blob: 6fd2002e5eec86dad38b47e253c68945f1e2a78e [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Christian Heimesbd5c7d22018-01-20 15:16:30 +010059CERTFILE_INFO = {
60 'issuer': ((('countryName', 'XY'),),
61 (('localityName', 'Castle Anthrax'),),
62 (('organizationName', 'Python Software Foundation'),),
63 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020064 'notAfter': 'Aug 26 14:23:15 2028 GMT',
65 'notBefore': 'Aug 29 14:23:15 2018 GMT',
66 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010067 'subject': ((('countryName', 'XY'),),
68 (('localityName', 'Castle Anthrax'),),
69 (('organizationName', 'Python Software Foundation'),),
70 (('commonName', 'localhost'),)),
71 'subjectAltName': (('DNS', 'localhost'),),
72 'version': 3
73}
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Christian Heimes22587792013-11-21 23:56:13 +010075# empty CRL
76CRLFILE = data_file("revocation.crl")
77
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078# Two keys and certs signed by the same CA (for SNI tests)
79SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020080SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081
82SIGNED_CERTFILE_INFO = {
83 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
84 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
85 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
86 'issuer': ((('countryName', 'XY'),),
87 (('organizationName', 'Python Software Foundation CA'),),
88 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020089 'notAfter': 'Jul 7 14:23:16 2028 GMT',
90 'notBefore': 'Aug 29 14:23:16 2018 GMT',
91 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010092 'subject': ((('countryName', 'XY'),),
93 (('localityName', 'Castle Anthrax'),),
94 (('organizationName', 'Python Software Foundation'),),
95 (('commonName', 'localhost'),)),
96 'subjectAltName': (('DNS', 'localhost'),),
97 'version': 3
98}
99
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100100SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200101SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100102SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
103SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
104
Martin Panter3840b2a2016-03-27 01:53:46 +0000105# Same certificate as pycacert.pem, but without extra text in file
106SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200107# cert with all kinds of subject alt names
108ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100109IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100110
Martin Panter3d81d932016-01-14 09:36:00 +0000111REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000112
113EMPTYCERT = data_file("nullcert.pem")
114BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000115NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000116BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200117NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200118NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000119
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200120DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100121BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000122
Christian Heimes358cfd42016-09-10 22:43:48 +0200123# Not defined in all versions of OpenSSL
124OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
125OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
126OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
127OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100128OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200129
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Thomas Woutersed03b412007-08-28 21:37:11 +0000131def handle_error(prefix):
132 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000133 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000134 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Antoine Pitroub5218772010-05-21 09:56:06 +0000136def can_clear_options():
137 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200138 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000139
140def no_sslv2_implies_sslv3_hello():
141 # 0.9.7h or higher
142 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
143
Christian Heimes2427b502013-11-23 11:24:32 +0100144def have_verify_flags():
145 # 0.9.8 or higher
146 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
147
Christian Heimesb7b92252018-02-25 09:49:31 +0100148def _have_secp_curves():
149 if not ssl.HAS_ECDH:
150 return False
151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
152 try:
153 ctx.set_ecdh_curve("secp384r1")
154 except ValueError:
155 return False
156 else:
157 return True
158
159
160HAVE_SECP_CURVES = _have_secp_curves()
161
162
Antoine Pitrouc695c952014-04-28 20:57:36 +0200163def utc_offset(): #NOTE: ignore issues like #1647654
164 # local time = utc time + utc offset
165 if time.daylight and time.localtime().tm_isdst > 0:
166 return -time.altzone # seconds
167 return -time.timezone
168
Christian Heimes9424bb42013-06-17 15:32:57 +0200169def asn1time(cert_time):
170 # Some versions of OpenSSL ignore seconds, see #18207
171 # 0.9.8.i
172 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
173 fmt = "%b %d %H:%M:%S %Y GMT"
174 dt = datetime.datetime.strptime(cert_time, fmt)
175 dt = dt.replace(second=0)
176 cert_time = dt.strftime(fmt)
177 # %d adds leading zero but ASN1_TIME_print() uses leading space
178 if cert_time[4] == "0":
179 cert_time = cert_time[:4] + " " + cert_time[5:]
180
181 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000182
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100183needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
184
Antoine Pitrou23df4832010-08-04 17:14:06 +0000185
Christian Heimesd0486372016-09-10 23:23:33 +0200186def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
187 cert_reqs=ssl.CERT_NONE, ca_certs=None,
188 ciphers=None, certfile=None, keyfile=None,
189 **kwargs):
190 context = ssl.SSLContext(ssl_version)
191 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200192 if cert_reqs == ssl.CERT_NONE:
193 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200194 context.verify_mode = cert_reqs
195 if ca_certs is not None:
196 context.load_verify_locations(ca_certs)
197 if certfile is not None or keyfile is not None:
198 context.load_cert_chain(certfile, keyfile)
199 if ciphers is not None:
200 context.set_ciphers(ciphers)
201 return context.wrap_socket(sock, **kwargs)
202
Christian Heimesa170fa12017-09-15 20:27:30 +0200203
204def testing_context(server_cert=SIGNED_CERTFILE):
205 """Create context
206
207 client_context, server_context, hostname = testing_context()
208 """
209 if server_cert == SIGNED_CERTFILE:
210 hostname = SIGNED_CERTFILE_HOSTNAME
211 elif server_cert == SIGNED_CERTFILE2:
212 hostname = SIGNED_CERTFILE2_HOSTNAME
213 else:
214 raise ValueError(server_cert)
215
216 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
217 client_context.load_verify_locations(SIGNING_CA)
218
219 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
220 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200221 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200222
223 return client_context, server_context, hostname
224
225
Antoine Pitrou152efa22010-05-16 18:19:27 +0000226class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000227
Antoine Pitrou480a1242010-04-28 21:37:09 +0000228 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000229 ssl.CERT_NONE
230 ssl.CERT_OPTIONAL
231 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100232 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100233 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100234 if ssl.HAS_ECDH:
235 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100236 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
237 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000238 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100239 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700240 ssl.OP_NO_SSLv2
241 ssl.OP_NO_SSLv3
242 ssl.OP_NO_TLSv1
243 ssl.OP_NO_TLSv1_3
244 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
245 ssl.OP_NO_TLSv1_1
246 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200247 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000248
Christian Heimes9d50ab52018-02-27 10:17:30 +0100249 def test_private_init(self):
250 with self.assertRaisesRegex(TypeError, "public constructor"):
251 with socket.socket() as s:
252 ssl.SSLSocket(s)
253
Antoine Pitrou172f0252014-04-18 20:33:08 +0200254 def test_str_for_enums(self):
255 # Make sure that the PROTOCOL_* constants have enum-like string
256 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200257 proto = ssl.PROTOCOL_TLS
258 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200259 ctx = ssl.SSLContext(proto)
260 self.assertIs(ctx.protocol, proto)
261
Antoine Pitrou480a1242010-04-28 21:37:09 +0000262 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000263 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000265 sys.stdout.write("\n RAND_status is %d (%s)\n"
266 % (v, (v and "sufficient randomness") or
267 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200268
269 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
270 self.assertEqual(len(data), 16)
271 self.assertEqual(is_cryptographic, v == 1)
272 if v:
273 data = ssl.RAND_bytes(16)
274 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200275 else:
276 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200277
Victor Stinner1e81a392013-12-19 16:47:04 +0100278 # negative num is invalid
279 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
280 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
281
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100282 if hasattr(ssl, 'RAND_egd'):
283 self.assertRaises(TypeError, ssl.RAND_egd, 1)
284 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000285 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200286 ssl.RAND_add(b"this is a random bytes object", 75.0)
287 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000288
Christian Heimesf77b4b22013-08-21 13:26:05 +0200289 @unittest.skipUnless(os.name == 'posix', 'requires posix')
290 def test_random_fork(self):
291 status = ssl.RAND_status()
292 if not status:
293 self.fail("OpenSSL's PRNG has insufficient randomness")
294
295 rfd, wfd = os.pipe()
296 pid = os.fork()
297 if pid == 0:
298 try:
299 os.close(rfd)
300 child_random = ssl.RAND_pseudo_bytes(16)[0]
301 self.assertEqual(len(child_random), 16)
302 os.write(wfd, child_random)
303 os.close(wfd)
304 except BaseException:
305 os._exit(1)
306 else:
307 os._exit(0)
308 else:
309 os.close(wfd)
310 self.addCleanup(os.close, rfd)
311 _, status = os.waitpid(pid, 0)
312 self.assertEqual(status, 0)
313
314 child_random = os.read(rfd, 16)
315 self.assertEqual(len(child_random), 16)
316 parent_random = ssl.RAND_pseudo_bytes(16)[0]
317 self.assertEqual(len(parent_random), 16)
318
319 self.assertNotEqual(child_random, parent_random)
320
Christian Heimese6dac002018-08-30 07:25:49 +0200321 maxDiff = None
322
Antoine Pitrou480a1242010-04-28 21:37:09 +0000323 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000324 # note that this uses an 'unofficial' function in _ssl.c,
325 # provided solely for this test, to exercise the certificate
326 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100327 self.assertEqual(
328 ssl._ssl._test_decode_cert(CERTFILE),
329 CERTFILE_INFO
330 )
331 self.assertEqual(
332 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
333 SIGNED_CERTFILE_INFO
334 )
335
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200336 # Issue #13034: the subjectAltName in some certificates
337 # (notably projects.developer.nokia.com:443) wasn't parsed
338 p = ssl._ssl._test_decode_cert(NOKIACERT)
339 if support.verbose:
340 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
341 self.assertEqual(p['subjectAltName'],
342 (('DNS', 'projects.developer.nokia.com'),
343 ('DNS', 'projects.forum.nokia.com'))
344 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100345 # extra OCSP and AIA fields
346 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
347 self.assertEqual(p['caIssuers'],
348 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
349 self.assertEqual(p['crlDistributionPoints'],
350 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000351
Christian Heimes824f7f32013-08-17 00:54:47 +0200352 def test_parse_cert_CVE_2013_4238(self):
353 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 subject = ((('countryName', 'US'),),
357 (('stateOrProvinceName', 'Oregon'),),
358 (('localityName', 'Beaverton'),),
359 (('organizationName', 'Python Software Foundation'),),
360 (('organizationalUnitName', 'Python Core Development'),),
361 (('commonName', 'null.python.org\x00example.org'),),
362 (('emailAddress', 'python-dev@python.org'),))
363 self.assertEqual(p['subject'], subject)
364 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200365 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
366 san = (('DNS', 'altnull.python.org\x00example.com'),
367 ('email', 'null@python.org\x00user@example.org'),
368 ('URI', 'http://null.python.org\x00http://example.org'),
369 ('IP Address', '192.0.2.1'),
370 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
371 else:
372 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
373 san = (('DNS', 'altnull.python.org\x00example.com'),
374 ('email', 'null@python.org\x00user@example.org'),
375 ('URI', 'http://null.python.org\x00http://example.org'),
376 ('IP Address', '192.0.2.1'),
377 ('IP Address', '<invalid>'))
378
379 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200380
Christian Heimes1c03abd2016-09-06 23:25:35 +0200381 def test_parse_all_sans(self):
382 p = ssl._ssl._test_decode_cert(ALLSANFILE)
383 self.assertEqual(p['subjectAltName'],
384 (
385 ('DNS', 'allsans'),
386 ('othername', '<unsupported>'),
387 ('othername', '<unsupported>'),
388 ('email', 'user@example.org'),
389 ('DNS', 'www.example.org'),
390 ('DirName',
391 ((('countryName', 'XY'),),
392 (('localityName', 'Castle Anthrax'),),
393 (('organizationName', 'Python Software Foundation'),),
394 (('commonName', 'dirname example'),))),
395 ('URI', 'https://www.python.org/'),
396 ('IP Address', '127.0.0.1'),
397 ('IP Address', '0:0:0:0:0:0:0:1\n'),
398 ('Registered ID', '1.2.3.4.5')
399 )
400 )
401
Antoine Pitrou480a1242010-04-28 21:37:09 +0000402 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000403 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000404 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000405 d1 = ssl.PEM_cert_to_DER_cert(pem)
406 p2 = ssl.DER_cert_to_PEM_cert(d1)
407 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000408 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000409 if not p2.startswith(ssl.PEM_HEADER + '\n'):
410 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
411 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
412 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000413
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000414 def test_openssl_version(self):
415 n = ssl.OPENSSL_VERSION_NUMBER
416 t = ssl.OPENSSL_VERSION_INFO
417 s = ssl.OPENSSL_VERSION
418 self.assertIsInstance(n, int)
419 self.assertIsInstance(t, tuple)
420 self.assertIsInstance(s, str)
421 # Some sanity checks follow
422 # >= 0.9
423 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400424 # < 3.0
425 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000426 major, minor, fix, patch, status = t
427 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400428 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000429 self.assertGreaterEqual(minor, 0)
430 self.assertLess(minor, 256)
431 self.assertGreaterEqual(fix, 0)
432 self.assertLess(fix, 256)
433 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100434 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000435 self.assertGreaterEqual(status, 0)
436 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400437 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200438 if IS_LIBRESSL:
439 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100440 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400441 else:
442 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100443 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000444
Antoine Pitrou9d543662010-04-23 23:10:32 +0000445 @support.cpython_only
446 def test_refcycle(self):
447 # Issue #7943: an SSL object doesn't create reference cycles with
448 # itself.
449 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200450 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000451 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100452 with support.check_warnings(("", ResourceWarning)):
453 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100454 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000455
Antoine Pitroua468adc2010-09-14 14:43:44 +0000456 def test_wrapped_unconnected(self):
457 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200458 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000459 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200460 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100461 self.assertRaises(OSError, ss.recv, 1)
462 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
463 self.assertRaises(OSError, ss.recvfrom, 1)
464 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
465 self.assertRaises(OSError, ss.send, b'x')
466 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Christian Heimes141c5e82018-02-24 21:10:57 +0100467 self.assertRaises(NotImplementedError, ss.sendmsg,
468 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000469
Antoine Pitrou40f08742010-04-24 22:04:40 +0000470 def test_timeout(self):
471 # Issue #8524: when creating an SSL socket, the timeout of the
472 # original socket should be retained.
473 for timeout in (None, 0.0, 5.0):
474 s = socket.socket(socket.AF_INET)
475 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200476 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100477 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000478
Christian Heimesd0486372016-09-10 23:23:33 +0200479 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000480 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000481 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000482 "certfile must be specified",
483 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000484 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000485 "certfile must be specified for server-side operations",
486 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000487 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000488 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200489 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100490 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
491 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200492 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200493 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000494 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000495 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000496 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200497 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000498 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000499 ssl.wrap_socket(sock,
500 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000501 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200502 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000503 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000504 ssl.wrap_socket(sock,
505 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000506 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000507
Martin Panter3464ea22016-02-01 21:58:11 +0000508 def bad_cert_test(self, certfile):
509 """Check that trying to use the given client certificate fails"""
510 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
511 certfile)
512 sock = socket.socket()
513 self.addCleanup(sock.close)
514 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200515 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200516 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000517
518 def test_empty_cert(self):
519 """Wrapping with an empty cert file"""
520 self.bad_cert_test("nullcert.pem")
521
522 def test_malformed_cert(self):
523 """Wrapping with a badly formatted certificate (syntax error)"""
524 self.bad_cert_test("badcert.pem")
525
526 def test_malformed_key(self):
527 """Wrapping with a badly formatted key (syntax error)"""
528 self.bad_cert_test("badkey.pem")
529
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000530 def test_match_hostname(self):
531 def ok(cert, hostname):
532 ssl.match_hostname(cert, hostname)
533 def fail(cert, hostname):
534 self.assertRaises(ssl.CertificateError,
535 ssl.match_hostname, cert, hostname)
536
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100537 # -- Hostname matching --
538
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000539 cert = {'subject': ((('commonName', 'example.com'),),)}
540 ok(cert, 'example.com')
541 ok(cert, 'ExAmple.cOm')
542 fail(cert, 'www.example.com')
543 fail(cert, '.example.com')
544 fail(cert, 'example.org')
545 fail(cert, 'exampleXcom')
546
547 cert = {'subject': ((('commonName', '*.a.com'),),)}
548 ok(cert, 'foo.a.com')
549 fail(cert, 'bar.foo.a.com')
550 fail(cert, 'a.com')
551 fail(cert, 'Xa.com')
552 fail(cert, '.a.com')
553
Mandeep Singhede2ac92017-11-27 04:01:27 +0530554 # only match wildcards when they are the only thing
555 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000556 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530557 fail(cert, 'foo.com')
558 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000559 fail(cert, 'bar.com')
560 fail(cert, 'foo.a.com')
561 fail(cert, 'bar.foo.com')
562
Christian Heimes824f7f32013-08-17 00:54:47 +0200563 # NULL bytes are bad, CVE-2013-4073
564 cert = {'subject': ((('commonName',
565 'null.python.org\x00example.org'),),)}
566 ok(cert, 'null.python.org\x00example.org') # or raise an error?
567 fail(cert, 'example.org')
568 fail(cert, 'null.python.org')
569
Georg Brandl72c98d32013-10-27 07:16:53 +0100570 # error cases with wildcards
571 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
572 fail(cert, 'bar.foo.a.com')
573 fail(cert, 'a.com')
574 fail(cert, 'Xa.com')
575 fail(cert, '.a.com')
576
577 cert = {'subject': ((('commonName', 'a.*.com'),),)}
578 fail(cert, 'a.foo.com')
579 fail(cert, 'a..com')
580 fail(cert, 'a.com')
581
582 # wildcard doesn't match IDNA prefix 'xn--'
583 idna = 'püthon.python.org'.encode("idna").decode("ascii")
584 cert = {'subject': ((('commonName', idna),),)}
585 ok(cert, idna)
586 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
587 fail(cert, idna)
588 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
589 fail(cert, idna)
590
591 # wildcard in first fragment and IDNA A-labels in sequent fragments
592 # are supported.
593 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
594 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530595 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
596 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100597 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
598 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
599
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000600 # Slightly fake real-world example
601 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
602 'subject': ((('commonName', 'linuxfrz.org'),),),
603 'subjectAltName': (('DNS', 'linuxfr.org'),
604 ('DNS', 'linuxfr.com'),
605 ('othername', '<unsupported>'))}
606 ok(cert, 'linuxfr.org')
607 ok(cert, 'linuxfr.com')
608 # Not a "DNS" entry
609 fail(cert, '<unsupported>')
610 # When there is a subjectAltName, commonName isn't used
611 fail(cert, 'linuxfrz.org')
612
613 # A pristine real-world example
614 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
615 'subject': ((('countryName', 'US'),),
616 (('stateOrProvinceName', 'California'),),
617 (('localityName', 'Mountain View'),),
618 (('organizationName', 'Google Inc'),),
619 (('commonName', 'mail.google.com'),))}
620 ok(cert, 'mail.google.com')
621 fail(cert, 'gmail.com')
622 # Only commonName is considered
623 fail(cert, 'California')
624
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100625 # -- IPv4 matching --
626 cert = {'subject': ((('commonName', 'example.com'),),),
627 'subjectAltName': (('DNS', 'example.com'),
628 ('IP Address', '10.11.12.13'),
629 ('IP Address', '14.15.16.17'))}
630 ok(cert, '10.11.12.13')
631 ok(cert, '14.15.16.17')
632 fail(cert, '14.15.16.18')
633 fail(cert, 'example.net')
634
635 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100636 if hasattr(socket, 'AF_INET6'):
637 cert = {'subject': ((('commonName', 'example.com'),),),
638 'subjectAltName': (
639 ('DNS', 'example.com'),
640 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
641 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
642 ok(cert, '2001::cafe')
643 ok(cert, '2003::baba')
644 fail(cert, '2003::bebe')
645 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100646
647 # -- Miscellaneous --
648
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000649 # Neither commonName nor subjectAltName
650 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
651 'subject': ((('countryName', 'US'),),
652 (('stateOrProvinceName', 'California'),),
653 (('localityName', 'Mountain View'),),
654 (('organizationName', 'Google Inc'),))}
655 fail(cert, 'mail.google.com')
656
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200657 # No DNS entry in subjectAltName but a commonName
658 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
659 'subject': ((('countryName', 'US'),),
660 (('stateOrProvinceName', 'California'),),
661 (('localityName', 'Mountain View'),),
662 (('commonName', 'mail.google.com'),)),
663 'subjectAltName': (('othername', 'blabla'), )}
664 ok(cert, 'mail.google.com')
665
666 # No DNS entry subjectAltName and no commonName
667 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
668 'subject': ((('countryName', 'US'),),
669 (('stateOrProvinceName', 'California'),),
670 (('localityName', 'Mountain View'),),
671 (('organizationName', 'Google Inc'),)),
672 'subjectAltName': (('othername', 'blabla'),)}
673 fail(cert, 'google.com')
674
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000675 # Empty cert / no cert
676 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
677 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
678
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200679 # Issue #17980: avoid denials of service by refusing more than one
680 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100681 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
682 with self.assertRaisesRegex(
683 ssl.CertificateError,
684 "partial wildcards in leftmost label are not supported"):
685 ssl.match_hostname(cert, 'axxb.example.com')
686
687 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
688 with self.assertRaisesRegex(
689 ssl.CertificateError,
690 "wildcard can only be present in the leftmost label"):
691 ssl.match_hostname(cert, 'www.sub.example.com')
692
693 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
694 with self.assertRaisesRegex(
695 ssl.CertificateError,
696 "too many wildcards"):
697 ssl.match_hostname(cert, 'axxbxxc.example.com')
698
699 cert = {'subject': ((('commonName', '*'),),)}
700 with self.assertRaisesRegex(
701 ssl.CertificateError,
702 "sole wildcard without additional labels are not support"):
703 ssl.match_hostname(cert, 'host')
704
705 cert = {'subject': ((('commonName', '*.com'),),)}
706 with self.assertRaisesRegex(
707 ssl.CertificateError,
708 r"hostname 'com' doesn't match '\*.com'"):
709 ssl.match_hostname(cert, 'com')
710
711 # extra checks for _inet_paton()
712 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
713 with self.assertRaises(ValueError):
714 ssl._inet_paton(invalid)
715 for ipaddr in ['127.0.0.1', '192.168.0.1']:
716 self.assertTrue(ssl._inet_paton(ipaddr))
717 if hasattr(socket, 'AF_INET6'):
718 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
719 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200720
Antoine Pitroud5323212010-10-22 18:19:07 +0000721 def test_server_side(self):
722 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200723 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000724 with socket.socket() as sock:
725 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
726 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000727
Antoine Pitroud6494802011-07-21 01:11:30 +0200728 def test_unknown_channel_binding(self):
729 # should raise ValueError for unknown type
730 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200731 s.bind(('127.0.0.1', 0))
732 s.listen()
733 c = socket.socket(socket.AF_INET)
734 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200735 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100736 with self.assertRaises(ValueError):
737 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200738 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200739
740 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
741 "'tls-unique' channel binding not available")
742 def test_tls_unique_channel_binding(self):
743 # unconnected should return None for known type
744 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200745 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100746 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200747 # the same for server-side
748 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200749 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100750 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200751
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600752 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200753 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600754 r = repr(ss)
755 with self.assertWarns(ResourceWarning) as cm:
756 ss = None
757 support.gc_collect()
758 self.assertIn(r, str(cm.warning.args[0]))
759
Christian Heimes6d7ad132013-06-09 18:02:55 +0200760 def test_get_default_verify_paths(self):
761 paths = ssl.get_default_verify_paths()
762 self.assertEqual(len(paths), 6)
763 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
764
765 with support.EnvironmentVarGuard() as env:
766 env["SSL_CERT_DIR"] = CAPATH
767 env["SSL_CERT_FILE"] = CERTFILE
768 paths = ssl.get_default_verify_paths()
769 self.assertEqual(paths.cafile, CERTFILE)
770 self.assertEqual(paths.capath, CAPATH)
771
Christian Heimes44109d72013-11-22 01:51:30 +0100772 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
773 def test_enum_certificates(self):
774 self.assertTrue(ssl.enum_certificates("CA"))
775 self.assertTrue(ssl.enum_certificates("ROOT"))
776
777 self.assertRaises(TypeError, ssl.enum_certificates)
778 self.assertRaises(WindowsError, ssl.enum_certificates, "")
779
Christian Heimesc2d65e12013-11-22 16:13:55 +0100780 trust_oids = set()
781 for storename in ("CA", "ROOT"):
782 store = ssl.enum_certificates(storename)
783 self.assertIsInstance(store, list)
784 for element in store:
785 self.assertIsInstance(element, tuple)
786 self.assertEqual(len(element), 3)
787 cert, enc, trust = element
788 self.assertIsInstance(cert, bytes)
789 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
790 self.assertIsInstance(trust, (set, bool))
791 if isinstance(trust, set):
792 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100793
794 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100795 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200796
Christian Heimes46bebee2013-06-09 19:03:31 +0200797 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100798 def test_enum_crls(self):
799 self.assertTrue(ssl.enum_crls("CA"))
800 self.assertRaises(TypeError, ssl.enum_crls)
801 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200802
Christian Heimes44109d72013-11-22 01:51:30 +0100803 crls = ssl.enum_crls("CA")
804 self.assertIsInstance(crls, list)
805 for element in crls:
806 self.assertIsInstance(element, tuple)
807 self.assertEqual(len(element), 2)
808 self.assertIsInstance(element[0], bytes)
809 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200810
Christian Heimes46bebee2013-06-09 19:03:31 +0200811
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100812 def test_asn1object(self):
813 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
814 '1.3.6.1.5.5.7.3.1')
815
816 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
817 self.assertEqual(val, expected)
818 self.assertEqual(val.nid, 129)
819 self.assertEqual(val.shortname, 'serverAuth')
820 self.assertEqual(val.longname, 'TLS Web Server Authentication')
821 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
822 self.assertIsInstance(val, ssl._ASN1Object)
823 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
824
825 val = ssl._ASN1Object.fromnid(129)
826 self.assertEqual(val, expected)
827 self.assertIsInstance(val, ssl._ASN1Object)
828 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100829 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
830 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100831 for i in range(1000):
832 try:
833 obj = ssl._ASN1Object.fromnid(i)
834 except ValueError:
835 pass
836 else:
837 self.assertIsInstance(obj.nid, int)
838 self.assertIsInstance(obj.shortname, str)
839 self.assertIsInstance(obj.longname, str)
840 self.assertIsInstance(obj.oid, (str, type(None)))
841
842 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
843 self.assertEqual(val, expected)
844 self.assertIsInstance(val, ssl._ASN1Object)
845 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
846 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
847 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100848 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
849 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100850
Christian Heimes72d28502013-11-23 13:56:58 +0100851 def test_purpose_enum(self):
852 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
853 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
854 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
855 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
856 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
857 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
858 '1.3.6.1.5.5.7.3.1')
859
860 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
861 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
862 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
863 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
864 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
865 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
866 '1.3.6.1.5.5.7.3.2')
867
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100868 def test_unsupported_dtls(self):
869 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
870 self.addCleanup(s.close)
871 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200872 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100873 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200874 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100875 with self.assertRaises(NotImplementedError) as cx:
876 ctx.wrap_socket(s)
877 self.assertEqual(str(cx.exception), "only stream sockets are supported")
878
Antoine Pitrouc695c952014-04-28 20:57:36 +0200879 def cert_time_ok(self, timestring, timestamp):
880 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
881
882 def cert_time_fail(self, timestring):
883 with self.assertRaises(ValueError):
884 ssl.cert_time_to_seconds(timestring)
885
886 @unittest.skipUnless(utc_offset(),
887 'local time needs to be different from UTC')
888 def test_cert_time_to_seconds_timezone(self):
889 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
890 # results if local timezone is not UTC
891 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
892 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
893
894 def test_cert_time_to_seconds(self):
895 timestring = "Jan 5 09:34:43 2018 GMT"
896 ts = 1515144883.0
897 self.cert_time_ok(timestring, ts)
898 # accept keyword parameter, assert its name
899 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
900 # accept both %e and %d (space or zero generated by strftime)
901 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
902 # case-insensitive
903 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
904 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
905 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
906 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
907 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
908 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
909 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
910 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
911
912 newyear_ts = 1230768000.0
913 # leap seconds
914 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
915 # same timestamp
916 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
917
918 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
919 # allow 60th second (even if it is not a leap second)
920 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
921 # allow 2nd leap second for compatibility with time.strptime()
922 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
923 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
924
Mike53f7a7c2017-12-14 14:04:53 +0300925 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200926 # 99991231235959Z (rfc 5280)
927 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
928
929 @support.run_with_locale('LC_ALL', '')
930 def test_cert_time_to_seconds_locale(self):
931 # `cert_time_to_seconds()` should be locale independent
932
933 def local_february_name():
934 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
935
936 if local_february_name().lower() == 'feb':
937 self.skipTest("locale-specific month name needs to be "
938 "different from C locale")
939
940 # locale-independent
941 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
942 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
943
Martin Panter3840b2a2016-03-27 01:53:46 +0000944 def test_connect_ex_error(self):
945 server = socket.socket(socket.AF_INET)
946 self.addCleanup(server.close)
947 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200948 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000949 cert_reqs=ssl.CERT_REQUIRED)
950 self.addCleanup(s.close)
951 rc = s.connect_ex((HOST, port))
952 # Issue #19919: Windows machines or VMs hosted on Windows
953 # machines sometimes return EWOULDBLOCK.
954 errors = (
955 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
956 errno.EWOULDBLOCK,
957 )
958 self.assertIn(rc, errors)
959
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100960
Antoine Pitrou152efa22010-05-16 18:19:27 +0000961class ContextTests(unittest.TestCase):
962
963 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100964 for protocol in PROTOCOLS:
965 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200966 ctx = ssl.SSLContext()
967 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000968 self.assertRaises(ValueError, ssl.SSLContext, -1)
969 self.assertRaises(ValueError, ssl.SSLContext, 42)
970
971 def test_protocol(self):
972 for proto in PROTOCOLS:
973 ctx = ssl.SSLContext(proto)
974 self.assertEqual(ctx.protocol, proto)
975
976 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200977 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000978 ctx.set_ciphers("ALL")
979 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000980 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000981 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000982
Christian Heimes892d66e2018-01-29 14:10:18 +0100983 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
984 "Test applies only to Python default ciphers")
985 def test_python_ciphers(self):
986 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
987 ciphers = ctx.get_ciphers()
988 for suite in ciphers:
989 name = suite['name']
990 self.assertNotIn("PSK", name)
991 self.assertNotIn("SRP", name)
992 self.assertNotIn("MD5", name)
993 self.assertNotIn("RC4", name)
994 self.assertNotIn("3DES", name)
995
Christian Heimes25bfcd52016-09-06 00:04:45 +0200996 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
997 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200998 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200999 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001000 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001001 self.assertIn('AES256-GCM-SHA384', names)
1002 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001003
Antoine Pitroub5218772010-05-21 09:56:06 +00001004 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001005 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001006 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001007 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001008 # SSLContext also enables these by default
1009 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001010 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1011 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001012 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001013 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001014 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001015 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001016 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1017 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001018 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001019 # Ubuntu has OP_NO_SSLv3 forced on by default
1020 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001021 else:
1022 with self.assertRaises(ValueError):
1023 ctx.options = 0
1024
Christian Heimesa170fa12017-09-15 20:27:30 +02001025 def test_verify_mode_protocol(self):
1026 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001027 # Default value
1028 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1029 ctx.verify_mode = ssl.CERT_OPTIONAL
1030 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1031 ctx.verify_mode = ssl.CERT_REQUIRED
1032 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1033 ctx.verify_mode = ssl.CERT_NONE
1034 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1035 with self.assertRaises(TypeError):
1036 ctx.verify_mode = None
1037 with self.assertRaises(ValueError):
1038 ctx.verify_mode = 42
1039
Christian Heimesa170fa12017-09-15 20:27:30 +02001040 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1041 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1042 self.assertFalse(ctx.check_hostname)
1043
1044 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1045 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1046 self.assertTrue(ctx.check_hostname)
1047
Christian Heimes61d478c2018-01-27 15:51:38 +01001048 def test_hostname_checks_common_name(self):
1049 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1050 self.assertTrue(ctx.hostname_checks_common_name)
1051 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1052 ctx.hostname_checks_common_name = True
1053 self.assertTrue(ctx.hostname_checks_common_name)
1054 ctx.hostname_checks_common_name = False
1055 self.assertFalse(ctx.hostname_checks_common_name)
1056 ctx.hostname_checks_common_name = True
1057 self.assertTrue(ctx.hostname_checks_common_name)
1058 else:
1059 with self.assertRaises(AttributeError):
1060 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001061
Christian Heimes698dde12018-02-27 11:54:43 +01001062 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1063 "required OpenSSL 1.1.0g")
1064 def test_min_max_version(self):
1065 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1066 self.assertEqual(
1067 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1068 )
1069 self.assertEqual(
1070 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1071 )
1072
1073 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1074 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1075 self.assertEqual(
1076 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1077 )
1078 self.assertEqual(
1079 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1080 )
1081
1082 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1083 ctx.maximum_version = ssl.TLSVersion.TLSv1
1084 self.assertEqual(
1085 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1086 )
1087 self.assertEqual(
1088 ctx.maximum_version, ssl.TLSVersion.TLSv1
1089 )
1090
1091 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1092 self.assertEqual(
1093 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1094 )
1095
1096 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1097 self.assertIn(
1098 ctx.maximum_version,
1099 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1100 )
1101
1102 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1103 self.assertIn(
1104 ctx.minimum_version,
1105 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1106 )
1107
1108 with self.assertRaises(ValueError):
1109 ctx.minimum_version = 42
1110
1111 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1112
1113 self.assertEqual(
1114 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1115 )
1116 self.assertEqual(
1117 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1118 )
1119 with self.assertRaises(ValueError):
1120 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1121 with self.assertRaises(ValueError):
1122 ctx.maximum_version = ssl.TLSVersion.TLSv1
1123
1124
Christian Heimes2427b502013-11-23 11:24:32 +01001125 @unittest.skipUnless(have_verify_flags(),
1126 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001127 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001128 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001129 # default value
1130 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1131 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001132 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1133 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1134 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1135 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1136 ctx.verify_flags = ssl.VERIFY_DEFAULT
1137 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1138 # supports any value
1139 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1140 self.assertEqual(ctx.verify_flags,
1141 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1142 with self.assertRaises(TypeError):
1143 ctx.verify_flags = None
1144
Antoine Pitrou152efa22010-05-16 18:19:27 +00001145 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001146 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001147 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001148 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001149 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1150 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001151 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001152 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001153 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001154 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001155 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001156 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001157 ctx.load_cert_chain(EMPTYCERT)
1158 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001159 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001160 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1161 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1162 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001163 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001164 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001165 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001166 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001167 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001168 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1169 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001170 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001171 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001172 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001173 # Password protected key and cert
1174 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1175 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1176 ctx.load_cert_chain(CERTFILE_PROTECTED,
1177 password=bytearray(KEY_PASSWORD.encode()))
1178 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1179 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1180 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1181 bytearray(KEY_PASSWORD.encode()))
1182 with self.assertRaisesRegex(TypeError, "should be a string"):
1183 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1184 with self.assertRaises(ssl.SSLError):
1185 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1186 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1187 # openssl has a fixed limit on the password buffer.
1188 # PEM_BUFSIZE is generally set to 1kb.
1189 # Return a string larger than this.
1190 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1191 # Password callback
1192 def getpass_unicode():
1193 return KEY_PASSWORD
1194 def getpass_bytes():
1195 return KEY_PASSWORD.encode()
1196 def getpass_bytearray():
1197 return bytearray(KEY_PASSWORD.encode())
1198 def getpass_badpass():
1199 return "badpass"
1200 def getpass_huge():
1201 return b'a' * (1024 * 1024)
1202 def getpass_bad_type():
1203 return 9
1204 def getpass_exception():
1205 raise Exception('getpass error')
1206 class GetPassCallable:
1207 def __call__(self):
1208 return KEY_PASSWORD
1209 def getpass(self):
1210 return KEY_PASSWORD
1211 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1212 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1213 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1214 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1215 ctx.load_cert_chain(CERTFILE_PROTECTED,
1216 password=GetPassCallable().getpass)
1217 with self.assertRaises(ssl.SSLError):
1218 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1219 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1220 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1221 with self.assertRaisesRegex(TypeError, "must return a string"):
1222 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1223 with self.assertRaisesRegex(Exception, "getpass error"):
1224 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1225 # Make sure the password function isn't called if it isn't needed
1226 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001227
1228 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001229 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001230 ctx.load_verify_locations(CERTFILE)
1231 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1232 ctx.load_verify_locations(BYTES_CERTFILE)
1233 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1234 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001235 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001236 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001237 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001238 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001239 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001240 ctx.load_verify_locations(BADCERT)
1241 ctx.load_verify_locations(CERTFILE, CAPATH)
1242 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1243
Victor Stinner80f75e62011-01-29 11:31:20 +00001244 # Issue #10989: crash if the second argument type is invalid
1245 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1246
Christian Heimesefff7062013-11-21 03:35:02 +01001247 def test_load_verify_cadata(self):
1248 # test cadata
1249 with open(CAFILE_CACERT) as f:
1250 cacert_pem = f.read()
1251 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1252 with open(CAFILE_NEURONIO) as f:
1253 neuronio_pem = f.read()
1254 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1255
1256 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001257 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001258 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1259 ctx.load_verify_locations(cadata=cacert_pem)
1260 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1261 ctx.load_verify_locations(cadata=neuronio_pem)
1262 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1263 # cert already in hash table
1264 ctx.load_verify_locations(cadata=neuronio_pem)
1265 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1266
1267 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001268 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001269 combined = "\n".join((cacert_pem, neuronio_pem))
1270 ctx.load_verify_locations(cadata=combined)
1271 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1272
1273 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001275 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1276 neuronio_pem, "tail"]
1277 ctx.load_verify_locations(cadata="\n".join(combined))
1278 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1279
1280 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001281 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001282 ctx.load_verify_locations(cadata=cacert_der)
1283 ctx.load_verify_locations(cadata=neuronio_der)
1284 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1285 # cert already in hash table
1286 ctx.load_verify_locations(cadata=cacert_der)
1287 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1288
1289 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001290 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001291 combined = b"".join((cacert_der, neuronio_der))
1292 ctx.load_verify_locations(cadata=combined)
1293 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1294
1295 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001296 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001297 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1298
1299 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1300 ctx.load_verify_locations(cadata="broken")
1301 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1302 ctx.load_verify_locations(cadata=b"broken")
1303
1304
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001305 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001306 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001307 ctx.load_dh_params(DHFILE)
1308 if os.name != 'nt':
1309 ctx.load_dh_params(BYTES_DHFILE)
1310 self.assertRaises(TypeError, ctx.load_dh_params)
1311 self.assertRaises(TypeError, ctx.load_dh_params, None)
1312 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001313 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001314 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001315 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001316 ctx.load_dh_params(CERTFILE)
1317
Antoine Pitroub0182c82010-10-12 20:09:02 +00001318 def test_session_stats(self):
1319 for proto in PROTOCOLS:
1320 ctx = ssl.SSLContext(proto)
1321 self.assertEqual(ctx.session_stats(), {
1322 'number': 0,
1323 'connect': 0,
1324 'connect_good': 0,
1325 'connect_renegotiate': 0,
1326 'accept': 0,
1327 'accept_good': 0,
1328 'accept_renegotiate': 0,
1329 'hits': 0,
1330 'misses': 0,
1331 'timeouts': 0,
1332 'cache_full': 0,
1333 })
1334
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001335 def test_set_default_verify_paths(self):
1336 # There's not much we can do to test that it acts as expected,
1337 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001338 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001339 ctx.set_default_verify_paths()
1340
Antoine Pitrou501da612011-12-21 09:27:41 +01001341 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001342 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001343 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001344 ctx.set_ecdh_curve("prime256v1")
1345 ctx.set_ecdh_curve(b"prime256v1")
1346 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1347 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1348 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1349 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1350
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001351 @needs_sni
1352 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001353 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001354
1355 # set_servername_callback expects a callable, or None
1356 self.assertRaises(TypeError, ctx.set_servername_callback)
1357 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1358 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1359 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1360
1361 def dummycallback(sock, servername, ctx):
1362 pass
1363 ctx.set_servername_callback(None)
1364 ctx.set_servername_callback(dummycallback)
1365
1366 @needs_sni
1367 def test_sni_callback_refcycle(self):
1368 # Reference cycles through the servername callback are detected
1369 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001370 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001371 def dummycallback(sock, servername, ctx, cycle=ctx):
1372 pass
1373 ctx.set_servername_callback(dummycallback)
1374 wr = weakref.ref(ctx)
1375 del ctx, dummycallback
1376 gc.collect()
1377 self.assertIs(wr(), None)
1378
Christian Heimes9a5395a2013-06-17 15:44:12 +02001379 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001380 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001381 self.assertEqual(ctx.cert_store_stats(),
1382 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1383 ctx.load_cert_chain(CERTFILE)
1384 self.assertEqual(ctx.cert_store_stats(),
1385 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1386 ctx.load_verify_locations(CERTFILE)
1387 self.assertEqual(ctx.cert_store_stats(),
1388 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001389 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001390 self.assertEqual(ctx.cert_store_stats(),
1391 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1392
1393 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001394 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001395 self.assertEqual(ctx.get_ca_certs(), [])
1396 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1397 ctx.load_verify_locations(CERTFILE)
1398 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001399 # but CAFILE_CACERT is a CA cert
1400 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001401 self.assertEqual(ctx.get_ca_certs(),
1402 [{'issuer': ((('organizationName', 'Root CA'),),
1403 (('organizationalUnitName', 'http://www.cacert.org'),),
1404 (('commonName', 'CA Cert Signing Authority'),),
1405 (('emailAddress', 'support@cacert.org'),)),
1406 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1407 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1408 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001409 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001410 'subject': ((('organizationName', 'Root CA'),),
1411 (('organizationalUnitName', 'http://www.cacert.org'),),
1412 (('commonName', 'CA Cert Signing Authority'),),
1413 (('emailAddress', 'support@cacert.org'),)),
1414 'version': 3}])
1415
Martin Panterb55f8b72016-01-14 12:53:56 +00001416 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001417 pem = f.read()
1418 der = ssl.PEM_cert_to_DER_cert(pem)
1419 self.assertEqual(ctx.get_ca_certs(True), [der])
1420
Christian Heimes72d28502013-11-23 13:56:58 +01001421 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001422 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001423 ctx.load_default_certs()
1424
Christian Heimesa170fa12017-09-15 20:27:30 +02001425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001426 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1427 ctx.load_default_certs()
1428
Christian Heimesa170fa12017-09-15 20:27:30 +02001429 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001430 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1431
Christian Heimesa170fa12017-09-15 20:27:30 +02001432 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001433 self.assertRaises(TypeError, ctx.load_default_certs, None)
1434 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1435
Benjamin Peterson91244e02014-10-03 18:17:15 -04001436 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001437 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001438 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001439 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001440 with support.EnvironmentVarGuard() as env:
1441 env["SSL_CERT_DIR"] = CAPATH
1442 env["SSL_CERT_FILE"] = CERTFILE
1443 ctx.load_default_certs()
1444 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1445
Benjamin Peterson91244e02014-10-03 18:17:15 -04001446 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001447 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001448 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001449 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001450 ctx.load_default_certs()
1451 stats = ctx.cert_store_stats()
1452
Christian Heimesa170fa12017-09-15 20:27:30 +02001453 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001454 with support.EnvironmentVarGuard() as env:
1455 env["SSL_CERT_DIR"] = CAPATH
1456 env["SSL_CERT_FILE"] = CERTFILE
1457 ctx.load_default_certs()
1458 stats["x509"] += 1
1459 self.assertEqual(ctx.cert_store_stats(), stats)
1460
Christian Heimes358cfd42016-09-10 22:43:48 +02001461 def _assert_context_options(self, ctx):
1462 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1463 if OP_NO_COMPRESSION != 0:
1464 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1465 OP_NO_COMPRESSION)
1466 if OP_SINGLE_DH_USE != 0:
1467 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1468 OP_SINGLE_DH_USE)
1469 if OP_SINGLE_ECDH_USE != 0:
1470 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1471 OP_SINGLE_ECDH_USE)
1472 if OP_CIPHER_SERVER_PREFERENCE != 0:
1473 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1474 OP_CIPHER_SERVER_PREFERENCE)
1475
Christian Heimes4c05b472013-11-23 15:58:30 +01001476 def test_create_default_context(self):
1477 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001478
Christian Heimesa170fa12017-09-15 20:27:30 +02001479 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001480 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001481 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001482 self._assert_context_options(ctx)
1483
Christian Heimes4c05b472013-11-23 15:58:30 +01001484 with open(SIGNING_CA) as f:
1485 cadata = f.read()
1486 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1487 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001488 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001489 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001490 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001491
1492 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001493 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001494 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001495 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001496
Christian Heimes67986f92013-11-23 22:43:47 +01001497 def test__create_stdlib_context(self):
1498 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001499 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001500 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001501 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001502 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001503
1504 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1505 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1506 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001507 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001508
1509 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001510 cert_reqs=ssl.CERT_REQUIRED,
1511 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001512 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1513 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001514 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001515 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001516
1517 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001518 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001519 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001520 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001521
Christian Heimes1aa9a752013-12-02 02:41:19 +01001522 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001523 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001524 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001525 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001526
Christian Heimese82c0342017-09-15 20:29:57 +02001527 # Auto set CERT_REQUIRED
1528 ctx.check_hostname = True
1529 self.assertTrue(ctx.check_hostname)
1530 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1531 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001532 ctx.verify_mode = ssl.CERT_REQUIRED
1533 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001534 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001535
Christian Heimese82c0342017-09-15 20:29:57 +02001536 # Changing verify_mode does not affect check_hostname
1537 ctx.check_hostname = False
1538 ctx.verify_mode = ssl.CERT_NONE
1539 ctx.check_hostname = False
1540 self.assertFalse(ctx.check_hostname)
1541 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1542 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001543 ctx.check_hostname = True
1544 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001545 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1546
1547 ctx.check_hostname = False
1548 ctx.verify_mode = ssl.CERT_OPTIONAL
1549 ctx.check_hostname = False
1550 self.assertFalse(ctx.check_hostname)
1551 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1552 # keep CERT_OPTIONAL
1553 ctx.check_hostname = True
1554 self.assertTrue(ctx.check_hostname)
1555 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001556
1557 # Cannot set CERT_NONE with check_hostname enabled
1558 with self.assertRaises(ValueError):
1559 ctx.verify_mode = ssl.CERT_NONE
1560 ctx.check_hostname = False
1561 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001562 ctx.verify_mode = ssl.CERT_NONE
1563 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001564
Christian Heimes5fe668c2016-09-12 00:01:11 +02001565 def test_context_client_server(self):
1566 # PROTOCOL_TLS_CLIENT has sane defaults
1567 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1568 self.assertTrue(ctx.check_hostname)
1569 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1570
1571 # PROTOCOL_TLS_SERVER has different but also sane defaults
1572 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1573 self.assertFalse(ctx.check_hostname)
1574 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1575
Christian Heimes4df60f12017-09-15 20:26:05 +02001576 def test_context_custom_class(self):
1577 class MySSLSocket(ssl.SSLSocket):
1578 pass
1579
1580 class MySSLObject(ssl.SSLObject):
1581 pass
1582
1583 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1584 ctx.sslsocket_class = MySSLSocket
1585 ctx.sslobject_class = MySSLObject
1586
1587 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1588 self.assertIsInstance(sock, MySSLSocket)
1589 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1590 self.assertIsInstance(obj, MySSLObject)
1591
Antoine Pitrou152efa22010-05-16 18:19:27 +00001592
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001593class SSLErrorTests(unittest.TestCase):
1594
1595 def test_str(self):
1596 # The str() of a SSLError doesn't include the errno
1597 e = ssl.SSLError(1, "foo")
1598 self.assertEqual(str(e), "foo")
1599 self.assertEqual(e.errno, 1)
1600 # Same for a subclass
1601 e = ssl.SSLZeroReturnError(1, "foo")
1602 self.assertEqual(str(e), "foo")
1603 self.assertEqual(e.errno, 1)
1604
1605 def test_lib_reason(self):
1606 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001607 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001608 with self.assertRaises(ssl.SSLError) as cm:
1609 ctx.load_dh_params(CERTFILE)
1610 self.assertEqual(cm.exception.library, 'PEM')
1611 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1612 s = str(cm.exception)
1613 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1614
1615 def test_subclass(self):
1616 # Check that the appropriate SSLError subclass is raised
1617 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001618 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1619 ctx.check_hostname = False
1620 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001621 with socket.socket() as s:
1622 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001623 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001624 c = socket.socket()
1625 c.connect(s.getsockname())
1626 c.setblocking(False)
1627 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001628 with self.assertRaises(ssl.SSLWantReadError) as cm:
1629 c.do_handshake()
1630 s = str(cm.exception)
1631 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1632 # For compatibility
1633 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1634
1635
Christian Heimes61d478c2018-01-27 15:51:38 +01001636 def test_bad_server_hostname(self):
1637 ctx = ssl.create_default_context()
1638 with self.assertRaises(ValueError):
1639 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1640 server_hostname="")
1641 with self.assertRaises(ValueError):
1642 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1643 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001644 with self.assertRaises(TypeError):
1645 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1646 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001647
1648
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001649class MemoryBIOTests(unittest.TestCase):
1650
1651 def test_read_write(self):
1652 bio = ssl.MemoryBIO()
1653 bio.write(b'foo')
1654 self.assertEqual(bio.read(), b'foo')
1655 self.assertEqual(bio.read(), b'')
1656 bio.write(b'foo')
1657 bio.write(b'bar')
1658 self.assertEqual(bio.read(), b'foobar')
1659 self.assertEqual(bio.read(), b'')
1660 bio.write(b'baz')
1661 self.assertEqual(bio.read(2), b'ba')
1662 self.assertEqual(bio.read(1), b'z')
1663 self.assertEqual(bio.read(1), b'')
1664
1665 def test_eof(self):
1666 bio = ssl.MemoryBIO()
1667 self.assertFalse(bio.eof)
1668 self.assertEqual(bio.read(), b'')
1669 self.assertFalse(bio.eof)
1670 bio.write(b'foo')
1671 self.assertFalse(bio.eof)
1672 bio.write_eof()
1673 self.assertFalse(bio.eof)
1674 self.assertEqual(bio.read(2), b'fo')
1675 self.assertFalse(bio.eof)
1676 self.assertEqual(bio.read(1), b'o')
1677 self.assertTrue(bio.eof)
1678 self.assertEqual(bio.read(), b'')
1679 self.assertTrue(bio.eof)
1680
1681 def test_pending(self):
1682 bio = ssl.MemoryBIO()
1683 self.assertEqual(bio.pending, 0)
1684 bio.write(b'foo')
1685 self.assertEqual(bio.pending, 3)
1686 for i in range(3):
1687 bio.read(1)
1688 self.assertEqual(bio.pending, 3-i-1)
1689 for i in range(3):
1690 bio.write(b'x')
1691 self.assertEqual(bio.pending, i+1)
1692 bio.read()
1693 self.assertEqual(bio.pending, 0)
1694
1695 def test_buffer_types(self):
1696 bio = ssl.MemoryBIO()
1697 bio.write(b'foo')
1698 self.assertEqual(bio.read(), b'foo')
1699 bio.write(bytearray(b'bar'))
1700 self.assertEqual(bio.read(), b'bar')
1701 bio.write(memoryview(b'baz'))
1702 self.assertEqual(bio.read(), b'baz')
1703
1704 def test_error_types(self):
1705 bio = ssl.MemoryBIO()
1706 self.assertRaises(TypeError, bio.write, 'foo')
1707 self.assertRaises(TypeError, bio.write, None)
1708 self.assertRaises(TypeError, bio.write, True)
1709 self.assertRaises(TypeError, bio.write, 1)
1710
1711
Christian Heimes9d50ab52018-02-27 10:17:30 +01001712class SSLObjectTests(unittest.TestCase):
1713 def test_private_init(self):
1714 bio = ssl.MemoryBIO()
1715 with self.assertRaisesRegex(TypeError, "public constructor"):
1716 ssl.SSLObject(bio, bio)
1717
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001718 def test_unwrap(self):
1719 client_ctx, server_ctx, hostname = testing_context()
1720 c_in = ssl.MemoryBIO()
1721 c_out = ssl.MemoryBIO()
1722 s_in = ssl.MemoryBIO()
1723 s_out = ssl.MemoryBIO()
1724 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1725 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1726
1727 # Loop on the handshake for a bit to get it settled
1728 for _ in range(5):
1729 try:
1730 client.do_handshake()
1731 except ssl.SSLWantReadError:
1732 pass
1733 if c_out.pending:
1734 s_in.write(c_out.read())
1735 try:
1736 server.do_handshake()
1737 except ssl.SSLWantReadError:
1738 pass
1739 if s_out.pending:
1740 c_in.write(s_out.read())
1741 # Now the handshakes should be complete (don't raise WantReadError)
1742 client.do_handshake()
1743 server.do_handshake()
1744
1745 # Now if we unwrap one side unilaterally, it should send close-notify
1746 # and raise WantReadError:
1747 with self.assertRaises(ssl.SSLWantReadError):
1748 client.unwrap()
1749
1750 # But server.unwrap() does not raise, because it reads the client's
1751 # close-notify:
1752 s_in.write(c_out.read())
1753 server.unwrap()
1754
1755 # And now that the client gets the server's close-notify, it doesn't
1756 # raise either.
1757 c_in.write(s_out.read())
1758 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001759
Martin Panter3840b2a2016-03-27 01:53:46 +00001760class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001761 """Tests that connect to a simple server running in the background"""
1762
1763 def setUp(self):
1764 server = ThreadedEchoServer(SIGNED_CERTFILE)
1765 self.server_addr = (HOST, server.port)
1766 server.__enter__()
1767 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001768
Antoine Pitrou480a1242010-04-28 21:37:09 +00001769 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001770 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001771 cert_reqs=ssl.CERT_NONE) as s:
1772 s.connect(self.server_addr)
1773 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001774 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001775
Martin Panter3840b2a2016-03-27 01:53:46 +00001776 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001777 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001778 cert_reqs=ssl.CERT_REQUIRED,
1779 ca_certs=SIGNING_CA) as s:
1780 s.connect(self.server_addr)
1781 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001782 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001783
Martin Panter3840b2a2016-03-27 01:53:46 +00001784 def test_connect_fail(self):
1785 # This should fail because we have no verification certs. Connection
1786 # failure crashes ThreadedEchoServer, so run this in an independent
1787 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001788 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001789 cert_reqs=ssl.CERT_REQUIRED)
1790 self.addCleanup(s.close)
1791 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1792 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001793
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001794 def test_connect_ex(self):
1795 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001796 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001797 cert_reqs=ssl.CERT_REQUIRED,
1798 ca_certs=SIGNING_CA)
1799 self.addCleanup(s.close)
1800 self.assertEqual(0, s.connect_ex(self.server_addr))
1801 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001802
1803 def test_non_blocking_connect_ex(self):
1804 # Issue #11326: non-blocking connect_ex() should allow handshake
1805 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001806 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001807 cert_reqs=ssl.CERT_REQUIRED,
1808 ca_certs=SIGNING_CA,
1809 do_handshake_on_connect=False)
1810 self.addCleanup(s.close)
1811 s.setblocking(False)
1812 rc = s.connect_ex(self.server_addr)
1813 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1814 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1815 # Wait for connect to finish
1816 select.select([], [s], [], 5.0)
1817 # Non-blocking handshake
1818 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001819 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001820 s.do_handshake()
1821 break
1822 except ssl.SSLWantReadError:
1823 select.select([s], [], [], 5.0)
1824 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001825 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001826 # SSL established
1827 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001828
Antoine Pitrou152efa22010-05-16 18:19:27 +00001829 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001830 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001831 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001832 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1833 s.connect(self.server_addr)
1834 self.assertEqual({}, s.getpeercert())
1835 # Same with a server hostname
1836 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1837 server_hostname="dummy") as s:
1838 s.connect(self.server_addr)
1839 ctx.verify_mode = ssl.CERT_REQUIRED
1840 # This should succeed because we specify the root cert
1841 ctx.load_verify_locations(SIGNING_CA)
1842 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1843 s.connect(self.server_addr)
1844 cert = s.getpeercert()
1845 self.assertTrue(cert)
1846
1847 def test_connect_with_context_fail(self):
1848 # This should fail because we have no verification certs. Connection
1849 # failure crashes ThreadedEchoServer, so run this in an independent
1850 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001851 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001852 ctx.verify_mode = ssl.CERT_REQUIRED
1853 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1854 self.addCleanup(s.close)
1855 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1856 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001857
1858 def test_connect_capath(self):
1859 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001860 # NOTE: the subject hashing algorithm has been changed between
1861 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1862 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001863 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001864 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001865 ctx.verify_mode = ssl.CERT_REQUIRED
1866 ctx.load_verify_locations(capath=CAPATH)
1867 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1868 s.connect(self.server_addr)
1869 cert = s.getpeercert()
1870 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001871
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001873 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001874 ctx.verify_mode = ssl.CERT_REQUIRED
1875 ctx.load_verify_locations(capath=BYTES_CAPATH)
1876 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1877 s.connect(self.server_addr)
1878 cert = s.getpeercert()
1879 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001880
Christian Heimesefff7062013-11-21 03:35:02 +01001881 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001882 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001883 pem = f.read()
1884 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001885 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001886 ctx.verify_mode = ssl.CERT_REQUIRED
1887 ctx.load_verify_locations(cadata=pem)
1888 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1889 s.connect(self.server_addr)
1890 cert = s.getpeercert()
1891 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001892
Martin Panter3840b2a2016-03-27 01:53:46 +00001893 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001894 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001895 ctx.verify_mode = ssl.CERT_REQUIRED
1896 ctx.load_verify_locations(cadata=der)
1897 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1898 s.connect(self.server_addr)
1899 cert = s.getpeercert()
1900 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001901
Antoine Pitroue3220242010-04-24 11:13:53 +00001902 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1903 def test_makefile_close(self):
1904 # Issue #5238: creating a file-like object with makefile() shouldn't
1905 # delay closing the underlying "real socket" (here tested with its
1906 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001907 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001908 ss.connect(self.server_addr)
1909 fd = ss.fileno()
1910 f = ss.makefile()
1911 f.close()
1912 # The fd is still open
1913 os.read(fd, 0)
1914 # Closing the SSL socket should close the fd too
1915 ss.close()
1916 gc.collect()
1917 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001918 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001919 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001920
Antoine Pitrou480a1242010-04-28 21:37:09 +00001921 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001922 s = socket.socket(socket.AF_INET)
1923 s.connect(self.server_addr)
1924 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001925 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001926 cert_reqs=ssl.CERT_NONE,
1927 do_handshake_on_connect=False)
1928 self.addCleanup(s.close)
1929 count = 0
1930 while True:
1931 try:
1932 count += 1
1933 s.do_handshake()
1934 break
1935 except ssl.SSLWantReadError:
1936 select.select([s], [], [])
1937 except ssl.SSLWantWriteError:
1938 select.select([], [s], [])
1939 if support.verbose:
1940 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001941
Antoine Pitrou480a1242010-04-28 21:37:09 +00001942 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001943 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001944
Martin Panter3840b2a2016-03-27 01:53:46 +00001945 def test_get_server_certificate_fail(self):
1946 # Connection failure crashes ThreadedEchoServer, so run this in an
1947 # independent test method
1948 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001949
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001950 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001951 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001952 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1953 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001954 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001955 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1956 s.connect(self.server_addr)
1957 # Error checking can happen at instantiation or when connecting
1958 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1959 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001960 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001961 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1962 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001963
Christian Heimes9a5395a2013-06-17 15:44:12 +02001964 def test_get_ca_certs_capath(self):
1965 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001966 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001967 ctx.load_verify_locations(capath=CAPATH)
1968 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001969 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1970 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001971 s.connect(self.server_addr)
1972 cert = s.getpeercert()
1973 self.assertTrue(cert)
1974 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001975
Christian Heimes575596e2013-12-15 21:49:17 +01001976 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001977 def test_context_setget(self):
1978 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001979 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1980 ctx1.load_verify_locations(capath=CAPATH)
1981 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1982 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001983 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001984 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001985 ss.connect(self.server_addr)
1986 self.assertIs(ss.context, ctx1)
1987 self.assertIs(ss._sslobj.context, ctx1)
1988 ss.context = ctx2
1989 self.assertIs(ss.context, ctx2)
1990 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001991
1992 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1993 # A simple IO loop. Call func(*args) depending on the error we get
1994 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1995 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001996 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001997 count = 0
1998 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001999 if time.monotonic() > deadline:
2000 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002001 errno = None
2002 count += 1
2003 try:
2004 ret = func(*args)
2005 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002006 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002007 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002008 raise
2009 errno = e.errno
2010 # Get any data from the outgoing BIO irrespective of any error, and
2011 # send it to the socket.
2012 buf = outgoing.read()
2013 sock.sendall(buf)
2014 # If there's no error, we're done. For WANT_READ, we need to get
2015 # data from the socket and put it in the incoming BIO.
2016 if errno is None:
2017 break
2018 elif errno == ssl.SSL_ERROR_WANT_READ:
2019 buf = sock.recv(32768)
2020 if buf:
2021 incoming.write(buf)
2022 else:
2023 incoming.write_eof()
2024 if support.verbose:
2025 sys.stdout.write("Needed %d calls to complete %s().\n"
2026 % (count, func.__name__))
2027 return ret
2028
Martin Panter3840b2a2016-03-27 01:53:46 +00002029 def test_bio_handshake(self):
2030 sock = socket.socket(socket.AF_INET)
2031 self.addCleanup(sock.close)
2032 sock.connect(self.server_addr)
2033 incoming = ssl.MemoryBIO()
2034 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002035 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2036 self.assertTrue(ctx.check_hostname)
2037 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002038 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002039 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2040 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002041 self.assertIs(sslobj._sslobj.owner, sslobj)
2042 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002043 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002044 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002045 self.assertRaises(ValueError, sslobj.getpeercert)
2046 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2047 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2048 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2049 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002050 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002051 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002052 self.assertTrue(sslobj.getpeercert())
2053 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2054 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2055 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002056 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002057 except ssl.SSLSyscallError:
2058 # If the server shuts down the TCP connection without sending a
2059 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2060 pass
2061 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2062
2063 def test_bio_read_write_data(self):
2064 sock = socket.socket(socket.AF_INET)
2065 self.addCleanup(sock.close)
2066 sock.connect(self.server_addr)
2067 incoming = ssl.MemoryBIO()
2068 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002069 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002070 ctx.verify_mode = ssl.CERT_NONE
2071 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2072 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2073 req = b'FOO\n'
2074 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2075 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2076 self.assertEqual(buf, b'foo\n')
2077 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002078
2079
Martin Panter3840b2a2016-03-27 01:53:46 +00002080class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002081
Martin Panter3840b2a2016-03-27 01:53:46 +00002082 def test_timeout_connect_ex(self):
2083 # Issue #12065: on a timeout, connect_ex() should return the original
2084 # errno (mimicking the behaviour of non-SSL sockets).
2085 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002086 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002087 cert_reqs=ssl.CERT_REQUIRED,
2088 do_handshake_on_connect=False)
2089 self.addCleanup(s.close)
2090 s.settimeout(0.0000001)
2091 rc = s.connect_ex((REMOTE_HOST, 443))
2092 if rc == 0:
2093 self.skipTest("REMOTE_HOST responded too quickly")
2094 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2095
2096 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2097 def test_get_server_certificate_ipv6(self):
2098 with support.transient_internet('ipv6.google.com'):
2099 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2100 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2101
Martin Panter3840b2a2016-03-27 01:53:46 +00002102
2103def _test_get_server_certificate(test, host, port, cert=None):
2104 pem = ssl.get_server_certificate((host, port))
2105 if not pem:
2106 test.fail("No server certificate on %s:%s!" % (host, port))
2107
2108 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2109 if not pem:
2110 test.fail("No server certificate on %s:%s!" % (host, port))
2111 if support.verbose:
2112 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2113
2114def _test_get_server_certificate_fail(test, host, port):
2115 try:
2116 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2117 except ssl.SSLError as x:
2118 #should fail
2119 if support.verbose:
2120 sys.stdout.write("%s\n" % x)
2121 else:
2122 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2123
2124
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002125from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002126
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002127class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002128
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002129 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002130
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002131 """A mildly complicated class, because we want it to work both
2132 with and without the SSL wrapper around the socket connection, so
2133 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002134
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002135 def __init__(self, server, connsock, addr):
2136 self.server = server
2137 self.running = False
2138 self.sock = connsock
2139 self.addr = addr
2140 self.sock.setblocking(1)
2141 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002142 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002143 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002144
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002145 def wrap_conn(self):
2146 try:
2147 self.sslconn = self.server.context.wrap_socket(
2148 self.sock, server_side=True)
2149 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2150 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Christian Heimes529525f2018-05-23 22:24:45 +02002151 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002152 # We treat ConnectionResetError as though it were an
2153 # SSLError - OpenSSL on Ubuntu abruptly closes the
2154 # connection when asked to use an unsupported protocol.
2155 #
Christian Heimes529525f2018-05-23 22:24:45 +02002156 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2157 # tries to send session tickets after handshake.
2158 # https://github.com/openssl/openssl/issues/6342
2159 self.server.conn_errors.append(str(e))
2160 if self.server.chatty:
2161 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2162 self.running = False
2163 self.close()
2164 return False
2165 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002166 # OSError may occur with wrong protocols, e.g. both
2167 # sides use PROTOCOL_TLS_SERVER.
2168 #
2169 # XXX Various errors can have happened here, for example
2170 # a mismatching protocol version, an invalid certificate,
2171 # or a low-level bug. This should be made more discriminating.
2172 #
2173 # bpo-31323: Store the exception as string to prevent
2174 # a reference leak: server -> conn_errors -> exception
2175 # -> traceback -> self (ConnectionHandler) -> server
2176 self.server.conn_errors.append(str(e))
2177 if self.server.chatty:
2178 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2179 self.running = False
2180 self.server.stop()
2181 self.close()
2182 return False
2183 else:
2184 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2185 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2186 cert = self.sslconn.getpeercert()
2187 if support.verbose and self.server.chatty:
2188 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2189 cert_binary = self.sslconn.getpeercert(True)
2190 if support.verbose and self.server.chatty:
2191 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2192 cipher = self.sslconn.cipher()
2193 if support.verbose and self.server.chatty:
2194 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2195 sys.stdout.write(" server: selected protocol is now "
2196 + str(self.sslconn.selected_npn_protocol()) + "\n")
2197 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002198
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002199 def read(self):
2200 if self.sslconn:
2201 return self.sslconn.read()
2202 else:
2203 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002204
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002205 def write(self, bytes):
2206 if self.sslconn:
2207 return self.sslconn.write(bytes)
2208 else:
2209 return self.sock.send(bytes)
2210
2211 def close(self):
2212 if self.sslconn:
2213 self.sslconn.close()
2214 else:
2215 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002216
Antoine Pitrou480a1242010-04-28 21:37:09 +00002217 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002218 self.running = True
2219 if not self.server.starttls_server:
2220 if not self.wrap_conn():
2221 return
2222 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002223 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002224 msg = self.read()
2225 stripped = msg.strip()
2226 if not stripped:
2227 # eof, so quit this handler
2228 self.running = False
2229 try:
2230 self.sock = self.sslconn.unwrap()
2231 except OSError:
2232 # Many tests shut the TCP connection down
2233 # without an SSL shutdown. This causes
2234 # unwrap() to raise OSError with errno=0!
2235 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002236 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002237 self.sslconn = None
2238 self.close()
2239 elif stripped == b'over':
2240 if support.verbose and self.server.connectionchatty:
2241 sys.stdout.write(" server: client closed connection\n")
2242 self.close()
2243 return
2244 elif (self.server.starttls_server and
2245 stripped == b'STARTTLS'):
2246 if support.verbose and self.server.connectionchatty:
2247 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2248 self.write(b"OK\n")
2249 if not self.wrap_conn():
2250 return
2251 elif (self.server.starttls_server and self.sslconn
2252 and stripped == b'ENDTLS'):
2253 if support.verbose and self.server.connectionchatty:
2254 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2255 self.write(b"OK\n")
2256 self.sock = self.sslconn.unwrap()
2257 self.sslconn = None
2258 if support.verbose and self.server.connectionchatty:
2259 sys.stdout.write(" server: connection is now unencrypted...\n")
2260 elif stripped == b'CB tls-unique':
2261 if support.verbose and self.server.connectionchatty:
2262 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2263 data = self.sslconn.get_channel_binding("tls-unique")
2264 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002265 elif stripped == b'PHA':
2266 if support.verbose and self.server.connectionchatty:
2267 sys.stdout.write(" server: initiating post handshake auth\n")
2268 try:
2269 self.sslconn.verify_client_post_handshake()
2270 except ssl.SSLError as e:
2271 self.write(repr(e).encode("us-ascii") + b"\n")
2272 else:
2273 self.write(b"OK\n")
2274 elif stripped == b'HASCERT':
2275 if self.sslconn.getpeercert() is not None:
2276 self.write(b'TRUE\n')
2277 else:
2278 self.write(b'FALSE\n')
2279 elif stripped == b'GETCERT':
2280 cert = self.sslconn.getpeercert()
2281 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002282 else:
2283 if (support.verbose and
2284 self.server.connectionchatty):
2285 ctype = (self.sslconn and "encrypted") or "unencrypted"
2286 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2287 % (msg, ctype, msg.lower(), ctype))
2288 self.write(msg.lower())
Christian Heimes529525f2018-05-23 22:24:45 +02002289 except ConnectionResetError:
2290 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2291 # when connection is not shut down gracefully.
2292 if self.server.chatty and support.verbose:
2293 sys.stdout.write(
2294 " Connection reset by peer: {}\n".format(
2295 self.addr)
2296 )
2297 self.close()
2298 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002299 except OSError:
2300 if self.server.chatty:
2301 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002302 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002303 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002304
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002305 # normally, we'd just stop here, but for the test
2306 # harness, we want to stop the server
2307 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002308
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002309 def __init__(self, certificate=None, ssl_version=None,
2310 certreqs=None, cacerts=None,
2311 chatty=True, connectionchatty=False, starttls_server=False,
2312 npn_protocols=None, alpn_protocols=None,
2313 ciphers=None, context=None):
2314 if context:
2315 self.context = context
2316 else:
2317 self.context = ssl.SSLContext(ssl_version
2318 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002319 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002320 self.context.verify_mode = (certreqs if certreqs is not None
2321 else ssl.CERT_NONE)
2322 if cacerts:
2323 self.context.load_verify_locations(cacerts)
2324 if certificate:
2325 self.context.load_cert_chain(certificate)
2326 if npn_protocols:
2327 self.context.set_npn_protocols(npn_protocols)
2328 if alpn_protocols:
2329 self.context.set_alpn_protocols(alpn_protocols)
2330 if ciphers:
2331 self.context.set_ciphers(ciphers)
2332 self.chatty = chatty
2333 self.connectionchatty = connectionchatty
2334 self.starttls_server = starttls_server
2335 self.sock = socket.socket()
2336 self.port = support.bind_port(self.sock)
2337 self.flag = None
2338 self.active = False
2339 self.selected_npn_protocols = []
2340 self.selected_alpn_protocols = []
2341 self.shared_ciphers = []
2342 self.conn_errors = []
2343 threading.Thread.__init__(self)
2344 self.daemon = True
2345
2346 def __enter__(self):
2347 self.start(threading.Event())
2348 self.flag.wait()
2349 return self
2350
2351 def __exit__(self, *args):
2352 self.stop()
2353 self.join()
2354
2355 def start(self, flag=None):
2356 self.flag = flag
2357 threading.Thread.start(self)
2358
2359 def run(self):
2360 self.sock.settimeout(0.05)
2361 self.sock.listen()
2362 self.active = True
2363 if self.flag:
2364 # signal an event
2365 self.flag.set()
2366 while self.active:
2367 try:
2368 newconn, connaddr = self.sock.accept()
2369 if support.verbose and self.chatty:
2370 sys.stdout.write(' server: new connection from '
2371 + repr(connaddr) + '\n')
2372 handler = self.ConnectionHandler(self, newconn, connaddr)
2373 handler.start()
2374 handler.join()
2375 except socket.timeout:
2376 pass
2377 except KeyboardInterrupt:
2378 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002379 except BaseException as e:
2380 if support.verbose and self.chatty:
2381 sys.stdout.write(
2382 ' connection handling failed: ' + repr(e) + '\n')
2383
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002384 self.sock.close()
2385
2386 def stop(self):
2387 self.active = False
2388
2389class AsyncoreEchoServer(threading.Thread):
2390
2391 # this one's based on asyncore.dispatcher
2392
2393 class EchoServer (asyncore.dispatcher):
2394
2395 class ConnectionHandler(asyncore.dispatcher_with_send):
2396
2397 def __init__(self, conn, certfile):
2398 self.socket = test_wrap_socket(conn, server_side=True,
2399 certfile=certfile,
2400 do_handshake_on_connect=False)
2401 asyncore.dispatcher_with_send.__init__(self, self.socket)
2402 self._ssl_accepting = True
2403 self._do_ssl_handshake()
2404
2405 def readable(self):
2406 if isinstance(self.socket, ssl.SSLSocket):
2407 while self.socket.pending() > 0:
2408 self.handle_read_event()
2409 return True
2410
2411 def _do_ssl_handshake(self):
2412 try:
2413 self.socket.do_handshake()
2414 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2415 return
2416 except ssl.SSLEOFError:
2417 return self.handle_close()
2418 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002419 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002420 except OSError as err:
2421 if err.args[0] == errno.ECONNABORTED:
2422 return self.handle_close()
2423 else:
2424 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002425
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002426 def handle_read(self):
2427 if self._ssl_accepting:
2428 self._do_ssl_handshake()
2429 else:
2430 data = self.recv(1024)
2431 if support.verbose:
2432 sys.stdout.write(" server: read %s from client\n" % repr(data))
2433 if not data:
2434 self.close()
2435 else:
2436 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002437
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002438 def handle_close(self):
2439 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002440 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002442
2443 def handle_error(self):
2444 raise
2445
Trent Nelson78520002008-04-10 20:54:35 +00002446 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002447 self.certfile = certfile
2448 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2449 self.port = support.bind_port(sock, '')
2450 asyncore.dispatcher.__init__(self, sock)
2451 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002452
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002453 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002454 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002455 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2456 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002457
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002458 def handle_error(self):
2459 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002460
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002461 def __init__(self, certfile):
2462 self.flag = None
2463 self.active = False
2464 self.server = self.EchoServer(certfile)
2465 self.port = self.server.port
2466 threading.Thread.__init__(self)
2467 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002468
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002469 def __str__(self):
2470 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002471
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002472 def __enter__(self):
2473 self.start(threading.Event())
2474 self.flag.wait()
2475 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002476
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002477 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002478 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002479 sys.stdout.write(" cleanup: stopping server.\n")
2480 self.stop()
2481 if support.verbose:
2482 sys.stdout.write(" cleanup: joining server thread.\n")
2483 self.join()
2484 if support.verbose:
2485 sys.stdout.write(" cleanup: successfully joined.\n")
2486 # make sure that ConnectionHandler is removed from socket_map
2487 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002488
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002489 def start (self, flag=None):
2490 self.flag = flag
2491 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002492
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002493 def run(self):
2494 self.active = True
2495 if self.flag:
2496 self.flag.set()
2497 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002498 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002499 asyncore.loop(1)
2500 except:
2501 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002502
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002503 def stop(self):
2504 self.active = False
2505 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002506
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002507def server_params_test(client_context, server_context, indata=b"FOO\n",
2508 chatty=True, connectionchatty=False, sni_name=None,
2509 session=None):
2510 """
2511 Launch a server, connect a client to it and try various reads
2512 and writes.
2513 """
2514 stats = {}
2515 server = ThreadedEchoServer(context=server_context,
2516 chatty=chatty,
2517 connectionchatty=False)
2518 with server:
2519 with client_context.wrap_socket(socket.socket(),
2520 server_hostname=sni_name, session=session) as s:
2521 s.connect((HOST, server.port))
2522 for arg in [indata, bytearray(indata), memoryview(indata)]:
2523 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002524 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002525 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002526 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002528 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002529 if connectionchatty:
2530 if support.verbose:
2531 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002532 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002533 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002534 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2535 % (outdata[:20], len(outdata),
2536 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002537 s.write(b"over\n")
2538 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002539 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002540 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002541 stats.update({
2542 'compression': s.compression(),
2543 'cipher': s.cipher(),
2544 'peercert': s.getpeercert(),
2545 'client_alpn_protocol': s.selected_alpn_protocol(),
2546 'client_npn_protocol': s.selected_npn_protocol(),
2547 'version': s.version(),
2548 'session_reused': s.session_reused,
2549 'session': s.session,
2550 })
2551 s.close()
2552 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2553 stats['server_npn_protocols'] = server.selected_npn_protocols
2554 stats['server_shared_ciphers'] = server.shared_ciphers
2555 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002556
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557def try_protocol_combo(server_protocol, client_protocol, expect_success,
2558 certsreqs=None, server_options=0, client_options=0):
2559 """
2560 Try to SSL-connect using *client_protocol* to *server_protocol*.
2561 If *expect_success* is true, assert that the connection succeeds,
2562 if it's false, assert that the connection fails.
2563 Also, if *expect_success* is a string, assert that it is the protocol
2564 version actually used by the connection.
2565 """
2566 if certsreqs is None:
2567 certsreqs = ssl.CERT_NONE
2568 certtype = {
2569 ssl.CERT_NONE: "CERT_NONE",
2570 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2571 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2572 }[certsreqs]
2573 if support.verbose:
2574 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2575 sys.stdout.write(formatstr %
2576 (ssl.get_protocol_name(client_protocol),
2577 ssl.get_protocol_name(server_protocol),
2578 certtype))
2579 client_context = ssl.SSLContext(client_protocol)
2580 client_context.options |= client_options
2581 server_context = ssl.SSLContext(server_protocol)
2582 server_context.options |= server_options
2583
2584 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2585 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2586 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002587 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002588 client_context.set_ciphers("ALL")
2589
2590 for ctx in (client_context, server_context):
2591 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002592 ctx.load_cert_chain(SIGNED_CERTFILE)
2593 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002594 try:
2595 stats = server_params_test(client_context, server_context,
2596 chatty=False, connectionchatty=False)
2597 # Protocol mismatch can result in either an SSLError, or a
2598 # "Connection reset by peer" error.
2599 except ssl.SSLError:
2600 if expect_success:
2601 raise
2602 except OSError as e:
2603 if expect_success or e.errno != errno.ECONNRESET:
2604 raise
2605 else:
2606 if not expect_success:
2607 raise AssertionError(
2608 "Client protocol %s succeeded with server protocol %s!"
2609 % (ssl.get_protocol_name(client_protocol),
2610 ssl.get_protocol_name(server_protocol)))
2611 elif (expect_success is not True
2612 and expect_success != stats['version']):
2613 raise AssertionError("version mismatch: expected %r, got %r"
2614 % (expect_success, stats['version']))
2615
2616
2617class ThreadedTests(unittest.TestCase):
2618
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002619 def test_echo(self):
2620 """Basic test of an SSL client connecting to a server"""
2621 if support.verbose:
2622 sys.stdout.write("\n")
2623 for protocol in PROTOCOLS:
2624 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2625 continue
2626 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2627 context = ssl.SSLContext(protocol)
2628 context.load_cert_chain(CERTFILE)
2629 server_params_test(context, context,
2630 chatty=True, connectionchatty=True)
2631
Christian Heimesa170fa12017-09-15 20:27:30 +02002632 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002633
2634 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2635 server_params_test(client_context=client_context,
2636 server_context=server_context,
2637 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002638 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002639
2640 client_context.check_hostname = False
2641 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2642 with self.assertRaises(ssl.SSLError) as e:
2643 server_params_test(client_context=server_context,
2644 server_context=client_context,
2645 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002646 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002647 self.assertIn('called a function you should not call',
2648 str(e.exception))
2649
2650 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2651 with self.assertRaises(ssl.SSLError) as e:
2652 server_params_test(client_context=server_context,
2653 server_context=server_context,
2654 chatty=True, connectionchatty=True)
2655 self.assertIn('called a function you should not call',
2656 str(e.exception))
2657
2658 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2659 with self.assertRaises(ssl.SSLError) as e:
2660 server_params_test(client_context=server_context,
2661 server_context=client_context,
2662 chatty=True, connectionchatty=True)
2663 self.assertIn('called a function you should not call',
2664 str(e.exception))
2665
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002666 def test_getpeercert(self):
2667 if support.verbose:
2668 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002669
2670 client_context, server_context, hostname = testing_context()
2671 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002672 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002673 with client_context.wrap_socket(socket.socket(),
2674 do_handshake_on_connect=False,
2675 server_hostname=hostname) as s:
2676 s.connect((HOST, server.port))
2677 # getpeercert() raise ValueError while the handshake isn't
2678 # done.
2679 with self.assertRaises(ValueError):
2680 s.getpeercert()
2681 s.do_handshake()
2682 cert = s.getpeercert()
2683 self.assertTrue(cert, "Can't get peer certificate.")
2684 cipher = s.cipher()
2685 if support.verbose:
2686 sys.stdout.write(pprint.pformat(cert) + '\n')
2687 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2688 if 'subject' not in cert:
2689 self.fail("No subject field in certificate: %s." %
2690 pprint.pformat(cert))
2691 if ((('organizationName', 'Python Software Foundation'),)
2692 not in cert['subject']):
2693 self.fail(
2694 "Missing or invalid 'organizationName' field in certificate subject; "
2695 "should be 'Python Software Foundation'.")
2696 self.assertIn('notBefore', cert)
2697 self.assertIn('notAfter', cert)
2698 before = ssl.cert_time_to_seconds(cert['notBefore'])
2699 after = ssl.cert_time_to_seconds(cert['notAfter'])
2700 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002701
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002702 @unittest.skipUnless(have_verify_flags(),
2703 "verify_flags need OpenSSL > 0.9.8")
2704 def test_crl_check(self):
2705 if support.verbose:
2706 sys.stdout.write("\n")
2707
Christian Heimesa170fa12017-09-15 20:27:30 +02002708 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002709
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002710 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002711 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002712
2713 # VERIFY_DEFAULT should pass
2714 server = ThreadedEchoServer(context=server_context, chatty=True)
2715 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002716 with client_context.wrap_socket(socket.socket(),
2717 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002718 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002719 cert = s.getpeercert()
2720 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002721
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002722 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002723 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002724
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002725 server = ThreadedEchoServer(context=server_context, chatty=True)
2726 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002727 with client_context.wrap_socket(socket.socket(),
2728 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002729 with self.assertRaisesRegex(ssl.SSLError,
2730 "certificate verify failed"):
2731 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002732
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002733 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002734 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002735
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002736 server = ThreadedEchoServer(context=server_context, chatty=True)
2737 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002738 with client_context.wrap_socket(socket.socket(),
2739 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002740 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002741 cert = s.getpeercert()
2742 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002743
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002744 def test_check_hostname(self):
2745 if support.verbose:
2746 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002747
Christian Heimesa170fa12017-09-15 20:27:30 +02002748 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002749
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002750 # correct hostname should verify
2751 server = ThreadedEchoServer(context=server_context, chatty=True)
2752 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002753 with client_context.wrap_socket(socket.socket(),
2754 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002755 s.connect((HOST, server.port))
2756 cert = s.getpeercert()
2757 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002758
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002759 # incorrect hostname should raise an exception
2760 server = ThreadedEchoServer(context=server_context, chatty=True)
2761 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002762 with client_context.wrap_socket(socket.socket(),
2763 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002764 with self.assertRaisesRegex(
2765 ssl.CertificateError,
2766 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002767 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002768
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002769 # missing server_hostname arg should cause an exception, too
2770 server = ThreadedEchoServer(context=server_context, chatty=True)
2771 with server:
2772 with socket.socket() as s:
2773 with self.assertRaisesRegex(ValueError,
2774 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002775 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002776
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002777 def test_ecc_cert(self):
2778 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2779 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002780 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002781 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2782
2783 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2784 # load ECC cert
2785 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2786
2787 # correct hostname should verify
2788 server = ThreadedEchoServer(context=server_context, chatty=True)
2789 with server:
2790 with client_context.wrap_socket(socket.socket(),
2791 server_hostname=hostname) as s:
2792 s.connect((HOST, server.port))
2793 cert = s.getpeercert()
2794 self.assertTrue(cert, "Can't get peer certificate.")
2795 cipher = s.cipher()[0].split('-')
2796 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2797
2798 def test_dual_rsa_ecc(self):
2799 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2800 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002801 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2802 # algorithms.
2803 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002804 # only ECDSA certs
2805 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2806 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2807
2808 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2809 # load ECC and RSA key/cert pairs
2810 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2811 server_context.load_cert_chain(SIGNED_CERTFILE)
2812
2813 # correct hostname should verify
2814 server = ThreadedEchoServer(context=server_context, chatty=True)
2815 with server:
2816 with client_context.wrap_socket(socket.socket(),
2817 server_hostname=hostname) as s:
2818 s.connect((HOST, server.port))
2819 cert = s.getpeercert()
2820 self.assertTrue(cert, "Can't get peer certificate.")
2821 cipher = s.cipher()[0].split('-')
2822 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2823
Christian Heimes66e57422018-01-29 14:25:13 +01002824 def test_check_hostname_idn(self):
2825 if support.verbose:
2826 sys.stdout.write("\n")
2827
Christian Heimes11a14932018-02-24 02:35:08 +01002828 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002829 server_context.load_cert_chain(IDNSANSFILE)
2830
Christian Heimes11a14932018-02-24 02:35:08 +01002831 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002832 context.verify_mode = ssl.CERT_REQUIRED
2833 context.check_hostname = True
2834 context.load_verify_locations(SIGNING_CA)
2835
2836 # correct hostname should verify, when specified in several
2837 # different ways
2838 idn_hostnames = [
2839 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002840 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002841 ('xn--knig-5qa.idn.pythontest.net',
2842 'xn--knig-5qa.idn.pythontest.net'),
2843 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002844 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002845
2846 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002847 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002848 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2849 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2850 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002851 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2852
2853 # ('königsgäßchen.idna2008.pythontest.net',
2854 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2855 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2856 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2857 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2858 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2859
Christian Heimes66e57422018-01-29 14:25:13 +01002860 ]
2861 for server_hostname, expected_hostname in idn_hostnames:
2862 server = ThreadedEchoServer(context=server_context, chatty=True)
2863 with server:
2864 with context.wrap_socket(socket.socket(),
2865 server_hostname=server_hostname) as s:
2866 self.assertEqual(s.server_hostname, expected_hostname)
2867 s.connect((HOST, server.port))
2868 cert = s.getpeercert()
2869 self.assertEqual(s.server_hostname, expected_hostname)
2870 self.assertTrue(cert, "Can't get peer certificate.")
2871
Christian Heimes66e57422018-01-29 14:25:13 +01002872 # incorrect hostname should raise an exception
2873 server = ThreadedEchoServer(context=server_context, chatty=True)
2874 with server:
2875 with context.wrap_socket(socket.socket(),
2876 server_hostname="python.example.org") as s:
2877 with self.assertRaises(ssl.CertificateError):
2878 s.connect((HOST, server.port))
2879
Christian Heimes529525f2018-05-23 22:24:45 +02002880 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002881 """Connecting when the server rejects the client's certificate
2882
2883 Launch a server with CERT_REQUIRED, and check that trying to
2884 connect to it with a wrong client certificate fails.
2885 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002886 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002887 # load client cert that is not signed by trusted CA
2888 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002889 # require TLS client authentication
2890 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002891 # TLS 1.3 has different handshake
2892 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002893
2894 server = ThreadedEchoServer(
2895 context=server_context, chatty=True, connectionchatty=True,
2896 )
2897
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002898 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002899 client_context.wrap_socket(socket.socket(),
2900 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002901 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002902 # Expect either an SSL error about the server rejecting
2903 # the connection, or a low-level connection reset (which
2904 # sometimes happens on Windows)
2905 s.connect((HOST, server.port))
2906 except ssl.SSLError as e:
2907 if support.verbose:
2908 sys.stdout.write("\nSSLError is %r\n" % e)
2909 except OSError as e:
2910 if e.errno != errno.ECONNRESET:
2911 raise
2912 if support.verbose:
2913 sys.stdout.write("\nsocket.error is %r\n" % e)
2914 else:
2915 self.fail("Use of invalid cert should have failed!")
2916
Christian Heimes529525f2018-05-23 22:24:45 +02002917 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2918 def test_wrong_cert_tls13(self):
2919 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002920 # load client cert that is not signed by trusted CA
2921 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02002922 server_context.verify_mode = ssl.CERT_REQUIRED
2923 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2924 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2925
2926 server = ThreadedEchoServer(
2927 context=server_context, chatty=True, connectionchatty=True,
2928 )
2929 with server, \
2930 client_context.wrap_socket(socket.socket(),
2931 server_hostname=hostname) as s:
2932 # TLS 1.3 perform client cert exchange after handshake
2933 s.connect((HOST, server.port))
2934 try:
2935 s.write(b'data')
2936 s.read(4)
2937 except ssl.SSLError as e:
2938 if support.verbose:
2939 sys.stdout.write("\nSSLError is %r\n" % e)
2940 except OSError as e:
2941 if e.errno != errno.ECONNRESET:
2942 raise
2943 if support.verbose:
2944 sys.stdout.write("\nsocket.error is %r\n" % e)
2945 else:
2946 self.fail("Use of invalid cert should have failed!")
2947
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002948 def test_rude_shutdown(self):
2949 """A brutal shutdown of an SSL server should raise an OSError
2950 in the client when attempting handshake.
2951 """
2952 listener_ready = threading.Event()
2953 listener_gone = threading.Event()
2954
2955 s = socket.socket()
2956 port = support.bind_port(s, HOST)
2957
2958 # `listener` runs in a thread. It sits in an accept() until
2959 # the main thread connects. Then it rudely closes the socket,
2960 # and sets Event `listener_gone` to let the main thread know
2961 # the socket is gone.
2962 def listener():
2963 s.listen()
2964 listener_ready.set()
2965 newsock, addr = s.accept()
2966 newsock.close()
2967 s.close()
2968 listener_gone.set()
2969
2970 def connector():
2971 listener_ready.wait()
2972 with socket.socket() as c:
2973 c.connect((HOST, port))
2974 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002975 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002976 ssl_sock = test_wrap_socket(c)
2977 except OSError:
2978 pass
2979 else:
2980 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002981
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002982 t = threading.Thread(target=listener)
2983 t.start()
2984 try:
2985 connector()
2986 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002987 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002988
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002989 def test_ssl_cert_verify_error(self):
2990 if support.verbose:
2991 sys.stdout.write("\n")
2992
2993 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2994 server_context.load_cert_chain(SIGNED_CERTFILE)
2995
2996 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2997
2998 server = ThreadedEchoServer(context=server_context, chatty=True)
2999 with server:
3000 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003001 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003002 try:
3003 s.connect((HOST, server.port))
3004 except ssl.SSLError as e:
3005 msg = 'unable to get local issuer certificate'
3006 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3007 self.assertEqual(e.verify_code, 20)
3008 self.assertEqual(e.verify_message, msg)
3009 self.assertIn(msg, repr(e))
3010 self.assertIn('certificate verify failed', repr(e))
3011
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003012 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3013 "OpenSSL is compiled without SSLv2 support")
3014 def test_protocol_sslv2(self):
3015 """Connecting to an SSLv2 server with various client options"""
3016 if support.verbose:
3017 sys.stdout.write("\n")
3018 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3019 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3020 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003021 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003022 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3023 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3024 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3025 # SSLv23 client with specific SSL options
3026 if no_sslv2_implies_sslv3_hello():
3027 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003028 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003029 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003030 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003031 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003032 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003033 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003034
Christian Heimesa170fa12017-09-15 20:27:30 +02003035 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003036 """Connecting to an SSLv23 server with various client options"""
3037 if support.verbose:
3038 sys.stdout.write("\n")
3039 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003040 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003041 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003042 except OSError as x:
3043 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3044 if support.verbose:
3045 sys.stdout.write(
3046 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3047 % str(x))
3048 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003049 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3050 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3051 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003052
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003053 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003054 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3055 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3056 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003057
3058 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003059 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3060 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3061 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003062
3063 # Server with specific SSL options
3064 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003065 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003066 server_options=ssl.OP_NO_SSLv3)
3067 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003068 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003069 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003070 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003071 server_options=ssl.OP_NO_TLSv1)
3072
3073
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003074 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3075 "OpenSSL is compiled without SSLv3 support")
3076 def test_protocol_sslv3(self):
3077 """Connecting to an SSLv3 server with various client options"""
3078 if support.verbose:
3079 sys.stdout.write("\n")
3080 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3081 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3082 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3083 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3084 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003085 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 client_options=ssl.OP_NO_SSLv3)
3087 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3088 if no_sslv2_implies_sslv3_hello():
3089 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003090 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003091 False, client_options=ssl.OP_NO_SSLv2)
3092
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003093 def test_protocol_tlsv1(self):
3094 """Connecting to a TLSv1 server with various client options"""
3095 if support.verbose:
3096 sys.stdout.write("\n")
3097 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3098 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3099 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3100 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3101 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3102 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3103 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003104 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003105 client_options=ssl.OP_NO_TLSv1)
3106
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003107 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3108 "TLS version 1.1 not supported.")
3109 def test_protocol_tlsv1_1(self):
3110 """Connecting to a TLSv1.1 server with various client options.
3111 Testing against older TLS versions."""
3112 if support.verbose:
3113 sys.stdout.write("\n")
3114 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3115 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3116 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3117 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3118 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003119 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003120 client_options=ssl.OP_NO_TLSv1_1)
3121
Christian Heimesa170fa12017-09-15 20:27:30 +02003122 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003123 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3124 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3125
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003126 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3127 "TLS version 1.2 not supported.")
3128 def test_protocol_tlsv1_2(self):
3129 """Connecting to a TLSv1.2 server with various client options.
3130 Testing against older TLS versions."""
3131 if support.verbose:
3132 sys.stdout.write("\n")
3133 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3134 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3135 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3136 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3137 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3138 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3139 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003140 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003141 client_options=ssl.OP_NO_TLSv1_2)
3142
Christian Heimesa170fa12017-09-15 20:27:30 +02003143 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003144 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3145 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3146 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3147 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3148
3149 def test_starttls(self):
3150 """Switching from clear text to encrypted and back again."""
3151 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3152
3153 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003154 starttls_server=True,
3155 chatty=True,
3156 connectionchatty=True)
3157 wrapped = False
3158 with server:
3159 s = socket.socket()
3160 s.setblocking(1)
3161 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003162 if support.verbose:
3163 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003164 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003165 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003166 sys.stdout.write(
3167 " client: sending %r...\n" % indata)
3168 if wrapped:
3169 conn.write(indata)
3170 outdata = conn.read()
3171 else:
3172 s.send(indata)
3173 outdata = s.recv(1024)
3174 msg = outdata.strip().lower()
3175 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3176 # STARTTLS ok, switch to secure mode
3177 if support.verbose:
3178 sys.stdout.write(
3179 " client: read %r from server, starting TLS...\n"
3180 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003181 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003182 wrapped = True
3183 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3184 # ENDTLS ok, switch back to clear text
3185 if support.verbose:
3186 sys.stdout.write(
3187 " client: read %r from server, ending TLS...\n"
3188 % msg)
3189 s = conn.unwrap()
3190 wrapped = False
3191 else:
3192 if support.verbose:
3193 sys.stdout.write(
3194 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003195 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003196 sys.stdout.write(" client: closing connection.\n")
3197 if wrapped:
3198 conn.write(b"over\n")
3199 else:
3200 s.send(b"over\n")
3201 if wrapped:
3202 conn.close()
3203 else:
3204 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003205
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003206 def test_socketserver(self):
3207 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003208 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003209 # try to connect
3210 if support.verbose:
3211 sys.stdout.write('\n')
3212 with open(CERTFILE, 'rb') as f:
3213 d1 = f.read()
3214 d2 = ''
3215 # now fetch the same data from the HTTPS server
3216 url = 'https://localhost:%d/%s' % (
3217 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003218 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003219 f = urllib.request.urlopen(url, context=context)
3220 try:
3221 dlen = f.info().get("content-length")
3222 if dlen and (int(dlen) > 0):
3223 d2 = f.read(int(dlen))
3224 if support.verbose:
3225 sys.stdout.write(
3226 " client: read %d bytes from remote server '%s'\n"
3227 % (len(d2), server))
3228 finally:
3229 f.close()
3230 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003231
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003232 def test_asyncore_server(self):
3233 """Check the example asyncore integration."""
3234 if support.verbose:
3235 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003236
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003237 indata = b"FOO\n"
3238 server = AsyncoreEchoServer(CERTFILE)
3239 with server:
3240 s = test_wrap_socket(socket.socket())
3241 s.connect(('127.0.0.1', server.port))
3242 if support.verbose:
3243 sys.stdout.write(
3244 " client: sending %r...\n" % indata)
3245 s.write(indata)
3246 outdata = s.read()
3247 if support.verbose:
3248 sys.stdout.write(" client: read %r\n" % outdata)
3249 if outdata != indata.lower():
3250 self.fail(
3251 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3252 % (outdata[:20], len(outdata),
3253 indata[:20].lower(), len(indata)))
3254 s.write(b"over\n")
3255 if support.verbose:
3256 sys.stdout.write(" client: closing connection.\n")
3257 s.close()
3258 if support.verbose:
3259 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003260
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003261 def test_recv_send(self):
3262 """Test recv(), send() and friends."""
3263 if support.verbose:
3264 sys.stdout.write("\n")
3265
3266 server = ThreadedEchoServer(CERTFILE,
3267 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003268 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003269 cacerts=CERTFILE,
3270 chatty=True,
3271 connectionchatty=False)
3272 with server:
3273 s = test_wrap_socket(socket.socket(),
3274 server_side=False,
3275 certfile=CERTFILE,
3276 ca_certs=CERTFILE,
3277 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003278 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003279 s.connect((HOST, server.port))
3280 # helper methods for standardising recv* method signatures
3281 def _recv_into():
3282 b = bytearray(b"\0"*100)
3283 count = s.recv_into(b)
3284 return b[:count]
3285
3286 def _recvfrom_into():
3287 b = bytearray(b"\0"*100)
3288 count, addr = s.recvfrom_into(b)
3289 return b[:count]
3290
3291 # (name, method, expect success?, *args, return value func)
3292 send_methods = [
3293 ('send', s.send, True, [], len),
3294 ('sendto', s.sendto, False, ["some.address"], len),
3295 ('sendall', s.sendall, True, [], lambda x: None),
3296 ]
3297 # (name, method, whether to expect success, *args)
3298 recv_methods = [
3299 ('recv', s.recv, True, []),
3300 ('recvfrom', s.recvfrom, False, ["some.address"]),
3301 ('recv_into', _recv_into, True, []),
3302 ('recvfrom_into', _recvfrom_into, False, []),
3303 ]
3304 data_prefix = "PREFIX_"
3305
3306 for (meth_name, send_meth, expect_success, args,
3307 ret_val_meth) in send_methods:
3308 indata = (data_prefix + meth_name).encode('ascii')
3309 try:
3310 ret = send_meth(indata, *args)
3311 msg = "sending with {}".format(meth_name)
3312 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3313 outdata = s.read()
3314 if outdata != indata.lower():
3315 self.fail(
3316 "While sending with <<{name:s}>> bad data "
3317 "<<{outdata:r}>> ({nout:d}) received; "
3318 "expected <<{indata:r}>> ({nin:d})\n".format(
3319 name=meth_name, outdata=outdata[:20],
3320 nout=len(outdata),
3321 indata=indata[:20], nin=len(indata)
3322 )
3323 )
3324 except ValueError as e:
3325 if expect_success:
3326 self.fail(
3327 "Failed to send with method <<{name:s}>>; "
3328 "expected to succeed.\n".format(name=meth_name)
3329 )
3330 if not str(e).startswith(meth_name):
3331 self.fail(
3332 "Method <<{name:s}>> failed with unexpected "
3333 "exception message: {exp:s}\n".format(
3334 name=meth_name, exp=e
3335 )
3336 )
3337
3338 for meth_name, recv_meth, expect_success, args in recv_methods:
3339 indata = (data_prefix + meth_name).encode('ascii')
3340 try:
3341 s.send(indata)
3342 outdata = recv_meth(*args)
3343 if outdata != indata.lower():
3344 self.fail(
3345 "While receiving with <<{name:s}>> bad data "
3346 "<<{outdata:r}>> ({nout:d}) received; "
3347 "expected <<{indata:r}>> ({nin:d})\n".format(
3348 name=meth_name, outdata=outdata[:20],
3349 nout=len(outdata),
3350 indata=indata[:20], nin=len(indata)
3351 )
3352 )
3353 except ValueError as e:
3354 if expect_success:
3355 self.fail(
3356 "Failed to receive with method <<{name:s}>>; "
3357 "expected to succeed.\n".format(name=meth_name)
3358 )
3359 if not str(e).startswith(meth_name):
3360 self.fail(
3361 "Method <<{name:s}>> failed with unexpected "
3362 "exception message: {exp:s}\n".format(
3363 name=meth_name, exp=e
3364 )
3365 )
3366 # consume data
3367 s.read()
3368
3369 # read(-1, buffer) is supported, even though read(-1) is not
3370 data = b"data"
3371 s.send(data)
3372 buffer = bytearray(len(data))
3373 self.assertEqual(s.read(-1, buffer), len(data))
3374 self.assertEqual(buffer, data)
3375
Christian Heimes888bbdc2017-09-07 14:18:21 -07003376 # sendall accepts bytes-like objects
3377 if ctypes is not None:
3378 ubyte = ctypes.c_ubyte * len(data)
3379 byteslike = ubyte.from_buffer_copy(data)
3380 s.sendall(byteslike)
3381 self.assertEqual(s.read(), data)
3382
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003383 # Make sure sendmsg et al are disallowed to avoid
3384 # inadvertent disclosure of data and/or corruption
3385 # of the encrypted data stream
3386 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3387 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3388 self.assertRaises(NotImplementedError,
3389 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003390 s.write(b"over\n")
3391
3392 self.assertRaises(ValueError, s.recv, -1)
3393 self.assertRaises(ValueError, s.read, -1)
3394
3395 s.close()
3396
3397 def test_recv_zero(self):
3398 server = ThreadedEchoServer(CERTFILE)
3399 server.__enter__()
3400 self.addCleanup(server.__exit__, None, None)
3401 s = socket.create_connection((HOST, server.port))
3402 self.addCleanup(s.close)
3403 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3404 self.addCleanup(s.close)
3405
3406 # recv/read(0) should return no data
3407 s.send(b"data")
3408 self.assertEqual(s.recv(0), b"")
3409 self.assertEqual(s.read(0), b"")
3410 self.assertEqual(s.read(), b"data")
3411
3412 # Should not block if the other end sends no data
3413 s.setblocking(False)
3414 self.assertEqual(s.recv(0), b"")
3415 self.assertEqual(s.recv_into(bytearray()), 0)
3416
3417 def test_nonblocking_send(self):
3418 server = ThreadedEchoServer(CERTFILE,
3419 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003420 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003421 cacerts=CERTFILE,
3422 chatty=True,
3423 connectionchatty=False)
3424 with server:
3425 s = test_wrap_socket(socket.socket(),
3426 server_side=False,
3427 certfile=CERTFILE,
3428 ca_certs=CERTFILE,
3429 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003430 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003431 s.connect((HOST, server.port))
3432 s.setblocking(False)
3433
3434 # If we keep sending data, at some point the buffers
3435 # will be full and the call will block
3436 buf = bytearray(8192)
3437 def fill_buffer():
3438 while True:
3439 s.send(buf)
3440 self.assertRaises((ssl.SSLWantWriteError,
3441 ssl.SSLWantReadError), fill_buffer)
3442
3443 # Now read all the output and discard it
3444 s.setblocking(True)
3445 s.close()
3446
3447 def test_handshake_timeout(self):
3448 # Issue #5103: SSL handshake must respect the socket timeout
3449 server = socket.socket(socket.AF_INET)
3450 host = "127.0.0.1"
3451 port = support.bind_port(server)
3452 started = threading.Event()
3453 finish = False
3454
3455 def serve():
3456 server.listen()
3457 started.set()
3458 conns = []
3459 while not finish:
3460 r, w, e = select.select([server], [], [], 0.1)
3461 if server in r:
3462 # Let the socket hang around rather than having
3463 # it closed by garbage collection.
3464 conns.append(server.accept()[0])
3465 for sock in conns:
3466 sock.close()
3467
3468 t = threading.Thread(target=serve)
3469 t.start()
3470 started.wait()
3471
3472 try:
3473 try:
3474 c = socket.socket(socket.AF_INET)
3475 c.settimeout(0.2)
3476 c.connect((host, port))
3477 # Will attempt handshake and time out
3478 self.assertRaisesRegex(socket.timeout, "timed out",
3479 test_wrap_socket, c)
3480 finally:
3481 c.close()
3482 try:
3483 c = socket.socket(socket.AF_INET)
3484 c = test_wrap_socket(c)
3485 c.settimeout(0.2)
3486 # Will attempt handshake and time out
3487 self.assertRaisesRegex(socket.timeout, "timed out",
3488 c.connect, (host, port))
3489 finally:
3490 c.close()
3491 finally:
3492 finish = True
3493 t.join()
3494 server.close()
3495
3496 def test_server_accept(self):
3497 # Issue #16357: accept() on a SSLSocket created through
3498 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003499 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003500 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003501 context.load_verify_locations(SIGNING_CA)
3502 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003503 server = socket.socket(socket.AF_INET)
3504 host = "127.0.0.1"
3505 port = support.bind_port(server)
3506 server = context.wrap_socket(server, server_side=True)
3507 self.assertTrue(server.server_side)
3508
3509 evt = threading.Event()
3510 remote = None
3511 peer = None
3512 def serve():
3513 nonlocal remote, peer
3514 server.listen()
3515 # Block on the accept and wait on the connection to close.
3516 evt.set()
3517 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003518 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003519
3520 t = threading.Thread(target=serve)
3521 t.start()
3522 # Client wait until server setup and perform a connect.
3523 evt.wait()
3524 client = context.wrap_socket(socket.socket())
3525 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003526 client.send(b'data')
3527 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003528 client_addr = client.getsockname()
3529 client.close()
3530 t.join()
3531 remote.close()
3532 server.close()
3533 # Sanity checks.
3534 self.assertIsInstance(remote, ssl.SSLSocket)
3535 self.assertEqual(peer, client_addr)
3536
3537 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003538 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003539 with context.wrap_socket(socket.socket()) as sock:
3540 with self.assertRaises(OSError) as cm:
3541 sock.getpeercert()
3542 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3543
3544 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003545 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003546 with context.wrap_socket(socket.socket()) as sock:
3547 with self.assertRaises(OSError) as cm:
3548 sock.do_handshake()
3549 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3550
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003551 def test_no_shared_ciphers(self):
3552 client_context, server_context, hostname = testing_context()
3553 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3554 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003555 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003556 client_context.set_ciphers("AES128")
3557 server_context.set_ciphers("AES256")
3558 with ThreadedEchoServer(context=server_context) as server:
3559 with client_context.wrap_socket(socket.socket(),
3560 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003561 with self.assertRaises(OSError):
3562 s.connect((HOST, server.port))
3563 self.assertIn("no shared cipher", server.conn_errors[0])
3564
3565 def test_version_basic(self):
3566 """
3567 Basic tests for SSLSocket.version().
3568 More tests are done in the test_protocol_*() methods.
3569 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003570 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3571 context.check_hostname = False
3572 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003573 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003574 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003575 chatty=False) as server:
3576 with context.wrap_socket(socket.socket()) as s:
3577 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003578 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003579 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003580 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003581 self.assertEqual(s.version(), 'TLSv1.3')
3582 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003583 self.assertEqual(s.version(), 'TLSv1.2')
3584 else: # 0.9.8 to 1.0.1
3585 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003586 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003587 self.assertIs(s.version(), None)
3588
Christian Heimescb5b68a2017-09-07 18:07:00 -07003589 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3590 "test requires TLSv1.3 enabled OpenSSL")
3591 def test_tls1_3(self):
3592 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3593 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003594 context.options |= (
3595 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3596 )
3597 with ThreadedEchoServer(context=context) as server:
3598 with context.wrap_socket(socket.socket()) as s:
3599 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003600 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003601 'TLS_AES_256_GCM_SHA384',
3602 'TLS_CHACHA20_POLY1305_SHA256',
3603 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003604 })
3605 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003606
Christian Heimes698dde12018-02-27 11:54:43 +01003607 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3608 "required OpenSSL 1.1.0g")
3609 def test_min_max_version(self):
3610 client_context, server_context, hostname = testing_context()
3611 # client TLSv1.0 to 1.2
3612 client_context.minimum_version = ssl.TLSVersion.TLSv1
3613 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3614 # server only TLSv1.2
3615 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3616 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3617
3618 with ThreadedEchoServer(context=server_context) as server:
3619 with client_context.wrap_socket(socket.socket(),
3620 server_hostname=hostname) as s:
3621 s.connect((HOST, server.port))
3622 self.assertEqual(s.version(), 'TLSv1.2')
3623
3624 # client 1.0 to 1.2, server 1.0 to 1.1
3625 server_context.minimum_version = ssl.TLSVersion.TLSv1
3626 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3627
3628 with ThreadedEchoServer(context=server_context) as server:
3629 with client_context.wrap_socket(socket.socket(),
3630 server_hostname=hostname) as s:
3631 s.connect((HOST, server.port))
3632 self.assertEqual(s.version(), 'TLSv1.1')
3633
3634 # client 1.0, server 1.2 (mismatch)
3635 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3636 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3637 client_context.minimum_version = ssl.TLSVersion.TLSv1
3638 client_context.maximum_version = ssl.TLSVersion.TLSv1
3639 with ThreadedEchoServer(context=server_context) as server:
3640 with client_context.wrap_socket(socket.socket(),
3641 server_hostname=hostname) as s:
3642 with self.assertRaises(ssl.SSLError) as e:
3643 s.connect((HOST, server.port))
3644 self.assertIn("alert", str(e.exception))
3645
3646
3647 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3648 "required OpenSSL 1.1.0g")
3649 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3650 def test_min_max_version_sslv3(self):
3651 client_context, server_context, hostname = testing_context()
3652 server_context.minimum_version = ssl.TLSVersion.SSLv3
3653 client_context.minimum_version = ssl.TLSVersion.SSLv3
3654 client_context.maximum_version = ssl.TLSVersion.SSLv3
3655 with ThreadedEchoServer(context=server_context) as server:
3656 with client_context.wrap_socket(socket.socket(),
3657 server_hostname=hostname) as s:
3658 s.connect((HOST, server.port))
3659 self.assertEqual(s.version(), 'SSLv3')
3660
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003661 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3662 def test_default_ecdh_curve(self):
3663 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3664 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003665 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003666 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003667 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3668 # cipher name.
3669 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003670 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3671 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3672 # our default cipher list should prefer ECDH-based ciphers
3673 # automatically.
3674 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3675 context.set_ciphers("ECCdraft:ECDH")
3676 with ThreadedEchoServer(context=context) as server:
3677 with context.wrap_socket(socket.socket()) as s:
3678 s.connect((HOST, server.port))
3679 self.assertIn("ECDH", s.cipher()[0])
3680
3681 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3682 "'tls-unique' channel binding not available")
3683 def test_tls_unique_channel_binding(self):
3684 """Test tls-unique channel binding."""
3685 if support.verbose:
3686 sys.stdout.write("\n")
3687
Christian Heimes05d9fe32018-02-27 08:55:39 +01003688 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003689
3690 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003691 chatty=True,
3692 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003693
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003694 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003695 with client_context.wrap_socket(
3696 socket.socket(),
3697 server_hostname=hostname) as s:
3698 s.connect((HOST, server.port))
3699 # get the data
3700 cb_data = s.get_channel_binding("tls-unique")
3701 if support.verbose:
3702 sys.stdout.write(
3703 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003704
Christian Heimes05d9fe32018-02-27 08:55:39 +01003705 # check if it is sane
3706 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003707 if s.version() == 'TLSv1.3':
3708 self.assertEqual(len(cb_data), 48)
3709 else:
3710 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003711
Christian Heimes05d9fe32018-02-27 08:55:39 +01003712 # and compare with the peers version
3713 s.write(b"CB tls-unique\n")
3714 peer_data_repr = s.read().strip()
3715 self.assertEqual(peer_data_repr,
3716 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003717
3718 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003719 with client_context.wrap_socket(
3720 socket.socket(),
3721 server_hostname=hostname) as s:
3722 s.connect((HOST, server.port))
3723 new_cb_data = s.get_channel_binding("tls-unique")
3724 if support.verbose:
3725 sys.stdout.write(
3726 "got another channel binding data: {0!r}\n".format(
3727 new_cb_data)
3728 )
3729 # is it really unique
3730 self.assertNotEqual(cb_data, new_cb_data)
3731 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003732 if s.version() == 'TLSv1.3':
3733 self.assertEqual(len(cb_data), 48)
3734 else:
3735 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003736 s.write(b"CB tls-unique\n")
3737 peer_data_repr = s.read().strip()
3738 self.assertEqual(peer_data_repr,
3739 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003740
3741 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003742 client_context, server_context, hostname = testing_context()
3743 stats = server_params_test(client_context, server_context,
3744 chatty=True, connectionchatty=True,
3745 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003746 if support.verbose:
3747 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3748 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3749
3750 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3751 "ssl.OP_NO_COMPRESSION needed for this test")
3752 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003753 client_context, server_context, hostname = testing_context()
3754 client_context.options |= ssl.OP_NO_COMPRESSION
3755 server_context.options |= ssl.OP_NO_COMPRESSION
3756 stats = server_params_test(client_context, server_context,
3757 chatty=True, connectionchatty=True,
3758 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003759 self.assertIs(stats['compression'], None)
3760
3761 def test_dh_params(self):
3762 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003763 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003764 # test scenario needs TLS <= 1.2
3765 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003766 server_context.load_dh_params(DHFILE)
3767 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003768 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003769 stats = server_params_test(client_context, server_context,
3770 chatty=True, connectionchatty=True,
3771 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003772 cipher = stats["cipher"][0]
3773 parts = cipher.split("-")
3774 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3775 self.fail("Non-DH cipher: " + cipher[0])
3776
Christian Heimesb7b92252018-02-25 09:49:31 +01003777 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003778 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003779 def test_ecdh_curve(self):
3780 # server secp384r1, client auto
3781 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003782
Christian Heimesb7b92252018-02-25 09:49:31 +01003783 server_context.set_ecdh_curve("secp384r1")
3784 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3785 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3786 stats = server_params_test(client_context, server_context,
3787 chatty=True, connectionchatty=True,
3788 sni_name=hostname)
3789
3790 # server auto, client secp384r1
3791 client_context, server_context, hostname = testing_context()
3792 client_context.set_ecdh_curve("secp384r1")
3793 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3794 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3795 stats = server_params_test(client_context, server_context,
3796 chatty=True, connectionchatty=True,
3797 sni_name=hostname)
3798
3799 # server / client curve mismatch
3800 client_context, server_context, hostname = testing_context()
3801 client_context.set_ecdh_curve("prime256v1")
3802 server_context.set_ecdh_curve("secp384r1")
3803 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3804 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3805 try:
3806 stats = server_params_test(client_context, server_context,
3807 chatty=True, connectionchatty=True,
3808 sni_name=hostname)
3809 except ssl.SSLError:
3810 pass
3811 else:
3812 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003813 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003814 self.fail("mismatch curve did not fail")
3815
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003816 def test_selected_alpn_protocol(self):
3817 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003818 client_context, server_context, hostname = testing_context()
3819 stats = server_params_test(client_context, server_context,
3820 chatty=True, connectionchatty=True,
3821 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003822 self.assertIs(stats['client_alpn_protocol'], None)
3823
3824 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3825 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3826 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003827 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003828 server_context.set_alpn_protocols(['foo', 'bar'])
3829 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003830 chatty=True, connectionchatty=True,
3831 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003832 self.assertIs(stats['client_alpn_protocol'], None)
3833
3834 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3835 def test_alpn_protocols(self):
3836 server_protocols = ['foo', 'bar', 'milkshake']
3837 protocol_tests = [
3838 (['foo', 'bar'], 'foo'),
3839 (['bar', 'foo'], 'foo'),
3840 (['milkshake'], 'milkshake'),
3841 (['http/3.0', 'http/4.0'], None)
3842 ]
3843 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003844 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003845 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003846 client_context.set_alpn_protocols(client_protocols)
3847
3848 try:
3849 stats = server_params_test(client_context,
3850 server_context,
3851 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003852 connectionchatty=True,
3853 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003854 except ssl.SSLError as e:
3855 stats = e
3856
Christian Heimes05d9fe32018-02-27 08:55:39 +01003857 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003858 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3859 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3860 self.assertIsInstance(stats, ssl.SSLError)
3861 else:
3862 msg = "failed trying %s (s) and %s (c).\n" \
3863 "was expecting %s, but got %%s from the %%s" \
3864 % (str(server_protocols), str(client_protocols),
3865 str(expected))
3866 client_result = stats['client_alpn_protocol']
3867 self.assertEqual(client_result, expected,
3868 msg % (client_result, "client"))
3869 server_result = stats['server_alpn_protocols'][-1] \
3870 if len(stats['server_alpn_protocols']) else 'nothing'
3871 self.assertEqual(server_result, expected,
3872 msg % (server_result, "server"))
3873
3874 def test_selected_npn_protocol(self):
3875 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003876 client_context, server_context, hostname = testing_context()
3877 stats = server_params_test(client_context, server_context,
3878 chatty=True, connectionchatty=True,
3879 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003880 self.assertIs(stats['client_npn_protocol'], None)
3881
3882 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3883 def test_npn_protocols(self):
3884 server_protocols = ['http/1.1', 'spdy/2']
3885 protocol_tests = [
3886 (['http/1.1', 'spdy/2'], 'http/1.1'),
3887 (['spdy/2', 'http/1.1'], 'http/1.1'),
3888 (['spdy/2', 'test'], 'spdy/2'),
3889 (['abc', 'def'], 'abc')
3890 ]
3891 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003892 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003893 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003894 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003895 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003896 chatty=True, connectionchatty=True,
3897 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003898 msg = "failed trying %s (s) and %s (c).\n" \
3899 "was expecting %s, but got %%s from the %%s" \
3900 % (str(server_protocols), str(client_protocols),
3901 str(expected))
3902 client_result = stats['client_npn_protocol']
3903 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3904 server_result = stats['server_npn_protocols'][-1] \
3905 if len(stats['server_npn_protocols']) else 'nothing'
3906 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3907
3908 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003909 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003910 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003911 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003912 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003913 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003914 client_context.load_verify_locations(SIGNING_CA)
3915 return server_context, other_context, client_context
3916
3917 def check_common_name(self, stats, name):
3918 cert = stats['peercert']
3919 self.assertIn((('commonName', name),), cert['subject'])
3920
3921 @needs_sni
3922 def test_sni_callback(self):
3923 calls = []
3924 server_context, other_context, client_context = self.sni_contexts()
3925
Christian Heimesa170fa12017-09-15 20:27:30 +02003926 client_context.check_hostname = False
3927
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003928 def servername_cb(ssl_sock, server_name, initial_context):
3929 calls.append((server_name, initial_context))
3930 if server_name is not None:
3931 ssl_sock.context = other_context
3932 server_context.set_servername_callback(servername_cb)
3933
3934 stats = server_params_test(client_context, server_context,
3935 chatty=True,
3936 sni_name='supermessage')
3937 # The hostname was fetched properly, and the certificate was
3938 # changed for the connection.
3939 self.assertEqual(calls, [("supermessage", server_context)])
3940 # CERTFILE4 was selected
3941 self.check_common_name(stats, 'fakehostname')
3942
3943 calls = []
3944 # The callback is called with server_name=None
3945 stats = server_params_test(client_context, server_context,
3946 chatty=True,
3947 sni_name=None)
3948 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003949 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003950
3951 # Check disabling the callback
3952 calls = []
3953 server_context.set_servername_callback(None)
3954
3955 stats = server_params_test(client_context, server_context,
3956 chatty=True,
3957 sni_name='notfunny')
3958 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003959 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003960 self.assertEqual(calls, [])
3961
3962 @needs_sni
3963 def test_sni_callback_alert(self):
3964 # Returning a TLS alert is reflected to the connecting client
3965 server_context, other_context, client_context = self.sni_contexts()
3966
3967 def cb_returning_alert(ssl_sock, server_name, initial_context):
3968 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3969 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003970 with self.assertRaises(ssl.SSLError) as cm:
3971 stats = server_params_test(client_context, server_context,
3972 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003973 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003974 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003975
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003976 @needs_sni
3977 def test_sni_callback_raising(self):
3978 # Raising fails the connection with a TLS handshake failure alert.
3979 server_context, other_context, client_context = self.sni_contexts()
3980
3981 def cb_raising(ssl_sock, server_name, initial_context):
3982 1/0
3983 server_context.set_servername_callback(cb_raising)
3984
3985 with self.assertRaises(ssl.SSLError) as cm, \
3986 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003987 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003988 chatty=False,
3989 sni_name='supermessage')
3990 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3991 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003992
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003993 @needs_sni
3994 def test_sni_callback_wrong_return_type(self):
3995 # Returning the wrong return type terminates the TLS connection
3996 # with an internal error alert.
3997 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003998
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003999 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4000 return "foo"
4001 server_context.set_servername_callback(cb_wrong_return_type)
4002
4003 with self.assertRaises(ssl.SSLError) as cm, \
4004 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004005 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004006 chatty=False,
4007 sni_name='supermessage')
4008 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4009 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004010
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004011 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004012 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004013 client_context.set_ciphers("AES128:AES256")
4014 server_context.set_ciphers("AES256")
4015 expected_algs = [
4016 "AES256", "AES-256",
4017 # TLS 1.3 ciphers are always enabled
4018 "TLS_CHACHA20", "TLS_AES",
4019 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004020
Christian Heimesa170fa12017-09-15 20:27:30 +02004021 stats = server_params_test(client_context, server_context,
4022 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004023 ciphers = stats['server_shared_ciphers'][0]
4024 self.assertGreater(len(ciphers), 0)
4025 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004026 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004027 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004028
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004029 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004030 client_context, server_context, hostname = testing_context()
4031 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004033 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004034 s = client_context.wrap_socket(socket.socket(),
4035 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004036 s.connect((HOST, server.port))
4037 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004038
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004039 self.assertRaises(ValueError, s.read, 1024)
4040 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004041
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004042 def test_sendfile(self):
4043 TEST_DATA = b"x" * 512
4044 with open(support.TESTFN, 'wb') as f:
4045 f.write(TEST_DATA)
4046 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004047 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004048 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004049 context.load_verify_locations(SIGNING_CA)
4050 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004051 server = ThreadedEchoServer(context=context, chatty=False)
4052 with server:
4053 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004054 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004055 with open(support.TESTFN, 'rb') as file:
4056 s.sendfile(file)
4057 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004058
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004059 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004060 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004061 # TODO: sessions aren't compatible with TLSv1.3 yet
4062 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004063
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004064 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004065 stats = server_params_test(client_context, server_context,
4066 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004067 session = stats['session']
4068 self.assertTrue(session.id)
4069 self.assertGreater(session.time, 0)
4070 self.assertGreater(session.timeout, 0)
4071 self.assertTrue(session.has_ticket)
4072 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4073 self.assertGreater(session.ticket_lifetime_hint, 0)
4074 self.assertFalse(stats['session_reused'])
4075 sess_stat = server_context.session_stats()
4076 self.assertEqual(sess_stat['accept'], 1)
4077 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004078
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004079 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004080 stats = server_params_test(client_context, server_context,
4081 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004082 sess_stat = server_context.session_stats()
4083 self.assertEqual(sess_stat['accept'], 2)
4084 self.assertEqual(sess_stat['hits'], 1)
4085 self.assertTrue(stats['session_reused'])
4086 session2 = stats['session']
4087 self.assertEqual(session2.id, session.id)
4088 self.assertEqual(session2, session)
4089 self.assertIsNot(session2, session)
4090 self.assertGreaterEqual(session2.time, session.time)
4091 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004092
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004093 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004094 stats = server_params_test(client_context, server_context,
4095 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004096 self.assertFalse(stats['session_reused'])
4097 session3 = stats['session']
4098 self.assertNotEqual(session3.id, session.id)
4099 self.assertNotEqual(session3, session)
4100 sess_stat = server_context.session_stats()
4101 self.assertEqual(sess_stat['accept'], 3)
4102 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004103
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004104 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004105 stats = server_params_test(client_context, server_context,
4106 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004107 self.assertTrue(stats['session_reused'])
4108 session4 = stats['session']
4109 self.assertEqual(session4.id, session.id)
4110 self.assertEqual(session4, session)
4111 self.assertGreaterEqual(session4.time, session.time)
4112 self.assertGreaterEqual(session4.timeout, session.timeout)
4113 sess_stat = server_context.session_stats()
4114 self.assertEqual(sess_stat['accept'], 4)
4115 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004116
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004117 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004118 client_context, server_context, hostname = testing_context()
4119 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004120
Christian Heimes05d9fe32018-02-27 08:55:39 +01004121 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004122 client_context.options |= ssl.OP_NO_TLSv1_3
4123 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004124
Christian Heimesa170fa12017-09-15 20:27:30 +02004125 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004126 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004127 with client_context.wrap_socket(socket.socket(),
4128 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004129 # session is None before handshake
4130 self.assertEqual(s.session, None)
4131 self.assertEqual(s.session_reused, None)
4132 s.connect((HOST, server.port))
4133 session = s.session
4134 self.assertTrue(session)
4135 with self.assertRaises(TypeError) as e:
4136 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004137 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004138
Christian Heimesa170fa12017-09-15 20:27:30 +02004139 with client_context.wrap_socket(socket.socket(),
4140 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004141 s.connect((HOST, server.port))
4142 # cannot set session after handshake
4143 with self.assertRaises(ValueError) as e:
4144 s.session = session
4145 self.assertEqual(str(e.exception),
4146 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004147
Christian Heimesa170fa12017-09-15 20:27:30 +02004148 with client_context.wrap_socket(socket.socket(),
4149 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004150 # can set session before handshake and before the
4151 # connection was established
4152 s.session = session
4153 s.connect((HOST, server.port))
4154 self.assertEqual(s.session.id, session.id)
4155 self.assertEqual(s.session, session)
4156 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004157
Christian Heimesa170fa12017-09-15 20:27:30 +02004158 with client_context2.wrap_socket(socket.socket(),
4159 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004160 # cannot re-use session with a different SSLContext
4161 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004162 s.session = session
4163 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004164 self.assertEqual(str(e.exception),
4165 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004166
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004167
Christian Heimes9fb051f2018-09-23 08:32:31 +02004168@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4169class TestPostHandshakeAuth(unittest.TestCase):
4170 def test_pha_setter(self):
4171 protocols = [
4172 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4173 ]
4174 for protocol in protocols:
4175 ctx = ssl.SSLContext(protocol)
4176 self.assertEqual(ctx.post_handshake_auth, False)
4177
4178 ctx.post_handshake_auth = True
4179 self.assertEqual(ctx.post_handshake_auth, True)
4180
4181 ctx.verify_mode = ssl.CERT_REQUIRED
4182 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4183 self.assertEqual(ctx.post_handshake_auth, True)
4184
4185 ctx.post_handshake_auth = False
4186 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4187 self.assertEqual(ctx.post_handshake_auth, False)
4188
4189 ctx.verify_mode = ssl.CERT_OPTIONAL
4190 ctx.post_handshake_auth = True
4191 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4192 self.assertEqual(ctx.post_handshake_auth, True)
4193
4194 def test_pha_required(self):
4195 client_context, server_context, hostname = testing_context()
4196 server_context.post_handshake_auth = True
4197 server_context.verify_mode = ssl.CERT_REQUIRED
4198 client_context.post_handshake_auth = True
4199 client_context.load_cert_chain(SIGNED_CERTFILE)
4200
4201 server = ThreadedEchoServer(context=server_context, chatty=False)
4202 with server:
4203 with client_context.wrap_socket(socket.socket(),
4204 server_hostname=hostname) as s:
4205 s.connect((HOST, server.port))
4206 s.write(b'HASCERT')
4207 self.assertEqual(s.recv(1024), b'FALSE\n')
4208 s.write(b'PHA')
4209 self.assertEqual(s.recv(1024), b'OK\n')
4210 s.write(b'HASCERT')
4211 self.assertEqual(s.recv(1024), b'TRUE\n')
4212 # PHA method just returns true when cert is already available
4213 s.write(b'PHA')
4214 self.assertEqual(s.recv(1024), b'OK\n')
4215 s.write(b'GETCERT')
4216 cert_text = s.recv(4096).decode('us-ascii')
4217 self.assertIn('Python Software Foundation CA', cert_text)
4218
4219 def test_pha_required_nocert(self):
4220 client_context, server_context, hostname = testing_context()
4221 server_context.post_handshake_auth = True
4222 server_context.verify_mode = ssl.CERT_REQUIRED
4223 client_context.post_handshake_auth = True
4224
4225 server = ThreadedEchoServer(context=server_context, chatty=False)
4226 with server:
4227 with client_context.wrap_socket(socket.socket(),
4228 server_hostname=hostname) as s:
4229 s.connect((HOST, server.port))
4230 s.write(b'PHA')
4231 # receive CertificateRequest
4232 self.assertEqual(s.recv(1024), b'OK\n')
4233 # send empty Certificate + Finish
4234 s.write(b'HASCERT')
4235 # receive alert
4236 with self.assertRaisesRegex(
4237 ssl.SSLError,
4238 'tlsv13 alert certificate required'):
4239 s.recv(1024)
4240
4241 def test_pha_optional(self):
4242 if support.verbose:
4243 sys.stdout.write("\n")
4244
4245 client_context, server_context, hostname = testing_context()
4246 server_context.post_handshake_auth = True
4247 server_context.verify_mode = ssl.CERT_REQUIRED
4248 client_context.post_handshake_auth = True
4249 client_context.load_cert_chain(SIGNED_CERTFILE)
4250
4251 # check CERT_OPTIONAL
4252 server_context.verify_mode = ssl.CERT_OPTIONAL
4253 server = ThreadedEchoServer(context=server_context, chatty=False)
4254 with server:
4255 with client_context.wrap_socket(socket.socket(),
4256 server_hostname=hostname) as s:
4257 s.connect((HOST, server.port))
4258 s.write(b'HASCERT')
4259 self.assertEqual(s.recv(1024), b'FALSE\n')
4260 s.write(b'PHA')
4261 self.assertEqual(s.recv(1024), b'OK\n')
4262 s.write(b'HASCERT')
4263 self.assertEqual(s.recv(1024), b'TRUE\n')
4264
4265 def test_pha_optional_nocert(self):
4266 if support.verbose:
4267 sys.stdout.write("\n")
4268
4269 client_context, server_context, hostname = testing_context()
4270 server_context.post_handshake_auth = True
4271 server_context.verify_mode = ssl.CERT_OPTIONAL
4272 client_context.post_handshake_auth = True
4273
4274 server = ThreadedEchoServer(context=server_context, chatty=False)
4275 with server:
4276 with client_context.wrap_socket(socket.socket(),
4277 server_hostname=hostname) as s:
4278 s.connect((HOST, server.port))
4279 s.write(b'HASCERT')
4280 self.assertEqual(s.recv(1024), b'FALSE\n')
4281 s.write(b'PHA')
4282 self.assertEqual(s.recv(1024), b'OK\n')
4283 # optional doens't fail when client does not have a cert
4284 s.write(b'HASCERT')
4285 self.assertEqual(s.recv(1024), b'FALSE\n')
4286
4287 def test_pha_no_pha_client(self):
4288 client_context, server_context, hostname = testing_context()
4289 server_context.post_handshake_auth = True
4290 server_context.verify_mode = ssl.CERT_REQUIRED
4291 client_context.load_cert_chain(SIGNED_CERTFILE)
4292
4293 server = ThreadedEchoServer(context=server_context, chatty=False)
4294 with server:
4295 with client_context.wrap_socket(socket.socket(),
4296 server_hostname=hostname) as s:
4297 s.connect((HOST, server.port))
4298 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4299 s.verify_client_post_handshake()
4300 s.write(b'PHA')
4301 self.assertIn(b'extension not received', s.recv(1024))
4302
4303 def test_pha_no_pha_server(self):
4304 # server doesn't have PHA enabled, cert is requested in handshake
4305 client_context, server_context, hostname = testing_context()
4306 server_context.verify_mode = ssl.CERT_REQUIRED
4307 client_context.post_handshake_auth = True
4308 client_context.load_cert_chain(SIGNED_CERTFILE)
4309
4310 server = ThreadedEchoServer(context=server_context, chatty=False)
4311 with server:
4312 with client_context.wrap_socket(socket.socket(),
4313 server_hostname=hostname) as s:
4314 s.connect((HOST, server.port))
4315 s.write(b'HASCERT')
4316 self.assertEqual(s.recv(1024), b'TRUE\n')
4317 # PHA doesn't fail if there is already a cert
4318 s.write(b'PHA')
4319 self.assertEqual(s.recv(1024), b'OK\n')
4320 s.write(b'HASCERT')
4321 self.assertEqual(s.recv(1024), b'TRUE\n')
4322
4323 def test_pha_not_tls13(self):
4324 # TLS 1.2
4325 client_context, server_context, hostname = testing_context()
4326 server_context.verify_mode = ssl.CERT_REQUIRED
4327 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4328 client_context.post_handshake_auth = True
4329 client_context.load_cert_chain(SIGNED_CERTFILE)
4330
4331 server = ThreadedEchoServer(context=server_context, chatty=False)
4332 with server:
4333 with client_context.wrap_socket(socket.socket(),
4334 server_hostname=hostname) as s:
4335 s.connect((HOST, server.port))
4336 # PHA fails for TLS != 1.3
4337 s.write(b'PHA')
4338 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4339
4340
Thomas Woutersed03b412007-08-28 21:37:11 +00004341def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004342 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004343 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004344 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004345 'Mac': platform.mac_ver,
4346 'Windows': platform.win32_ver,
4347 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004348 for name, func in plats.items():
4349 plat = func()
4350 if plat and plat[0]:
4351 plat = '%s %r' % (name, plat)
4352 break
4353 else:
4354 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004355 print("test_ssl: testing with %r %r" %
4356 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4357 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004358 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004359 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4360 try:
4361 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4362 except AttributeError:
4363 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004364
Antoine Pitrou152efa22010-05-16 18:19:27 +00004365 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004366 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004367 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004368 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004369 BADCERT, BADKEY, EMPTYCERT]:
4370 if not os.path.exists(filename):
4371 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004372
Martin Panter3840b2a2016-03-27 01:53:46 +00004373 tests = [
4374 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004375 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes9fb051f2018-09-23 08:32:31 +02004376 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004377 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004378
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004379 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004380 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004381
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004382 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004383 try:
4384 support.run_unittest(*tests)
4385 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004386 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004387
4388if __name__ == "__main__":
4389 test_main()