blob: 74a91f6c23961e2a8d990ff367e8903e35f2df06 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010020import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070021try:
22 import ctypes
23except ImportError:
24 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000025
Antoine Pitrou05d936d2010-10-13 11:38:36 +000026ssl = support.import_module("ssl")
27
Martin Panter3840b2a2016-03-27 01:53:46 +000028
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010029PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000030HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020031IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010032IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
33IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Christian Heimesefff7062013-11-21 03:35:02 +010036def data_file(*name):
37 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Antoine Pitrou81564092010-10-08 23:06:24 +000039# The custom key and certificate files used in test_ssl are generated
40# using Lib/test/make_ssl_certs.py.
41# Other certificates are simply fetched from the Internet servers they
42# are meant to authenticate.
43
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000046ONLYCERT = data_file("ssl_cert.pem")
47ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000048BYTES_ONLYCERT = os.fsencode(ONLYCERT)
49BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020050CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
51ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
52KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000053CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000054BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010055CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
56CAFILE_CACERT = data_file("capath", "5ed36f99.0")
57
Christian Heimesbd5c7d22018-01-20 15:16:30 +010058CERTFILE_INFO = {
59 'issuer': ((('countryName', 'XY'),),
60 (('localityName', 'Castle Anthrax'),),
61 (('organizationName', 'Python Software Foundation'),),
62 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020063 'notAfter': 'Aug 26 14:23:15 2028 GMT',
64 'notBefore': 'Aug 29 14:23:15 2018 GMT',
65 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010066 'subject': ((('countryName', 'XY'),),
67 (('localityName', 'Castle Anthrax'),),
68 (('organizationName', 'Python Software Foundation'),),
69 (('commonName', 'localhost'),)),
70 'subjectAltName': (('DNS', 'localhost'),),
71 'version': 3
72}
Antoine Pitrou152efa22010-05-16 18:19:27 +000073
Christian Heimes22587792013-11-21 23:56:13 +010074# empty CRL
75CRLFILE = data_file("revocation.crl")
76
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010077# Two keys and certs signed by the same CA (for SNI tests)
78SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020079SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010080
81SIGNED_CERTFILE_INFO = {
82 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
83 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
84 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
85 'issuer': ((('countryName', 'XY'),),
86 (('organizationName', 'Python Software Foundation CA'),),
87 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020088 'notAfter': 'Jul 7 14:23:16 2028 GMT',
89 'notBefore': 'Aug 29 14:23:16 2018 GMT',
90 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010091 'subject': ((('countryName', 'XY'),),
92 (('localityName', 'Castle Anthrax'),),
93 (('organizationName', 'Python Software Foundation'),),
94 (('commonName', 'localhost'),)),
95 'subjectAltName': (('DNS', 'localhost'),),
96 'version': 3
97}
98
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010099SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
102SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
103
Martin Panter3840b2a2016-03-27 01:53:46 +0000104# Same certificate as pycacert.pem, but without extra text in file
105SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200106# cert with all kinds of subject alt names
107ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100108IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100109
Martin Panter3d81d932016-01-14 09:36:00 +0000110REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000111
112EMPTYCERT = data_file("nullcert.pem")
113BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000114NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000115BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200116NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200117NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000118
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200119DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100120BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000121
Christian Heimes358cfd42016-09-10 22:43:48 +0200122# Not defined in all versions of OpenSSL
123OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
124OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
125OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
126OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100127OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200128
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100129
Thomas Woutersed03b412007-08-28 21:37:11 +0000130def handle_error(prefix):
131 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000132 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000133 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000134
Antoine Pitroub5218772010-05-21 09:56:06 +0000135def can_clear_options():
136 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200137 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000138
139def no_sslv2_implies_sslv3_hello():
140 # 0.9.7h or higher
141 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
142
Christian Heimes2427b502013-11-23 11:24:32 +0100143def have_verify_flags():
144 # 0.9.8 or higher
145 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
146
Christian Heimesb7b92252018-02-25 09:49:31 +0100147def _have_secp_curves():
148 if not ssl.HAS_ECDH:
149 return False
150 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
151 try:
152 ctx.set_ecdh_curve("secp384r1")
153 except ValueError:
154 return False
155 else:
156 return True
157
158
159HAVE_SECP_CURVES = _have_secp_curves()
160
161
Antoine Pitrouc695c952014-04-28 20:57:36 +0200162def utc_offset(): #NOTE: ignore issues like #1647654
163 # local time = utc time + utc offset
164 if time.daylight and time.localtime().tm_isdst > 0:
165 return -time.altzone # seconds
166 return -time.timezone
167
Christian Heimes9424bb42013-06-17 15:32:57 +0200168def asn1time(cert_time):
169 # Some versions of OpenSSL ignore seconds, see #18207
170 # 0.9.8.i
171 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
172 fmt = "%b %d %H:%M:%S %Y GMT"
173 dt = datetime.datetime.strptime(cert_time, fmt)
174 dt = dt.replace(second=0)
175 cert_time = dt.strftime(fmt)
176 # %d adds leading zero but ASN1_TIME_print() uses leading space
177 if cert_time[4] == "0":
178 cert_time = cert_time[:4] + " " + cert_time[5:]
179
180 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000181
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100182needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184
Christian Heimesd0486372016-09-10 23:23:33 +0200185def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
186 cert_reqs=ssl.CERT_NONE, ca_certs=None,
187 ciphers=None, certfile=None, keyfile=None,
188 **kwargs):
189 context = ssl.SSLContext(ssl_version)
190 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200191 if cert_reqs == ssl.CERT_NONE:
192 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200193 context.verify_mode = cert_reqs
194 if ca_certs is not None:
195 context.load_verify_locations(ca_certs)
196 if certfile is not None or keyfile is not None:
197 context.load_cert_chain(certfile, keyfile)
198 if ciphers is not None:
199 context.set_ciphers(ciphers)
200 return context.wrap_socket(sock, **kwargs)
201
Christian Heimesa170fa12017-09-15 20:27:30 +0200202
203def testing_context(server_cert=SIGNED_CERTFILE):
204 """Create context
205
206 client_context, server_context, hostname = testing_context()
207 """
208 if server_cert == SIGNED_CERTFILE:
209 hostname = SIGNED_CERTFILE_HOSTNAME
210 elif server_cert == SIGNED_CERTFILE2:
211 hostname = SIGNED_CERTFILE2_HOSTNAME
212 else:
213 raise ValueError(server_cert)
214
215 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
216 client_context.load_verify_locations(SIGNING_CA)
217
218 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
219 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200220 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200221
222 return client_context, server_context, hostname
223
224
Antoine Pitrou152efa22010-05-16 18:19:27 +0000225class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000226
Antoine Pitrou480a1242010-04-28 21:37:09 +0000227 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000228 ssl.CERT_NONE
229 ssl.CERT_OPTIONAL
230 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100231 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100232 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100233 if ssl.HAS_ECDH:
234 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100235 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
236 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000237 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100238 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700239 ssl.OP_NO_SSLv2
240 ssl.OP_NO_SSLv3
241 ssl.OP_NO_TLSv1
242 ssl.OP_NO_TLSv1_3
243 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
244 ssl.OP_NO_TLSv1_1
245 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200246 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000247
Christian Heimes9d50ab52018-02-27 10:17:30 +0100248 def test_private_init(self):
249 with self.assertRaisesRegex(TypeError, "public constructor"):
250 with socket.socket() as s:
251 ssl.SSLSocket(s)
252
Antoine Pitrou172f0252014-04-18 20:33:08 +0200253 def test_str_for_enums(self):
254 # Make sure that the PROTOCOL_* constants have enum-like string
255 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200256 proto = ssl.PROTOCOL_TLS
257 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200258 ctx = ssl.SSLContext(proto)
259 self.assertIs(ctx.protocol, proto)
260
Antoine Pitrou480a1242010-04-28 21:37:09 +0000261 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000262 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000263 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000264 sys.stdout.write("\n RAND_status is %d (%s)\n"
265 % (v, (v and "sufficient randomness") or
266 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200267
268 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
269 self.assertEqual(len(data), 16)
270 self.assertEqual(is_cryptographic, v == 1)
271 if v:
272 data = ssl.RAND_bytes(16)
273 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200274 else:
275 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200276
Victor Stinner1e81a392013-12-19 16:47:04 +0100277 # negative num is invalid
278 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
279 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
280
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100281 if hasattr(ssl, 'RAND_egd'):
282 self.assertRaises(TypeError, ssl.RAND_egd, 1)
283 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000284 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200285 ssl.RAND_add(b"this is a random bytes object", 75.0)
286 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000287
Christian Heimesf77b4b22013-08-21 13:26:05 +0200288 @unittest.skipUnless(os.name == 'posix', 'requires posix')
289 def test_random_fork(self):
290 status = ssl.RAND_status()
291 if not status:
292 self.fail("OpenSSL's PRNG has insufficient randomness")
293
294 rfd, wfd = os.pipe()
295 pid = os.fork()
296 if pid == 0:
297 try:
298 os.close(rfd)
299 child_random = ssl.RAND_pseudo_bytes(16)[0]
300 self.assertEqual(len(child_random), 16)
301 os.write(wfd, child_random)
302 os.close(wfd)
303 except BaseException:
304 os._exit(1)
305 else:
306 os._exit(0)
307 else:
308 os.close(wfd)
309 self.addCleanup(os.close, rfd)
310 _, status = os.waitpid(pid, 0)
311 self.assertEqual(status, 0)
312
313 child_random = os.read(rfd, 16)
314 self.assertEqual(len(child_random), 16)
315 parent_random = ssl.RAND_pseudo_bytes(16)[0]
316 self.assertEqual(len(parent_random), 16)
317
318 self.assertNotEqual(child_random, parent_random)
319
Christian Heimese6dac002018-08-30 07:25:49 +0200320 maxDiff = None
321
Antoine Pitrou480a1242010-04-28 21:37:09 +0000322 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000323 # note that this uses an 'unofficial' function in _ssl.c,
324 # provided solely for this test, to exercise the certificate
325 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100326 self.assertEqual(
327 ssl._ssl._test_decode_cert(CERTFILE),
328 CERTFILE_INFO
329 )
330 self.assertEqual(
331 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
332 SIGNED_CERTFILE_INFO
333 )
334
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200335 # Issue #13034: the subjectAltName in some certificates
336 # (notably projects.developer.nokia.com:443) wasn't parsed
337 p = ssl._ssl._test_decode_cert(NOKIACERT)
338 if support.verbose:
339 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
340 self.assertEqual(p['subjectAltName'],
341 (('DNS', 'projects.developer.nokia.com'),
342 ('DNS', 'projects.forum.nokia.com'))
343 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100344 # extra OCSP and AIA fields
345 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
346 self.assertEqual(p['caIssuers'],
347 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
348 self.assertEqual(p['crlDistributionPoints'],
349 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000350
Christian Heimes824f7f32013-08-17 00:54:47 +0200351 def test_parse_cert_CVE_2013_4238(self):
352 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
353 if support.verbose:
354 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
355 subject = ((('countryName', 'US'),),
356 (('stateOrProvinceName', 'Oregon'),),
357 (('localityName', 'Beaverton'),),
358 (('organizationName', 'Python Software Foundation'),),
359 (('organizationalUnitName', 'Python Core Development'),),
360 (('commonName', 'null.python.org\x00example.org'),),
361 (('emailAddress', 'python-dev@python.org'),))
362 self.assertEqual(p['subject'], subject)
363 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200364 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
365 san = (('DNS', 'altnull.python.org\x00example.com'),
366 ('email', 'null@python.org\x00user@example.org'),
367 ('URI', 'http://null.python.org\x00http://example.org'),
368 ('IP Address', '192.0.2.1'),
369 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
370 else:
371 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
372 san = (('DNS', 'altnull.python.org\x00example.com'),
373 ('email', 'null@python.org\x00user@example.org'),
374 ('URI', 'http://null.python.org\x00http://example.org'),
375 ('IP Address', '192.0.2.1'),
376 ('IP Address', '<invalid>'))
377
378 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200379
Christian Heimes1c03abd2016-09-06 23:25:35 +0200380 def test_parse_all_sans(self):
381 p = ssl._ssl._test_decode_cert(ALLSANFILE)
382 self.assertEqual(p['subjectAltName'],
383 (
384 ('DNS', 'allsans'),
385 ('othername', '<unsupported>'),
386 ('othername', '<unsupported>'),
387 ('email', 'user@example.org'),
388 ('DNS', 'www.example.org'),
389 ('DirName',
390 ((('countryName', 'XY'),),
391 (('localityName', 'Castle Anthrax'),),
392 (('organizationName', 'Python Software Foundation'),),
393 (('commonName', 'dirname example'),))),
394 ('URI', 'https://www.python.org/'),
395 ('IP Address', '127.0.0.1'),
396 ('IP Address', '0:0:0:0:0:0:0:1\n'),
397 ('Registered ID', '1.2.3.4.5')
398 )
399 )
400
Antoine Pitrou480a1242010-04-28 21:37:09 +0000401 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000402 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000403 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000404 d1 = ssl.PEM_cert_to_DER_cert(pem)
405 p2 = ssl.DER_cert_to_PEM_cert(d1)
406 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000407 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000408 if not p2.startswith(ssl.PEM_HEADER + '\n'):
409 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
410 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
411 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000412
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000413 def test_openssl_version(self):
414 n = ssl.OPENSSL_VERSION_NUMBER
415 t = ssl.OPENSSL_VERSION_INFO
416 s = ssl.OPENSSL_VERSION
417 self.assertIsInstance(n, int)
418 self.assertIsInstance(t, tuple)
419 self.assertIsInstance(s, str)
420 # Some sanity checks follow
421 # >= 0.9
422 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400423 # < 3.0
424 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000425 major, minor, fix, patch, status = t
426 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400427 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000428 self.assertGreaterEqual(minor, 0)
429 self.assertLess(minor, 256)
430 self.assertGreaterEqual(fix, 0)
431 self.assertLess(fix, 256)
432 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100433 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000434 self.assertGreaterEqual(status, 0)
435 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400436 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200437 if IS_LIBRESSL:
438 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100439 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400440 else:
441 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100442 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000443
Antoine Pitrou9d543662010-04-23 23:10:32 +0000444 @support.cpython_only
445 def test_refcycle(self):
446 # Issue #7943: an SSL object doesn't create reference cycles with
447 # itself.
448 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200449 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000450 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100451 with support.check_warnings(("", ResourceWarning)):
452 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100453 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000454
Antoine Pitroua468adc2010-09-14 14:43:44 +0000455 def test_wrapped_unconnected(self):
456 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200457 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000458 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200459 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100460 self.assertRaises(OSError, ss.recv, 1)
461 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
462 self.assertRaises(OSError, ss.recvfrom, 1)
463 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
464 self.assertRaises(OSError, ss.send, b'x')
465 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Christian Heimes141c5e82018-02-24 21:10:57 +0100466 self.assertRaises(NotImplementedError, ss.sendmsg,
467 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000468
Antoine Pitrou40f08742010-04-24 22:04:40 +0000469 def test_timeout(self):
470 # Issue #8524: when creating an SSL socket, the timeout of the
471 # original socket should be retained.
472 for timeout in (None, 0.0, 5.0):
473 s = socket.socket(socket.AF_INET)
474 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200475 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100476 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000477
Christian Heimesd0486372016-09-10 23:23:33 +0200478 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000479 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000480 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000481 "certfile must be specified",
482 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000483 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000484 "certfile must be specified for server-side operations",
485 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000486 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000487 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200488 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100489 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
490 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200491 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200492 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000493 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000494 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000495 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200496 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000497 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000498 ssl.wrap_socket(sock,
499 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000500 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200501 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000502 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000503 ssl.wrap_socket(sock,
504 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000505 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000506
Martin Panter3464ea22016-02-01 21:58:11 +0000507 def bad_cert_test(self, certfile):
508 """Check that trying to use the given client certificate fails"""
509 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
510 certfile)
511 sock = socket.socket()
512 self.addCleanup(sock.close)
513 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200514 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200515 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000516
517 def test_empty_cert(self):
518 """Wrapping with an empty cert file"""
519 self.bad_cert_test("nullcert.pem")
520
521 def test_malformed_cert(self):
522 """Wrapping with a badly formatted certificate (syntax error)"""
523 self.bad_cert_test("badcert.pem")
524
525 def test_malformed_key(self):
526 """Wrapping with a badly formatted key (syntax error)"""
527 self.bad_cert_test("badkey.pem")
528
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000529 def test_match_hostname(self):
530 def ok(cert, hostname):
531 ssl.match_hostname(cert, hostname)
532 def fail(cert, hostname):
533 self.assertRaises(ssl.CertificateError,
534 ssl.match_hostname, cert, hostname)
535
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100536 # -- Hostname matching --
537
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000538 cert = {'subject': ((('commonName', 'example.com'),),)}
539 ok(cert, 'example.com')
540 ok(cert, 'ExAmple.cOm')
541 fail(cert, 'www.example.com')
542 fail(cert, '.example.com')
543 fail(cert, 'example.org')
544 fail(cert, 'exampleXcom')
545
546 cert = {'subject': ((('commonName', '*.a.com'),),)}
547 ok(cert, 'foo.a.com')
548 fail(cert, 'bar.foo.a.com')
549 fail(cert, 'a.com')
550 fail(cert, 'Xa.com')
551 fail(cert, '.a.com')
552
Mandeep Singhede2ac92017-11-27 04:01:27 +0530553 # only match wildcards when they are the only thing
554 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530556 fail(cert, 'foo.com')
557 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000558 fail(cert, 'bar.com')
559 fail(cert, 'foo.a.com')
560 fail(cert, 'bar.foo.com')
561
Christian Heimes824f7f32013-08-17 00:54:47 +0200562 # NULL bytes are bad, CVE-2013-4073
563 cert = {'subject': ((('commonName',
564 'null.python.org\x00example.org'),),)}
565 ok(cert, 'null.python.org\x00example.org') # or raise an error?
566 fail(cert, 'example.org')
567 fail(cert, 'null.python.org')
568
Georg Brandl72c98d32013-10-27 07:16:53 +0100569 # error cases with wildcards
570 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
571 fail(cert, 'bar.foo.a.com')
572 fail(cert, 'a.com')
573 fail(cert, 'Xa.com')
574 fail(cert, '.a.com')
575
576 cert = {'subject': ((('commonName', 'a.*.com'),),)}
577 fail(cert, 'a.foo.com')
578 fail(cert, 'a..com')
579 fail(cert, 'a.com')
580
581 # wildcard doesn't match IDNA prefix 'xn--'
582 idna = 'püthon.python.org'.encode("idna").decode("ascii")
583 cert = {'subject': ((('commonName', idna),),)}
584 ok(cert, idna)
585 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
586 fail(cert, idna)
587 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
588 fail(cert, idna)
589
590 # wildcard in first fragment and IDNA A-labels in sequent fragments
591 # are supported.
592 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
593 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530594 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
595 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100596 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
597 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
598
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000599 # Slightly fake real-world example
600 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
601 'subject': ((('commonName', 'linuxfrz.org'),),),
602 'subjectAltName': (('DNS', 'linuxfr.org'),
603 ('DNS', 'linuxfr.com'),
604 ('othername', '<unsupported>'))}
605 ok(cert, 'linuxfr.org')
606 ok(cert, 'linuxfr.com')
607 # Not a "DNS" entry
608 fail(cert, '<unsupported>')
609 # When there is a subjectAltName, commonName isn't used
610 fail(cert, 'linuxfrz.org')
611
612 # A pristine real-world example
613 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
614 'subject': ((('countryName', 'US'),),
615 (('stateOrProvinceName', 'California'),),
616 (('localityName', 'Mountain View'),),
617 (('organizationName', 'Google Inc'),),
618 (('commonName', 'mail.google.com'),))}
619 ok(cert, 'mail.google.com')
620 fail(cert, 'gmail.com')
621 # Only commonName is considered
622 fail(cert, 'California')
623
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100624 # -- IPv4 matching --
625 cert = {'subject': ((('commonName', 'example.com'),),),
626 'subjectAltName': (('DNS', 'example.com'),
627 ('IP Address', '10.11.12.13'),
628 ('IP Address', '14.15.16.17'))}
629 ok(cert, '10.11.12.13')
630 ok(cert, '14.15.16.17')
631 fail(cert, '14.15.16.18')
632 fail(cert, 'example.net')
633
634 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100635 if hasattr(socket, 'AF_INET6'):
636 cert = {'subject': ((('commonName', 'example.com'),),),
637 'subjectAltName': (
638 ('DNS', 'example.com'),
639 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
640 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
641 ok(cert, '2001::cafe')
642 ok(cert, '2003::baba')
643 fail(cert, '2003::bebe')
644 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100645
646 # -- Miscellaneous --
647
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000648 # Neither commonName nor subjectAltName
649 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
650 'subject': ((('countryName', 'US'),),
651 (('stateOrProvinceName', 'California'),),
652 (('localityName', 'Mountain View'),),
653 (('organizationName', 'Google Inc'),))}
654 fail(cert, 'mail.google.com')
655
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200656 # No DNS entry in subjectAltName but a commonName
657 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
658 'subject': ((('countryName', 'US'),),
659 (('stateOrProvinceName', 'California'),),
660 (('localityName', 'Mountain View'),),
661 (('commonName', 'mail.google.com'),)),
662 'subjectAltName': (('othername', 'blabla'), )}
663 ok(cert, 'mail.google.com')
664
665 # No DNS entry subjectAltName and no commonName
666 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
667 'subject': ((('countryName', 'US'),),
668 (('stateOrProvinceName', 'California'),),
669 (('localityName', 'Mountain View'),),
670 (('organizationName', 'Google Inc'),)),
671 'subjectAltName': (('othername', 'blabla'),)}
672 fail(cert, 'google.com')
673
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000674 # Empty cert / no cert
675 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
676 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
677
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200678 # Issue #17980: avoid denials of service by refusing more than one
679 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100680 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
681 with self.assertRaisesRegex(
682 ssl.CertificateError,
683 "partial wildcards in leftmost label are not supported"):
684 ssl.match_hostname(cert, 'axxb.example.com')
685
686 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
687 with self.assertRaisesRegex(
688 ssl.CertificateError,
689 "wildcard can only be present in the leftmost label"):
690 ssl.match_hostname(cert, 'www.sub.example.com')
691
692 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
693 with self.assertRaisesRegex(
694 ssl.CertificateError,
695 "too many wildcards"):
696 ssl.match_hostname(cert, 'axxbxxc.example.com')
697
698 cert = {'subject': ((('commonName', '*'),),)}
699 with self.assertRaisesRegex(
700 ssl.CertificateError,
701 "sole wildcard without additional labels are not support"):
702 ssl.match_hostname(cert, 'host')
703
704 cert = {'subject': ((('commonName', '*.com'),),)}
705 with self.assertRaisesRegex(
706 ssl.CertificateError,
707 r"hostname 'com' doesn't match '\*.com'"):
708 ssl.match_hostname(cert, 'com')
709
710 # extra checks for _inet_paton()
711 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
712 with self.assertRaises(ValueError):
713 ssl._inet_paton(invalid)
714 for ipaddr in ['127.0.0.1', '192.168.0.1']:
715 self.assertTrue(ssl._inet_paton(ipaddr))
716 if hasattr(socket, 'AF_INET6'):
717 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
718 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200719
Antoine Pitroud5323212010-10-22 18:19:07 +0000720 def test_server_side(self):
721 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200722 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000723 with socket.socket() as sock:
724 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
725 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000726
Antoine Pitroud6494802011-07-21 01:11:30 +0200727 def test_unknown_channel_binding(self):
728 # should raise ValueError for unknown type
729 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200730 s.bind(('127.0.0.1', 0))
731 s.listen()
732 c = socket.socket(socket.AF_INET)
733 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200734 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100735 with self.assertRaises(ValueError):
736 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200737 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200738
739 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
740 "'tls-unique' channel binding not available")
741 def test_tls_unique_channel_binding(self):
742 # unconnected should return None for known type
743 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200744 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100745 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200746 # the same for server-side
747 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200748 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100749 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200750
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600751 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200752 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600753 r = repr(ss)
754 with self.assertWarns(ResourceWarning) as cm:
755 ss = None
756 support.gc_collect()
757 self.assertIn(r, str(cm.warning.args[0]))
758
Christian Heimes6d7ad132013-06-09 18:02:55 +0200759 def test_get_default_verify_paths(self):
760 paths = ssl.get_default_verify_paths()
761 self.assertEqual(len(paths), 6)
762 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
763
764 with support.EnvironmentVarGuard() as env:
765 env["SSL_CERT_DIR"] = CAPATH
766 env["SSL_CERT_FILE"] = CERTFILE
767 paths = ssl.get_default_verify_paths()
768 self.assertEqual(paths.cafile, CERTFILE)
769 self.assertEqual(paths.capath, CAPATH)
770
Christian Heimes44109d72013-11-22 01:51:30 +0100771 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
772 def test_enum_certificates(self):
773 self.assertTrue(ssl.enum_certificates("CA"))
774 self.assertTrue(ssl.enum_certificates("ROOT"))
775
776 self.assertRaises(TypeError, ssl.enum_certificates)
777 self.assertRaises(WindowsError, ssl.enum_certificates, "")
778
Christian Heimesc2d65e12013-11-22 16:13:55 +0100779 trust_oids = set()
780 for storename in ("CA", "ROOT"):
781 store = ssl.enum_certificates(storename)
782 self.assertIsInstance(store, list)
783 for element in store:
784 self.assertIsInstance(element, tuple)
785 self.assertEqual(len(element), 3)
786 cert, enc, trust = element
787 self.assertIsInstance(cert, bytes)
788 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
789 self.assertIsInstance(trust, (set, bool))
790 if isinstance(trust, set):
791 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100792
793 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100794 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200795
Christian Heimes46bebee2013-06-09 19:03:31 +0200796 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100797 def test_enum_crls(self):
798 self.assertTrue(ssl.enum_crls("CA"))
799 self.assertRaises(TypeError, ssl.enum_crls)
800 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200801
Christian Heimes44109d72013-11-22 01:51:30 +0100802 crls = ssl.enum_crls("CA")
803 self.assertIsInstance(crls, list)
804 for element in crls:
805 self.assertIsInstance(element, tuple)
806 self.assertEqual(len(element), 2)
807 self.assertIsInstance(element[0], bytes)
808 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200809
Christian Heimes46bebee2013-06-09 19:03:31 +0200810
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100811 def test_asn1object(self):
812 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
813 '1.3.6.1.5.5.7.3.1')
814
815 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
816 self.assertEqual(val, expected)
817 self.assertEqual(val.nid, 129)
818 self.assertEqual(val.shortname, 'serverAuth')
819 self.assertEqual(val.longname, 'TLS Web Server Authentication')
820 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
821 self.assertIsInstance(val, ssl._ASN1Object)
822 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
823
824 val = ssl._ASN1Object.fromnid(129)
825 self.assertEqual(val, expected)
826 self.assertIsInstance(val, ssl._ASN1Object)
827 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100828 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
829 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100830 for i in range(1000):
831 try:
832 obj = ssl._ASN1Object.fromnid(i)
833 except ValueError:
834 pass
835 else:
836 self.assertIsInstance(obj.nid, int)
837 self.assertIsInstance(obj.shortname, str)
838 self.assertIsInstance(obj.longname, str)
839 self.assertIsInstance(obj.oid, (str, type(None)))
840
841 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
842 self.assertEqual(val, expected)
843 self.assertIsInstance(val, ssl._ASN1Object)
844 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
845 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
846 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100847 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
848 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100849
Christian Heimes72d28502013-11-23 13:56:58 +0100850 def test_purpose_enum(self):
851 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
852 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
853 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
854 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
855 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
856 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
857 '1.3.6.1.5.5.7.3.1')
858
859 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
860 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
861 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
862 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
863 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
864 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
865 '1.3.6.1.5.5.7.3.2')
866
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100867 def test_unsupported_dtls(self):
868 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
869 self.addCleanup(s.close)
870 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200871 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100872 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200873 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100874 with self.assertRaises(NotImplementedError) as cx:
875 ctx.wrap_socket(s)
876 self.assertEqual(str(cx.exception), "only stream sockets are supported")
877
Antoine Pitrouc695c952014-04-28 20:57:36 +0200878 def cert_time_ok(self, timestring, timestamp):
879 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
880
881 def cert_time_fail(self, timestring):
882 with self.assertRaises(ValueError):
883 ssl.cert_time_to_seconds(timestring)
884
885 @unittest.skipUnless(utc_offset(),
886 'local time needs to be different from UTC')
887 def test_cert_time_to_seconds_timezone(self):
888 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
889 # results if local timezone is not UTC
890 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
891 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
892
893 def test_cert_time_to_seconds(self):
894 timestring = "Jan 5 09:34:43 2018 GMT"
895 ts = 1515144883.0
896 self.cert_time_ok(timestring, ts)
897 # accept keyword parameter, assert its name
898 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
899 # accept both %e and %d (space or zero generated by strftime)
900 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
901 # case-insensitive
902 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
903 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
904 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
905 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
906 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
907 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
908 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
909 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
910
911 newyear_ts = 1230768000.0
912 # leap seconds
913 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
914 # same timestamp
915 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
916
917 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
918 # allow 60th second (even if it is not a leap second)
919 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
920 # allow 2nd leap second for compatibility with time.strptime()
921 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
922 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
923
Mike53f7a7c2017-12-14 14:04:53 +0300924 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200925 # 99991231235959Z (rfc 5280)
926 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
927
928 @support.run_with_locale('LC_ALL', '')
929 def test_cert_time_to_seconds_locale(self):
930 # `cert_time_to_seconds()` should be locale independent
931
932 def local_february_name():
933 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
934
935 if local_february_name().lower() == 'feb':
936 self.skipTest("locale-specific month name needs to be "
937 "different from C locale")
938
939 # locale-independent
940 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
941 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
942
Martin Panter3840b2a2016-03-27 01:53:46 +0000943 def test_connect_ex_error(self):
944 server = socket.socket(socket.AF_INET)
945 self.addCleanup(server.close)
946 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200947 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000948 cert_reqs=ssl.CERT_REQUIRED)
949 self.addCleanup(s.close)
950 rc = s.connect_ex((HOST, port))
951 # Issue #19919: Windows machines or VMs hosted on Windows
952 # machines sometimes return EWOULDBLOCK.
953 errors = (
954 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
955 errno.EWOULDBLOCK,
956 )
957 self.assertIn(rc, errors)
958
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100959
Antoine Pitrou152efa22010-05-16 18:19:27 +0000960class ContextTests(unittest.TestCase):
961
962 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100963 for protocol in PROTOCOLS:
964 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200965 ctx = ssl.SSLContext()
966 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000967 self.assertRaises(ValueError, ssl.SSLContext, -1)
968 self.assertRaises(ValueError, ssl.SSLContext, 42)
969
970 def test_protocol(self):
971 for proto in PROTOCOLS:
972 ctx = ssl.SSLContext(proto)
973 self.assertEqual(ctx.protocol, proto)
974
975 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200976 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000977 ctx.set_ciphers("ALL")
978 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000979 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000980 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000981
Christian Heimes892d66e2018-01-29 14:10:18 +0100982 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
983 "Test applies only to Python default ciphers")
984 def test_python_ciphers(self):
985 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
986 ciphers = ctx.get_ciphers()
987 for suite in ciphers:
988 name = suite['name']
989 self.assertNotIn("PSK", name)
990 self.assertNotIn("SRP", name)
991 self.assertNotIn("MD5", name)
992 self.assertNotIn("RC4", name)
993 self.assertNotIn("3DES", name)
994
Christian Heimes25bfcd52016-09-06 00:04:45 +0200995 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
996 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200997 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200998 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200999 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001000 self.assertIn('AES256-GCM-SHA384', names)
1001 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001002
Antoine Pitroub5218772010-05-21 09:56:06 +00001003 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001005 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001006 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001007 # SSLContext also enables these by default
1008 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001009 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1010 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001011 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001012 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001013 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001014 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001015 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1016 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001017 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001018 # Ubuntu has OP_NO_SSLv3 forced on by default
1019 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001020 else:
1021 with self.assertRaises(ValueError):
1022 ctx.options = 0
1023
Christian Heimesa170fa12017-09-15 20:27:30 +02001024 def test_verify_mode_protocol(self):
1025 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001026 # Default value
1027 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1028 ctx.verify_mode = ssl.CERT_OPTIONAL
1029 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1030 ctx.verify_mode = ssl.CERT_REQUIRED
1031 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1032 ctx.verify_mode = ssl.CERT_NONE
1033 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1034 with self.assertRaises(TypeError):
1035 ctx.verify_mode = None
1036 with self.assertRaises(ValueError):
1037 ctx.verify_mode = 42
1038
Christian Heimesa170fa12017-09-15 20:27:30 +02001039 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1040 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1041 self.assertFalse(ctx.check_hostname)
1042
1043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1044 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1045 self.assertTrue(ctx.check_hostname)
1046
Christian Heimes61d478c2018-01-27 15:51:38 +01001047 def test_hostname_checks_common_name(self):
1048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1049 self.assertTrue(ctx.hostname_checks_common_name)
1050 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1051 ctx.hostname_checks_common_name = True
1052 self.assertTrue(ctx.hostname_checks_common_name)
1053 ctx.hostname_checks_common_name = False
1054 self.assertFalse(ctx.hostname_checks_common_name)
1055 ctx.hostname_checks_common_name = True
1056 self.assertTrue(ctx.hostname_checks_common_name)
1057 else:
1058 with self.assertRaises(AttributeError):
1059 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001060
Christian Heimes698dde12018-02-27 11:54:43 +01001061 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1062 "required OpenSSL 1.1.0g")
1063 def test_min_max_version(self):
1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1065 self.assertEqual(
1066 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1067 )
1068 self.assertEqual(
1069 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1070 )
1071
1072 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1073 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1074 self.assertEqual(
1075 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1076 )
1077 self.assertEqual(
1078 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1079 )
1080
1081 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1082 ctx.maximum_version = ssl.TLSVersion.TLSv1
1083 self.assertEqual(
1084 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1085 )
1086 self.assertEqual(
1087 ctx.maximum_version, ssl.TLSVersion.TLSv1
1088 )
1089
1090 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1091 self.assertEqual(
1092 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1093 )
1094
1095 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1096 self.assertIn(
1097 ctx.maximum_version,
1098 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1099 )
1100
1101 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1102 self.assertIn(
1103 ctx.minimum_version,
1104 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1105 )
1106
1107 with self.assertRaises(ValueError):
1108 ctx.minimum_version = 42
1109
1110 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1111
1112 self.assertEqual(
1113 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1114 )
1115 self.assertEqual(
1116 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1117 )
1118 with self.assertRaises(ValueError):
1119 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1120 with self.assertRaises(ValueError):
1121 ctx.maximum_version = ssl.TLSVersion.TLSv1
1122
1123
Christian Heimes2427b502013-11-23 11:24:32 +01001124 @unittest.skipUnless(have_verify_flags(),
1125 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001126 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001127 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001128 # default value
1129 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1130 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001131 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1132 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1133 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1134 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1135 ctx.verify_flags = ssl.VERIFY_DEFAULT
1136 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1137 # supports any value
1138 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1139 self.assertEqual(ctx.verify_flags,
1140 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1141 with self.assertRaises(TypeError):
1142 ctx.verify_flags = None
1143
Antoine Pitrou152efa22010-05-16 18:19:27 +00001144 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001145 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001146 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001147 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001148 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1149 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001150 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001151 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001152 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001153 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001154 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001155 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001156 ctx.load_cert_chain(EMPTYCERT)
1157 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001158 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001159 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1160 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1161 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001162 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001163 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001164 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001165 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001166 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001167 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1168 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001170 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001171 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001172 # Password protected key and cert
1173 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1174 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1175 ctx.load_cert_chain(CERTFILE_PROTECTED,
1176 password=bytearray(KEY_PASSWORD.encode()))
1177 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1178 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1179 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1180 bytearray(KEY_PASSWORD.encode()))
1181 with self.assertRaisesRegex(TypeError, "should be a string"):
1182 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1183 with self.assertRaises(ssl.SSLError):
1184 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1185 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1186 # openssl has a fixed limit on the password buffer.
1187 # PEM_BUFSIZE is generally set to 1kb.
1188 # Return a string larger than this.
1189 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1190 # Password callback
1191 def getpass_unicode():
1192 return KEY_PASSWORD
1193 def getpass_bytes():
1194 return KEY_PASSWORD.encode()
1195 def getpass_bytearray():
1196 return bytearray(KEY_PASSWORD.encode())
1197 def getpass_badpass():
1198 return "badpass"
1199 def getpass_huge():
1200 return b'a' * (1024 * 1024)
1201 def getpass_bad_type():
1202 return 9
1203 def getpass_exception():
1204 raise Exception('getpass error')
1205 class GetPassCallable:
1206 def __call__(self):
1207 return KEY_PASSWORD
1208 def getpass(self):
1209 return KEY_PASSWORD
1210 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1211 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1212 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1213 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1214 ctx.load_cert_chain(CERTFILE_PROTECTED,
1215 password=GetPassCallable().getpass)
1216 with self.assertRaises(ssl.SSLError):
1217 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1218 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1219 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1220 with self.assertRaisesRegex(TypeError, "must return a string"):
1221 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1222 with self.assertRaisesRegex(Exception, "getpass error"):
1223 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1224 # Make sure the password function isn't called if it isn't needed
1225 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001226
1227 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001228 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001229 ctx.load_verify_locations(CERTFILE)
1230 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1231 ctx.load_verify_locations(BYTES_CERTFILE)
1232 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1233 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001234 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001235 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001236 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001237 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001238 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001239 ctx.load_verify_locations(BADCERT)
1240 ctx.load_verify_locations(CERTFILE, CAPATH)
1241 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1242
Victor Stinner80f75e62011-01-29 11:31:20 +00001243 # Issue #10989: crash if the second argument type is invalid
1244 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1245
Christian Heimesefff7062013-11-21 03:35:02 +01001246 def test_load_verify_cadata(self):
1247 # test cadata
1248 with open(CAFILE_CACERT) as f:
1249 cacert_pem = f.read()
1250 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1251 with open(CAFILE_NEURONIO) as f:
1252 neuronio_pem = f.read()
1253 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1254
1255 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001256 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001257 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1258 ctx.load_verify_locations(cadata=cacert_pem)
1259 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1260 ctx.load_verify_locations(cadata=neuronio_pem)
1261 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1262 # cert already in hash table
1263 ctx.load_verify_locations(cadata=neuronio_pem)
1264 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1265
1266 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001267 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001268 combined = "\n".join((cacert_pem, neuronio_pem))
1269 ctx.load_verify_locations(cadata=combined)
1270 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1271
1272 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001273 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001274 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1275 neuronio_pem, "tail"]
1276 ctx.load_verify_locations(cadata="\n".join(combined))
1277 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1278
1279 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001280 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001281 ctx.load_verify_locations(cadata=cacert_der)
1282 ctx.load_verify_locations(cadata=neuronio_der)
1283 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1284 # cert already in hash table
1285 ctx.load_verify_locations(cadata=cacert_der)
1286 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1287
1288 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001289 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001290 combined = b"".join((cacert_der, neuronio_der))
1291 ctx.load_verify_locations(cadata=combined)
1292 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1293
1294 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001295 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001296 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1297
1298 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1299 ctx.load_verify_locations(cadata="broken")
1300 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1301 ctx.load_verify_locations(cadata=b"broken")
1302
1303
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001304 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001305 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001306 ctx.load_dh_params(DHFILE)
1307 if os.name != 'nt':
1308 ctx.load_dh_params(BYTES_DHFILE)
1309 self.assertRaises(TypeError, ctx.load_dh_params)
1310 self.assertRaises(TypeError, ctx.load_dh_params, None)
1311 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001312 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001313 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001314 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001315 ctx.load_dh_params(CERTFILE)
1316
Antoine Pitroub0182c82010-10-12 20:09:02 +00001317 def test_session_stats(self):
1318 for proto in PROTOCOLS:
1319 ctx = ssl.SSLContext(proto)
1320 self.assertEqual(ctx.session_stats(), {
1321 'number': 0,
1322 'connect': 0,
1323 'connect_good': 0,
1324 'connect_renegotiate': 0,
1325 'accept': 0,
1326 'accept_good': 0,
1327 'accept_renegotiate': 0,
1328 'hits': 0,
1329 'misses': 0,
1330 'timeouts': 0,
1331 'cache_full': 0,
1332 })
1333
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001334 def test_set_default_verify_paths(self):
1335 # There's not much we can do to test that it acts as expected,
1336 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001337 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001338 ctx.set_default_verify_paths()
1339
Antoine Pitrou501da612011-12-21 09:27:41 +01001340 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001341 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001342 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001343 ctx.set_ecdh_curve("prime256v1")
1344 ctx.set_ecdh_curve(b"prime256v1")
1345 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1346 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1347 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1348 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1349
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001350 @needs_sni
1351 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001352 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001353
1354 # set_servername_callback expects a callable, or None
1355 self.assertRaises(TypeError, ctx.set_servername_callback)
1356 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1357 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1358 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1359
1360 def dummycallback(sock, servername, ctx):
1361 pass
1362 ctx.set_servername_callback(None)
1363 ctx.set_servername_callback(dummycallback)
1364
1365 @needs_sni
1366 def test_sni_callback_refcycle(self):
1367 # Reference cycles through the servername callback are detected
1368 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001369 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001370 def dummycallback(sock, servername, ctx, cycle=ctx):
1371 pass
1372 ctx.set_servername_callback(dummycallback)
1373 wr = weakref.ref(ctx)
1374 del ctx, dummycallback
1375 gc.collect()
1376 self.assertIs(wr(), None)
1377
Christian Heimes9a5395a2013-06-17 15:44:12 +02001378 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001379 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001380 self.assertEqual(ctx.cert_store_stats(),
1381 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1382 ctx.load_cert_chain(CERTFILE)
1383 self.assertEqual(ctx.cert_store_stats(),
1384 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1385 ctx.load_verify_locations(CERTFILE)
1386 self.assertEqual(ctx.cert_store_stats(),
1387 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001388 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001389 self.assertEqual(ctx.cert_store_stats(),
1390 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1391
1392 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001393 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001394 self.assertEqual(ctx.get_ca_certs(), [])
1395 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1396 ctx.load_verify_locations(CERTFILE)
1397 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001398 # but CAFILE_CACERT is a CA cert
1399 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001400 self.assertEqual(ctx.get_ca_certs(),
1401 [{'issuer': ((('organizationName', 'Root CA'),),
1402 (('organizationalUnitName', 'http://www.cacert.org'),),
1403 (('commonName', 'CA Cert Signing Authority'),),
1404 (('emailAddress', 'support@cacert.org'),)),
1405 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1406 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1407 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001408 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001409 'subject': ((('organizationName', 'Root CA'),),
1410 (('organizationalUnitName', 'http://www.cacert.org'),),
1411 (('commonName', 'CA Cert Signing Authority'),),
1412 (('emailAddress', 'support@cacert.org'),)),
1413 'version': 3}])
1414
Martin Panterb55f8b72016-01-14 12:53:56 +00001415 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001416 pem = f.read()
1417 der = ssl.PEM_cert_to_DER_cert(pem)
1418 self.assertEqual(ctx.get_ca_certs(True), [der])
1419
Christian Heimes72d28502013-11-23 13:56:58 +01001420 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001421 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001422 ctx.load_default_certs()
1423
Christian Heimesa170fa12017-09-15 20:27:30 +02001424 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001425 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1426 ctx.load_default_certs()
1427
Christian Heimesa170fa12017-09-15 20:27:30 +02001428 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001429 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1430
Christian Heimesa170fa12017-09-15 20:27:30 +02001431 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001432 self.assertRaises(TypeError, ctx.load_default_certs, None)
1433 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1434
Benjamin Peterson91244e02014-10-03 18:17:15 -04001435 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001436 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001437 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001438 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001439 with support.EnvironmentVarGuard() as env:
1440 env["SSL_CERT_DIR"] = CAPATH
1441 env["SSL_CERT_FILE"] = CERTFILE
1442 ctx.load_default_certs()
1443 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1444
Benjamin Peterson91244e02014-10-03 18:17:15 -04001445 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001446 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001447 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001448 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001449 ctx.load_default_certs()
1450 stats = ctx.cert_store_stats()
1451
Christian Heimesa170fa12017-09-15 20:27:30 +02001452 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001453 with support.EnvironmentVarGuard() as env:
1454 env["SSL_CERT_DIR"] = CAPATH
1455 env["SSL_CERT_FILE"] = CERTFILE
1456 ctx.load_default_certs()
1457 stats["x509"] += 1
1458 self.assertEqual(ctx.cert_store_stats(), stats)
1459
Christian Heimes358cfd42016-09-10 22:43:48 +02001460 def _assert_context_options(self, ctx):
1461 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1462 if OP_NO_COMPRESSION != 0:
1463 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1464 OP_NO_COMPRESSION)
1465 if OP_SINGLE_DH_USE != 0:
1466 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1467 OP_SINGLE_DH_USE)
1468 if OP_SINGLE_ECDH_USE != 0:
1469 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1470 OP_SINGLE_ECDH_USE)
1471 if OP_CIPHER_SERVER_PREFERENCE != 0:
1472 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1473 OP_CIPHER_SERVER_PREFERENCE)
1474
Christian Heimes4c05b472013-11-23 15:58:30 +01001475 def test_create_default_context(self):
1476 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001477
Christian Heimesa170fa12017-09-15 20:27:30 +02001478 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001479 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001480 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001481 self._assert_context_options(ctx)
1482
Christian Heimes4c05b472013-11-23 15:58:30 +01001483 with open(SIGNING_CA) as f:
1484 cadata = f.read()
1485 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1486 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001487 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001488 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001489 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001490
1491 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001492 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001493 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001494 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001495
Christian Heimes67986f92013-11-23 22:43:47 +01001496 def test__create_stdlib_context(self):
1497 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001498 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001499 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001500 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001501 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001502
1503 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1504 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1505 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001506 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001507
1508 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001509 cert_reqs=ssl.CERT_REQUIRED,
1510 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001511 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1512 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001513 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001514 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001515
1516 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001517 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001518 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001519 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001520
Christian Heimes1aa9a752013-12-02 02:41:19 +01001521 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001522 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001523 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001524 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001525
Christian Heimese82c0342017-09-15 20:29:57 +02001526 # Auto set CERT_REQUIRED
1527 ctx.check_hostname = True
1528 self.assertTrue(ctx.check_hostname)
1529 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1530 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001531 ctx.verify_mode = ssl.CERT_REQUIRED
1532 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001533 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001534
Christian Heimese82c0342017-09-15 20:29:57 +02001535 # Changing verify_mode does not affect check_hostname
1536 ctx.check_hostname = False
1537 ctx.verify_mode = ssl.CERT_NONE
1538 ctx.check_hostname = False
1539 self.assertFalse(ctx.check_hostname)
1540 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1541 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001542 ctx.check_hostname = True
1543 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001544 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1545
1546 ctx.check_hostname = False
1547 ctx.verify_mode = ssl.CERT_OPTIONAL
1548 ctx.check_hostname = False
1549 self.assertFalse(ctx.check_hostname)
1550 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1551 # keep CERT_OPTIONAL
1552 ctx.check_hostname = True
1553 self.assertTrue(ctx.check_hostname)
1554 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001555
1556 # Cannot set CERT_NONE with check_hostname enabled
1557 with self.assertRaises(ValueError):
1558 ctx.verify_mode = ssl.CERT_NONE
1559 ctx.check_hostname = False
1560 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001561 ctx.verify_mode = ssl.CERT_NONE
1562 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001563
Christian Heimes5fe668c2016-09-12 00:01:11 +02001564 def test_context_client_server(self):
1565 # PROTOCOL_TLS_CLIENT has sane defaults
1566 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1567 self.assertTrue(ctx.check_hostname)
1568 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1569
1570 # PROTOCOL_TLS_SERVER has different but also sane defaults
1571 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1572 self.assertFalse(ctx.check_hostname)
1573 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1574
Christian Heimes4df60f12017-09-15 20:26:05 +02001575 def test_context_custom_class(self):
1576 class MySSLSocket(ssl.SSLSocket):
1577 pass
1578
1579 class MySSLObject(ssl.SSLObject):
1580 pass
1581
1582 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1583 ctx.sslsocket_class = MySSLSocket
1584 ctx.sslobject_class = MySSLObject
1585
1586 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1587 self.assertIsInstance(sock, MySSLSocket)
1588 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1589 self.assertIsInstance(obj, MySSLObject)
1590
Antoine Pitrou152efa22010-05-16 18:19:27 +00001591
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001592class SSLErrorTests(unittest.TestCase):
1593
1594 def test_str(self):
1595 # The str() of a SSLError doesn't include the errno
1596 e = ssl.SSLError(1, "foo")
1597 self.assertEqual(str(e), "foo")
1598 self.assertEqual(e.errno, 1)
1599 # Same for a subclass
1600 e = ssl.SSLZeroReturnError(1, "foo")
1601 self.assertEqual(str(e), "foo")
1602 self.assertEqual(e.errno, 1)
1603
1604 def test_lib_reason(self):
1605 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001606 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001607 with self.assertRaises(ssl.SSLError) as cm:
1608 ctx.load_dh_params(CERTFILE)
1609 self.assertEqual(cm.exception.library, 'PEM')
1610 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1611 s = str(cm.exception)
1612 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1613
1614 def test_subclass(self):
1615 # Check that the appropriate SSLError subclass is raised
1616 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001617 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1618 ctx.check_hostname = False
1619 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001620 with socket.socket() as s:
1621 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001622 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001623 c = socket.socket()
1624 c.connect(s.getsockname())
1625 c.setblocking(False)
1626 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001627 with self.assertRaises(ssl.SSLWantReadError) as cm:
1628 c.do_handshake()
1629 s = str(cm.exception)
1630 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1631 # For compatibility
1632 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1633
1634
Christian Heimes61d478c2018-01-27 15:51:38 +01001635 def test_bad_server_hostname(self):
1636 ctx = ssl.create_default_context()
1637 with self.assertRaises(ValueError):
1638 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1639 server_hostname="")
1640 with self.assertRaises(ValueError):
1641 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1642 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001643 with self.assertRaises(TypeError):
1644 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1645 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001646
1647
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001648class MemoryBIOTests(unittest.TestCase):
1649
1650 def test_read_write(self):
1651 bio = ssl.MemoryBIO()
1652 bio.write(b'foo')
1653 self.assertEqual(bio.read(), b'foo')
1654 self.assertEqual(bio.read(), b'')
1655 bio.write(b'foo')
1656 bio.write(b'bar')
1657 self.assertEqual(bio.read(), b'foobar')
1658 self.assertEqual(bio.read(), b'')
1659 bio.write(b'baz')
1660 self.assertEqual(bio.read(2), b'ba')
1661 self.assertEqual(bio.read(1), b'z')
1662 self.assertEqual(bio.read(1), b'')
1663
1664 def test_eof(self):
1665 bio = ssl.MemoryBIO()
1666 self.assertFalse(bio.eof)
1667 self.assertEqual(bio.read(), b'')
1668 self.assertFalse(bio.eof)
1669 bio.write(b'foo')
1670 self.assertFalse(bio.eof)
1671 bio.write_eof()
1672 self.assertFalse(bio.eof)
1673 self.assertEqual(bio.read(2), b'fo')
1674 self.assertFalse(bio.eof)
1675 self.assertEqual(bio.read(1), b'o')
1676 self.assertTrue(bio.eof)
1677 self.assertEqual(bio.read(), b'')
1678 self.assertTrue(bio.eof)
1679
1680 def test_pending(self):
1681 bio = ssl.MemoryBIO()
1682 self.assertEqual(bio.pending, 0)
1683 bio.write(b'foo')
1684 self.assertEqual(bio.pending, 3)
1685 for i in range(3):
1686 bio.read(1)
1687 self.assertEqual(bio.pending, 3-i-1)
1688 for i in range(3):
1689 bio.write(b'x')
1690 self.assertEqual(bio.pending, i+1)
1691 bio.read()
1692 self.assertEqual(bio.pending, 0)
1693
1694 def test_buffer_types(self):
1695 bio = ssl.MemoryBIO()
1696 bio.write(b'foo')
1697 self.assertEqual(bio.read(), b'foo')
1698 bio.write(bytearray(b'bar'))
1699 self.assertEqual(bio.read(), b'bar')
1700 bio.write(memoryview(b'baz'))
1701 self.assertEqual(bio.read(), b'baz')
1702
1703 def test_error_types(self):
1704 bio = ssl.MemoryBIO()
1705 self.assertRaises(TypeError, bio.write, 'foo')
1706 self.assertRaises(TypeError, bio.write, None)
1707 self.assertRaises(TypeError, bio.write, True)
1708 self.assertRaises(TypeError, bio.write, 1)
1709
1710
Christian Heimes9d50ab52018-02-27 10:17:30 +01001711class SSLObjectTests(unittest.TestCase):
1712 def test_private_init(self):
1713 bio = ssl.MemoryBIO()
1714 with self.assertRaisesRegex(TypeError, "public constructor"):
1715 ssl.SSLObject(bio, bio)
1716
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001717 def test_unwrap(self):
1718 client_ctx, server_ctx, hostname = testing_context()
1719 c_in = ssl.MemoryBIO()
1720 c_out = ssl.MemoryBIO()
1721 s_in = ssl.MemoryBIO()
1722 s_out = ssl.MemoryBIO()
1723 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1724 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1725
1726 # Loop on the handshake for a bit to get it settled
1727 for _ in range(5):
1728 try:
1729 client.do_handshake()
1730 except ssl.SSLWantReadError:
1731 pass
1732 if c_out.pending:
1733 s_in.write(c_out.read())
1734 try:
1735 server.do_handshake()
1736 except ssl.SSLWantReadError:
1737 pass
1738 if s_out.pending:
1739 c_in.write(s_out.read())
1740 # Now the handshakes should be complete (don't raise WantReadError)
1741 client.do_handshake()
1742 server.do_handshake()
1743
1744 # Now if we unwrap one side unilaterally, it should send close-notify
1745 # and raise WantReadError:
1746 with self.assertRaises(ssl.SSLWantReadError):
1747 client.unwrap()
1748
1749 # But server.unwrap() does not raise, because it reads the client's
1750 # close-notify:
1751 s_in.write(c_out.read())
1752 server.unwrap()
1753
1754 # And now that the client gets the server's close-notify, it doesn't
1755 # raise either.
1756 c_in.write(s_out.read())
1757 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001758
Martin Panter3840b2a2016-03-27 01:53:46 +00001759class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001760 """Tests that connect to a simple server running in the background"""
1761
1762 def setUp(self):
1763 server = ThreadedEchoServer(SIGNED_CERTFILE)
1764 self.server_addr = (HOST, server.port)
1765 server.__enter__()
1766 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001767
Antoine Pitrou480a1242010-04-28 21:37:09 +00001768 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001769 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001770 cert_reqs=ssl.CERT_NONE) as s:
1771 s.connect(self.server_addr)
1772 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001773 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001774
Martin Panter3840b2a2016-03-27 01:53:46 +00001775 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001776 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001777 cert_reqs=ssl.CERT_REQUIRED,
1778 ca_certs=SIGNING_CA) as s:
1779 s.connect(self.server_addr)
1780 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001781 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001782
Martin Panter3840b2a2016-03-27 01:53:46 +00001783 def test_connect_fail(self):
1784 # This should fail because we have no verification certs. Connection
1785 # failure crashes ThreadedEchoServer, so run this in an independent
1786 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001787 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001788 cert_reqs=ssl.CERT_REQUIRED)
1789 self.addCleanup(s.close)
1790 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1791 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001792
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001793 def test_connect_ex(self):
1794 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001795 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001796 cert_reqs=ssl.CERT_REQUIRED,
1797 ca_certs=SIGNING_CA)
1798 self.addCleanup(s.close)
1799 self.assertEqual(0, s.connect_ex(self.server_addr))
1800 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001801
1802 def test_non_blocking_connect_ex(self):
1803 # Issue #11326: non-blocking connect_ex() should allow handshake
1804 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001805 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001806 cert_reqs=ssl.CERT_REQUIRED,
1807 ca_certs=SIGNING_CA,
1808 do_handshake_on_connect=False)
1809 self.addCleanup(s.close)
1810 s.setblocking(False)
1811 rc = s.connect_ex(self.server_addr)
1812 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1813 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1814 # Wait for connect to finish
1815 select.select([], [s], [], 5.0)
1816 # Non-blocking handshake
1817 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001818 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001819 s.do_handshake()
1820 break
1821 except ssl.SSLWantReadError:
1822 select.select([s], [], [], 5.0)
1823 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001824 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001825 # SSL established
1826 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001827
Antoine Pitrou152efa22010-05-16 18:19:27 +00001828 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001830 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001831 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1832 s.connect(self.server_addr)
1833 self.assertEqual({}, s.getpeercert())
1834 # Same with a server hostname
1835 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1836 server_hostname="dummy") as s:
1837 s.connect(self.server_addr)
1838 ctx.verify_mode = ssl.CERT_REQUIRED
1839 # This should succeed because we specify the root cert
1840 ctx.load_verify_locations(SIGNING_CA)
1841 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1842 s.connect(self.server_addr)
1843 cert = s.getpeercert()
1844 self.assertTrue(cert)
1845
1846 def test_connect_with_context_fail(self):
1847 # This should fail because we have no verification certs. Connection
1848 # failure crashes ThreadedEchoServer, so run this in an independent
1849 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001850 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001851 ctx.verify_mode = ssl.CERT_REQUIRED
1852 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1853 self.addCleanup(s.close)
1854 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1855 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001856
1857 def test_connect_capath(self):
1858 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001859 # NOTE: the subject hashing algorithm has been changed between
1860 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1861 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001862 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001863 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001864 ctx.verify_mode = ssl.CERT_REQUIRED
1865 ctx.load_verify_locations(capath=CAPATH)
1866 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1867 s.connect(self.server_addr)
1868 cert = s.getpeercert()
1869 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001870
Martin Panter3840b2a2016-03-27 01:53:46 +00001871 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001872 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001873 ctx.verify_mode = ssl.CERT_REQUIRED
1874 ctx.load_verify_locations(capath=BYTES_CAPATH)
1875 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1876 s.connect(self.server_addr)
1877 cert = s.getpeercert()
1878 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001879
Christian Heimesefff7062013-11-21 03:35:02 +01001880 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001881 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001882 pem = f.read()
1883 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001884 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001885 ctx.verify_mode = ssl.CERT_REQUIRED
1886 ctx.load_verify_locations(cadata=pem)
1887 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1888 s.connect(self.server_addr)
1889 cert = s.getpeercert()
1890 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001891
Martin Panter3840b2a2016-03-27 01:53:46 +00001892 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001893 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001894 ctx.verify_mode = ssl.CERT_REQUIRED
1895 ctx.load_verify_locations(cadata=der)
1896 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1897 s.connect(self.server_addr)
1898 cert = s.getpeercert()
1899 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001900
Antoine Pitroue3220242010-04-24 11:13:53 +00001901 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1902 def test_makefile_close(self):
1903 # Issue #5238: creating a file-like object with makefile() shouldn't
1904 # delay closing the underlying "real socket" (here tested with its
1905 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001906 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001907 ss.connect(self.server_addr)
1908 fd = ss.fileno()
1909 f = ss.makefile()
1910 f.close()
1911 # The fd is still open
1912 os.read(fd, 0)
1913 # Closing the SSL socket should close the fd too
1914 ss.close()
1915 gc.collect()
1916 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001917 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001918 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001919
Antoine Pitrou480a1242010-04-28 21:37:09 +00001920 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001921 s = socket.socket(socket.AF_INET)
1922 s.connect(self.server_addr)
1923 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001924 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001925 cert_reqs=ssl.CERT_NONE,
1926 do_handshake_on_connect=False)
1927 self.addCleanup(s.close)
1928 count = 0
1929 while True:
1930 try:
1931 count += 1
1932 s.do_handshake()
1933 break
1934 except ssl.SSLWantReadError:
1935 select.select([s], [], [])
1936 except ssl.SSLWantWriteError:
1937 select.select([], [s], [])
1938 if support.verbose:
1939 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001940
Antoine Pitrou480a1242010-04-28 21:37:09 +00001941 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001943
Martin Panter3840b2a2016-03-27 01:53:46 +00001944 def test_get_server_certificate_fail(self):
1945 # Connection failure crashes ThreadedEchoServer, so run this in an
1946 # independent test method
1947 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001948
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001949 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001950 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001951 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1952 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001953 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001954 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1955 s.connect(self.server_addr)
1956 # Error checking can happen at instantiation or when connecting
1957 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1958 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001959 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001960 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1961 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001962
Christian Heimes9a5395a2013-06-17 15:44:12 +02001963 def test_get_ca_certs_capath(self):
1964 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001965 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001966 ctx.load_verify_locations(capath=CAPATH)
1967 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001968 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1969 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001970 s.connect(self.server_addr)
1971 cert = s.getpeercert()
1972 self.assertTrue(cert)
1973 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001974
Christian Heimes575596e2013-12-15 21:49:17 +01001975 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001976 def test_context_setget(self):
1977 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001978 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1979 ctx1.load_verify_locations(capath=CAPATH)
1980 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1981 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001982 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001983 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001984 ss.connect(self.server_addr)
1985 self.assertIs(ss.context, ctx1)
1986 self.assertIs(ss._sslobj.context, ctx1)
1987 ss.context = ctx2
1988 self.assertIs(ss.context, ctx2)
1989 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001990
1991 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1992 # A simple IO loop. Call func(*args) depending on the error we get
1993 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1994 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001995 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001996 count = 0
1997 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001998 if time.monotonic() > deadline:
1999 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002000 errno = None
2001 count += 1
2002 try:
2003 ret = func(*args)
2004 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002005 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002006 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002007 raise
2008 errno = e.errno
2009 # Get any data from the outgoing BIO irrespective of any error, and
2010 # send it to the socket.
2011 buf = outgoing.read()
2012 sock.sendall(buf)
2013 # If there's no error, we're done. For WANT_READ, we need to get
2014 # data from the socket and put it in the incoming BIO.
2015 if errno is None:
2016 break
2017 elif errno == ssl.SSL_ERROR_WANT_READ:
2018 buf = sock.recv(32768)
2019 if buf:
2020 incoming.write(buf)
2021 else:
2022 incoming.write_eof()
2023 if support.verbose:
2024 sys.stdout.write("Needed %d calls to complete %s().\n"
2025 % (count, func.__name__))
2026 return ret
2027
Martin Panter3840b2a2016-03-27 01:53:46 +00002028 def test_bio_handshake(self):
2029 sock = socket.socket(socket.AF_INET)
2030 self.addCleanup(sock.close)
2031 sock.connect(self.server_addr)
2032 incoming = ssl.MemoryBIO()
2033 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002034 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2035 self.assertTrue(ctx.check_hostname)
2036 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002037 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002038 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2039 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002040 self.assertIs(sslobj._sslobj.owner, sslobj)
2041 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002042 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002043 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002044 self.assertRaises(ValueError, sslobj.getpeercert)
2045 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2046 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2047 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2048 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002049 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002050 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002051 self.assertTrue(sslobj.getpeercert())
2052 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2053 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2054 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002055 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002056 except ssl.SSLSyscallError:
2057 # If the server shuts down the TCP connection without sending a
2058 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2059 pass
2060 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2061
2062 def test_bio_read_write_data(self):
2063 sock = socket.socket(socket.AF_INET)
2064 self.addCleanup(sock.close)
2065 sock.connect(self.server_addr)
2066 incoming = ssl.MemoryBIO()
2067 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002068 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002069 ctx.verify_mode = ssl.CERT_NONE
2070 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2071 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2072 req = b'FOO\n'
2073 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2074 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2075 self.assertEqual(buf, b'foo\n')
2076 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002077
2078
Martin Panter3840b2a2016-03-27 01:53:46 +00002079class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002080
Martin Panter3840b2a2016-03-27 01:53:46 +00002081 def test_timeout_connect_ex(self):
2082 # Issue #12065: on a timeout, connect_ex() should return the original
2083 # errno (mimicking the behaviour of non-SSL sockets).
2084 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002085 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002086 cert_reqs=ssl.CERT_REQUIRED,
2087 do_handshake_on_connect=False)
2088 self.addCleanup(s.close)
2089 s.settimeout(0.0000001)
2090 rc = s.connect_ex((REMOTE_HOST, 443))
2091 if rc == 0:
2092 self.skipTest("REMOTE_HOST responded too quickly")
2093 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2094
2095 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2096 def test_get_server_certificate_ipv6(self):
2097 with support.transient_internet('ipv6.google.com'):
2098 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2099 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2100
Martin Panter3840b2a2016-03-27 01:53:46 +00002101
2102def _test_get_server_certificate(test, host, port, cert=None):
2103 pem = ssl.get_server_certificate((host, port))
2104 if not pem:
2105 test.fail("No server certificate on %s:%s!" % (host, port))
2106
2107 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2108 if not pem:
2109 test.fail("No server certificate on %s:%s!" % (host, port))
2110 if support.verbose:
2111 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2112
2113def _test_get_server_certificate_fail(test, host, port):
2114 try:
2115 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2116 except ssl.SSLError as x:
2117 #should fail
2118 if support.verbose:
2119 sys.stdout.write("%s\n" % x)
2120 else:
2121 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2122
2123
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002124from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002125
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002126class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002127
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002128 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002129
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002130 """A mildly complicated class, because we want it to work both
2131 with and without the SSL wrapper around the socket connection, so
2132 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002133
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002134 def __init__(self, server, connsock, addr):
2135 self.server = server
2136 self.running = False
2137 self.sock = connsock
2138 self.addr = addr
2139 self.sock.setblocking(1)
2140 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002141 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002142 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002143
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002144 def wrap_conn(self):
2145 try:
2146 self.sslconn = self.server.context.wrap_socket(
2147 self.sock, server_side=True)
2148 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2149 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Christian Heimes529525f2018-05-23 22:24:45 +02002150 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002151 # We treat ConnectionResetError as though it were an
2152 # SSLError - OpenSSL on Ubuntu abruptly closes the
2153 # connection when asked to use an unsupported protocol.
2154 #
Christian Heimes529525f2018-05-23 22:24:45 +02002155 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2156 # tries to send session tickets after handshake.
2157 # https://github.com/openssl/openssl/issues/6342
2158 self.server.conn_errors.append(str(e))
2159 if self.server.chatty:
2160 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2161 self.running = False
2162 self.close()
2163 return False
2164 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002165 # OSError may occur with wrong protocols, e.g. both
2166 # sides use PROTOCOL_TLS_SERVER.
2167 #
2168 # XXX Various errors can have happened here, for example
2169 # a mismatching protocol version, an invalid certificate,
2170 # or a low-level bug. This should be made more discriminating.
2171 #
2172 # bpo-31323: Store the exception as string to prevent
2173 # a reference leak: server -> conn_errors -> exception
2174 # -> traceback -> self (ConnectionHandler) -> server
2175 self.server.conn_errors.append(str(e))
2176 if self.server.chatty:
2177 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2178 self.running = False
2179 self.server.stop()
2180 self.close()
2181 return False
2182 else:
2183 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2184 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2185 cert = self.sslconn.getpeercert()
2186 if support.verbose and self.server.chatty:
2187 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2188 cert_binary = self.sslconn.getpeercert(True)
2189 if support.verbose and self.server.chatty:
2190 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2191 cipher = self.sslconn.cipher()
2192 if support.verbose and self.server.chatty:
2193 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2194 sys.stdout.write(" server: selected protocol is now "
2195 + str(self.sslconn.selected_npn_protocol()) + "\n")
2196 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002197
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002198 def read(self):
2199 if self.sslconn:
2200 return self.sslconn.read()
2201 else:
2202 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002203
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002204 def write(self, bytes):
2205 if self.sslconn:
2206 return self.sslconn.write(bytes)
2207 else:
2208 return self.sock.send(bytes)
2209
2210 def close(self):
2211 if self.sslconn:
2212 self.sslconn.close()
2213 else:
2214 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002215
Antoine Pitrou480a1242010-04-28 21:37:09 +00002216 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002217 self.running = True
2218 if not self.server.starttls_server:
2219 if not self.wrap_conn():
2220 return
2221 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002222 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002223 msg = self.read()
2224 stripped = msg.strip()
2225 if not stripped:
2226 # eof, so quit this handler
2227 self.running = False
2228 try:
2229 self.sock = self.sslconn.unwrap()
2230 except OSError:
2231 # Many tests shut the TCP connection down
2232 # without an SSL shutdown. This causes
2233 # unwrap() to raise OSError with errno=0!
2234 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002235 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002236 self.sslconn = None
2237 self.close()
2238 elif stripped == b'over':
2239 if support.verbose and self.server.connectionchatty:
2240 sys.stdout.write(" server: client closed connection\n")
2241 self.close()
2242 return
2243 elif (self.server.starttls_server and
2244 stripped == b'STARTTLS'):
2245 if support.verbose and self.server.connectionchatty:
2246 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2247 self.write(b"OK\n")
2248 if not self.wrap_conn():
2249 return
2250 elif (self.server.starttls_server and self.sslconn
2251 and stripped == b'ENDTLS'):
2252 if support.verbose and self.server.connectionchatty:
2253 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2254 self.write(b"OK\n")
2255 self.sock = self.sslconn.unwrap()
2256 self.sslconn = None
2257 if support.verbose and self.server.connectionchatty:
2258 sys.stdout.write(" server: connection is now unencrypted...\n")
2259 elif stripped == b'CB tls-unique':
2260 if support.verbose and self.server.connectionchatty:
2261 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2262 data = self.sslconn.get_channel_binding("tls-unique")
2263 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002264 elif stripped == b'PHA':
2265 if support.verbose and self.server.connectionchatty:
2266 sys.stdout.write(" server: initiating post handshake auth\n")
2267 try:
2268 self.sslconn.verify_client_post_handshake()
2269 except ssl.SSLError as e:
2270 self.write(repr(e).encode("us-ascii") + b"\n")
2271 else:
2272 self.write(b"OK\n")
2273 elif stripped == b'HASCERT':
2274 if self.sslconn.getpeercert() is not None:
2275 self.write(b'TRUE\n')
2276 else:
2277 self.write(b'FALSE\n')
2278 elif stripped == b'GETCERT':
2279 cert = self.sslconn.getpeercert()
2280 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002281 else:
2282 if (support.verbose and
2283 self.server.connectionchatty):
2284 ctype = (self.sslconn and "encrypted") or "unencrypted"
2285 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2286 % (msg, ctype, msg.lower(), ctype))
2287 self.write(msg.lower())
Christian Heimes529525f2018-05-23 22:24:45 +02002288 except ConnectionResetError:
2289 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2290 # when connection is not shut down gracefully.
2291 if self.server.chatty and support.verbose:
2292 sys.stdout.write(
2293 " Connection reset by peer: {}\n".format(
2294 self.addr)
2295 )
2296 self.close()
2297 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002298 except OSError:
2299 if self.server.chatty:
2300 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002301 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002302 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002303
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002304 # normally, we'd just stop here, but for the test
2305 # harness, we want to stop the server
2306 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002307
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002308 def __init__(self, certificate=None, ssl_version=None,
2309 certreqs=None, cacerts=None,
2310 chatty=True, connectionchatty=False, starttls_server=False,
2311 npn_protocols=None, alpn_protocols=None,
2312 ciphers=None, context=None):
2313 if context:
2314 self.context = context
2315 else:
2316 self.context = ssl.SSLContext(ssl_version
2317 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002318 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002319 self.context.verify_mode = (certreqs if certreqs is not None
2320 else ssl.CERT_NONE)
2321 if cacerts:
2322 self.context.load_verify_locations(cacerts)
2323 if certificate:
2324 self.context.load_cert_chain(certificate)
2325 if npn_protocols:
2326 self.context.set_npn_protocols(npn_protocols)
2327 if alpn_protocols:
2328 self.context.set_alpn_protocols(alpn_protocols)
2329 if ciphers:
2330 self.context.set_ciphers(ciphers)
2331 self.chatty = chatty
2332 self.connectionchatty = connectionchatty
2333 self.starttls_server = starttls_server
2334 self.sock = socket.socket()
2335 self.port = support.bind_port(self.sock)
2336 self.flag = None
2337 self.active = False
2338 self.selected_npn_protocols = []
2339 self.selected_alpn_protocols = []
2340 self.shared_ciphers = []
2341 self.conn_errors = []
2342 threading.Thread.__init__(self)
2343 self.daemon = True
2344
2345 def __enter__(self):
2346 self.start(threading.Event())
2347 self.flag.wait()
2348 return self
2349
2350 def __exit__(self, *args):
2351 self.stop()
2352 self.join()
2353
2354 def start(self, flag=None):
2355 self.flag = flag
2356 threading.Thread.start(self)
2357
2358 def run(self):
2359 self.sock.settimeout(0.05)
2360 self.sock.listen()
2361 self.active = True
2362 if self.flag:
2363 # signal an event
2364 self.flag.set()
2365 while self.active:
2366 try:
2367 newconn, connaddr = self.sock.accept()
2368 if support.verbose and self.chatty:
2369 sys.stdout.write(' server: new connection from '
2370 + repr(connaddr) + '\n')
2371 handler = self.ConnectionHandler(self, newconn, connaddr)
2372 handler.start()
2373 handler.join()
2374 except socket.timeout:
2375 pass
2376 except KeyboardInterrupt:
2377 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002378 except BaseException as e:
2379 if support.verbose and self.chatty:
2380 sys.stdout.write(
2381 ' connection handling failed: ' + repr(e) + '\n')
2382
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002383 self.sock.close()
2384
2385 def stop(self):
2386 self.active = False
2387
2388class AsyncoreEchoServer(threading.Thread):
2389
2390 # this one's based on asyncore.dispatcher
2391
2392 class EchoServer (asyncore.dispatcher):
2393
2394 class ConnectionHandler(asyncore.dispatcher_with_send):
2395
2396 def __init__(self, conn, certfile):
2397 self.socket = test_wrap_socket(conn, server_side=True,
2398 certfile=certfile,
2399 do_handshake_on_connect=False)
2400 asyncore.dispatcher_with_send.__init__(self, self.socket)
2401 self._ssl_accepting = True
2402 self._do_ssl_handshake()
2403
2404 def readable(self):
2405 if isinstance(self.socket, ssl.SSLSocket):
2406 while self.socket.pending() > 0:
2407 self.handle_read_event()
2408 return True
2409
2410 def _do_ssl_handshake(self):
2411 try:
2412 self.socket.do_handshake()
2413 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2414 return
2415 except ssl.SSLEOFError:
2416 return self.handle_close()
2417 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002418 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002419 except OSError as err:
2420 if err.args[0] == errno.ECONNABORTED:
2421 return self.handle_close()
2422 else:
2423 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002424
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002425 def handle_read(self):
2426 if self._ssl_accepting:
2427 self._do_ssl_handshake()
2428 else:
2429 data = self.recv(1024)
2430 if support.verbose:
2431 sys.stdout.write(" server: read %s from client\n" % repr(data))
2432 if not data:
2433 self.close()
2434 else:
2435 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002436
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002437 def handle_close(self):
2438 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002439 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002440 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002441
2442 def handle_error(self):
2443 raise
2444
Trent Nelson78520002008-04-10 20:54:35 +00002445 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002446 self.certfile = certfile
2447 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2448 self.port = support.bind_port(sock, '')
2449 asyncore.dispatcher.__init__(self, sock)
2450 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002451
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002452 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002453 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002454 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2455 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002456
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002457 def handle_error(self):
2458 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002459
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002460 def __init__(self, certfile):
2461 self.flag = None
2462 self.active = False
2463 self.server = self.EchoServer(certfile)
2464 self.port = self.server.port
2465 threading.Thread.__init__(self)
2466 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002467
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002468 def __str__(self):
2469 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002470
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002471 def __enter__(self):
2472 self.start(threading.Event())
2473 self.flag.wait()
2474 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002475
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002476 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002477 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002478 sys.stdout.write(" cleanup: stopping server.\n")
2479 self.stop()
2480 if support.verbose:
2481 sys.stdout.write(" cleanup: joining server thread.\n")
2482 self.join()
2483 if support.verbose:
2484 sys.stdout.write(" cleanup: successfully joined.\n")
2485 # make sure that ConnectionHandler is removed from socket_map
2486 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002487
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002488 def start (self, flag=None):
2489 self.flag = flag
2490 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002491
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002492 def run(self):
2493 self.active = True
2494 if self.flag:
2495 self.flag.set()
2496 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002497 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002498 asyncore.loop(1)
2499 except:
2500 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002501
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002502 def stop(self):
2503 self.active = False
2504 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002505
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002506def server_params_test(client_context, server_context, indata=b"FOO\n",
2507 chatty=True, connectionchatty=False, sni_name=None,
2508 session=None):
2509 """
2510 Launch a server, connect a client to it and try various reads
2511 and writes.
2512 """
2513 stats = {}
2514 server = ThreadedEchoServer(context=server_context,
2515 chatty=chatty,
2516 connectionchatty=False)
2517 with server:
2518 with client_context.wrap_socket(socket.socket(),
2519 server_hostname=sni_name, session=session) as s:
2520 s.connect((HOST, server.port))
2521 for arg in [indata, bytearray(indata), memoryview(indata)]:
2522 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002523 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002524 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002525 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002526 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002527 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002528 if connectionchatty:
2529 if support.verbose:
2530 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002531 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002532 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002533 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2534 % (outdata[:20], len(outdata),
2535 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002536 s.write(b"over\n")
2537 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002538 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002539 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002540 stats.update({
2541 'compression': s.compression(),
2542 'cipher': s.cipher(),
2543 'peercert': s.getpeercert(),
2544 'client_alpn_protocol': s.selected_alpn_protocol(),
2545 'client_npn_protocol': s.selected_npn_protocol(),
2546 'version': s.version(),
2547 'session_reused': s.session_reused,
2548 'session': s.session,
2549 })
2550 s.close()
2551 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2552 stats['server_npn_protocols'] = server.selected_npn_protocols
2553 stats['server_shared_ciphers'] = server.shared_ciphers
2554 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002555
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002556def try_protocol_combo(server_protocol, client_protocol, expect_success,
2557 certsreqs=None, server_options=0, client_options=0):
2558 """
2559 Try to SSL-connect using *client_protocol* to *server_protocol*.
2560 If *expect_success* is true, assert that the connection succeeds,
2561 if it's false, assert that the connection fails.
2562 Also, if *expect_success* is a string, assert that it is the protocol
2563 version actually used by the connection.
2564 """
2565 if certsreqs is None:
2566 certsreqs = ssl.CERT_NONE
2567 certtype = {
2568 ssl.CERT_NONE: "CERT_NONE",
2569 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2570 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2571 }[certsreqs]
2572 if support.verbose:
2573 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2574 sys.stdout.write(formatstr %
2575 (ssl.get_protocol_name(client_protocol),
2576 ssl.get_protocol_name(server_protocol),
2577 certtype))
2578 client_context = ssl.SSLContext(client_protocol)
2579 client_context.options |= client_options
2580 server_context = ssl.SSLContext(server_protocol)
2581 server_context.options |= server_options
2582
2583 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2584 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2585 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002586 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002587 client_context.set_ciphers("ALL")
2588
2589 for ctx in (client_context, server_context):
2590 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002591 ctx.load_cert_chain(SIGNED_CERTFILE)
2592 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002593 try:
2594 stats = server_params_test(client_context, server_context,
2595 chatty=False, connectionchatty=False)
2596 # Protocol mismatch can result in either an SSLError, or a
2597 # "Connection reset by peer" error.
2598 except ssl.SSLError:
2599 if expect_success:
2600 raise
2601 except OSError as e:
2602 if expect_success or e.errno != errno.ECONNRESET:
2603 raise
2604 else:
2605 if not expect_success:
2606 raise AssertionError(
2607 "Client protocol %s succeeded with server protocol %s!"
2608 % (ssl.get_protocol_name(client_protocol),
2609 ssl.get_protocol_name(server_protocol)))
2610 elif (expect_success is not True
2611 and expect_success != stats['version']):
2612 raise AssertionError("version mismatch: expected %r, got %r"
2613 % (expect_success, stats['version']))
2614
2615
2616class ThreadedTests(unittest.TestCase):
2617
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002618 def test_echo(self):
2619 """Basic test of an SSL client connecting to a server"""
2620 if support.verbose:
2621 sys.stdout.write("\n")
2622 for protocol in PROTOCOLS:
2623 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2624 continue
2625 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2626 context = ssl.SSLContext(protocol)
2627 context.load_cert_chain(CERTFILE)
2628 server_params_test(context, context,
2629 chatty=True, connectionchatty=True)
2630
Christian Heimesa170fa12017-09-15 20:27:30 +02002631 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002632
2633 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2634 server_params_test(client_context=client_context,
2635 server_context=server_context,
2636 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002637 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002638
2639 client_context.check_hostname = False
2640 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2641 with self.assertRaises(ssl.SSLError) as e:
2642 server_params_test(client_context=server_context,
2643 server_context=client_context,
2644 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002645 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002646 self.assertIn('called a function you should not call',
2647 str(e.exception))
2648
2649 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2650 with self.assertRaises(ssl.SSLError) as e:
2651 server_params_test(client_context=server_context,
2652 server_context=server_context,
2653 chatty=True, connectionchatty=True)
2654 self.assertIn('called a function you should not call',
2655 str(e.exception))
2656
2657 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2658 with self.assertRaises(ssl.SSLError) as e:
2659 server_params_test(client_context=server_context,
2660 server_context=client_context,
2661 chatty=True, connectionchatty=True)
2662 self.assertIn('called a function you should not call',
2663 str(e.exception))
2664
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002665 def test_getpeercert(self):
2666 if support.verbose:
2667 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002668
2669 client_context, server_context, hostname = testing_context()
2670 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002671 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002672 with client_context.wrap_socket(socket.socket(),
2673 do_handshake_on_connect=False,
2674 server_hostname=hostname) as s:
2675 s.connect((HOST, server.port))
2676 # getpeercert() raise ValueError while the handshake isn't
2677 # done.
2678 with self.assertRaises(ValueError):
2679 s.getpeercert()
2680 s.do_handshake()
2681 cert = s.getpeercert()
2682 self.assertTrue(cert, "Can't get peer certificate.")
2683 cipher = s.cipher()
2684 if support.verbose:
2685 sys.stdout.write(pprint.pformat(cert) + '\n')
2686 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2687 if 'subject' not in cert:
2688 self.fail("No subject field in certificate: %s." %
2689 pprint.pformat(cert))
2690 if ((('organizationName', 'Python Software Foundation'),)
2691 not in cert['subject']):
2692 self.fail(
2693 "Missing or invalid 'organizationName' field in certificate subject; "
2694 "should be 'Python Software Foundation'.")
2695 self.assertIn('notBefore', cert)
2696 self.assertIn('notAfter', cert)
2697 before = ssl.cert_time_to_seconds(cert['notBefore'])
2698 after = ssl.cert_time_to_seconds(cert['notAfter'])
2699 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002700
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002701 @unittest.skipUnless(have_verify_flags(),
2702 "verify_flags need OpenSSL > 0.9.8")
2703 def test_crl_check(self):
2704 if support.verbose:
2705 sys.stdout.write("\n")
2706
Christian Heimesa170fa12017-09-15 20:27:30 +02002707 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002708
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002709 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002710 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002711
2712 # VERIFY_DEFAULT should pass
2713 server = ThreadedEchoServer(context=server_context, chatty=True)
2714 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002715 with client_context.wrap_socket(socket.socket(),
2716 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002717 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002718 cert = s.getpeercert()
2719 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002720
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002721 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002722 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002723
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002724 server = ThreadedEchoServer(context=server_context, chatty=True)
2725 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002726 with client_context.wrap_socket(socket.socket(),
2727 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002728 with self.assertRaisesRegex(ssl.SSLError,
2729 "certificate verify failed"):
2730 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002731
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002732 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002733 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002734
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002735 server = ThreadedEchoServer(context=server_context, chatty=True)
2736 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002737 with client_context.wrap_socket(socket.socket(),
2738 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002739 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002740 cert = s.getpeercert()
2741 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002742
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002743 def test_check_hostname(self):
2744 if support.verbose:
2745 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002746
Christian Heimesa170fa12017-09-15 20:27:30 +02002747 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002748
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002749 # correct hostname should verify
2750 server = ThreadedEchoServer(context=server_context, chatty=True)
2751 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002752 with client_context.wrap_socket(socket.socket(),
2753 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002754 s.connect((HOST, server.port))
2755 cert = s.getpeercert()
2756 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002757
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002758 # incorrect hostname should raise an exception
2759 server = ThreadedEchoServer(context=server_context, chatty=True)
2760 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002761 with client_context.wrap_socket(socket.socket(),
2762 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002763 with self.assertRaisesRegex(
2764 ssl.CertificateError,
2765 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002766 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002767
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002768 # missing server_hostname arg should cause an exception, too
2769 server = ThreadedEchoServer(context=server_context, chatty=True)
2770 with server:
2771 with socket.socket() as s:
2772 with self.assertRaisesRegex(ValueError,
2773 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002774 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002775
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002776 def test_ecc_cert(self):
2777 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2778 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002779 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002780 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2781
2782 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2783 # load ECC cert
2784 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2785
2786 # correct hostname should verify
2787 server = ThreadedEchoServer(context=server_context, chatty=True)
2788 with server:
2789 with client_context.wrap_socket(socket.socket(),
2790 server_hostname=hostname) as s:
2791 s.connect((HOST, server.port))
2792 cert = s.getpeercert()
2793 self.assertTrue(cert, "Can't get peer certificate.")
2794 cipher = s.cipher()[0].split('-')
2795 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2796
2797 def test_dual_rsa_ecc(self):
2798 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2799 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002800 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2801 # algorithms.
2802 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002803 # only ECDSA certs
2804 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2805 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2806
2807 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2808 # load ECC and RSA key/cert pairs
2809 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2810 server_context.load_cert_chain(SIGNED_CERTFILE)
2811
2812 # correct hostname should verify
2813 server = ThreadedEchoServer(context=server_context, chatty=True)
2814 with server:
2815 with client_context.wrap_socket(socket.socket(),
2816 server_hostname=hostname) as s:
2817 s.connect((HOST, server.port))
2818 cert = s.getpeercert()
2819 self.assertTrue(cert, "Can't get peer certificate.")
2820 cipher = s.cipher()[0].split('-')
2821 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2822
Christian Heimes66e57422018-01-29 14:25:13 +01002823 def test_check_hostname_idn(self):
2824 if support.verbose:
2825 sys.stdout.write("\n")
2826
Christian Heimes11a14932018-02-24 02:35:08 +01002827 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002828 server_context.load_cert_chain(IDNSANSFILE)
2829
Christian Heimes11a14932018-02-24 02:35:08 +01002830 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002831 context.verify_mode = ssl.CERT_REQUIRED
2832 context.check_hostname = True
2833 context.load_verify_locations(SIGNING_CA)
2834
2835 # correct hostname should verify, when specified in several
2836 # different ways
2837 idn_hostnames = [
2838 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002839 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002840 ('xn--knig-5qa.idn.pythontest.net',
2841 'xn--knig-5qa.idn.pythontest.net'),
2842 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002843 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002844
2845 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002846 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002847 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2848 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2849 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002850 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2851
2852 # ('königsgäßchen.idna2008.pythontest.net',
2853 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2854 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2855 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2856 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2857 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2858
Christian Heimes66e57422018-01-29 14:25:13 +01002859 ]
2860 for server_hostname, expected_hostname in idn_hostnames:
2861 server = ThreadedEchoServer(context=server_context, chatty=True)
2862 with server:
2863 with context.wrap_socket(socket.socket(),
2864 server_hostname=server_hostname) as s:
2865 self.assertEqual(s.server_hostname, expected_hostname)
2866 s.connect((HOST, server.port))
2867 cert = s.getpeercert()
2868 self.assertEqual(s.server_hostname, expected_hostname)
2869 self.assertTrue(cert, "Can't get peer certificate.")
2870
Christian Heimes66e57422018-01-29 14:25:13 +01002871 # incorrect hostname should raise an exception
2872 server = ThreadedEchoServer(context=server_context, chatty=True)
2873 with server:
2874 with context.wrap_socket(socket.socket(),
2875 server_hostname="python.example.org") as s:
2876 with self.assertRaises(ssl.CertificateError):
2877 s.connect((HOST, server.port))
2878
Christian Heimes529525f2018-05-23 22:24:45 +02002879 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002880 """Connecting when the server rejects the client's certificate
2881
2882 Launch a server with CERT_REQUIRED, and check that trying to
2883 connect to it with a wrong client certificate fails.
2884 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002885 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002886 # load client cert that is not signed by trusted CA
2887 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002888 # require TLS client authentication
2889 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002890 # TLS 1.3 has different handshake
2891 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002892
2893 server = ThreadedEchoServer(
2894 context=server_context, chatty=True, connectionchatty=True,
2895 )
2896
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002897 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002898 client_context.wrap_socket(socket.socket(),
2899 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002900 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002901 # Expect either an SSL error about the server rejecting
2902 # the connection, or a low-level connection reset (which
2903 # sometimes happens on Windows)
2904 s.connect((HOST, server.port))
2905 except ssl.SSLError as e:
2906 if support.verbose:
2907 sys.stdout.write("\nSSLError is %r\n" % e)
2908 except OSError as e:
2909 if e.errno != errno.ECONNRESET:
2910 raise
2911 if support.verbose:
2912 sys.stdout.write("\nsocket.error is %r\n" % e)
2913 else:
2914 self.fail("Use of invalid cert should have failed!")
2915
Christian Heimes529525f2018-05-23 22:24:45 +02002916 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2917 def test_wrong_cert_tls13(self):
2918 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002919 # load client cert that is not signed by trusted CA
2920 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02002921 server_context.verify_mode = ssl.CERT_REQUIRED
2922 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2923 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2924
2925 server = ThreadedEchoServer(
2926 context=server_context, chatty=True, connectionchatty=True,
2927 )
2928 with server, \
2929 client_context.wrap_socket(socket.socket(),
2930 server_hostname=hostname) as s:
2931 # TLS 1.3 perform client cert exchange after handshake
2932 s.connect((HOST, server.port))
2933 try:
2934 s.write(b'data')
2935 s.read(4)
2936 except ssl.SSLError as e:
2937 if support.verbose:
2938 sys.stdout.write("\nSSLError is %r\n" % e)
2939 except OSError as e:
2940 if e.errno != errno.ECONNRESET:
2941 raise
2942 if support.verbose:
2943 sys.stdout.write("\nsocket.error is %r\n" % e)
2944 else:
2945 self.fail("Use of invalid cert should have failed!")
2946
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002947 def test_rude_shutdown(self):
2948 """A brutal shutdown of an SSL server should raise an OSError
2949 in the client when attempting handshake.
2950 """
2951 listener_ready = threading.Event()
2952 listener_gone = threading.Event()
2953
2954 s = socket.socket()
2955 port = support.bind_port(s, HOST)
2956
2957 # `listener` runs in a thread. It sits in an accept() until
2958 # the main thread connects. Then it rudely closes the socket,
2959 # and sets Event `listener_gone` to let the main thread know
2960 # the socket is gone.
2961 def listener():
2962 s.listen()
2963 listener_ready.set()
2964 newsock, addr = s.accept()
2965 newsock.close()
2966 s.close()
2967 listener_gone.set()
2968
2969 def connector():
2970 listener_ready.wait()
2971 with socket.socket() as c:
2972 c.connect((HOST, port))
2973 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002974 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002975 ssl_sock = test_wrap_socket(c)
2976 except OSError:
2977 pass
2978 else:
2979 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002980
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002981 t = threading.Thread(target=listener)
2982 t.start()
2983 try:
2984 connector()
2985 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002986 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002987
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002988 def test_ssl_cert_verify_error(self):
2989 if support.verbose:
2990 sys.stdout.write("\n")
2991
2992 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2993 server_context.load_cert_chain(SIGNED_CERTFILE)
2994
2995 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2996
2997 server = ThreadedEchoServer(context=server_context, chatty=True)
2998 with server:
2999 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003000 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003001 try:
3002 s.connect((HOST, server.port))
3003 except ssl.SSLError as e:
3004 msg = 'unable to get local issuer certificate'
3005 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3006 self.assertEqual(e.verify_code, 20)
3007 self.assertEqual(e.verify_message, msg)
3008 self.assertIn(msg, repr(e))
3009 self.assertIn('certificate verify failed', repr(e))
3010
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003011 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3012 "OpenSSL is compiled without SSLv2 support")
3013 def test_protocol_sslv2(self):
3014 """Connecting to an SSLv2 server with various client options"""
3015 if support.verbose:
3016 sys.stdout.write("\n")
3017 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3018 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3019 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003020 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003021 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3022 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3023 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3024 # SSLv23 client with specific SSL options
3025 if no_sslv2_implies_sslv3_hello():
3026 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003027 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003028 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003029 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003030 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003031 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003032 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003033
Christian Heimesa170fa12017-09-15 20:27:30 +02003034 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003035 """Connecting to an SSLv23 server with various client options"""
3036 if support.verbose:
3037 sys.stdout.write("\n")
3038 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003039 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003040 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003041 except OSError as x:
3042 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3043 if support.verbose:
3044 sys.stdout.write(
3045 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3046 % str(x))
3047 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003048 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3049 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3050 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003051
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003052 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003053 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3054 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3055 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003056
3057 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003058 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3059 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3060 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003061
3062 # Server with specific SSL options
3063 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003064 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003065 server_options=ssl.OP_NO_SSLv3)
3066 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003067 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003068 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003069 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003070 server_options=ssl.OP_NO_TLSv1)
3071
3072
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003073 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3074 "OpenSSL is compiled without SSLv3 support")
3075 def test_protocol_sslv3(self):
3076 """Connecting to an SSLv3 server with various client options"""
3077 if support.verbose:
3078 sys.stdout.write("\n")
3079 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3080 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3081 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3082 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3083 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003084 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003085 client_options=ssl.OP_NO_SSLv3)
3086 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3087 if no_sslv2_implies_sslv3_hello():
3088 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003089 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003090 False, client_options=ssl.OP_NO_SSLv2)
3091
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003092 def test_protocol_tlsv1(self):
3093 """Connecting to a TLSv1 server with various client options"""
3094 if support.verbose:
3095 sys.stdout.write("\n")
3096 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3097 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3098 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3099 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3100 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3101 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3102 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003103 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003104 client_options=ssl.OP_NO_TLSv1)
3105
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003106 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3107 "TLS version 1.1 not supported.")
3108 def test_protocol_tlsv1_1(self):
3109 """Connecting to a TLSv1.1 server with various client options.
3110 Testing against older TLS versions."""
3111 if support.verbose:
3112 sys.stdout.write("\n")
3113 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3114 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3115 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3116 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3117 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003118 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003119 client_options=ssl.OP_NO_TLSv1_1)
3120
Christian Heimesa170fa12017-09-15 20:27:30 +02003121 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003122 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3123 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3124
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003125 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3126 "TLS version 1.2 not supported.")
3127 def test_protocol_tlsv1_2(self):
3128 """Connecting to a TLSv1.2 server with various client options.
3129 Testing against older TLS versions."""
3130 if support.verbose:
3131 sys.stdout.write("\n")
3132 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3133 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3134 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3135 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3136 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3137 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3138 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003139 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003140 client_options=ssl.OP_NO_TLSv1_2)
3141
Christian Heimesa170fa12017-09-15 20:27:30 +02003142 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003143 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3144 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3145 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3146 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3147
3148 def test_starttls(self):
3149 """Switching from clear text to encrypted and back again."""
3150 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3151
3152 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003153 starttls_server=True,
3154 chatty=True,
3155 connectionchatty=True)
3156 wrapped = False
3157 with server:
3158 s = socket.socket()
3159 s.setblocking(1)
3160 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003161 if support.verbose:
3162 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003163 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003164 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003165 sys.stdout.write(
3166 " client: sending %r...\n" % indata)
3167 if wrapped:
3168 conn.write(indata)
3169 outdata = conn.read()
3170 else:
3171 s.send(indata)
3172 outdata = s.recv(1024)
3173 msg = outdata.strip().lower()
3174 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3175 # STARTTLS ok, switch to secure mode
3176 if support.verbose:
3177 sys.stdout.write(
3178 " client: read %r from server, starting TLS...\n"
3179 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003180 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003181 wrapped = True
3182 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3183 # ENDTLS ok, switch back to clear text
3184 if support.verbose:
3185 sys.stdout.write(
3186 " client: read %r from server, ending TLS...\n"
3187 % msg)
3188 s = conn.unwrap()
3189 wrapped = False
3190 else:
3191 if support.verbose:
3192 sys.stdout.write(
3193 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003194 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003195 sys.stdout.write(" client: closing connection.\n")
3196 if wrapped:
3197 conn.write(b"over\n")
3198 else:
3199 s.send(b"over\n")
3200 if wrapped:
3201 conn.close()
3202 else:
3203 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003204
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003205 def test_socketserver(self):
3206 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003207 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003208 # try to connect
3209 if support.verbose:
3210 sys.stdout.write('\n')
3211 with open(CERTFILE, 'rb') as f:
3212 d1 = f.read()
3213 d2 = ''
3214 # now fetch the same data from the HTTPS server
3215 url = 'https://localhost:%d/%s' % (
3216 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003217 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003218 f = urllib.request.urlopen(url, context=context)
3219 try:
3220 dlen = f.info().get("content-length")
3221 if dlen and (int(dlen) > 0):
3222 d2 = f.read(int(dlen))
3223 if support.verbose:
3224 sys.stdout.write(
3225 " client: read %d bytes from remote server '%s'\n"
3226 % (len(d2), server))
3227 finally:
3228 f.close()
3229 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003230
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003231 def test_asyncore_server(self):
3232 """Check the example asyncore integration."""
3233 if support.verbose:
3234 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003235
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003236 indata = b"FOO\n"
3237 server = AsyncoreEchoServer(CERTFILE)
3238 with server:
3239 s = test_wrap_socket(socket.socket())
3240 s.connect(('127.0.0.1', server.port))
3241 if support.verbose:
3242 sys.stdout.write(
3243 " client: sending %r...\n" % indata)
3244 s.write(indata)
3245 outdata = s.read()
3246 if support.verbose:
3247 sys.stdout.write(" client: read %r\n" % outdata)
3248 if outdata != indata.lower():
3249 self.fail(
3250 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3251 % (outdata[:20], len(outdata),
3252 indata[:20].lower(), len(indata)))
3253 s.write(b"over\n")
3254 if support.verbose:
3255 sys.stdout.write(" client: closing connection.\n")
3256 s.close()
3257 if support.verbose:
3258 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003259
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003260 def test_recv_send(self):
3261 """Test recv(), send() and friends."""
3262 if support.verbose:
3263 sys.stdout.write("\n")
3264
3265 server = ThreadedEchoServer(CERTFILE,
3266 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003267 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003268 cacerts=CERTFILE,
3269 chatty=True,
3270 connectionchatty=False)
3271 with server:
3272 s = test_wrap_socket(socket.socket(),
3273 server_side=False,
3274 certfile=CERTFILE,
3275 ca_certs=CERTFILE,
3276 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003277 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003278 s.connect((HOST, server.port))
3279 # helper methods for standardising recv* method signatures
3280 def _recv_into():
3281 b = bytearray(b"\0"*100)
3282 count = s.recv_into(b)
3283 return b[:count]
3284
3285 def _recvfrom_into():
3286 b = bytearray(b"\0"*100)
3287 count, addr = s.recvfrom_into(b)
3288 return b[:count]
3289
3290 # (name, method, expect success?, *args, return value func)
3291 send_methods = [
3292 ('send', s.send, True, [], len),
3293 ('sendto', s.sendto, False, ["some.address"], len),
3294 ('sendall', s.sendall, True, [], lambda x: None),
3295 ]
3296 # (name, method, whether to expect success, *args)
3297 recv_methods = [
3298 ('recv', s.recv, True, []),
3299 ('recvfrom', s.recvfrom, False, ["some.address"]),
3300 ('recv_into', _recv_into, True, []),
3301 ('recvfrom_into', _recvfrom_into, False, []),
3302 ]
3303 data_prefix = "PREFIX_"
3304
3305 for (meth_name, send_meth, expect_success, args,
3306 ret_val_meth) in send_methods:
3307 indata = (data_prefix + meth_name).encode('ascii')
3308 try:
3309 ret = send_meth(indata, *args)
3310 msg = "sending with {}".format(meth_name)
3311 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3312 outdata = s.read()
3313 if outdata != indata.lower():
3314 self.fail(
3315 "While sending with <<{name:s}>> bad data "
3316 "<<{outdata:r}>> ({nout:d}) received; "
3317 "expected <<{indata:r}>> ({nin:d})\n".format(
3318 name=meth_name, outdata=outdata[:20],
3319 nout=len(outdata),
3320 indata=indata[:20], nin=len(indata)
3321 )
3322 )
3323 except ValueError as e:
3324 if expect_success:
3325 self.fail(
3326 "Failed to send with method <<{name:s}>>; "
3327 "expected to succeed.\n".format(name=meth_name)
3328 )
3329 if not str(e).startswith(meth_name):
3330 self.fail(
3331 "Method <<{name:s}>> failed with unexpected "
3332 "exception message: {exp:s}\n".format(
3333 name=meth_name, exp=e
3334 )
3335 )
3336
3337 for meth_name, recv_meth, expect_success, args in recv_methods:
3338 indata = (data_prefix + meth_name).encode('ascii')
3339 try:
3340 s.send(indata)
3341 outdata = recv_meth(*args)
3342 if outdata != indata.lower():
3343 self.fail(
3344 "While receiving with <<{name:s}>> bad data "
3345 "<<{outdata:r}>> ({nout:d}) received; "
3346 "expected <<{indata:r}>> ({nin:d})\n".format(
3347 name=meth_name, outdata=outdata[:20],
3348 nout=len(outdata),
3349 indata=indata[:20], nin=len(indata)
3350 )
3351 )
3352 except ValueError as e:
3353 if expect_success:
3354 self.fail(
3355 "Failed to receive with method <<{name:s}>>; "
3356 "expected to succeed.\n".format(name=meth_name)
3357 )
3358 if not str(e).startswith(meth_name):
3359 self.fail(
3360 "Method <<{name:s}>> failed with unexpected "
3361 "exception message: {exp:s}\n".format(
3362 name=meth_name, exp=e
3363 )
3364 )
3365 # consume data
3366 s.read()
3367
3368 # read(-1, buffer) is supported, even though read(-1) is not
3369 data = b"data"
3370 s.send(data)
3371 buffer = bytearray(len(data))
3372 self.assertEqual(s.read(-1, buffer), len(data))
3373 self.assertEqual(buffer, data)
3374
Christian Heimes888bbdc2017-09-07 14:18:21 -07003375 # sendall accepts bytes-like objects
3376 if ctypes is not None:
3377 ubyte = ctypes.c_ubyte * len(data)
3378 byteslike = ubyte.from_buffer_copy(data)
3379 s.sendall(byteslike)
3380 self.assertEqual(s.read(), data)
3381
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003382 # Make sure sendmsg et al are disallowed to avoid
3383 # inadvertent disclosure of data and/or corruption
3384 # of the encrypted data stream
3385 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3386 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3387 self.assertRaises(NotImplementedError,
3388 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003389 s.write(b"over\n")
3390
3391 self.assertRaises(ValueError, s.recv, -1)
3392 self.assertRaises(ValueError, s.read, -1)
3393
3394 s.close()
3395
3396 def test_recv_zero(self):
3397 server = ThreadedEchoServer(CERTFILE)
3398 server.__enter__()
3399 self.addCleanup(server.__exit__, None, None)
3400 s = socket.create_connection((HOST, server.port))
3401 self.addCleanup(s.close)
3402 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3403 self.addCleanup(s.close)
3404
3405 # recv/read(0) should return no data
3406 s.send(b"data")
3407 self.assertEqual(s.recv(0), b"")
3408 self.assertEqual(s.read(0), b"")
3409 self.assertEqual(s.read(), b"data")
3410
3411 # Should not block if the other end sends no data
3412 s.setblocking(False)
3413 self.assertEqual(s.recv(0), b"")
3414 self.assertEqual(s.recv_into(bytearray()), 0)
3415
3416 def test_nonblocking_send(self):
3417 server = ThreadedEchoServer(CERTFILE,
3418 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003419 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003420 cacerts=CERTFILE,
3421 chatty=True,
3422 connectionchatty=False)
3423 with server:
3424 s = test_wrap_socket(socket.socket(),
3425 server_side=False,
3426 certfile=CERTFILE,
3427 ca_certs=CERTFILE,
3428 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003429 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003430 s.connect((HOST, server.port))
3431 s.setblocking(False)
3432
3433 # If we keep sending data, at some point the buffers
3434 # will be full and the call will block
3435 buf = bytearray(8192)
3436 def fill_buffer():
3437 while True:
3438 s.send(buf)
3439 self.assertRaises((ssl.SSLWantWriteError,
3440 ssl.SSLWantReadError), fill_buffer)
3441
3442 # Now read all the output and discard it
3443 s.setblocking(True)
3444 s.close()
3445
3446 def test_handshake_timeout(self):
3447 # Issue #5103: SSL handshake must respect the socket timeout
3448 server = socket.socket(socket.AF_INET)
3449 host = "127.0.0.1"
3450 port = support.bind_port(server)
3451 started = threading.Event()
3452 finish = False
3453
3454 def serve():
3455 server.listen()
3456 started.set()
3457 conns = []
3458 while not finish:
3459 r, w, e = select.select([server], [], [], 0.1)
3460 if server in r:
3461 # Let the socket hang around rather than having
3462 # it closed by garbage collection.
3463 conns.append(server.accept()[0])
3464 for sock in conns:
3465 sock.close()
3466
3467 t = threading.Thread(target=serve)
3468 t.start()
3469 started.wait()
3470
3471 try:
3472 try:
3473 c = socket.socket(socket.AF_INET)
3474 c.settimeout(0.2)
3475 c.connect((host, port))
3476 # Will attempt handshake and time out
3477 self.assertRaisesRegex(socket.timeout, "timed out",
3478 test_wrap_socket, c)
3479 finally:
3480 c.close()
3481 try:
3482 c = socket.socket(socket.AF_INET)
3483 c = test_wrap_socket(c)
3484 c.settimeout(0.2)
3485 # Will attempt handshake and time out
3486 self.assertRaisesRegex(socket.timeout, "timed out",
3487 c.connect, (host, port))
3488 finally:
3489 c.close()
3490 finally:
3491 finish = True
3492 t.join()
3493 server.close()
3494
3495 def test_server_accept(self):
3496 # Issue #16357: accept() on a SSLSocket created through
3497 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003498 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003499 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003500 context.load_verify_locations(SIGNING_CA)
3501 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003502 server = socket.socket(socket.AF_INET)
3503 host = "127.0.0.1"
3504 port = support.bind_port(server)
3505 server = context.wrap_socket(server, server_side=True)
3506 self.assertTrue(server.server_side)
3507
3508 evt = threading.Event()
3509 remote = None
3510 peer = None
3511 def serve():
3512 nonlocal remote, peer
3513 server.listen()
3514 # Block on the accept and wait on the connection to close.
3515 evt.set()
3516 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003517 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003518
3519 t = threading.Thread(target=serve)
3520 t.start()
3521 # Client wait until server setup and perform a connect.
3522 evt.wait()
3523 client = context.wrap_socket(socket.socket())
3524 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003525 client.send(b'data')
3526 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003527 client_addr = client.getsockname()
3528 client.close()
3529 t.join()
3530 remote.close()
3531 server.close()
3532 # Sanity checks.
3533 self.assertIsInstance(remote, ssl.SSLSocket)
3534 self.assertEqual(peer, client_addr)
3535
3536 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003537 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003538 with context.wrap_socket(socket.socket()) as sock:
3539 with self.assertRaises(OSError) as cm:
3540 sock.getpeercert()
3541 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3542
3543 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003544 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003545 with context.wrap_socket(socket.socket()) as sock:
3546 with self.assertRaises(OSError) as cm:
3547 sock.do_handshake()
3548 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3549
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003550 def test_no_shared_ciphers(self):
3551 client_context, server_context, hostname = testing_context()
3552 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3553 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003554 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003555 client_context.set_ciphers("AES128")
3556 server_context.set_ciphers("AES256")
3557 with ThreadedEchoServer(context=server_context) as server:
3558 with client_context.wrap_socket(socket.socket(),
3559 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003560 with self.assertRaises(OSError):
3561 s.connect((HOST, server.port))
3562 self.assertIn("no shared cipher", server.conn_errors[0])
3563
3564 def test_version_basic(self):
3565 """
3566 Basic tests for SSLSocket.version().
3567 More tests are done in the test_protocol_*() methods.
3568 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003569 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3570 context.check_hostname = False
3571 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003572 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003573 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003574 chatty=False) as server:
3575 with context.wrap_socket(socket.socket()) as s:
3576 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003577 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003578 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003579 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003580 self.assertEqual(s.version(), 'TLSv1.3')
3581 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003582 self.assertEqual(s.version(), 'TLSv1.2')
3583 else: # 0.9.8 to 1.0.1
3584 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003585 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003586 self.assertIs(s.version(), None)
3587
Christian Heimescb5b68a2017-09-07 18:07:00 -07003588 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3589 "test requires TLSv1.3 enabled OpenSSL")
3590 def test_tls1_3(self):
3591 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3592 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003593 context.options |= (
3594 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3595 )
3596 with ThreadedEchoServer(context=context) as server:
3597 with context.wrap_socket(socket.socket()) as s:
3598 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003599 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003600 'TLS_AES_256_GCM_SHA384',
3601 'TLS_CHACHA20_POLY1305_SHA256',
3602 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003603 })
3604 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003605
Christian Heimes698dde12018-02-27 11:54:43 +01003606 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3607 "required OpenSSL 1.1.0g")
3608 def test_min_max_version(self):
3609 client_context, server_context, hostname = testing_context()
3610 # client TLSv1.0 to 1.2
3611 client_context.minimum_version = ssl.TLSVersion.TLSv1
3612 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3613 # server only TLSv1.2
3614 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3615 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3616
3617 with ThreadedEchoServer(context=server_context) as server:
3618 with client_context.wrap_socket(socket.socket(),
3619 server_hostname=hostname) as s:
3620 s.connect((HOST, server.port))
3621 self.assertEqual(s.version(), 'TLSv1.2')
3622
3623 # client 1.0 to 1.2, server 1.0 to 1.1
3624 server_context.minimum_version = ssl.TLSVersion.TLSv1
3625 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3626
3627 with ThreadedEchoServer(context=server_context) as server:
3628 with client_context.wrap_socket(socket.socket(),
3629 server_hostname=hostname) as s:
3630 s.connect((HOST, server.port))
3631 self.assertEqual(s.version(), 'TLSv1.1')
3632
3633 # client 1.0, server 1.2 (mismatch)
3634 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3635 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3636 client_context.minimum_version = ssl.TLSVersion.TLSv1
3637 client_context.maximum_version = ssl.TLSVersion.TLSv1
3638 with ThreadedEchoServer(context=server_context) as server:
3639 with client_context.wrap_socket(socket.socket(),
3640 server_hostname=hostname) as s:
3641 with self.assertRaises(ssl.SSLError) as e:
3642 s.connect((HOST, server.port))
3643 self.assertIn("alert", str(e.exception))
3644
3645
3646 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3647 "required OpenSSL 1.1.0g")
3648 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3649 def test_min_max_version_sslv3(self):
3650 client_context, server_context, hostname = testing_context()
3651 server_context.minimum_version = ssl.TLSVersion.SSLv3
3652 client_context.minimum_version = ssl.TLSVersion.SSLv3
3653 client_context.maximum_version = ssl.TLSVersion.SSLv3
3654 with ThreadedEchoServer(context=server_context) as server:
3655 with client_context.wrap_socket(socket.socket(),
3656 server_hostname=hostname) as s:
3657 s.connect((HOST, server.port))
3658 self.assertEqual(s.version(), 'SSLv3')
3659
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003660 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3661 def test_default_ecdh_curve(self):
3662 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3663 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003664 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003665 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003666 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3667 # cipher name.
3668 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003669 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3670 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3671 # our default cipher list should prefer ECDH-based ciphers
3672 # automatically.
3673 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3674 context.set_ciphers("ECCdraft:ECDH")
3675 with ThreadedEchoServer(context=context) as server:
3676 with context.wrap_socket(socket.socket()) as s:
3677 s.connect((HOST, server.port))
3678 self.assertIn("ECDH", s.cipher()[0])
3679
3680 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3681 "'tls-unique' channel binding not available")
3682 def test_tls_unique_channel_binding(self):
3683 """Test tls-unique channel binding."""
3684 if support.verbose:
3685 sys.stdout.write("\n")
3686
Christian Heimes05d9fe32018-02-27 08:55:39 +01003687 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003688
3689 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003690 chatty=True,
3691 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003692
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003693 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003694 with client_context.wrap_socket(
3695 socket.socket(),
3696 server_hostname=hostname) as s:
3697 s.connect((HOST, server.port))
3698 # get the data
3699 cb_data = s.get_channel_binding("tls-unique")
3700 if support.verbose:
3701 sys.stdout.write(
3702 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003703
Christian Heimes05d9fe32018-02-27 08:55:39 +01003704 # check if it is sane
3705 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003706 if s.version() == 'TLSv1.3':
3707 self.assertEqual(len(cb_data), 48)
3708 else:
3709 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003710
Christian Heimes05d9fe32018-02-27 08:55:39 +01003711 # and compare with the peers version
3712 s.write(b"CB tls-unique\n")
3713 peer_data_repr = s.read().strip()
3714 self.assertEqual(peer_data_repr,
3715 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003716
3717 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003718 with client_context.wrap_socket(
3719 socket.socket(),
3720 server_hostname=hostname) as s:
3721 s.connect((HOST, server.port))
3722 new_cb_data = s.get_channel_binding("tls-unique")
3723 if support.verbose:
3724 sys.stdout.write(
3725 "got another channel binding data: {0!r}\n".format(
3726 new_cb_data)
3727 )
3728 # is it really unique
3729 self.assertNotEqual(cb_data, new_cb_data)
3730 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003731 if s.version() == 'TLSv1.3':
3732 self.assertEqual(len(cb_data), 48)
3733 else:
3734 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003735 s.write(b"CB tls-unique\n")
3736 peer_data_repr = s.read().strip()
3737 self.assertEqual(peer_data_repr,
3738 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003739
3740 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003741 client_context, server_context, hostname = testing_context()
3742 stats = server_params_test(client_context, server_context,
3743 chatty=True, connectionchatty=True,
3744 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003745 if support.verbose:
3746 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3747 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3748
3749 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3750 "ssl.OP_NO_COMPRESSION needed for this test")
3751 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003752 client_context, server_context, hostname = testing_context()
3753 client_context.options |= ssl.OP_NO_COMPRESSION
3754 server_context.options |= ssl.OP_NO_COMPRESSION
3755 stats = server_params_test(client_context, server_context,
3756 chatty=True, connectionchatty=True,
3757 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003758 self.assertIs(stats['compression'], None)
3759
3760 def test_dh_params(self):
3761 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003762 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003763 # test scenario needs TLS <= 1.2
3764 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003765 server_context.load_dh_params(DHFILE)
3766 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003767 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003768 stats = server_params_test(client_context, server_context,
3769 chatty=True, connectionchatty=True,
3770 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003771 cipher = stats["cipher"][0]
3772 parts = cipher.split("-")
3773 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3774 self.fail("Non-DH cipher: " + cipher[0])
3775
Christian Heimesb7b92252018-02-25 09:49:31 +01003776 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003777 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003778 def test_ecdh_curve(self):
3779 # server secp384r1, client auto
3780 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003781
Christian Heimesb7b92252018-02-25 09:49:31 +01003782 server_context.set_ecdh_curve("secp384r1")
3783 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3784 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3785 stats = server_params_test(client_context, server_context,
3786 chatty=True, connectionchatty=True,
3787 sni_name=hostname)
3788
3789 # server auto, client secp384r1
3790 client_context, server_context, hostname = testing_context()
3791 client_context.set_ecdh_curve("secp384r1")
3792 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3793 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3794 stats = server_params_test(client_context, server_context,
3795 chatty=True, connectionchatty=True,
3796 sni_name=hostname)
3797
3798 # server / client curve mismatch
3799 client_context, server_context, hostname = testing_context()
3800 client_context.set_ecdh_curve("prime256v1")
3801 server_context.set_ecdh_curve("secp384r1")
3802 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3803 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3804 try:
3805 stats = server_params_test(client_context, server_context,
3806 chatty=True, connectionchatty=True,
3807 sni_name=hostname)
3808 except ssl.SSLError:
3809 pass
3810 else:
3811 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003812 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003813 self.fail("mismatch curve did not fail")
3814
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003815 def test_selected_alpn_protocol(self):
3816 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003817 client_context, server_context, hostname = testing_context()
3818 stats = server_params_test(client_context, server_context,
3819 chatty=True, connectionchatty=True,
3820 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003821 self.assertIs(stats['client_alpn_protocol'], None)
3822
3823 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3824 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3825 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003826 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003827 server_context.set_alpn_protocols(['foo', 'bar'])
3828 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003829 chatty=True, connectionchatty=True,
3830 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003831 self.assertIs(stats['client_alpn_protocol'], None)
3832
3833 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3834 def test_alpn_protocols(self):
3835 server_protocols = ['foo', 'bar', 'milkshake']
3836 protocol_tests = [
3837 (['foo', 'bar'], 'foo'),
3838 (['bar', 'foo'], 'foo'),
3839 (['milkshake'], 'milkshake'),
3840 (['http/3.0', 'http/4.0'], None)
3841 ]
3842 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003843 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003844 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003845 client_context.set_alpn_protocols(client_protocols)
3846
3847 try:
3848 stats = server_params_test(client_context,
3849 server_context,
3850 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003851 connectionchatty=True,
3852 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003853 except ssl.SSLError as e:
3854 stats = e
3855
Christian Heimes05d9fe32018-02-27 08:55:39 +01003856 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003857 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3858 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3859 self.assertIsInstance(stats, ssl.SSLError)
3860 else:
3861 msg = "failed trying %s (s) and %s (c).\n" \
3862 "was expecting %s, but got %%s from the %%s" \
3863 % (str(server_protocols), str(client_protocols),
3864 str(expected))
3865 client_result = stats['client_alpn_protocol']
3866 self.assertEqual(client_result, expected,
3867 msg % (client_result, "client"))
3868 server_result = stats['server_alpn_protocols'][-1] \
3869 if len(stats['server_alpn_protocols']) else 'nothing'
3870 self.assertEqual(server_result, expected,
3871 msg % (server_result, "server"))
3872
3873 def test_selected_npn_protocol(self):
3874 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003875 client_context, server_context, hostname = testing_context()
3876 stats = server_params_test(client_context, server_context,
3877 chatty=True, connectionchatty=True,
3878 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003879 self.assertIs(stats['client_npn_protocol'], None)
3880
3881 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3882 def test_npn_protocols(self):
3883 server_protocols = ['http/1.1', 'spdy/2']
3884 protocol_tests = [
3885 (['http/1.1', 'spdy/2'], 'http/1.1'),
3886 (['spdy/2', 'http/1.1'], 'http/1.1'),
3887 (['spdy/2', 'test'], 'spdy/2'),
3888 (['abc', 'def'], 'abc')
3889 ]
3890 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003891 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003892 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003893 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003894 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003895 chatty=True, connectionchatty=True,
3896 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003897 msg = "failed trying %s (s) and %s (c).\n" \
3898 "was expecting %s, but got %%s from the %%s" \
3899 % (str(server_protocols), str(client_protocols),
3900 str(expected))
3901 client_result = stats['client_npn_protocol']
3902 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3903 server_result = stats['server_npn_protocols'][-1] \
3904 if len(stats['server_npn_protocols']) else 'nothing'
3905 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3906
3907 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003908 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003909 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003910 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003911 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003912 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003913 client_context.load_verify_locations(SIGNING_CA)
3914 return server_context, other_context, client_context
3915
3916 def check_common_name(self, stats, name):
3917 cert = stats['peercert']
3918 self.assertIn((('commonName', name),), cert['subject'])
3919
3920 @needs_sni
3921 def test_sni_callback(self):
3922 calls = []
3923 server_context, other_context, client_context = self.sni_contexts()
3924
Christian Heimesa170fa12017-09-15 20:27:30 +02003925 client_context.check_hostname = False
3926
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 def servername_cb(ssl_sock, server_name, initial_context):
3928 calls.append((server_name, initial_context))
3929 if server_name is not None:
3930 ssl_sock.context = other_context
3931 server_context.set_servername_callback(servername_cb)
3932
3933 stats = server_params_test(client_context, server_context,
3934 chatty=True,
3935 sni_name='supermessage')
3936 # The hostname was fetched properly, and the certificate was
3937 # changed for the connection.
3938 self.assertEqual(calls, [("supermessage", server_context)])
3939 # CERTFILE4 was selected
3940 self.check_common_name(stats, 'fakehostname')
3941
3942 calls = []
3943 # The callback is called with server_name=None
3944 stats = server_params_test(client_context, server_context,
3945 chatty=True,
3946 sni_name=None)
3947 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003948 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003949
3950 # Check disabling the callback
3951 calls = []
3952 server_context.set_servername_callback(None)
3953
3954 stats = server_params_test(client_context, server_context,
3955 chatty=True,
3956 sni_name='notfunny')
3957 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003958 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003959 self.assertEqual(calls, [])
3960
3961 @needs_sni
3962 def test_sni_callback_alert(self):
3963 # Returning a TLS alert is reflected to the connecting client
3964 server_context, other_context, client_context = self.sni_contexts()
3965
3966 def cb_returning_alert(ssl_sock, server_name, initial_context):
3967 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3968 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003969 with self.assertRaises(ssl.SSLError) as cm:
3970 stats = server_params_test(client_context, server_context,
3971 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003972 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003973 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003974
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003975 @needs_sni
3976 def test_sni_callback_raising(self):
3977 # Raising fails the connection with a TLS handshake failure alert.
3978 server_context, other_context, client_context = self.sni_contexts()
3979
3980 def cb_raising(ssl_sock, server_name, initial_context):
3981 1/0
3982 server_context.set_servername_callback(cb_raising)
3983
3984 with self.assertRaises(ssl.SSLError) as cm, \
3985 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003986 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003987 chatty=False,
3988 sni_name='supermessage')
3989 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3990 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003991
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003992 @needs_sni
3993 def test_sni_callback_wrong_return_type(self):
3994 # Returning the wrong return type terminates the TLS connection
3995 # with an internal error alert.
3996 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003997
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003998 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3999 return "foo"
4000 server_context.set_servername_callback(cb_wrong_return_type)
4001
4002 with self.assertRaises(ssl.SSLError) as cm, \
4003 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004004 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004005 chatty=False,
4006 sni_name='supermessage')
4007 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4008 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004009
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004010 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004011 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004012 client_context.set_ciphers("AES128:AES256")
4013 server_context.set_ciphers("AES256")
4014 expected_algs = [
4015 "AES256", "AES-256",
4016 # TLS 1.3 ciphers are always enabled
4017 "TLS_CHACHA20", "TLS_AES",
4018 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004019
Christian Heimesa170fa12017-09-15 20:27:30 +02004020 stats = server_params_test(client_context, server_context,
4021 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004022 ciphers = stats['server_shared_ciphers'][0]
4023 self.assertGreater(len(ciphers), 0)
4024 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004025 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004026 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004027
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004028 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004029 client_context, server_context, hostname = testing_context()
4030 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004031
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004032 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004033 s = client_context.wrap_socket(socket.socket(),
4034 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004035 s.connect((HOST, server.port))
4036 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004037
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004038 self.assertRaises(ValueError, s.read, 1024)
4039 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004040
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004041 def test_sendfile(self):
4042 TEST_DATA = b"x" * 512
4043 with open(support.TESTFN, 'wb') as f:
4044 f.write(TEST_DATA)
4045 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004046 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004047 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004048 context.load_verify_locations(SIGNING_CA)
4049 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004050 server = ThreadedEchoServer(context=context, chatty=False)
4051 with server:
4052 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004053 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004054 with open(support.TESTFN, 'rb') as file:
4055 s.sendfile(file)
4056 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004057
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004058 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004059 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004060 # TODO: sessions aren't compatible with TLSv1.3 yet
4061 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004062
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004064 stats = server_params_test(client_context, server_context,
4065 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004066 session = stats['session']
4067 self.assertTrue(session.id)
4068 self.assertGreater(session.time, 0)
4069 self.assertGreater(session.timeout, 0)
4070 self.assertTrue(session.has_ticket)
4071 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4072 self.assertGreater(session.ticket_lifetime_hint, 0)
4073 self.assertFalse(stats['session_reused'])
4074 sess_stat = server_context.session_stats()
4075 self.assertEqual(sess_stat['accept'], 1)
4076 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004077
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004078 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004079 stats = server_params_test(client_context, server_context,
4080 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004081 sess_stat = server_context.session_stats()
4082 self.assertEqual(sess_stat['accept'], 2)
4083 self.assertEqual(sess_stat['hits'], 1)
4084 self.assertTrue(stats['session_reused'])
4085 session2 = stats['session']
4086 self.assertEqual(session2.id, session.id)
4087 self.assertEqual(session2, session)
4088 self.assertIsNot(session2, session)
4089 self.assertGreaterEqual(session2.time, session.time)
4090 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004091
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004092 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004093 stats = server_params_test(client_context, server_context,
4094 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004095 self.assertFalse(stats['session_reused'])
4096 session3 = stats['session']
4097 self.assertNotEqual(session3.id, session.id)
4098 self.assertNotEqual(session3, session)
4099 sess_stat = server_context.session_stats()
4100 self.assertEqual(sess_stat['accept'], 3)
4101 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004102
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004103 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004104 stats = server_params_test(client_context, server_context,
4105 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004106 self.assertTrue(stats['session_reused'])
4107 session4 = stats['session']
4108 self.assertEqual(session4.id, session.id)
4109 self.assertEqual(session4, session)
4110 self.assertGreaterEqual(session4.time, session.time)
4111 self.assertGreaterEqual(session4.timeout, session.timeout)
4112 sess_stat = server_context.session_stats()
4113 self.assertEqual(sess_stat['accept'], 4)
4114 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004115
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004116 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004117 client_context, server_context, hostname = testing_context()
4118 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004119
Christian Heimes05d9fe32018-02-27 08:55:39 +01004120 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004121 client_context.options |= ssl.OP_NO_TLSv1_3
4122 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004123
Christian Heimesa170fa12017-09-15 20:27:30 +02004124 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004125 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004126 with client_context.wrap_socket(socket.socket(),
4127 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004128 # session is None before handshake
4129 self.assertEqual(s.session, None)
4130 self.assertEqual(s.session_reused, None)
4131 s.connect((HOST, server.port))
4132 session = s.session
4133 self.assertTrue(session)
4134 with self.assertRaises(TypeError) as e:
4135 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004136 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004137
Christian Heimesa170fa12017-09-15 20:27:30 +02004138 with client_context.wrap_socket(socket.socket(),
4139 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004140 s.connect((HOST, server.port))
4141 # cannot set session after handshake
4142 with self.assertRaises(ValueError) as e:
4143 s.session = session
4144 self.assertEqual(str(e.exception),
4145 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004146
Christian Heimesa170fa12017-09-15 20:27:30 +02004147 with client_context.wrap_socket(socket.socket(),
4148 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004149 # can set session before handshake and before the
4150 # connection was established
4151 s.session = session
4152 s.connect((HOST, server.port))
4153 self.assertEqual(s.session.id, session.id)
4154 self.assertEqual(s.session, session)
4155 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004156
Christian Heimesa170fa12017-09-15 20:27:30 +02004157 with client_context2.wrap_socket(socket.socket(),
4158 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004159 # cannot re-use session with a different SSLContext
4160 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004161 s.session = session
4162 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004163 self.assertEqual(str(e.exception),
4164 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004165
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004166
Christian Heimes9fb051f2018-09-23 08:32:31 +02004167@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4168class TestPostHandshakeAuth(unittest.TestCase):
4169 def test_pha_setter(self):
4170 protocols = [
4171 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4172 ]
4173 for protocol in protocols:
4174 ctx = ssl.SSLContext(protocol)
4175 self.assertEqual(ctx.post_handshake_auth, False)
4176
4177 ctx.post_handshake_auth = True
4178 self.assertEqual(ctx.post_handshake_auth, True)
4179
4180 ctx.verify_mode = ssl.CERT_REQUIRED
4181 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4182 self.assertEqual(ctx.post_handshake_auth, True)
4183
4184 ctx.post_handshake_auth = False
4185 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4186 self.assertEqual(ctx.post_handshake_auth, False)
4187
4188 ctx.verify_mode = ssl.CERT_OPTIONAL
4189 ctx.post_handshake_auth = True
4190 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4191 self.assertEqual(ctx.post_handshake_auth, True)
4192
4193 def test_pha_required(self):
4194 client_context, server_context, hostname = testing_context()
4195 server_context.post_handshake_auth = True
4196 server_context.verify_mode = ssl.CERT_REQUIRED
4197 client_context.post_handshake_auth = True
4198 client_context.load_cert_chain(SIGNED_CERTFILE)
4199
4200 server = ThreadedEchoServer(context=server_context, chatty=False)
4201 with server:
4202 with client_context.wrap_socket(socket.socket(),
4203 server_hostname=hostname) as s:
4204 s.connect((HOST, server.port))
4205 s.write(b'HASCERT')
4206 self.assertEqual(s.recv(1024), b'FALSE\n')
4207 s.write(b'PHA')
4208 self.assertEqual(s.recv(1024), b'OK\n')
4209 s.write(b'HASCERT')
4210 self.assertEqual(s.recv(1024), b'TRUE\n')
4211 # PHA method just returns true when cert is already available
4212 s.write(b'PHA')
4213 self.assertEqual(s.recv(1024), b'OK\n')
4214 s.write(b'GETCERT')
4215 cert_text = s.recv(4096).decode('us-ascii')
4216 self.assertIn('Python Software Foundation CA', cert_text)
4217
4218 def test_pha_required_nocert(self):
4219 client_context, server_context, hostname = testing_context()
4220 server_context.post_handshake_auth = True
4221 server_context.verify_mode = ssl.CERT_REQUIRED
4222 client_context.post_handshake_auth = True
4223
4224 server = ThreadedEchoServer(context=server_context, chatty=False)
4225 with server:
4226 with client_context.wrap_socket(socket.socket(),
4227 server_hostname=hostname) as s:
4228 s.connect((HOST, server.port))
4229 s.write(b'PHA')
4230 # receive CertificateRequest
4231 self.assertEqual(s.recv(1024), b'OK\n')
4232 # send empty Certificate + Finish
4233 s.write(b'HASCERT')
4234 # receive alert
4235 with self.assertRaisesRegex(
4236 ssl.SSLError,
4237 'tlsv13 alert certificate required'):
4238 s.recv(1024)
4239
4240 def test_pha_optional(self):
4241 if support.verbose:
4242 sys.stdout.write("\n")
4243
4244 client_context, server_context, hostname = testing_context()
4245 server_context.post_handshake_auth = True
4246 server_context.verify_mode = ssl.CERT_REQUIRED
4247 client_context.post_handshake_auth = True
4248 client_context.load_cert_chain(SIGNED_CERTFILE)
4249
4250 # check CERT_OPTIONAL
4251 server_context.verify_mode = ssl.CERT_OPTIONAL
4252 server = ThreadedEchoServer(context=server_context, chatty=False)
4253 with server:
4254 with client_context.wrap_socket(socket.socket(),
4255 server_hostname=hostname) as s:
4256 s.connect((HOST, server.port))
4257 s.write(b'HASCERT')
4258 self.assertEqual(s.recv(1024), b'FALSE\n')
4259 s.write(b'PHA')
4260 self.assertEqual(s.recv(1024), b'OK\n')
4261 s.write(b'HASCERT')
4262 self.assertEqual(s.recv(1024), b'TRUE\n')
4263
4264 def test_pha_optional_nocert(self):
4265 if support.verbose:
4266 sys.stdout.write("\n")
4267
4268 client_context, server_context, hostname = testing_context()
4269 server_context.post_handshake_auth = True
4270 server_context.verify_mode = ssl.CERT_OPTIONAL
4271 client_context.post_handshake_auth = True
4272
4273 server = ThreadedEchoServer(context=server_context, chatty=False)
4274 with server:
4275 with client_context.wrap_socket(socket.socket(),
4276 server_hostname=hostname) as s:
4277 s.connect((HOST, server.port))
4278 s.write(b'HASCERT')
4279 self.assertEqual(s.recv(1024), b'FALSE\n')
4280 s.write(b'PHA')
4281 self.assertEqual(s.recv(1024), b'OK\n')
4282 # optional doens't fail when client does not have a cert
4283 s.write(b'HASCERT')
4284 self.assertEqual(s.recv(1024), b'FALSE\n')
4285
4286 def test_pha_no_pha_client(self):
4287 client_context, server_context, hostname = testing_context()
4288 server_context.post_handshake_auth = True
4289 server_context.verify_mode = ssl.CERT_REQUIRED
4290 client_context.load_cert_chain(SIGNED_CERTFILE)
4291
4292 server = ThreadedEchoServer(context=server_context, chatty=False)
4293 with server:
4294 with client_context.wrap_socket(socket.socket(),
4295 server_hostname=hostname) as s:
4296 s.connect((HOST, server.port))
4297 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4298 s.verify_client_post_handshake()
4299 s.write(b'PHA')
4300 self.assertIn(b'extension not received', s.recv(1024))
4301
4302 def test_pha_no_pha_server(self):
4303 # server doesn't have PHA enabled, cert is requested in handshake
4304 client_context, server_context, hostname = testing_context()
4305 server_context.verify_mode = ssl.CERT_REQUIRED
4306 client_context.post_handshake_auth = True
4307 client_context.load_cert_chain(SIGNED_CERTFILE)
4308
4309 server = ThreadedEchoServer(context=server_context, chatty=False)
4310 with server:
4311 with client_context.wrap_socket(socket.socket(),
4312 server_hostname=hostname) as s:
4313 s.connect((HOST, server.port))
4314 s.write(b'HASCERT')
4315 self.assertEqual(s.recv(1024), b'TRUE\n')
4316 # PHA doesn't fail if there is already a cert
4317 s.write(b'PHA')
4318 self.assertEqual(s.recv(1024), b'OK\n')
4319 s.write(b'HASCERT')
4320 self.assertEqual(s.recv(1024), b'TRUE\n')
4321
4322 def test_pha_not_tls13(self):
4323 # TLS 1.2
4324 client_context, server_context, hostname = testing_context()
4325 server_context.verify_mode = ssl.CERT_REQUIRED
4326 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4327 client_context.post_handshake_auth = True
4328 client_context.load_cert_chain(SIGNED_CERTFILE)
4329
4330 server = ThreadedEchoServer(context=server_context, chatty=False)
4331 with server:
4332 with client_context.wrap_socket(socket.socket(),
4333 server_hostname=hostname) as s:
4334 s.connect((HOST, server.port))
4335 # PHA fails for TLS != 1.3
4336 s.write(b'PHA')
4337 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4338
4339
Thomas Woutersed03b412007-08-28 21:37:11 +00004340def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004341 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004342 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004343 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004344 'Mac': platform.mac_ver,
4345 'Windows': platform.win32_ver,
4346 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004347 for name, func in plats.items():
4348 plat = func()
4349 if plat and plat[0]:
4350 plat = '%s %r' % (name, plat)
4351 break
4352 else:
4353 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004354 print("test_ssl: testing with %r %r" %
4355 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4356 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004357 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004358 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4359 try:
4360 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4361 except AttributeError:
4362 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004363
Antoine Pitrou152efa22010-05-16 18:19:27 +00004364 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004365 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004366 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004367 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004368 BADCERT, BADKEY, EMPTYCERT]:
4369 if not os.path.exists(filename):
4370 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004371
Martin Panter3840b2a2016-03-27 01:53:46 +00004372 tests = [
4373 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004374 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes9fb051f2018-09-23 08:32:31 +02004375 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004376 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004377
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004378 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004379 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004380
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004381 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004382 try:
4383 support.run_unittest(*tests)
4384 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004385 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004386
4387if __name__ == "__main__":
4388 test_main()