blob: e120a2f9adf22e3e7f97f6b13c479170a050d1ce [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010033IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
34IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010035PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Christian Heimesbd5c7d22018-01-20 15:16:30 +010059CERTFILE_INFO = {
60 'issuer': ((('countryName', 'XY'),),
61 (('localityName', 'Castle Anthrax'),),
62 (('organizationName', 'Python Software Foundation'),),
63 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020064 'notAfter': 'Aug 26 14:23:15 2028 GMT',
65 'notBefore': 'Aug 29 14:23:15 2018 GMT',
66 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010067 'subject': ((('countryName', 'XY'),),
68 (('localityName', 'Castle Anthrax'),),
69 (('organizationName', 'Python Software Foundation'),),
70 (('commonName', 'localhost'),)),
71 'subjectAltName': (('DNS', 'localhost'),),
72 'version': 3
73}
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Christian Heimes22587792013-11-21 23:56:13 +010075# empty CRL
76CRLFILE = data_file("revocation.crl")
77
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078# Two keys and certs signed by the same CA (for SNI tests)
79SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020080SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010081
82SIGNED_CERTFILE_INFO = {
83 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
84 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
85 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
86 'issuer': ((('countryName', 'XY'),),
87 (('organizationName', 'Python Software Foundation CA'),),
88 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020089 'notAfter': 'Jul 7 14:23:16 2028 GMT',
90 'notBefore': 'Aug 29 14:23:16 2018 GMT',
91 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010092 'subject': ((('countryName', 'XY'),),
93 (('localityName', 'Castle Anthrax'),),
94 (('organizationName', 'Python Software Foundation'),),
95 (('commonName', 'localhost'),)),
96 'subjectAltName': (('DNS', 'localhost'),),
97 'version': 3
98}
99
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100100SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200101SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100102SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
103SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
104
Martin Panter3840b2a2016-03-27 01:53:46 +0000105# Same certificate as pycacert.pem, but without extra text in file
106SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200107# cert with all kinds of subject alt names
108ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100109IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100110
Martin Panter3d81d932016-01-14 09:36:00 +0000111REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000112
113EMPTYCERT = data_file("nullcert.pem")
114BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000115NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000116BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200117NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200118NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000119
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200120DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100121BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000122
Christian Heimes358cfd42016-09-10 22:43:48 +0200123# Not defined in all versions of OpenSSL
124OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
125OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
126OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
127OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100128OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200129
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100130
Thomas Woutersed03b412007-08-28 21:37:11 +0000131def handle_error(prefix):
132 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000133 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000134 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000135
Antoine Pitroub5218772010-05-21 09:56:06 +0000136def can_clear_options():
137 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200138 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000139
140def no_sslv2_implies_sslv3_hello():
141 # 0.9.7h or higher
142 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
143
Christian Heimes2427b502013-11-23 11:24:32 +0100144def have_verify_flags():
145 # 0.9.8 or higher
146 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
147
Christian Heimesb7b92252018-02-25 09:49:31 +0100148def _have_secp_curves():
149 if not ssl.HAS_ECDH:
150 return False
151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
152 try:
153 ctx.set_ecdh_curve("secp384r1")
154 except ValueError:
155 return False
156 else:
157 return True
158
159
160HAVE_SECP_CURVES = _have_secp_curves()
161
162
Antoine Pitrouc695c952014-04-28 20:57:36 +0200163def utc_offset(): #NOTE: ignore issues like #1647654
164 # local time = utc time + utc offset
165 if time.daylight and time.localtime().tm_isdst > 0:
166 return -time.altzone # seconds
167 return -time.timezone
168
Christian Heimes9424bb42013-06-17 15:32:57 +0200169def asn1time(cert_time):
170 # Some versions of OpenSSL ignore seconds, see #18207
171 # 0.9.8.i
172 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
173 fmt = "%b %d %H:%M:%S %Y GMT"
174 dt = datetime.datetime.strptime(cert_time, fmt)
175 dt = dt.replace(second=0)
176 cert_time = dt.strftime(fmt)
177 # %d adds leading zero but ASN1_TIME_print() uses leading space
178 if cert_time[4] == "0":
179 cert_time = cert_time[:4] + " " + cert_time[5:]
180
181 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000182
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100183needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
184
Antoine Pitrou23df4832010-08-04 17:14:06 +0000185
Christian Heimesd0486372016-09-10 23:23:33 +0200186def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
187 cert_reqs=ssl.CERT_NONE, ca_certs=None,
188 ciphers=None, certfile=None, keyfile=None,
189 **kwargs):
190 context = ssl.SSLContext(ssl_version)
191 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200192 if cert_reqs == ssl.CERT_NONE:
193 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200194 context.verify_mode = cert_reqs
195 if ca_certs is not None:
196 context.load_verify_locations(ca_certs)
197 if certfile is not None or keyfile is not None:
198 context.load_cert_chain(certfile, keyfile)
199 if ciphers is not None:
200 context.set_ciphers(ciphers)
201 return context.wrap_socket(sock, **kwargs)
202
Christian Heimesa170fa12017-09-15 20:27:30 +0200203
204def testing_context(server_cert=SIGNED_CERTFILE):
205 """Create context
206
207 client_context, server_context, hostname = testing_context()
208 """
209 if server_cert == SIGNED_CERTFILE:
210 hostname = SIGNED_CERTFILE_HOSTNAME
211 elif server_cert == SIGNED_CERTFILE2:
212 hostname = SIGNED_CERTFILE2_HOSTNAME
213 else:
214 raise ValueError(server_cert)
215
216 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
217 client_context.load_verify_locations(SIGNING_CA)
218
219 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
220 server_context.load_cert_chain(server_cert)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100221 client_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200222
223 return client_context, server_context, hostname
224
225
Antoine Pitrou152efa22010-05-16 18:19:27 +0000226class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000227
Antoine Pitrou480a1242010-04-28 21:37:09 +0000228 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000229 ssl.CERT_NONE
230 ssl.CERT_OPTIONAL
231 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100232 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100233 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100234 if ssl.HAS_ECDH:
235 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100236 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
237 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000238 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100239 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700240 ssl.OP_NO_SSLv2
241 ssl.OP_NO_SSLv3
242 ssl.OP_NO_TLSv1
243 ssl.OP_NO_TLSv1_3
244 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
245 ssl.OP_NO_TLSv1_1
246 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200247 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000248
Christian Heimes9d50ab52018-02-27 10:17:30 +0100249 def test_private_init(self):
250 with self.assertRaisesRegex(TypeError, "public constructor"):
251 with socket.socket() as s:
252 ssl.SSLSocket(s)
253
Antoine Pitrou172f0252014-04-18 20:33:08 +0200254 def test_str_for_enums(self):
255 # Make sure that the PROTOCOL_* constants have enum-like string
256 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200257 proto = ssl.PROTOCOL_TLS
258 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200259 ctx = ssl.SSLContext(proto)
260 self.assertIs(ctx.protocol, proto)
261
Antoine Pitrou480a1242010-04-28 21:37:09 +0000262 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000263 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000265 sys.stdout.write("\n RAND_status is %d (%s)\n"
266 % (v, (v and "sufficient randomness") or
267 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200268
269 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
270 self.assertEqual(len(data), 16)
271 self.assertEqual(is_cryptographic, v == 1)
272 if v:
273 data = ssl.RAND_bytes(16)
274 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200275 else:
276 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200277
Victor Stinner1e81a392013-12-19 16:47:04 +0100278 # negative num is invalid
279 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
280 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
281
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100282 if hasattr(ssl, 'RAND_egd'):
283 self.assertRaises(TypeError, ssl.RAND_egd, 1)
284 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000285 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200286 ssl.RAND_add(b"this is a random bytes object", 75.0)
287 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000288
Christian Heimesf77b4b22013-08-21 13:26:05 +0200289 @unittest.skipUnless(os.name == 'posix', 'requires posix')
290 def test_random_fork(self):
291 status = ssl.RAND_status()
292 if not status:
293 self.fail("OpenSSL's PRNG has insufficient randomness")
294
295 rfd, wfd = os.pipe()
296 pid = os.fork()
297 if pid == 0:
298 try:
299 os.close(rfd)
300 child_random = ssl.RAND_pseudo_bytes(16)[0]
301 self.assertEqual(len(child_random), 16)
302 os.write(wfd, child_random)
303 os.close(wfd)
304 except BaseException:
305 os._exit(1)
306 else:
307 os._exit(0)
308 else:
309 os.close(wfd)
310 self.addCleanup(os.close, rfd)
311 _, status = os.waitpid(pid, 0)
312 self.assertEqual(status, 0)
313
314 child_random = os.read(rfd, 16)
315 self.assertEqual(len(child_random), 16)
316 parent_random = ssl.RAND_pseudo_bytes(16)[0]
317 self.assertEqual(len(parent_random), 16)
318
319 self.assertNotEqual(child_random, parent_random)
320
Christian Heimese6dac002018-08-30 07:25:49 +0200321 maxDiff = None
322
Antoine Pitrou480a1242010-04-28 21:37:09 +0000323 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000324 # note that this uses an 'unofficial' function in _ssl.c,
325 # provided solely for this test, to exercise the certificate
326 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100327 self.assertEqual(
328 ssl._ssl._test_decode_cert(CERTFILE),
329 CERTFILE_INFO
330 )
331 self.assertEqual(
332 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
333 SIGNED_CERTFILE_INFO
334 )
335
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200336 # Issue #13034: the subjectAltName in some certificates
337 # (notably projects.developer.nokia.com:443) wasn't parsed
338 p = ssl._ssl._test_decode_cert(NOKIACERT)
339 if support.verbose:
340 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
341 self.assertEqual(p['subjectAltName'],
342 (('DNS', 'projects.developer.nokia.com'),
343 ('DNS', 'projects.forum.nokia.com'))
344 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100345 # extra OCSP and AIA fields
346 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
347 self.assertEqual(p['caIssuers'],
348 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
349 self.assertEqual(p['crlDistributionPoints'],
350 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000351
Christian Heimes824f7f32013-08-17 00:54:47 +0200352 def test_parse_cert_CVE_2013_4238(self):
353 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
354 if support.verbose:
355 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
356 subject = ((('countryName', 'US'),),
357 (('stateOrProvinceName', 'Oregon'),),
358 (('localityName', 'Beaverton'),),
359 (('organizationName', 'Python Software Foundation'),),
360 (('organizationalUnitName', 'Python Core Development'),),
361 (('commonName', 'null.python.org\x00example.org'),),
362 (('emailAddress', 'python-dev@python.org'),))
363 self.assertEqual(p['subject'], subject)
364 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200365 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
366 san = (('DNS', 'altnull.python.org\x00example.com'),
367 ('email', 'null@python.org\x00user@example.org'),
368 ('URI', 'http://null.python.org\x00http://example.org'),
369 ('IP Address', '192.0.2.1'),
370 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
371 else:
372 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
373 san = (('DNS', 'altnull.python.org\x00example.com'),
374 ('email', 'null@python.org\x00user@example.org'),
375 ('URI', 'http://null.python.org\x00http://example.org'),
376 ('IP Address', '192.0.2.1'),
377 ('IP Address', '<invalid>'))
378
379 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200380
Christian Heimes1c03abd2016-09-06 23:25:35 +0200381 def test_parse_all_sans(self):
382 p = ssl._ssl._test_decode_cert(ALLSANFILE)
383 self.assertEqual(p['subjectAltName'],
384 (
385 ('DNS', 'allsans'),
386 ('othername', '<unsupported>'),
387 ('othername', '<unsupported>'),
388 ('email', 'user@example.org'),
389 ('DNS', 'www.example.org'),
390 ('DirName',
391 ((('countryName', 'XY'),),
392 (('localityName', 'Castle Anthrax'),),
393 (('organizationName', 'Python Software Foundation'),),
394 (('commonName', 'dirname example'),))),
395 ('URI', 'https://www.python.org/'),
396 ('IP Address', '127.0.0.1'),
397 ('IP Address', '0:0:0:0:0:0:0:1\n'),
398 ('Registered ID', '1.2.3.4.5')
399 )
400 )
401
Antoine Pitrou480a1242010-04-28 21:37:09 +0000402 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000403 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000404 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000405 d1 = ssl.PEM_cert_to_DER_cert(pem)
406 p2 = ssl.DER_cert_to_PEM_cert(d1)
407 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000408 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000409 if not p2.startswith(ssl.PEM_HEADER + '\n'):
410 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
411 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
412 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000413
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000414 def test_openssl_version(self):
415 n = ssl.OPENSSL_VERSION_NUMBER
416 t = ssl.OPENSSL_VERSION_INFO
417 s = ssl.OPENSSL_VERSION
418 self.assertIsInstance(n, int)
419 self.assertIsInstance(t, tuple)
420 self.assertIsInstance(s, str)
421 # Some sanity checks follow
422 # >= 0.9
423 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400424 # < 3.0
425 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000426 major, minor, fix, patch, status = t
427 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400428 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000429 self.assertGreaterEqual(minor, 0)
430 self.assertLess(minor, 256)
431 self.assertGreaterEqual(fix, 0)
432 self.assertLess(fix, 256)
433 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100434 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000435 self.assertGreaterEqual(status, 0)
436 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400437 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200438 if IS_LIBRESSL:
439 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100440 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400441 else:
442 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100443 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000444
Antoine Pitrou9d543662010-04-23 23:10:32 +0000445 @support.cpython_only
446 def test_refcycle(self):
447 # Issue #7943: an SSL object doesn't create reference cycles with
448 # itself.
449 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200450 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000451 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100452 with support.check_warnings(("", ResourceWarning)):
453 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100454 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000455
Antoine Pitroua468adc2010-09-14 14:43:44 +0000456 def test_wrapped_unconnected(self):
457 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200458 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000459 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200460 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100461 self.assertRaises(OSError, ss.recv, 1)
462 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
463 self.assertRaises(OSError, ss.recvfrom, 1)
464 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
465 self.assertRaises(OSError, ss.send, b'x')
466 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Christian Heimes141c5e82018-02-24 21:10:57 +0100467 self.assertRaises(NotImplementedError, ss.sendmsg,
468 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000469
Antoine Pitrou40f08742010-04-24 22:04:40 +0000470 def test_timeout(self):
471 # Issue #8524: when creating an SSL socket, the timeout of the
472 # original socket should be retained.
473 for timeout in (None, 0.0, 5.0):
474 s = socket.socket(socket.AF_INET)
475 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200476 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100477 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000478
Christian Heimesd0486372016-09-10 23:23:33 +0200479 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000480 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000481 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000482 "certfile must be specified",
483 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000484 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000485 "certfile must be specified for server-side operations",
486 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000487 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000488 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200489 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100490 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
491 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200492 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200493 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000494 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000495 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000496 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200497 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000498 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000499 ssl.wrap_socket(sock,
500 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000501 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200502 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000503 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000504 ssl.wrap_socket(sock,
505 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000506 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000507
Martin Panter3464ea22016-02-01 21:58:11 +0000508 def bad_cert_test(self, certfile):
509 """Check that trying to use the given client certificate fails"""
510 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
511 certfile)
512 sock = socket.socket()
513 self.addCleanup(sock.close)
514 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200515 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200516 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000517
518 def test_empty_cert(self):
519 """Wrapping with an empty cert file"""
520 self.bad_cert_test("nullcert.pem")
521
522 def test_malformed_cert(self):
523 """Wrapping with a badly formatted certificate (syntax error)"""
524 self.bad_cert_test("badcert.pem")
525
526 def test_malformed_key(self):
527 """Wrapping with a badly formatted key (syntax error)"""
528 self.bad_cert_test("badkey.pem")
529
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000530 def test_match_hostname(self):
531 def ok(cert, hostname):
532 ssl.match_hostname(cert, hostname)
533 def fail(cert, hostname):
534 self.assertRaises(ssl.CertificateError,
535 ssl.match_hostname, cert, hostname)
536
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100537 # -- Hostname matching --
538
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000539 cert = {'subject': ((('commonName', 'example.com'),),)}
540 ok(cert, 'example.com')
541 ok(cert, 'ExAmple.cOm')
542 fail(cert, 'www.example.com')
543 fail(cert, '.example.com')
544 fail(cert, 'example.org')
545 fail(cert, 'exampleXcom')
546
547 cert = {'subject': ((('commonName', '*.a.com'),),)}
548 ok(cert, 'foo.a.com')
549 fail(cert, 'bar.foo.a.com')
550 fail(cert, 'a.com')
551 fail(cert, 'Xa.com')
552 fail(cert, '.a.com')
553
Mandeep Singhede2ac92017-11-27 04:01:27 +0530554 # only match wildcards when they are the only thing
555 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000556 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530557 fail(cert, 'foo.com')
558 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000559 fail(cert, 'bar.com')
560 fail(cert, 'foo.a.com')
561 fail(cert, 'bar.foo.com')
562
Christian Heimes824f7f32013-08-17 00:54:47 +0200563 # NULL bytes are bad, CVE-2013-4073
564 cert = {'subject': ((('commonName',
565 'null.python.org\x00example.org'),),)}
566 ok(cert, 'null.python.org\x00example.org') # or raise an error?
567 fail(cert, 'example.org')
568 fail(cert, 'null.python.org')
569
Georg Brandl72c98d32013-10-27 07:16:53 +0100570 # error cases with wildcards
571 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
572 fail(cert, 'bar.foo.a.com')
573 fail(cert, 'a.com')
574 fail(cert, 'Xa.com')
575 fail(cert, '.a.com')
576
577 cert = {'subject': ((('commonName', 'a.*.com'),),)}
578 fail(cert, 'a.foo.com')
579 fail(cert, 'a..com')
580 fail(cert, 'a.com')
581
582 # wildcard doesn't match IDNA prefix 'xn--'
583 idna = 'püthon.python.org'.encode("idna").decode("ascii")
584 cert = {'subject': ((('commonName', idna),),)}
585 ok(cert, idna)
586 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
587 fail(cert, idna)
588 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
589 fail(cert, idna)
590
591 # wildcard in first fragment and IDNA A-labels in sequent fragments
592 # are supported.
593 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
594 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530595 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
596 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100597 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
598 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
599
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000600 # Slightly fake real-world example
601 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
602 'subject': ((('commonName', 'linuxfrz.org'),),),
603 'subjectAltName': (('DNS', 'linuxfr.org'),
604 ('DNS', 'linuxfr.com'),
605 ('othername', '<unsupported>'))}
606 ok(cert, 'linuxfr.org')
607 ok(cert, 'linuxfr.com')
608 # Not a "DNS" entry
609 fail(cert, '<unsupported>')
610 # When there is a subjectAltName, commonName isn't used
611 fail(cert, 'linuxfrz.org')
612
613 # A pristine real-world example
614 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
615 'subject': ((('countryName', 'US'),),
616 (('stateOrProvinceName', 'California'),),
617 (('localityName', 'Mountain View'),),
618 (('organizationName', 'Google Inc'),),
619 (('commonName', 'mail.google.com'),))}
620 ok(cert, 'mail.google.com')
621 fail(cert, 'gmail.com')
622 # Only commonName is considered
623 fail(cert, 'California')
624
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100625 # -- IPv4 matching --
626 cert = {'subject': ((('commonName', 'example.com'),),),
627 'subjectAltName': (('DNS', 'example.com'),
628 ('IP Address', '10.11.12.13'),
629 ('IP Address', '14.15.16.17'))}
630 ok(cert, '10.11.12.13')
631 ok(cert, '14.15.16.17')
632 fail(cert, '14.15.16.18')
633 fail(cert, 'example.net')
634
635 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100636 if hasattr(socket, 'AF_INET6'):
637 cert = {'subject': ((('commonName', 'example.com'),),),
638 'subjectAltName': (
639 ('DNS', 'example.com'),
640 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
641 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
642 ok(cert, '2001::cafe')
643 ok(cert, '2003::baba')
644 fail(cert, '2003::bebe')
645 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100646
647 # -- Miscellaneous --
648
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000649 # Neither commonName nor subjectAltName
650 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
651 'subject': ((('countryName', 'US'),),
652 (('stateOrProvinceName', 'California'),),
653 (('localityName', 'Mountain View'),),
654 (('organizationName', 'Google Inc'),))}
655 fail(cert, 'mail.google.com')
656
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200657 # No DNS entry in subjectAltName but a commonName
658 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
659 'subject': ((('countryName', 'US'),),
660 (('stateOrProvinceName', 'California'),),
661 (('localityName', 'Mountain View'),),
662 (('commonName', 'mail.google.com'),)),
663 'subjectAltName': (('othername', 'blabla'), )}
664 ok(cert, 'mail.google.com')
665
666 # No DNS entry subjectAltName and no commonName
667 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
668 'subject': ((('countryName', 'US'),),
669 (('stateOrProvinceName', 'California'),),
670 (('localityName', 'Mountain View'),),
671 (('organizationName', 'Google Inc'),)),
672 'subjectAltName': (('othername', 'blabla'),)}
673 fail(cert, 'google.com')
674
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000675 # Empty cert / no cert
676 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
677 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
678
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200679 # Issue #17980: avoid denials of service by refusing more than one
680 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100681 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
682 with self.assertRaisesRegex(
683 ssl.CertificateError,
684 "partial wildcards in leftmost label are not supported"):
685 ssl.match_hostname(cert, 'axxb.example.com')
686
687 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
688 with self.assertRaisesRegex(
689 ssl.CertificateError,
690 "wildcard can only be present in the leftmost label"):
691 ssl.match_hostname(cert, 'www.sub.example.com')
692
693 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
694 with self.assertRaisesRegex(
695 ssl.CertificateError,
696 "too many wildcards"):
697 ssl.match_hostname(cert, 'axxbxxc.example.com')
698
699 cert = {'subject': ((('commonName', '*'),),)}
700 with self.assertRaisesRegex(
701 ssl.CertificateError,
702 "sole wildcard without additional labels are not support"):
703 ssl.match_hostname(cert, 'host')
704
705 cert = {'subject': ((('commonName', '*.com'),),)}
706 with self.assertRaisesRegex(
707 ssl.CertificateError,
708 r"hostname 'com' doesn't match '\*.com'"):
709 ssl.match_hostname(cert, 'com')
710
711 # extra checks for _inet_paton()
712 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
713 with self.assertRaises(ValueError):
714 ssl._inet_paton(invalid)
715 for ipaddr in ['127.0.0.1', '192.168.0.1']:
716 self.assertTrue(ssl._inet_paton(ipaddr))
717 if hasattr(socket, 'AF_INET6'):
718 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
719 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200720
Antoine Pitroud5323212010-10-22 18:19:07 +0000721 def test_server_side(self):
722 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200723 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000724 with socket.socket() as sock:
725 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
726 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000727
Antoine Pitroud6494802011-07-21 01:11:30 +0200728 def test_unknown_channel_binding(self):
729 # should raise ValueError for unknown type
730 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200731 s.bind(('127.0.0.1', 0))
732 s.listen()
733 c = socket.socket(socket.AF_INET)
734 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200735 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100736 with self.assertRaises(ValueError):
737 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200738 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200739
740 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
741 "'tls-unique' channel binding not available")
742 def test_tls_unique_channel_binding(self):
743 # unconnected should return None for known type
744 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200745 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100746 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200747 # the same for server-side
748 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200749 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100750 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200751
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600752 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200753 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600754 r = repr(ss)
755 with self.assertWarns(ResourceWarning) as cm:
756 ss = None
757 support.gc_collect()
758 self.assertIn(r, str(cm.warning.args[0]))
759
Christian Heimes6d7ad132013-06-09 18:02:55 +0200760 def test_get_default_verify_paths(self):
761 paths = ssl.get_default_verify_paths()
762 self.assertEqual(len(paths), 6)
763 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
764
765 with support.EnvironmentVarGuard() as env:
766 env["SSL_CERT_DIR"] = CAPATH
767 env["SSL_CERT_FILE"] = CERTFILE
768 paths = ssl.get_default_verify_paths()
769 self.assertEqual(paths.cafile, CERTFILE)
770 self.assertEqual(paths.capath, CAPATH)
771
Christian Heimes44109d72013-11-22 01:51:30 +0100772 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
773 def test_enum_certificates(self):
774 self.assertTrue(ssl.enum_certificates("CA"))
775 self.assertTrue(ssl.enum_certificates("ROOT"))
776
777 self.assertRaises(TypeError, ssl.enum_certificates)
778 self.assertRaises(WindowsError, ssl.enum_certificates, "")
779
Christian Heimesc2d65e12013-11-22 16:13:55 +0100780 trust_oids = set()
781 for storename in ("CA", "ROOT"):
782 store = ssl.enum_certificates(storename)
783 self.assertIsInstance(store, list)
784 for element in store:
785 self.assertIsInstance(element, tuple)
786 self.assertEqual(len(element), 3)
787 cert, enc, trust = element
788 self.assertIsInstance(cert, bytes)
789 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
790 self.assertIsInstance(trust, (set, bool))
791 if isinstance(trust, set):
792 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100793
794 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100795 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200796
Christian Heimes46bebee2013-06-09 19:03:31 +0200797 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100798 def test_enum_crls(self):
799 self.assertTrue(ssl.enum_crls("CA"))
800 self.assertRaises(TypeError, ssl.enum_crls)
801 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200802
Christian Heimes44109d72013-11-22 01:51:30 +0100803 crls = ssl.enum_crls("CA")
804 self.assertIsInstance(crls, list)
805 for element in crls:
806 self.assertIsInstance(element, tuple)
807 self.assertEqual(len(element), 2)
808 self.assertIsInstance(element[0], bytes)
809 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200810
Christian Heimes46bebee2013-06-09 19:03:31 +0200811
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100812 def test_asn1object(self):
813 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
814 '1.3.6.1.5.5.7.3.1')
815
816 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
817 self.assertEqual(val, expected)
818 self.assertEqual(val.nid, 129)
819 self.assertEqual(val.shortname, 'serverAuth')
820 self.assertEqual(val.longname, 'TLS Web Server Authentication')
821 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
822 self.assertIsInstance(val, ssl._ASN1Object)
823 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
824
825 val = ssl._ASN1Object.fromnid(129)
826 self.assertEqual(val, expected)
827 self.assertIsInstance(val, ssl._ASN1Object)
828 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100829 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
830 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100831 for i in range(1000):
832 try:
833 obj = ssl._ASN1Object.fromnid(i)
834 except ValueError:
835 pass
836 else:
837 self.assertIsInstance(obj.nid, int)
838 self.assertIsInstance(obj.shortname, str)
839 self.assertIsInstance(obj.longname, str)
840 self.assertIsInstance(obj.oid, (str, type(None)))
841
842 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
843 self.assertEqual(val, expected)
844 self.assertIsInstance(val, ssl._ASN1Object)
845 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
846 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
847 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100848 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
849 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100850
Christian Heimes72d28502013-11-23 13:56:58 +0100851 def test_purpose_enum(self):
852 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
853 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
854 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
855 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
856 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
857 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
858 '1.3.6.1.5.5.7.3.1')
859
860 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
861 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
862 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
863 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
864 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
865 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
866 '1.3.6.1.5.5.7.3.2')
867
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100868 def test_unsupported_dtls(self):
869 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
870 self.addCleanup(s.close)
871 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200872 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100873 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200874 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100875 with self.assertRaises(NotImplementedError) as cx:
876 ctx.wrap_socket(s)
877 self.assertEqual(str(cx.exception), "only stream sockets are supported")
878
Antoine Pitrouc695c952014-04-28 20:57:36 +0200879 def cert_time_ok(self, timestring, timestamp):
880 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
881
882 def cert_time_fail(self, timestring):
883 with self.assertRaises(ValueError):
884 ssl.cert_time_to_seconds(timestring)
885
886 @unittest.skipUnless(utc_offset(),
887 'local time needs to be different from UTC')
888 def test_cert_time_to_seconds_timezone(self):
889 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
890 # results if local timezone is not UTC
891 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
892 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
893
894 def test_cert_time_to_seconds(self):
895 timestring = "Jan 5 09:34:43 2018 GMT"
896 ts = 1515144883.0
897 self.cert_time_ok(timestring, ts)
898 # accept keyword parameter, assert its name
899 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
900 # accept both %e and %d (space or zero generated by strftime)
901 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
902 # case-insensitive
903 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
904 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
905 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
906 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
907 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
908 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
909 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
910 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
911
912 newyear_ts = 1230768000.0
913 # leap seconds
914 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
915 # same timestamp
916 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
917
918 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
919 # allow 60th second (even if it is not a leap second)
920 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
921 # allow 2nd leap second for compatibility with time.strptime()
922 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
923 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
924
Mike53f7a7c2017-12-14 14:04:53 +0300925 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200926 # 99991231235959Z (rfc 5280)
927 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
928
929 @support.run_with_locale('LC_ALL', '')
930 def test_cert_time_to_seconds_locale(self):
931 # `cert_time_to_seconds()` should be locale independent
932
933 def local_february_name():
934 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
935
936 if local_february_name().lower() == 'feb':
937 self.skipTest("locale-specific month name needs to be "
938 "different from C locale")
939
940 # locale-independent
941 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
942 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
943
Martin Panter3840b2a2016-03-27 01:53:46 +0000944 def test_connect_ex_error(self):
945 server = socket.socket(socket.AF_INET)
946 self.addCleanup(server.close)
947 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200948 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000949 cert_reqs=ssl.CERT_REQUIRED)
950 self.addCleanup(s.close)
951 rc = s.connect_ex((HOST, port))
952 # Issue #19919: Windows machines or VMs hosted on Windows
953 # machines sometimes return EWOULDBLOCK.
954 errors = (
955 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
956 errno.EWOULDBLOCK,
957 )
958 self.assertIn(rc, errors)
959
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100960
Antoine Pitrou152efa22010-05-16 18:19:27 +0000961class ContextTests(unittest.TestCase):
962
963 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100964 for protocol in PROTOCOLS:
965 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200966 ctx = ssl.SSLContext()
967 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000968 self.assertRaises(ValueError, ssl.SSLContext, -1)
969 self.assertRaises(ValueError, ssl.SSLContext, 42)
970
971 def test_protocol(self):
972 for proto in PROTOCOLS:
973 ctx = ssl.SSLContext(proto)
974 self.assertEqual(ctx.protocol, proto)
975
976 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200977 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000978 ctx.set_ciphers("ALL")
979 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000980 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000981 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000982
Christian Heimes892d66e2018-01-29 14:10:18 +0100983 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
984 "Test applies only to Python default ciphers")
985 def test_python_ciphers(self):
986 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
987 ciphers = ctx.get_ciphers()
988 for suite in ciphers:
989 name = suite['name']
990 self.assertNotIn("PSK", name)
991 self.assertNotIn("SRP", name)
992 self.assertNotIn("MD5", name)
993 self.assertNotIn("RC4", name)
994 self.assertNotIn("3DES", name)
995
Christian Heimes25bfcd52016-09-06 00:04:45 +0200996 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
997 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200998 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200999 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001000 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001001 self.assertIn('AES256-GCM-SHA384', names)
1002 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001003
Antoine Pitroub5218772010-05-21 09:56:06 +00001004 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001005 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001006 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001007 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001008 # SSLContext also enables these by default
1009 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001010 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1011 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001012 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001013 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001014 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001015 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001016 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1017 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001018 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001019 # Ubuntu has OP_NO_SSLv3 forced on by default
1020 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001021 else:
1022 with self.assertRaises(ValueError):
1023 ctx.options = 0
1024
Christian Heimesa170fa12017-09-15 20:27:30 +02001025 def test_verify_mode_protocol(self):
1026 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001027 # Default value
1028 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1029 ctx.verify_mode = ssl.CERT_OPTIONAL
1030 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1031 ctx.verify_mode = ssl.CERT_REQUIRED
1032 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1033 ctx.verify_mode = ssl.CERT_NONE
1034 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1035 with self.assertRaises(TypeError):
1036 ctx.verify_mode = None
1037 with self.assertRaises(ValueError):
1038 ctx.verify_mode = 42
1039
Christian Heimesa170fa12017-09-15 20:27:30 +02001040 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1041 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1042 self.assertFalse(ctx.check_hostname)
1043
1044 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1045 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1046 self.assertTrue(ctx.check_hostname)
1047
Christian Heimes61d478c2018-01-27 15:51:38 +01001048 def test_hostname_checks_common_name(self):
1049 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1050 self.assertTrue(ctx.hostname_checks_common_name)
1051 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1052 ctx.hostname_checks_common_name = True
1053 self.assertTrue(ctx.hostname_checks_common_name)
1054 ctx.hostname_checks_common_name = False
1055 self.assertFalse(ctx.hostname_checks_common_name)
1056 ctx.hostname_checks_common_name = True
1057 self.assertTrue(ctx.hostname_checks_common_name)
1058 else:
1059 with self.assertRaises(AttributeError):
1060 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001061
Christian Heimes698dde12018-02-27 11:54:43 +01001062 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1063 "required OpenSSL 1.1.0g")
1064 def test_min_max_version(self):
1065 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1066 self.assertEqual(
1067 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1068 )
1069 self.assertEqual(
1070 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1071 )
1072
1073 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1074 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1075 self.assertEqual(
1076 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1077 )
1078 self.assertEqual(
1079 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1080 )
1081
1082 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1083 ctx.maximum_version = ssl.TLSVersion.TLSv1
1084 self.assertEqual(
1085 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1086 )
1087 self.assertEqual(
1088 ctx.maximum_version, ssl.TLSVersion.TLSv1
1089 )
1090
1091 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1092 self.assertEqual(
1093 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1094 )
1095
1096 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1097 self.assertIn(
1098 ctx.maximum_version,
1099 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1100 )
1101
1102 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1103 self.assertIn(
1104 ctx.minimum_version,
1105 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1106 )
1107
1108 with self.assertRaises(ValueError):
1109 ctx.minimum_version = 42
1110
1111 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1112
1113 self.assertEqual(
1114 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1115 )
1116 self.assertEqual(
1117 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1118 )
1119 with self.assertRaises(ValueError):
1120 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1121 with self.assertRaises(ValueError):
1122 ctx.maximum_version = ssl.TLSVersion.TLSv1
1123
1124
Christian Heimes2427b502013-11-23 11:24:32 +01001125 @unittest.skipUnless(have_verify_flags(),
1126 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001127 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001128 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001129 # default value
1130 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1131 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001132 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1133 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1134 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1135 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1136 ctx.verify_flags = ssl.VERIFY_DEFAULT
1137 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1138 # supports any value
1139 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1140 self.assertEqual(ctx.verify_flags,
1141 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1142 with self.assertRaises(TypeError):
1143 ctx.verify_flags = None
1144
Antoine Pitrou152efa22010-05-16 18:19:27 +00001145 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001146 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001147 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001148 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001149 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1150 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001151 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001152 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001153 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001154 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001155 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001156 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001157 ctx.load_cert_chain(EMPTYCERT)
1158 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001159 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001160 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1161 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1162 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001163 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001164 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001165 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001166 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001167 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001168 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1169 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001170 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001171 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001172 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001173 # Password protected key and cert
1174 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1175 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1176 ctx.load_cert_chain(CERTFILE_PROTECTED,
1177 password=bytearray(KEY_PASSWORD.encode()))
1178 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1179 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1180 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1181 bytearray(KEY_PASSWORD.encode()))
1182 with self.assertRaisesRegex(TypeError, "should be a string"):
1183 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1184 with self.assertRaises(ssl.SSLError):
1185 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1186 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1187 # openssl has a fixed limit on the password buffer.
1188 # PEM_BUFSIZE is generally set to 1kb.
1189 # Return a string larger than this.
1190 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1191 # Password callback
1192 def getpass_unicode():
1193 return KEY_PASSWORD
1194 def getpass_bytes():
1195 return KEY_PASSWORD.encode()
1196 def getpass_bytearray():
1197 return bytearray(KEY_PASSWORD.encode())
1198 def getpass_badpass():
1199 return "badpass"
1200 def getpass_huge():
1201 return b'a' * (1024 * 1024)
1202 def getpass_bad_type():
1203 return 9
1204 def getpass_exception():
1205 raise Exception('getpass error')
1206 class GetPassCallable:
1207 def __call__(self):
1208 return KEY_PASSWORD
1209 def getpass(self):
1210 return KEY_PASSWORD
1211 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1212 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1213 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1214 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1215 ctx.load_cert_chain(CERTFILE_PROTECTED,
1216 password=GetPassCallable().getpass)
1217 with self.assertRaises(ssl.SSLError):
1218 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1219 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1220 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1221 with self.assertRaisesRegex(TypeError, "must return a string"):
1222 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1223 with self.assertRaisesRegex(Exception, "getpass error"):
1224 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1225 # Make sure the password function isn't called if it isn't needed
1226 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001227
1228 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001229 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001230 ctx.load_verify_locations(CERTFILE)
1231 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1232 ctx.load_verify_locations(BYTES_CERTFILE)
1233 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1234 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001235 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001236 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001237 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001238 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001239 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001240 ctx.load_verify_locations(BADCERT)
1241 ctx.load_verify_locations(CERTFILE, CAPATH)
1242 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1243
Victor Stinner80f75e62011-01-29 11:31:20 +00001244 # Issue #10989: crash if the second argument type is invalid
1245 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1246
Christian Heimesefff7062013-11-21 03:35:02 +01001247 def test_load_verify_cadata(self):
1248 # test cadata
1249 with open(CAFILE_CACERT) as f:
1250 cacert_pem = f.read()
1251 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1252 with open(CAFILE_NEURONIO) as f:
1253 neuronio_pem = f.read()
1254 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1255
1256 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001257 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001258 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1259 ctx.load_verify_locations(cadata=cacert_pem)
1260 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1261 ctx.load_verify_locations(cadata=neuronio_pem)
1262 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1263 # cert already in hash table
1264 ctx.load_verify_locations(cadata=neuronio_pem)
1265 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1266
1267 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001268 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001269 combined = "\n".join((cacert_pem, neuronio_pem))
1270 ctx.load_verify_locations(cadata=combined)
1271 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1272
1273 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001275 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1276 neuronio_pem, "tail"]
1277 ctx.load_verify_locations(cadata="\n".join(combined))
1278 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1279
1280 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001281 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001282 ctx.load_verify_locations(cadata=cacert_der)
1283 ctx.load_verify_locations(cadata=neuronio_der)
1284 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1285 # cert already in hash table
1286 ctx.load_verify_locations(cadata=cacert_der)
1287 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1288
1289 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001290 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001291 combined = b"".join((cacert_der, neuronio_der))
1292 ctx.load_verify_locations(cadata=combined)
1293 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1294
1295 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001296 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001297 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1298
1299 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1300 ctx.load_verify_locations(cadata="broken")
1301 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1302 ctx.load_verify_locations(cadata=b"broken")
1303
1304
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001305 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001306 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001307 ctx.load_dh_params(DHFILE)
1308 if os.name != 'nt':
1309 ctx.load_dh_params(BYTES_DHFILE)
1310 self.assertRaises(TypeError, ctx.load_dh_params)
1311 self.assertRaises(TypeError, ctx.load_dh_params, None)
1312 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001313 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001314 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001315 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001316 ctx.load_dh_params(CERTFILE)
1317
Antoine Pitroub0182c82010-10-12 20:09:02 +00001318 def test_session_stats(self):
1319 for proto in PROTOCOLS:
1320 ctx = ssl.SSLContext(proto)
1321 self.assertEqual(ctx.session_stats(), {
1322 'number': 0,
1323 'connect': 0,
1324 'connect_good': 0,
1325 'connect_renegotiate': 0,
1326 'accept': 0,
1327 'accept_good': 0,
1328 'accept_renegotiate': 0,
1329 'hits': 0,
1330 'misses': 0,
1331 'timeouts': 0,
1332 'cache_full': 0,
1333 })
1334
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001335 def test_set_default_verify_paths(self):
1336 # There's not much we can do to test that it acts as expected,
1337 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001338 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001339 ctx.set_default_verify_paths()
1340
Antoine Pitrou501da612011-12-21 09:27:41 +01001341 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001342 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001343 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001344 ctx.set_ecdh_curve("prime256v1")
1345 ctx.set_ecdh_curve(b"prime256v1")
1346 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1347 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1348 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1349 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1350
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001351 @needs_sni
1352 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001353 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001354
1355 # set_servername_callback expects a callable, or None
1356 self.assertRaises(TypeError, ctx.set_servername_callback)
1357 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1358 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1359 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1360
1361 def dummycallback(sock, servername, ctx):
1362 pass
1363 ctx.set_servername_callback(None)
1364 ctx.set_servername_callback(dummycallback)
1365
1366 @needs_sni
1367 def test_sni_callback_refcycle(self):
1368 # Reference cycles through the servername callback are detected
1369 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001370 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001371 def dummycallback(sock, servername, ctx, cycle=ctx):
1372 pass
1373 ctx.set_servername_callback(dummycallback)
1374 wr = weakref.ref(ctx)
1375 del ctx, dummycallback
1376 gc.collect()
1377 self.assertIs(wr(), None)
1378
Christian Heimes9a5395a2013-06-17 15:44:12 +02001379 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001380 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001381 self.assertEqual(ctx.cert_store_stats(),
1382 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1383 ctx.load_cert_chain(CERTFILE)
1384 self.assertEqual(ctx.cert_store_stats(),
1385 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1386 ctx.load_verify_locations(CERTFILE)
1387 self.assertEqual(ctx.cert_store_stats(),
1388 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001389 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001390 self.assertEqual(ctx.cert_store_stats(),
1391 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1392
1393 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001394 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001395 self.assertEqual(ctx.get_ca_certs(), [])
1396 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1397 ctx.load_verify_locations(CERTFILE)
1398 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001399 # but CAFILE_CACERT is a CA cert
1400 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001401 self.assertEqual(ctx.get_ca_certs(),
1402 [{'issuer': ((('organizationName', 'Root CA'),),
1403 (('organizationalUnitName', 'http://www.cacert.org'),),
1404 (('commonName', 'CA Cert Signing Authority'),),
1405 (('emailAddress', 'support@cacert.org'),)),
1406 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1407 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1408 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001409 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001410 'subject': ((('organizationName', 'Root CA'),),
1411 (('organizationalUnitName', 'http://www.cacert.org'),),
1412 (('commonName', 'CA Cert Signing Authority'),),
1413 (('emailAddress', 'support@cacert.org'),)),
1414 'version': 3}])
1415
Martin Panterb55f8b72016-01-14 12:53:56 +00001416 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001417 pem = f.read()
1418 der = ssl.PEM_cert_to_DER_cert(pem)
1419 self.assertEqual(ctx.get_ca_certs(True), [der])
1420
Christian Heimes72d28502013-11-23 13:56:58 +01001421 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001422 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001423 ctx.load_default_certs()
1424
Christian Heimesa170fa12017-09-15 20:27:30 +02001425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001426 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1427 ctx.load_default_certs()
1428
Christian Heimesa170fa12017-09-15 20:27:30 +02001429 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001430 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1431
Christian Heimesa170fa12017-09-15 20:27:30 +02001432 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001433 self.assertRaises(TypeError, ctx.load_default_certs, None)
1434 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1435
Benjamin Peterson91244e02014-10-03 18:17:15 -04001436 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001437 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001438 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001439 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001440 with support.EnvironmentVarGuard() as env:
1441 env["SSL_CERT_DIR"] = CAPATH
1442 env["SSL_CERT_FILE"] = CERTFILE
1443 ctx.load_default_certs()
1444 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1445
Benjamin Peterson91244e02014-10-03 18:17:15 -04001446 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001447 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001448 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001449 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001450 ctx.load_default_certs()
1451 stats = ctx.cert_store_stats()
1452
Christian Heimesa170fa12017-09-15 20:27:30 +02001453 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001454 with support.EnvironmentVarGuard() as env:
1455 env["SSL_CERT_DIR"] = CAPATH
1456 env["SSL_CERT_FILE"] = CERTFILE
1457 ctx.load_default_certs()
1458 stats["x509"] += 1
1459 self.assertEqual(ctx.cert_store_stats(), stats)
1460
Christian Heimes358cfd42016-09-10 22:43:48 +02001461 def _assert_context_options(self, ctx):
1462 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1463 if OP_NO_COMPRESSION != 0:
1464 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1465 OP_NO_COMPRESSION)
1466 if OP_SINGLE_DH_USE != 0:
1467 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1468 OP_SINGLE_DH_USE)
1469 if OP_SINGLE_ECDH_USE != 0:
1470 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1471 OP_SINGLE_ECDH_USE)
1472 if OP_CIPHER_SERVER_PREFERENCE != 0:
1473 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1474 OP_CIPHER_SERVER_PREFERENCE)
1475
Christian Heimes4c05b472013-11-23 15:58:30 +01001476 def test_create_default_context(self):
1477 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001478
Christian Heimesa170fa12017-09-15 20:27:30 +02001479 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001480 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001481 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001482 self._assert_context_options(ctx)
1483
Christian Heimes4c05b472013-11-23 15:58:30 +01001484 with open(SIGNING_CA) as f:
1485 cadata = f.read()
1486 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1487 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001488 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001489 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001490 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001491
1492 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001493 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001494 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001495 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001496
Christian Heimes67986f92013-11-23 22:43:47 +01001497 def test__create_stdlib_context(self):
1498 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001499 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001500 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001501 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001502 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001503
1504 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1505 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1506 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001507 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001508
1509 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001510 cert_reqs=ssl.CERT_REQUIRED,
1511 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001512 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1513 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001514 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001515 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001516
1517 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001518 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001519 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001520 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001521
Christian Heimes1aa9a752013-12-02 02:41:19 +01001522 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001523 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001524 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001525 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001526
Christian Heimese82c0342017-09-15 20:29:57 +02001527 # Auto set CERT_REQUIRED
1528 ctx.check_hostname = True
1529 self.assertTrue(ctx.check_hostname)
1530 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1531 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001532 ctx.verify_mode = ssl.CERT_REQUIRED
1533 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001534 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001535
Christian Heimese82c0342017-09-15 20:29:57 +02001536 # Changing verify_mode does not affect check_hostname
1537 ctx.check_hostname = False
1538 ctx.verify_mode = ssl.CERT_NONE
1539 ctx.check_hostname = False
1540 self.assertFalse(ctx.check_hostname)
1541 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1542 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001543 ctx.check_hostname = True
1544 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001545 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1546
1547 ctx.check_hostname = False
1548 ctx.verify_mode = ssl.CERT_OPTIONAL
1549 ctx.check_hostname = False
1550 self.assertFalse(ctx.check_hostname)
1551 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1552 # keep CERT_OPTIONAL
1553 ctx.check_hostname = True
1554 self.assertTrue(ctx.check_hostname)
1555 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001556
1557 # Cannot set CERT_NONE with check_hostname enabled
1558 with self.assertRaises(ValueError):
1559 ctx.verify_mode = ssl.CERT_NONE
1560 ctx.check_hostname = False
1561 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001562 ctx.verify_mode = ssl.CERT_NONE
1563 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001564
Christian Heimes5fe668c2016-09-12 00:01:11 +02001565 def test_context_client_server(self):
1566 # PROTOCOL_TLS_CLIENT has sane defaults
1567 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1568 self.assertTrue(ctx.check_hostname)
1569 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1570
1571 # PROTOCOL_TLS_SERVER has different but also sane defaults
1572 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1573 self.assertFalse(ctx.check_hostname)
1574 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1575
Christian Heimes4df60f12017-09-15 20:26:05 +02001576 def test_context_custom_class(self):
1577 class MySSLSocket(ssl.SSLSocket):
1578 pass
1579
1580 class MySSLObject(ssl.SSLObject):
1581 pass
1582
1583 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1584 ctx.sslsocket_class = MySSLSocket
1585 ctx.sslobject_class = MySSLObject
1586
1587 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1588 self.assertIsInstance(sock, MySSLSocket)
1589 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1590 self.assertIsInstance(obj, MySSLObject)
1591
Antoine Pitrou152efa22010-05-16 18:19:27 +00001592
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001593class SSLErrorTests(unittest.TestCase):
1594
1595 def test_str(self):
1596 # The str() of a SSLError doesn't include the errno
1597 e = ssl.SSLError(1, "foo")
1598 self.assertEqual(str(e), "foo")
1599 self.assertEqual(e.errno, 1)
1600 # Same for a subclass
1601 e = ssl.SSLZeroReturnError(1, "foo")
1602 self.assertEqual(str(e), "foo")
1603 self.assertEqual(e.errno, 1)
1604
1605 def test_lib_reason(self):
1606 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001607 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001608 with self.assertRaises(ssl.SSLError) as cm:
1609 ctx.load_dh_params(CERTFILE)
1610 self.assertEqual(cm.exception.library, 'PEM')
1611 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1612 s = str(cm.exception)
1613 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1614
1615 def test_subclass(self):
1616 # Check that the appropriate SSLError subclass is raised
1617 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001618 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1619 ctx.check_hostname = False
1620 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001621 with socket.socket() as s:
1622 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001623 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001624 c = socket.socket()
1625 c.connect(s.getsockname())
1626 c.setblocking(False)
1627 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001628 with self.assertRaises(ssl.SSLWantReadError) as cm:
1629 c.do_handshake()
1630 s = str(cm.exception)
1631 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1632 # For compatibility
1633 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1634
1635
Christian Heimes61d478c2018-01-27 15:51:38 +01001636 def test_bad_server_hostname(self):
1637 ctx = ssl.create_default_context()
1638 with self.assertRaises(ValueError):
1639 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1640 server_hostname="")
1641 with self.assertRaises(ValueError):
1642 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1643 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001644 with self.assertRaises(TypeError):
1645 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1646 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001647
1648
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001649class MemoryBIOTests(unittest.TestCase):
1650
1651 def test_read_write(self):
1652 bio = ssl.MemoryBIO()
1653 bio.write(b'foo')
1654 self.assertEqual(bio.read(), b'foo')
1655 self.assertEqual(bio.read(), b'')
1656 bio.write(b'foo')
1657 bio.write(b'bar')
1658 self.assertEqual(bio.read(), b'foobar')
1659 self.assertEqual(bio.read(), b'')
1660 bio.write(b'baz')
1661 self.assertEqual(bio.read(2), b'ba')
1662 self.assertEqual(bio.read(1), b'z')
1663 self.assertEqual(bio.read(1), b'')
1664
1665 def test_eof(self):
1666 bio = ssl.MemoryBIO()
1667 self.assertFalse(bio.eof)
1668 self.assertEqual(bio.read(), b'')
1669 self.assertFalse(bio.eof)
1670 bio.write(b'foo')
1671 self.assertFalse(bio.eof)
1672 bio.write_eof()
1673 self.assertFalse(bio.eof)
1674 self.assertEqual(bio.read(2), b'fo')
1675 self.assertFalse(bio.eof)
1676 self.assertEqual(bio.read(1), b'o')
1677 self.assertTrue(bio.eof)
1678 self.assertEqual(bio.read(), b'')
1679 self.assertTrue(bio.eof)
1680
1681 def test_pending(self):
1682 bio = ssl.MemoryBIO()
1683 self.assertEqual(bio.pending, 0)
1684 bio.write(b'foo')
1685 self.assertEqual(bio.pending, 3)
1686 for i in range(3):
1687 bio.read(1)
1688 self.assertEqual(bio.pending, 3-i-1)
1689 for i in range(3):
1690 bio.write(b'x')
1691 self.assertEqual(bio.pending, i+1)
1692 bio.read()
1693 self.assertEqual(bio.pending, 0)
1694
1695 def test_buffer_types(self):
1696 bio = ssl.MemoryBIO()
1697 bio.write(b'foo')
1698 self.assertEqual(bio.read(), b'foo')
1699 bio.write(bytearray(b'bar'))
1700 self.assertEqual(bio.read(), b'bar')
1701 bio.write(memoryview(b'baz'))
1702 self.assertEqual(bio.read(), b'baz')
1703
1704 def test_error_types(self):
1705 bio = ssl.MemoryBIO()
1706 self.assertRaises(TypeError, bio.write, 'foo')
1707 self.assertRaises(TypeError, bio.write, None)
1708 self.assertRaises(TypeError, bio.write, True)
1709 self.assertRaises(TypeError, bio.write, 1)
1710
1711
Christian Heimes9d50ab52018-02-27 10:17:30 +01001712class SSLObjectTests(unittest.TestCase):
1713 def test_private_init(self):
1714 bio = ssl.MemoryBIO()
1715 with self.assertRaisesRegex(TypeError, "public constructor"):
1716 ssl.SSLObject(bio, bio)
1717
1718
Martin Panter3840b2a2016-03-27 01:53:46 +00001719class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001720 """Tests that connect to a simple server running in the background"""
1721
1722 def setUp(self):
1723 server = ThreadedEchoServer(SIGNED_CERTFILE)
1724 self.server_addr = (HOST, server.port)
1725 server.__enter__()
1726 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001727
Antoine Pitrou480a1242010-04-28 21:37:09 +00001728 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001729 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001730 cert_reqs=ssl.CERT_NONE) as s:
1731 s.connect(self.server_addr)
1732 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001733 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001734
Martin Panter3840b2a2016-03-27 01:53:46 +00001735 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001736 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001737 cert_reqs=ssl.CERT_REQUIRED,
1738 ca_certs=SIGNING_CA) as s:
1739 s.connect(self.server_addr)
1740 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001741 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001742
Martin Panter3840b2a2016-03-27 01:53:46 +00001743 def test_connect_fail(self):
1744 # This should fail because we have no verification certs. Connection
1745 # failure crashes ThreadedEchoServer, so run this in an independent
1746 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001747 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001748 cert_reqs=ssl.CERT_REQUIRED)
1749 self.addCleanup(s.close)
1750 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1751 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001752
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001753 def test_connect_ex(self):
1754 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001755 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001756 cert_reqs=ssl.CERT_REQUIRED,
1757 ca_certs=SIGNING_CA)
1758 self.addCleanup(s.close)
1759 self.assertEqual(0, s.connect_ex(self.server_addr))
1760 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001761
1762 def test_non_blocking_connect_ex(self):
1763 # Issue #11326: non-blocking connect_ex() should allow handshake
1764 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001765 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001766 cert_reqs=ssl.CERT_REQUIRED,
1767 ca_certs=SIGNING_CA,
1768 do_handshake_on_connect=False)
1769 self.addCleanup(s.close)
1770 s.setblocking(False)
1771 rc = s.connect_ex(self.server_addr)
1772 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1773 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1774 # Wait for connect to finish
1775 select.select([], [s], [], 5.0)
1776 # Non-blocking handshake
1777 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001778 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001779 s.do_handshake()
1780 break
1781 except ssl.SSLWantReadError:
1782 select.select([s], [], [], 5.0)
1783 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001784 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001785 # SSL established
1786 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001787
Antoine Pitrou152efa22010-05-16 18:19:27 +00001788 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001789 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001790 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001791 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1792 s.connect(self.server_addr)
1793 self.assertEqual({}, s.getpeercert())
1794 # Same with a server hostname
1795 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1796 server_hostname="dummy") as s:
1797 s.connect(self.server_addr)
1798 ctx.verify_mode = ssl.CERT_REQUIRED
1799 # This should succeed because we specify the root cert
1800 ctx.load_verify_locations(SIGNING_CA)
1801 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1802 s.connect(self.server_addr)
1803 cert = s.getpeercert()
1804 self.assertTrue(cert)
1805
1806 def test_connect_with_context_fail(self):
1807 # This should fail because we have no verification certs. Connection
1808 # failure crashes ThreadedEchoServer, so run this in an independent
1809 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001810 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001811 ctx.verify_mode = ssl.CERT_REQUIRED
1812 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1813 self.addCleanup(s.close)
1814 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1815 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001816
1817 def test_connect_capath(self):
1818 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001819 # NOTE: the subject hashing algorithm has been changed between
1820 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1821 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001822 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001823 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001824 ctx.verify_mode = ssl.CERT_REQUIRED
1825 ctx.load_verify_locations(capath=CAPATH)
1826 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1827 s.connect(self.server_addr)
1828 cert = s.getpeercert()
1829 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001830
Martin Panter3840b2a2016-03-27 01:53:46 +00001831 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001832 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001833 ctx.verify_mode = ssl.CERT_REQUIRED
1834 ctx.load_verify_locations(capath=BYTES_CAPATH)
1835 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1836 s.connect(self.server_addr)
1837 cert = s.getpeercert()
1838 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001839
Christian Heimesefff7062013-11-21 03:35:02 +01001840 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001841 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001842 pem = f.read()
1843 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001844 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001845 ctx.verify_mode = ssl.CERT_REQUIRED
1846 ctx.load_verify_locations(cadata=pem)
1847 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1848 s.connect(self.server_addr)
1849 cert = s.getpeercert()
1850 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001851
Martin Panter3840b2a2016-03-27 01:53:46 +00001852 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001853 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001854 ctx.verify_mode = ssl.CERT_REQUIRED
1855 ctx.load_verify_locations(cadata=der)
1856 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1857 s.connect(self.server_addr)
1858 cert = s.getpeercert()
1859 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001860
Antoine Pitroue3220242010-04-24 11:13:53 +00001861 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1862 def test_makefile_close(self):
1863 # Issue #5238: creating a file-like object with makefile() shouldn't
1864 # delay closing the underlying "real socket" (here tested with its
1865 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001866 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001867 ss.connect(self.server_addr)
1868 fd = ss.fileno()
1869 f = ss.makefile()
1870 f.close()
1871 # The fd is still open
1872 os.read(fd, 0)
1873 # Closing the SSL socket should close the fd too
1874 ss.close()
1875 gc.collect()
1876 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001877 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001878 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001879
Antoine Pitrou480a1242010-04-28 21:37:09 +00001880 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001881 s = socket.socket(socket.AF_INET)
1882 s.connect(self.server_addr)
1883 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001884 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001885 cert_reqs=ssl.CERT_NONE,
1886 do_handshake_on_connect=False)
1887 self.addCleanup(s.close)
1888 count = 0
1889 while True:
1890 try:
1891 count += 1
1892 s.do_handshake()
1893 break
1894 except ssl.SSLWantReadError:
1895 select.select([s], [], [])
1896 except ssl.SSLWantWriteError:
1897 select.select([], [s], [])
1898 if support.verbose:
1899 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001900
Antoine Pitrou480a1242010-04-28 21:37:09 +00001901 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001902 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001903
Martin Panter3840b2a2016-03-27 01:53:46 +00001904 def test_get_server_certificate_fail(self):
1905 # Connection failure crashes ThreadedEchoServer, so run this in an
1906 # independent test method
1907 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001908
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001909 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001910 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001911 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1912 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001913 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001914 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1915 s.connect(self.server_addr)
1916 # Error checking can happen at instantiation or when connecting
1917 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1918 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001919 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001920 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1921 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001922
Christian Heimes9a5395a2013-06-17 15:44:12 +02001923 def test_get_ca_certs_capath(self):
1924 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001925 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001926 ctx.load_verify_locations(capath=CAPATH)
1927 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001928 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1929 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001930 s.connect(self.server_addr)
1931 cert = s.getpeercert()
1932 self.assertTrue(cert)
1933 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001934
Christian Heimes575596e2013-12-15 21:49:17 +01001935 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001936 def test_context_setget(self):
1937 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001938 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1939 ctx1.load_verify_locations(capath=CAPATH)
1940 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1941 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001942 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001943 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001944 ss.connect(self.server_addr)
1945 self.assertIs(ss.context, ctx1)
1946 self.assertIs(ss._sslobj.context, ctx1)
1947 ss.context = ctx2
1948 self.assertIs(ss.context, ctx2)
1949 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001950
1951 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1952 # A simple IO loop. Call func(*args) depending on the error we get
1953 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1954 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001955 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001956 count = 0
1957 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001958 if time.monotonic() > deadline:
1959 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001960 errno = None
1961 count += 1
1962 try:
1963 ret = func(*args)
1964 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001965 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001966 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001967 raise
1968 errno = e.errno
1969 # Get any data from the outgoing BIO irrespective of any error, and
1970 # send it to the socket.
1971 buf = outgoing.read()
1972 sock.sendall(buf)
1973 # If there's no error, we're done. For WANT_READ, we need to get
1974 # data from the socket and put it in the incoming BIO.
1975 if errno is None:
1976 break
1977 elif errno == ssl.SSL_ERROR_WANT_READ:
1978 buf = sock.recv(32768)
1979 if buf:
1980 incoming.write(buf)
1981 else:
1982 incoming.write_eof()
1983 if support.verbose:
1984 sys.stdout.write("Needed %d calls to complete %s().\n"
1985 % (count, func.__name__))
1986 return ret
1987
Martin Panter3840b2a2016-03-27 01:53:46 +00001988 def test_bio_handshake(self):
1989 sock = socket.socket(socket.AF_INET)
1990 self.addCleanup(sock.close)
1991 sock.connect(self.server_addr)
1992 incoming = ssl.MemoryBIO()
1993 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001994 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1995 self.assertTrue(ctx.check_hostname)
1996 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001997 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001998 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1999 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002000 self.assertIs(sslobj._sslobj.owner, sslobj)
2001 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002002 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002003 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002004 self.assertRaises(ValueError, sslobj.getpeercert)
2005 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2006 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2007 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2008 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002009 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002010 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002011 self.assertTrue(sslobj.getpeercert())
2012 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2013 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2014 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002015 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002016 except ssl.SSLSyscallError:
2017 # If the server shuts down the TCP connection without sending a
2018 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2019 pass
2020 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2021
2022 def test_bio_read_write_data(self):
2023 sock = socket.socket(socket.AF_INET)
2024 self.addCleanup(sock.close)
2025 sock.connect(self.server_addr)
2026 incoming = ssl.MemoryBIO()
2027 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002028 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002029 ctx.verify_mode = ssl.CERT_NONE
2030 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2031 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2032 req = b'FOO\n'
2033 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2034 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2035 self.assertEqual(buf, b'foo\n')
2036 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002037
2038
Martin Panter3840b2a2016-03-27 01:53:46 +00002039class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002040
Martin Panter3840b2a2016-03-27 01:53:46 +00002041 def test_timeout_connect_ex(self):
2042 # Issue #12065: on a timeout, connect_ex() should return the original
2043 # errno (mimicking the behaviour of non-SSL sockets).
2044 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002045 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002046 cert_reqs=ssl.CERT_REQUIRED,
2047 do_handshake_on_connect=False)
2048 self.addCleanup(s.close)
2049 s.settimeout(0.0000001)
2050 rc = s.connect_ex((REMOTE_HOST, 443))
2051 if rc == 0:
2052 self.skipTest("REMOTE_HOST responded too quickly")
2053 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2054
2055 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2056 def test_get_server_certificate_ipv6(self):
2057 with support.transient_internet('ipv6.google.com'):
2058 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2059 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2060
Martin Panter3840b2a2016-03-27 01:53:46 +00002061
2062def _test_get_server_certificate(test, host, port, cert=None):
2063 pem = ssl.get_server_certificate((host, port))
2064 if not pem:
2065 test.fail("No server certificate on %s:%s!" % (host, port))
2066
2067 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2068 if not pem:
2069 test.fail("No server certificate on %s:%s!" % (host, port))
2070 if support.verbose:
2071 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2072
2073def _test_get_server_certificate_fail(test, host, port):
2074 try:
2075 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2076 except ssl.SSLError as x:
2077 #should fail
2078 if support.verbose:
2079 sys.stdout.write("%s\n" % x)
2080 else:
2081 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2082
2083
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002084from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002085
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002086class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002087
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002088 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002089
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002090 """A mildly complicated class, because we want it to work both
2091 with and without the SSL wrapper around the socket connection, so
2092 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002093
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002094 def __init__(self, server, connsock, addr):
2095 self.server = server
2096 self.running = False
2097 self.sock = connsock
2098 self.addr = addr
2099 self.sock.setblocking(1)
2100 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002101 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002102 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002103
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002104 def wrap_conn(self):
2105 try:
2106 self.sslconn = self.server.context.wrap_socket(
2107 self.sock, server_side=True)
2108 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2109 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Christian Heimes529525f2018-05-23 22:24:45 +02002110 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002111 # We treat ConnectionResetError as though it were an
2112 # SSLError - OpenSSL on Ubuntu abruptly closes the
2113 # connection when asked to use an unsupported protocol.
2114 #
Christian Heimes529525f2018-05-23 22:24:45 +02002115 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2116 # tries to send session tickets after handshake.
2117 # https://github.com/openssl/openssl/issues/6342
2118 self.server.conn_errors.append(str(e))
2119 if self.server.chatty:
2120 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2121 self.running = False
2122 self.close()
2123 return False
2124 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002125 # OSError may occur with wrong protocols, e.g. both
2126 # sides use PROTOCOL_TLS_SERVER.
2127 #
2128 # XXX Various errors can have happened here, for example
2129 # a mismatching protocol version, an invalid certificate,
2130 # or a low-level bug. This should be made more discriminating.
2131 #
2132 # bpo-31323: Store the exception as string to prevent
2133 # a reference leak: server -> conn_errors -> exception
2134 # -> traceback -> self (ConnectionHandler) -> server
2135 self.server.conn_errors.append(str(e))
2136 if self.server.chatty:
2137 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2138 self.running = False
2139 self.server.stop()
2140 self.close()
2141 return False
2142 else:
2143 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2144 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2145 cert = self.sslconn.getpeercert()
2146 if support.verbose and self.server.chatty:
2147 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2148 cert_binary = self.sslconn.getpeercert(True)
2149 if support.verbose and self.server.chatty:
2150 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2151 cipher = self.sslconn.cipher()
2152 if support.verbose and self.server.chatty:
2153 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2154 sys.stdout.write(" server: selected protocol is now "
2155 + str(self.sslconn.selected_npn_protocol()) + "\n")
2156 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002157
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002158 def read(self):
2159 if self.sslconn:
2160 return self.sslconn.read()
2161 else:
2162 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002163
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002164 def write(self, bytes):
2165 if self.sslconn:
2166 return self.sslconn.write(bytes)
2167 else:
2168 return self.sock.send(bytes)
2169
2170 def close(self):
2171 if self.sslconn:
2172 self.sslconn.close()
2173 else:
2174 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002175
Antoine Pitrou480a1242010-04-28 21:37:09 +00002176 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002177 self.running = True
2178 if not self.server.starttls_server:
2179 if not self.wrap_conn():
2180 return
2181 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002182 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002183 msg = self.read()
2184 stripped = msg.strip()
2185 if not stripped:
2186 # eof, so quit this handler
2187 self.running = False
2188 try:
2189 self.sock = self.sslconn.unwrap()
2190 except OSError:
2191 # Many tests shut the TCP connection down
2192 # without an SSL shutdown. This causes
2193 # unwrap() to raise OSError with errno=0!
2194 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002195 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002196 self.sslconn = None
2197 self.close()
2198 elif stripped == b'over':
2199 if support.verbose and self.server.connectionchatty:
2200 sys.stdout.write(" server: client closed connection\n")
2201 self.close()
2202 return
2203 elif (self.server.starttls_server and
2204 stripped == b'STARTTLS'):
2205 if support.verbose and self.server.connectionchatty:
2206 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2207 self.write(b"OK\n")
2208 if not self.wrap_conn():
2209 return
2210 elif (self.server.starttls_server and self.sslconn
2211 and stripped == b'ENDTLS'):
2212 if support.verbose and self.server.connectionchatty:
2213 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2214 self.write(b"OK\n")
2215 self.sock = self.sslconn.unwrap()
2216 self.sslconn = None
2217 if support.verbose and self.server.connectionchatty:
2218 sys.stdout.write(" server: connection is now unencrypted...\n")
2219 elif stripped == b'CB tls-unique':
2220 if support.verbose and self.server.connectionchatty:
2221 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2222 data = self.sslconn.get_channel_binding("tls-unique")
2223 self.write(repr(data).encode("us-ascii") + b"\n")
2224 else:
2225 if (support.verbose and
2226 self.server.connectionchatty):
2227 ctype = (self.sslconn and "encrypted") or "unencrypted"
2228 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2229 % (msg, ctype, msg.lower(), ctype))
2230 self.write(msg.lower())
Christian Heimes529525f2018-05-23 22:24:45 +02002231 except ConnectionResetError:
2232 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2233 # when connection is not shut down gracefully.
2234 if self.server.chatty and support.verbose:
2235 sys.stdout.write(
2236 " Connection reset by peer: {}\n".format(
2237 self.addr)
2238 )
2239 self.close()
2240 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002241 except OSError:
2242 if self.server.chatty:
2243 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002244 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002245 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002246
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002247 # normally, we'd just stop here, but for the test
2248 # harness, we want to stop the server
2249 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002250
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002251 def __init__(self, certificate=None, ssl_version=None,
2252 certreqs=None, cacerts=None,
2253 chatty=True, connectionchatty=False, starttls_server=False,
2254 npn_protocols=None, alpn_protocols=None,
2255 ciphers=None, context=None):
2256 if context:
2257 self.context = context
2258 else:
2259 self.context = ssl.SSLContext(ssl_version
2260 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002261 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002262 self.context.verify_mode = (certreqs if certreqs is not None
2263 else ssl.CERT_NONE)
2264 if cacerts:
2265 self.context.load_verify_locations(cacerts)
2266 if certificate:
2267 self.context.load_cert_chain(certificate)
2268 if npn_protocols:
2269 self.context.set_npn_protocols(npn_protocols)
2270 if alpn_protocols:
2271 self.context.set_alpn_protocols(alpn_protocols)
2272 if ciphers:
2273 self.context.set_ciphers(ciphers)
2274 self.chatty = chatty
2275 self.connectionchatty = connectionchatty
2276 self.starttls_server = starttls_server
2277 self.sock = socket.socket()
2278 self.port = support.bind_port(self.sock)
2279 self.flag = None
2280 self.active = False
2281 self.selected_npn_protocols = []
2282 self.selected_alpn_protocols = []
2283 self.shared_ciphers = []
2284 self.conn_errors = []
2285 threading.Thread.__init__(self)
2286 self.daemon = True
2287
2288 def __enter__(self):
2289 self.start(threading.Event())
2290 self.flag.wait()
2291 return self
2292
2293 def __exit__(self, *args):
2294 self.stop()
2295 self.join()
2296
2297 def start(self, flag=None):
2298 self.flag = flag
2299 threading.Thread.start(self)
2300
2301 def run(self):
2302 self.sock.settimeout(0.05)
2303 self.sock.listen()
2304 self.active = True
2305 if self.flag:
2306 # signal an event
2307 self.flag.set()
2308 while self.active:
2309 try:
2310 newconn, connaddr = self.sock.accept()
2311 if support.verbose and self.chatty:
2312 sys.stdout.write(' server: new connection from '
2313 + repr(connaddr) + '\n')
2314 handler = self.ConnectionHandler(self, newconn, connaddr)
2315 handler.start()
2316 handler.join()
2317 except socket.timeout:
2318 pass
2319 except KeyboardInterrupt:
2320 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002321 except BaseException as e:
2322 if support.verbose and self.chatty:
2323 sys.stdout.write(
2324 ' connection handling failed: ' + repr(e) + '\n')
2325
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002326 self.sock.close()
2327
2328 def stop(self):
2329 self.active = False
2330
2331class AsyncoreEchoServer(threading.Thread):
2332
2333 # this one's based on asyncore.dispatcher
2334
2335 class EchoServer (asyncore.dispatcher):
2336
2337 class ConnectionHandler(asyncore.dispatcher_with_send):
2338
2339 def __init__(self, conn, certfile):
2340 self.socket = test_wrap_socket(conn, server_side=True,
2341 certfile=certfile,
2342 do_handshake_on_connect=False)
2343 asyncore.dispatcher_with_send.__init__(self, self.socket)
2344 self._ssl_accepting = True
2345 self._do_ssl_handshake()
2346
2347 def readable(self):
2348 if isinstance(self.socket, ssl.SSLSocket):
2349 while self.socket.pending() > 0:
2350 self.handle_read_event()
2351 return True
2352
2353 def _do_ssl_handshake(self):
2354 try:
2355 self.socket.do_handshake()
2356 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2357 return
2358 except ssl.SSLEOFError:
2359 return self.handle_close()
2360 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002361 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002362 except OSError as err:
2363 if err.args[0] == errno.ECONNABORTED:
2364 return self.handle_close()
2365 else:
2366 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002367
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002368 def handle_read(self):
2369 if self._ssl_accepting:
2370 self._do_ssl_handshake()
2371 else:
2372 data = self.recv(1024)
2373 if support.verbose:
2374 sys.stdout.write(" server: read %s from client\n" % repr(data))
2375 if not data:
2376 self.close()
2377 else:
2378 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002379
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002380 def handle_close(self):
2381 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002382 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002383 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002384
2385 def handle_error(self):
2386 raise
2387
Trent Nelson78520002008-04-10 20:54:35 +00002388 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002389 self.certfile = certfile
2390 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2391 self.port = support.bind_port(sock, '')
2392 asyncore.dispatcher.__init__(self, sock)
2393 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002394
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002395 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002396 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002397 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2398 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002399
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002400 def handle_error(self):
2401 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002402
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002403 def __init__(self, certfile):
2404 self.flag = None
2405 self.active = False
2406 self.server = self.EchoServer(certfile)
2407 self.port = self.server.port
2408 threading.Thread.__init__(self)
2409 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002410
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002411 def __str__(self):
2412 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002413
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002414 def __enter__(self):
2415 self.start(threading.Event())
2416 self.flag.wait()
2417 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002418
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002419 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002420 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002421 sys.stdout.write(" cleanup: stopping server.\n")
2422 self.stop()
2423 if support.verbose:
2424 sys.stdout.write(" cleanup: joining server thread.\n")
2425 self.join()
2426 if support.verbose:
2427 sys.stdout.write(" cleanup: successfully joined.\n")
2428 # make sure that ConnectionHandler is removed from socket_map
2429 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002430
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002431 def start (self, flag=None):
2432 self.flag = flag
2433 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002434
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002435 def run(self):
2436 self.active = True
2437 if self.flag:
2438 self.flag.set()
2439 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002440 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441 asyncore.loop(1)
2442 except:
2443 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002444
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002445 def stop(self):
2446 self.active = False
2447 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002448
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002449def server_params_test(client_context, server_context, indata=b"FOO\n",
2450 chatty=True, connectionchatty=False, sni_name=None,
2451 session=None):
2452 """
2453 Launch a server, connect a client to it and try various reads
2454 and writes.
2455 """
2456 stats = {}
2457 server = ThreadedEchoServer(context=server_context,
2458 chatty=chatty,
2459 connectionchatty=False)
2460 with server:
2461 with client_context.wrap_socket(socket.socket(),
2462 server_hostname=sni_name, session=session) as s:
2463 s.connect((HOST, server.port))
2464 for arg in [indata, bytearray(indata), memoryview(indata)]:
2465 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002466 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002467 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002468 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002469 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002470 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002471 if connectionchatty:
2472 if support.verbose:
2473 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002474 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002476 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2477 % (outdata[:20], len(outdata),
2478 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002479 s.write(b"over\n")
2480 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002481 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002482 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002483 stats.update({
2484 'compression': s.compression(),
2485 'cipher': s.cipher(),
2486 'peercert': s.getpeercert(),
2487 'client_alpn_protocol': s.selected_alpn_protocol(),
2488 'client_npn_protocol': s.selected_npn_protocol(),
2489 'version': s.version(),
2490 'session_reused': s.session_reused,
2491 'session': s.session,
2492 })
2493 s.close()
2494 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2495 stats['server_npn_protocols'] = server.selected_npn_protocols
2496 stats['server_shared_ciphers'] = server.shared_ciphers
2497 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002498
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002499def try_protocol_combo(server_protocol, client_protocol, expect_success,
2500 certsreqs=None, server_options=0, client_options=0):
2501 """
2502 Try to SSL-connect using *client_protocol* to *server_protocol*.
2503 If *expect_success* is true, assert that the connection succeeds,
2504 if it's false, assert that the connection fails.
2505 Also, if *expect_success* is a string, assert that it is the protocol
2506 version actually used by the connection.
2507 """
2508 if certsreqs is None:
2509 certsreqs = ssl.CERT_NONE
2510 certtype = {
2511 ssl.CERT_NONE: "CERT_NONE",
2512 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2513 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2514 }[certsreqs]
2515 if support.verbose:
2516 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2517 sys.stdout.write(formatstr %
2518 (ssl.get_protocol_name(client_protocol),
2519 ssl.get_protocol_name(server_protocol),
2520 certtype))
2521 client_context = ssl.SSLContext(client_protocol)
2522 client_context.options |= client_options
2523 server_context = ssl.SSLContext(server_protocol)
2524 server_context.options |= server_options
2525
2526 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2527 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2528 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002529 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002530 client_context.set_ciphers("ALL")
2531
2532 for ctx in (client_context, server_context):
2533 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002534 ctx.load_cert_chain(SIGNED_CERTFILE)
2535 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002536 try:
2537 stats = server_params_test(client_context, server_context,
2538 chatty=False, connectionchatty=False)
2539 # Protocol mismatch can result in either an SSLError, or a
2540 # "Connection reset by peer" error.
2541 except ssl.SSLError:
2542 if expect_success:
2543 raise
2544 except OSError as e:
2545 if expect_success or e.errno != errno.ECONNRESET:
2546 raise
2547 else:
2548 if not expect_success:
2549 raise AssertionError(
2550 "Client protocol %s succeeded with server protocol %s!"
2551 % (ssl.get_protocol_name(client_protocol),
2552 ssl.get_protocol_name(server_protocol)))
2553 elif (expect_success is not True
2554 and expect_success != stats['version']):
2555 raise AssertionError("version mismatch: expected %r, got %r"
2556 % (expect_success, stats['version']))
2557
2558
2559class ThreadedTests(unittest.TestCase):
2560
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002561 def test_echo(self):
2562 """Basic test of an SSL client connecting to a server"""
2563 if support.verbose:
2564 sys.stdout.write("\n")
2565 for protocol in PROTOCOLS:
2566 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2567 continue
2568 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2569 context = ssl.SSLContext(protocol)
2570 context.load_cert_chain(CERTFILE)
2571 server_params_test(context, context,
2572 chatty=True, connectionchatty=True)
2573
Christian Heimesa170fa12017-09-15 20:27:30 +02002574 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002575
2576 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2577 server_params_test(client_context=client_context,
2578 server_context=server_context,
2579 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002580 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002581
2582 client_context.check_hostname = False
2583 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2584 with self.assertRaises(ssl.SSLError) as e:
2585 server_params_test(client_context=server_context,
2586 server_context=client_context,
2587 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002588 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002589 self.assertIn('called a function you should not call',
2590 str(e.exception))
2591
2592 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2593 with self.assertRaises(ssl.SSLError) as e:
2594 server_params_test(client_context=server_context,
2595 server_context=server_context,
2596 chatty=True, connectionchatty=True)
2597 self.assertIn('called a function you should not call',
2598 str(e.exception))
2599
2600 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2601 with self.assertRaises(ssl.SSLError) as e:
2602 server_params_test(client_context=server_context,
2603 server_context=client_context,
2604 chatty=True, connectionchatty=True)
2605 self.assertIn('called a function you should not call',
2606 str(e.exception))
2607
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002608 def test_getpeercert(self):
2609 if support.verbose:
2610 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002611
2612 client_context, server_context, hostname = testing_context()
2613 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002614 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002615 with client_context.wrap_socket(socket.socket(),
2616 do_handshake_on_connect=False,
2617 server_hostname=hostname) as s:
2618 s.connect((HOST, server.port))
2619 # getpeercert() raise ValueError while the handshake isn't
2620 # done.
2621 with self.assertRaises(ValueError):
2622 s.getpeercert()
2623 s.do_handshake()
2624 cert = s.getpeercert()
2625 self.assertTrue(cert, "Can't get peer certificate.")
2626 cipher = s.cipher()
2627 if support.verbose:
2628 sys.stdout.write(pprint.pformat(cert) + '\n')
2629 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2630 if 'subject' not in cert:
2631 self.fail("No subject field in certificate: %s." %
2632 pprint.pformat(cert))
2633 if ((('organizationName', 'Python Software Foundation'),)
2634 not in cert['subject']):
2635 self.fail(
2636 "Missing or invalid 'organizationName' field in certificate subject; "
2637 "should be 'Python Software Foundation'.")
2638 self.assertIn('notBefore', cert)
2639 self.assertIn('notAfter', cert)
2640 before = ssl.cert_time_to_seconds(cert['notBefore'])
2641 after = ssl.cert_time_to_seconds(cert['notAfter'])
2642 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002643
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002644 @unittest.skipUnless(have_verify_flags(),
2645 "verify_flags need OpenSSL > 0.9.8")
2646 def test_crl_check(self):
2647 if support.verbose:
2648 sys.stdout.write("\n")
2649
Christian Heimesa170fa12017-09-15 20:27:30 +02002650 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002651
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002652 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002653 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002654
2655 # VERIFY_DEFAULT should pass
2656 server = ThreadedEchoServer(context=server_context, chatty=True)
2657 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002658 with client_context.wrap_socket(socket.socket(),
2659 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002660 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002661 cert = s.getpeercert()
2662 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002663
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002664 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002665 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002666
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002667 server = ThreadedEchoServer(context=server_context, chatty=True)
2668 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002669 with client_context.wrap_socket(socket.socket(),
2670 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002671 with self.assertRaisesRegex(ssl.SSLError,
2672 "certificate verify failed"):
2673 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002674
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002675 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002676 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002677
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002678 server = ThreadedEchoServer(context=server_context, chatty=True)
2679 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002680 with client_context.wrap_socket(socket.socket(),
2681 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002682 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002683 cert = s.getpeercert()
2684 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002685
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002686 def test_check_hostname(self):
2687 if support.verbose:
2688 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002689
Christian Heimesa170fa12017-09-15 20:27:30 +02002690 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002691
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002692 # correct hostname should verify
2693 server = ThreadedEchoServer(context=server_context, chatty=True)
2694 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002695 with client_context.wrap_socket(socket.socket(),
2696 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002697 s.connect((HOST, server.port))
2698 cert = s.getpeercert()
2699 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002700
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002701 # incorrect hostname should raise an exception
2702 server = ThreadedEchoServer(context=server_context, chatty=True)
2703 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002704 with client_context.wrap_socket(socket.socket(),
2705 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002706 with self.assertRaisesRegex(
2707 ssl.CertificateError,
2708 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002709 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002710
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002711 # missing server_hostname arg should cause an exception, too
2712 server = ThreadedEchoServer(context=server_context, chatty=True)
2713 with server:
2714 with socket.socket() as s:
2715 with self.assertRaisesRegex(ValueError,
2716 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002717 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002718
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002719 def test_ecc_cert(self):
2720 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2721 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002722 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002723 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2724
2725 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2726 # load ECC cert
2727 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2728
2729 # correct hostname should verify
2730 server = ThreadedEchoServer(context=server_context, chatty=True)
2731 with server:
2732 with client_context.wrap_socket(socket.socket(),
2733 server_hostname=hostname) as s:
2734 s.connect((HOST, server.port))
2735 cert = s.getpeercert()
2736 self.assertTrue(cert, "Can't get peer certificate.")
2737 cipher = s.cipher()[0].split('-')
2738 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2739
2740 def test_dual_rsa_ecc(self):
2741 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2742 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002743 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2744 # algorithms.
2745 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002746 # only ECDSA certs
2747 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2748 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2749
2750 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2751 # load ECC and RSA key/cert pairs
2752 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2753 server_context.load_cert_chain(SIGNED_CERTFILE)
2754
2755 # correct hostname should verify
2756 server = ThreadedEchoServer(context=server_context, chatty=True)
2757 with server:
2758 with client_context.wrap_socket(socket.socket(),
2759 server_hostname=hostname) as s:
2760 s.connect((HOST, server.port))
2761 cert = s.getpeercert()
2762 self.assertTrue(cert, "Can't get peer certificate.")
2763 cipher = s.cipher()[0].split('-')
2764 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2765
Christian Heimes66e57422018-01-29 14:25:13 +01002766 def test_check_hostname_idn(self):
2767 if support.verbose:
2768 sys.stdout.write("\n")
2769
Christian Heimes11a14932018-02-24 02:35:08 +01002770 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002771 server_context.load_cert_chain(IDNSANSFILE)
2772
Christian Heimes11a14932018-02-24 02:35:08 +01002773 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002774 context.verify_mode = ssl.CERT_REQUIRED
2775 context.check_hostname = True
2776 context.load_verify_locations(SIGNING_CA)
2777
2778 # correct hostname should verify, when specified in several
2779 # different ways
2780 idn_hostnames = [
2781 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002782 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002783 ('xn--knig-5qa.idn.pythontest.net',
2784 'xn--knig-5qa.idn.pythontest.net'),
2785 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002786 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002787
2788 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002789 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002790 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2791 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2792 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002793 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2794
2795 # ('königsgäßchen.idna2008.pythontest.net',
2796 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2797 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2798 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2799 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2800 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2801
Christian Heimes66e57422018-01-29 14:25:13 +01002802 ]
2803 for server_hostname, expected_hostname in idn_hostnames:
2804 server = ThreadedEchoServer(context=server_context, chatty=True)
2805 with server:
2806 with context.wrap_socket(socket.socket(),
2807 server_hostname=server_hostname) as s:
2808 self.assertEqual(s.server_hostname, expected_hostname)
2809 s.connect((HOST, server.port))
2810 cert = s.getpeercert()
2811 self.assertEqual(s.server_hostname, expected_hostname)
2812 self.assertTrue(cert, "Can't get peer certificate.")
2813
Christian Heimes66e57422018-01-29 14:25:13 +01002814 # incorrect hostname should raise an exception
2815 server = ThreadedEchoServer(context=server_context, chatty=True)
2816 with server:
2817 with context.wrap_socket(socket.socket(),
2818 server_hostname="python.example.org") as s:
2819 with self.assertRaises(ssl.CertificateError):
2820 s.connect((HOST, server.port))
2821
Christian Heimes529525f2018-05-23 22:24:45 +02002822 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002823 """Connecting when the server rejects the client's certificate
2824
2825 Launch a server with CERT_REQUIRED, and check that trying to
2826 connect to it with a wrong client certificate fails.
2827 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002828 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002829 # load client cert that is not signed by trusted CA
2830 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002831 # require TLS client authentication
2832 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002833 # TLS 1.3 has different handshake
2834 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002835
2836 server = ThreadedEchoServer(
2837 context=server_context, chatty=True, connectionchatty=True,
2838 )
2839
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002840 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002841 client_context.wrap_socket(socket.socket(),
2842 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002843 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002844 # Expect either an SSL error about the server rejecting
2845 # the connection, or a low-level connection reset (which
2846 # sometimes happens on Windows)
2847 s.connect((HOST, server.port))
2848 except ssl.SSLError as e:
2849 if support.verbose:
2850 sys.stdout.write("\nSSLError is %r\n" % e)
2851 except OSError as e:
2852 if e.errno != errno.ECONNRESET:
2853 raise
2854 if support.verbose:
2855 sys.stdout.write("\nsocket.error is %r\n" % e)
2856 else:
2857 self.fail("Use of invalid cert should have failed!")
2858
Christian Heimes529525f2018-05-23 22:24:45 +02002859 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2860 def test_wrong_cert_tls13(self):
2861 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002862 # load client cert that is not signed by trusted CA
2863 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02002864 server_context.verify_mode = ssl.CERT_REQUIRED
2865 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2866 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2867
2868 server = ThreadedEchoServer(
2869 context=server_context, chatty=True, connectionchatty=True,
2870 )
2871 with server, \
2872 client_context.wrap_socket(socket.socket(),
2873 server_hostname=hostname) as s:
2874 # TLS 1.3 perform client cert exchange after handshake
2875 s.connect((HOST, server.port))
2876 try:
2877 s.write(b'data')
2878 s.read(4)
2879 except ssl.SSLError as e:
2880 if support.verbose:
2881 sys.stdout.write("\nSSLError is %r\n" % e)
2882 except OSError as e:
2883 if e.errno != errno.ECONNRESET:
2884 raise
2885 if support.verbose:
2886 sys.stdout.write("\nsocket.error is %r\n" % e)
2887 else:
2888 self.fail("Use of invalid cert should have failed!")
2889
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002890 def test_rude_shutdown(self):
2891 """A brutal shutdown of an SSL server should raise an OSError
2892 in the client when attempting handshake.
2893 """
2894 listener_ready = threading.Event()
2895 listener_gone = threading.Event()
2896
2897 s = socket.socket()
2898 port = support.bind_port(s, HOST)
2899
2900 # `listener` runs in a thread. It sits in an accept() until
2901 # the main thread connects. Then it rudely closes the socket,
2902 # and sets Event `listener_gone` to let the main thread know
2903 # the socket is gone.
2904 def listener():
2905 s.listen()
2906 listener_ready.set()
2907 newsock, addr = s.accept()
2908 newsock.close()
2909 s.close()
2910 listener_gone.set()
2911
2912 def connector():
2913 listener_ready.wait()
2914 with socket.socket() as c:
2915 c.connect((HOST, port))
2916 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002917 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002918 ssl_sock = test_wrap_socket(c)
2919 except OSError:
2920 pass
2921 else:
2922 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002923
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002924 t = threading.Thread(target=listener)
2925 t.start()
2926 try:
2927 connector()
2928 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002929 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002930
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002931 def test_ssl_cert_verify_error(self):
2932 if support.verbose:
2933 sys.stdout.write("\n")
2934
2935 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2936 server_context.load_cert_chain(SIGNED_CERTFILE)
2937
2938 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2939
2940 server = ThreadedEchoServer(context=server_context, chatty=True)
2941 with server:
2942 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002943 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002944 try:
2945 s.connect((HOST, server.port))
2946 except ssl.SSLError as e:
2947 msg = 'unable to get local issuer certificate'
2948 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2949 self.assertEqual(e.verify_code, 20)
2950 self.assertEqual(e.verify_message, msg)
2951 self.assertIn(msg, repr(e))
2952 self.assertIn('certificate verify failed', repr(e))
2953
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002954 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2955 "OpenSSL is compiled without SSLv2 support")
2956 def test_protocol_sslv2(self):
2957 """Connecting to an SSLv2 server with various client options"""
2958 if support.verbose:
2959 sys.stdout.write("\n")
2960 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2961 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2962 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002963 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002964 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2965 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2966 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2967 # SSLv23 client with specific SSL options
2968 if no_sslv2_implies_sslv3_hello():
2969 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002970 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002971 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002972 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002974 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002975 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002976
Christian Heimesa170fa12017-09-15 20:27:30 +02002977 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002978 """Connecting to an SSLv23 server with various client options"""
2979 if support.verbose:
2980 sys.stdout.write("\n")
2981 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002982 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002983 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002984 except OSError as x:
2985 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2986 if support.verbose:
2987 sys.stdout.write(
2988 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2989 % str(x))
2990 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002991 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2992 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2993 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002994
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002995 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002996 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2997 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2998 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002999
3000 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003001 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3002 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3003 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003004
3005 # Server with specific SSL options
3006 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003007 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003008 server_options=ssl.OP_NO_SSLv3)
3009 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003010 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003011 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003012 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003013 server_options=ssl.OP_NO_TLSv1)
3014
3015
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003016 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3017 "OpenSSL is compiled without SSLv3 support")
3018 def test_protocol_sslv3(self):
3019 """Connecting to an SSLv3 server with various client options"""
3020 if support.verbose:
3021 sys.stdout.write("\n")
3022 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3023 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3024 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3025 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3026 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003027 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003028 client_options=ssl.OP_NO_SSLv3)
3029 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3030 if no_sslv2_implies_sslv3_hello():
3031 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003032 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003033 False, client_options=ssl.OP_NO_SSLv2)
3034
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003035 def test_protocol_tlsv1(self):
3036 """Connecting to a TLSv1 server with various client options"""
3037 if support.verbose:
3038 sys.stdout.write("\n")
3039 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3040 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3041 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3042 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3043 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3044 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3045 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003046 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003047 client_options=ssl.OP_NO_TLSv1)
3048
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003049 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3050 "TLS version 1.1 not supported.")
3051 def test_protocol_tlsv1_1(self):
3052 """Connecting to a TLSv1.1 server with various client options.
3053 Testing against older TLS versions."""
3054 if support.verbose:
3055 sys.stdout.write("\n")
3056 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3057 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3058 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3059 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3060 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003061 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003062 client_options=ssl.OP_NO_TLSv1_1)
3063
Christian Heimesa170fa12017-09-15 20:27:30 +02003064 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003065 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3066 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003068 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3069 "TLS version 1.2 not supported.")
3070 def test_protocol_tlsv1_2(self):
3071 """Connecting to a TLSv1.2 server with various client options.
3072 Testing against older TLS versions."""
3073 if support.verbose:
3074 sys.stdout.write("\n")
3075 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3076 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3077 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3078 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3079 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3080 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3081 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003082 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003083 client_options=ssl.OP_NO_TLSv1_2)
3084
Christian Heimesa170fa12017-09-15 20:27:30 +02003085 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3087 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3088 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3089 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3090
3091 def test_starttls(self):
3092 """Switching from clear text to encrypted and back again."""
3093 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3094
3095 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003096 starttls_server=True,
3097 chatty=True,
3098 connectionchatty=True)
3099 wrapped = False
3100 with server:
3101 s = socket.socket()
3102 s.setblocking(1)
3103 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003104 if support.verbose:
3105 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003106 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003107 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003108 sys.stdout.write(
3109 " client: sending %r...\n" % indata)
3110 if wrapped:
3111 conn.write(indata)
3112 outdata = conn.read()
3113 else:
3114 s.send(indata)
3115 outdata = s.recv(1024)
3116 msg = outdata.strip().lower()
3117 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3118 # STARTTLS ok, switch to secure mode
3119 if support.verbose:
3120 sys.stdout.write(
3121 " client: read %r from server, starting TLS...\n"
3122 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003123 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003124 wrapped = True
3125 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3126 # ENDTLS ok, switch back to clear text
3127 if support.verbose:
3128 sys.stdout.write(
3129 " client: read %r from server, ending TLS...\n"
3130 % msg)
3131 s = conn.unwrap()
3132 wrapped = False
3133 else:
3134 if support.verbose:
3135 sys.stdout.write(
3136 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003137 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003138 sys.stdout.write(" client: closing connection.\n")
3139 if wrapped:
3140 conn.write(b"over\n")
3141 else:
3142 s.send(b"over\n")
3143 if wrapped:
3144 conn.close()
3145 else:
3146 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003147
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003148 def test_socketserver(self):
3149 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003150 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003151 # try to connect
3152 if support.verbose:
3153 sys.stdout.write('\n')
3154 with open(CERTFILE, 'rb') as f:
3155 d1 = f.read()
3156 d2 = ''
3157 # now fetch the same data from the HTTPS server
3158 url = 'https://localhost:%d/%s' % (
3159 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003160 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003161 f = urllib.request.urlopen(url, context=context)
3162 try:
3163 dlen = f.info().get("content-length")
3164 if dlen and (int(dlen) > 0):
3165 d2 = f.read(int(dlen))
3166 if support.verbose:
3167 sys.stdout.write(
3168 " client: read %d bytes from remote server '%s'\n"
3169 % (len(d2), server))
3170 finally:
3171 f.close()
3172 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003173
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003174 def test_asyncore_server(self):
3175 """Check the example asyncore integration."""
3176 if support.verbose:
3177 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003178
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003179 indata = b"FOO\n"
3180 server = AsyncoreEchoServer(CERTFILE)
3181 with server:
3182 s = test_wrap_socket(socket.socket())
3183 s.connect(('127.0.0.1', server.port))
3184 if support.verbose:
3185 sys.stdout.write(
3186 " client: sending %r...\n" % indata)
3187 s.write(indata)
3188 outdata = s.read()
3189 if support.verbose:
3190 sys.stdout.write(" client: read %r\n" % outdata)
3191 if outdata != indata.lower():
3192 self.fail(
3193 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3194 % (outdata[:20], len(outdata),
3195 indata[:20].lower(), len(indata)))
3196 s.write(b"over\n")
3197 if support.verbose:
3198 sys.stdout.write(" client: closing connection.\n")
3199 s.close()
3200 if support.verbose:
3201 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003202
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003203 def test_recv_send(self):
3204 """Test recv(), send() and friends."""
3205 if support.verbose:
3206 sys.stdout.write("\n")
3207
3208 server = ThreadedEchoServer(CERTFILE,
3209 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003210 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003211 cacerts=CERTFILE,
3212 chatty=True,
3213 connectionchatty=False)
3214 with server:
3215 s = test_wrap_socket(socket.socket(),
3216 server_side=False,
3217 certfile=CERTFILE,
3218 ca_certs=CERTFILE,
3219 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003220 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003221 s.connect((HOST, server.port))
3222 # helper methods for standardising recv* method signatures
3223 def _recv_into():
3224 b = bytearray(b"\0"*100)
3225 count = s.recv_into(b)
3226 return b[:count]
3227
3228 def _recvfrom_into():
3229 b = bytearray(b"\0"*100)
3230 count, addr = s.recvfrom_into(b)
3231 return b[:count]
3232
3233 # (name, method, expect success?, *args, return value func)
3234 send_methods = [
3235 ('send', s.send, True, [], len),
3236 ('sendto', s.sendto, False, ["some.address"], len),
3237 ('sendall', s.sendall, True, [], lambda x: None),
3238 ]
3239 # (name, method, whether to expect success, *args)
3240 recv_methods = [
3241 ('recv', s.recv, True, []),
3242 ('recvfrom', s.recvfrom, False, ["some.address"]),
3243 ('recv_into', _recv_into, True, []),
3244 ('recvfrom_into', _recvfrom_into, False, []),
3245 ]
3246 data_prefix = "PREFIX_"
3247
3248 for (meth_name, send_meth, expect_success, args,
3249 ret_val_meth) in send_methods:
3250 indata = (data_prefix + meth_name).encode('ascii')
3251 try:
3252 ret = send_meth(indata, *args)
3253 msg = "sending with {}".format(meth_name)
3254 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3255 outdata = s.read()
3256 if outdata != indata.lower():
3257 self.fail(
3258 "While sending with <<{name:s}>> bad data "
3259 "<<{outdata:r}>> ({nout:d}) received; "
3260 "expected <<{indata:r}>> ({nin:d})\n".format(
3261 name=meth_name, outdata=outdata[:20],
3262 nout=len(outdata),
3263 indata=indata[:20], nin=len(indata)
3264 )
3265 )
3266 except ValueError as e:
3267 if expect_success:
3268 self.fail(
3269 "Failed to send with method <<{name:s}>>; "
3270 "expected to succeed.\n".format(name=meth_name)
3271 )
3272 if not str(e).startswith(meth_name):
3273 self.fail(
3274 "Method <<{name:s}>> failed with unexpected "
3275 "exception message: {exp:s}\n".format(
3276 name=meth_name, exp=e
3277 )
3278 )
3279
3280 for meth_name, recv_meth, expect_success, args in recv_methods:
3281 indata = (data_prefix + meth_name).encode('ascii')
3282 try:
3283 s.send(indata)
3284 outdata = recv_meth(*args)
3285 if outdata != indata.lower():
3286 self.fail(
3287 "While receiving with <<{name:s}>> bad data "
3288 "<<{outdata:r}>> ({nout:d}) received; "
3289 "expected <<{indata:r}>> ({nin:d})\n".format(
3290 name=meth_name, outdata=outdata[:20],
3291 nout=len(outdata),
3292 indata=indata[:20], nin=len(indata)
3293 )
3294 )
3295 except ValueError as e:
3296 if expect_success:
3297 self.fail(
3298 "Failed to receive with method <<{name:s}>>; "
3299 "expected to succeed.\n".format(name=meth_name)
3300 )
3301 if not str(e).startswith(meth_name):
3302 self.fail(
3303 "Method <<{name:s}>> failed with unexpected "
3304 "exception message: {exp:s}\n".format(
3305 name=meth_name, exp=e
3306 )
3307 )
3308 # consume data
3309 s.read()
3310
3311 # read(-1, buffer) is supported, even though read(-1) is not
3312 data = b"data"
3313 s.send(data)
3314 buffer = bytearray(len(data))
3315 self.assertEqual(s.read(-1, buffer), len(data))
3316 self.assertEqual(buffer, data)
3317
Christian Heimes888bbdc2017-09-07 14:18:21 -07003318 # sendall accepts bytes-like objects
3319 if ctypes is not None:
3320 ubyte = ctypes.c_ubyte * len(data)
3321 byteslike = ubyte.from_buffer_copy(data)
3322 s.sendall(byteslike)
3323 self.assertEqual(s.read(), data)
3324
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003325 # Make sure sendmsg et al are disallowed to avoid
3326 # inadvertent disclosure of data and/or corruption
3327 # of the encrypted data stream
3328 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3329 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3330 self.assertRaises(NotImplementedError,
3331 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003332 s.write(b"over\n")
3333
3334 self.assertRaises(ValueError, s.recv, -1)
3335 self.assertRaises(ValueError, s.read, -1)
3336
3337 s.close()
3338
3339 def test_recv_zero(self):
3340 server = ThreadedEchoServer(CERTFILE)
3341 server.__enter__()
3342 self.addCleanup(server.__exit__, None, None)
3343 s = socket.create_connection((HOST, server.port))
3344 self.addCleanup(s.close)
3345 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3346 self.addCleanup(s.close)
3347
3348 # recv/read(0) should return no data
3349 s.send(b"data")
3350 self.assertEqual(s.recv(0), b"")
3351 self.assertEqual(s.read(0), b"")
3352 self.assertEqual(s.read(), b"data")
3353
3354 # Should not block if the other end sends no data
3355 s.setblocking(False)
3356 self.assertEqual(s.recv(0), b"")
3357 self.assertEqual(s.recv_into(bytearray()), 0)
3358
3359 def test_nonblocking_send(self):
3360 server = ThreadedEchoServer(CERTFILE,
3361 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003362 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003363 cacerts=CERTFILE,
3364 chatty=True,
3365 connectionchatty=False)
3366 with server:
3367 s = test_wrap_socket(socket.socket(),
3368 server_side=False,
3369 certfile=CERTFILE,
3370 ca_certs=CERTFILE,
3371 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003372 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003373 s.connect((HOST, server.port))
3374 s.setblocking(False)
3375
3376 # If we keep sending data, at some point the buffers
3377 # will be full and the call will block
3378 buf = bytearray(8192)
3379 def fill_buffer():
3380 while True:
3381 s.send(buf)
3382 self.assertRaises((ssl.SSLWantWriteError,
3383 ssl.SSLWantReadError), fill_buffer)
3384
3385 # Now read all the output and discard it
3386 s.setblocking(True)
3387 s.close()
3388
3389 def test_handshake_timeout(self):
3390 # Issue #5103: SSL handshake must respect the socket timeout
3391 server = socket.socket(socket.AF_INET)
3392 host = "127.0.0.1"
3393 port = support.bind_port(server)
3394 started = threading.Event()
3395 finish = False
3396
3397 def serve():
3398 server.listen()
3399 started.set()
3400 conns = []
3401 while not finish:
3402 r, w, e = select.select([server], [], [], 0.1)
3403 if server in r:
3404 # Let the socket hang around rather than having
3405 # it closed by garbage collection.
3406 conns.append(server.accept()[0])
3407 for sock in conns:
3408 sock.close()
3409
3410 t = threading.Thread(target=serve)
3411 t.start()
3412 started.wait()
3413
3414 try:
3415 try:
3416 c = socket.socket(socket.AF_INET)
3417 c.settimeout(0.2)
3418 c.connect((host, port))
3419 # Will attempt handshake and time out
3420 self.assertRaisesRegex(socket.timeout, "timed out",
3421 test_wrap_socket, c)
3422 finally:
3423 c.close()
3424 try:
3425 c = socket.socket(socket.AF_INET)
3426 c = test_wrap_socket(c)
3427 c.settimeout(0.2)
3428 # Will attempt handshake and time out
3429 self.assertRaisesRegex(socket.timeout, "timed out",
3430 c.connect, (host, port))
3431 finally:
3432 c.close()
3433 finally:
3434 finish = True
3435 t.join()
3436 server.close()
3437
3438 def test_server_accept(self):
3439 # Issue #16357: accept() on a SSLSocket created through
3440 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003441 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003442 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003443 context.load_verify_locations(SIGNING_CA)
3444 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003445 server = socket.socket(socket.AF_INET)
3446 host = "127.0.0.1"
3447 port = support.bind_port(server)
3448 server = context.wrap_socket(server, server_side=True)
3449 self.assertTrue(server.server_side)
3450
3451 evt = threading.Event()
3452 remote = None
3453 peer = None
3454 def serve():
3455 nonlocal remote, peer
3456 server.listen()
3457 # Block on the accept and wait on the connection to close.
3458 evt.set()
3459 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003460 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003461
3462 t = threading.Thread(target=serve)
3463 t.start()
3464 # Client wait until server setup and perform a connect.
3465 evt.wait()
3466 client = context.wrap_socket(socket.socket())
3467 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003468 client.send(b'data')
3469 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003470 client_addr = client.getsockname()
3471 client.close()
3472 t.join()
3473 remote.close()
3474 server.close()
3475 # Sanity checks.
3476 self.assertIsInstance(remote, ssl.SSLSocket)
3477 self.assertEqual(peer, client_addr)
3478
3479 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003480 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003481 with context.wrap_socket(socket.socket()) as sock:
3482 with self.assertRaises(OSError) as cm:
3483 sock.getpeercert()
3484 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3485
3486 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003487 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003488 with context.wrap_socket(socket.socket()) as sock:
3489 with self.assertRaises(OSError) as cm:
3490 sock.do_handshake()
3491 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3492
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003493 def test_no_shared_ciphers(self):
3494 client_context, server_context, hostname = testing_context()
3495 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3496 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003497 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003498 client_context.set_ciphers("AES128")
3499 server_context.set_ciphers("AES256")
3500 with ThreadedEchoServer(context=server_context) as server:
3501 with client_context.wrap_socket(socket.socket(),
3502 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003503 with self.assertRaises(OSError):
3504 s.connect((HOST, server.port))
3505 self.assertIn("no shared cipher", server.conn_errors[0])
3506
3507 def test_version_basic(self):
3508 """
3509 Basic tests for SSLSocket.version().
3510 More tests are done in the test_protocol_*() methods.
3511 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003512 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3513 context.check_hostname = False
3514 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003515 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003516 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003517 chatty=False) as server:
3518 with context.wrap_socket(socket.socket()) as s:
3519 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003520 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003521 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003522 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003523 self.assertEqual(s.version(), 'TLSv1.3')
3524 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003525 self.assertEqual(s.version(), 'TLSv1.2')
3526 else: # 0.9.8 to 1.0.1
3527 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003528 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003529 self.assertIs(s.version(), None)
3530
Christian Heimescb5b68a2017-09-07 18:07:00 -07003531 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3532 "test requires TLSv1.3 enabled OpenSSL")
3533 def test_tls1_3(self):
3534 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3535 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003536 context.options |= (
3537 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3538 )
3539 with ThreadedEchoServer(context=context) as server:
3540 with context.wrap_socket(socket.socket()) as s:
3541 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003542 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003543 'TLS_AES_256_GCM_SHA384',
3544 'TLS_CHACHA20_POLY1305_SHA256',
3545 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003546 })
3547 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003548
Christian Heimes698dde12018-02-27 11:54:43 +01003549 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3550 "required OpenSSL 1.1.0g")
3551 def test_min_max_version(self):
3552 client_context, server_context, hostname = testing_context()
3553 # client TLSv1.0 to 1.2
3554 client_context.minimum_version = ssl.TLSVersion.TLSv1
3555 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3556 # server only TLSv1.2
3557 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3558 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3559
3560 with ThreadedEchoServer(context=server_context) as server:
3561 with client_context.wrap_socket(socket.socket(),
3562 server_hostname=hostname) as s:
3563 s.connect((HOST, server.port))
3564 self.assertEqual(s.version(), 'TLSv1.2')
3565
3566 # client 1.0 to 1.2, server 1.0 to 1.1
3567 server_context.minimum_version = ssl.TLSVersion.TLSv1
3568 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3569
3570 with ThreadedEchoServer(context=server_context) as server:
3571 with client_context.wrap_socket(socket.socket(),
3572 server_hostname=hostname) as s:
3573 s.connect((HOST, server.port))
3574 self.assertEqual(s.version(), 'TLSv1.1')
3575
3576 # client 1.0, server 1.2 (mismatch)
3577 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3578 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3579 client_context.minimum_version = ssl.TLSVersion.TLSv1
3580 client_context.maximum_version = ssl.TLSVersion.TLSv1
3581 with ThreadedEchoServer(context=server_context) as server:
3582 with client_context.wrap_socket(socket.socket(),
3583 server_hostname=hostname) as s:
3584 with self.assertRaises(ssl.SSLError) as e:
3585 s.connect((HOST, server.port))
3586 self.assertIn("alert", str(e.exception))
3587
3588
3589 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3590 "required OpenSSL 1.1.0g")
3591 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3592 def test_min_max_version_sslv3(self):
3593 client_context, server_context, hostname = testing_context()
3594 server_context.minimum_version = ssl.TLSVersion.SSLv3
3595 client_context.minimum_version = ssl.TLSVersion.SSLv3
3596 client_context.maximum_version = ssl.TLSVersion.SSLv3
3597 with ThreadedEchoServer(context=server_context) as server:
3598 with client_context.wrap_socket(socket.socket(),
3599 server_hostname=hostname) as s:
3600 s.connect((HOST, server.port))
3601 self.assertEqual(s.version(), 'SSLv3')
3602
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003603 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3604 def test_default_ecdh_curve(self):
3605 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3606 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003607 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003608 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003609 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3610 # cipher name.
3611 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003612 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3613 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3614 # our default cipher list should prefer ECDH-based ciphers
3615 # automatically.
3616 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3617 context.set_ciphers("ECCdraft:ECDH")
3618 with ThreadedEchoServer(context=context) as server:
3619 with context.wrap_socket(socket.socket()) as s:
3620 s.connect((HOST, server.port))
3621 self.assertIn("ECDH", s.cipher()[0])
3622
3623 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3624 "'tls-unique' channel binding not available")
3625 def test_tls_unique_channel_binding(self):
3626 """Test tls-unique channel binding."""
3627 if support.verbose:
3628 sys.stdout.write("\n")
3629
Christian Heimes05d9fe32018-02-27 08:55:39 +01003630 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003631
3632 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003633 chatty=True,
3634 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003635
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003636 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003637 with client_context.wrap_socket(
3638 socket.socket(),
3639 server_hostname=hostname) as s:
3640 s.connect((HOST, server.port))
3641 # get the data
3642 cb_data = s.get_channel_binding("tls-unique")
3643 if support.verbose:
3644 sys.stdout.write(
3645 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003646
Christian Heimes05d9fe32018-02-27 08:55:39 +01003647 # check if it is sane
3648 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003649 if s.version() == 'TLSv1.3':
3650 self.assertEqual(len(cb_data), 48)
3651 else:
3652 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003653
Christian Heimes05d9fe32018-02-27 08:55:39 +01003654 # and compare with the peers version
3655 s.write(b"CB tls-unique\n")
3656 peer_data_repr = s.read().strip()
3657 self.assertEqual(peer_data_repr,
3658 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003659
3660 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003661 with client_context.wrap_socket(
3662 socket.socket(),
3663 server_hostname=hostname) as s:
3664 s.connect((HOST, server.port))
3665 new_cb_data = s.get_channel_binding("tls-unique")
3666 if support.verbose:
3667 sys.stdout.write(
3668 "got another channel binding data: {0!r}\n".format(
3669 new_cb_data)
3670 )
3671 # is it really unique
3672 self.assertNotEqual(cb_data, new_cb_data)
3673 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003674 if s.version() == 'TLSv1.3':
3675 self.assertEqual(len(cb_data), 48)
3676 else:
3677 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003678 s.write(b"CB tls-unique\n")
3679 peer_data_repr = s.read().strip()
3680 self.assertEqual(peer_data_repr,
3681 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003682
3683 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003684 client_context, server_context, hostname = testing_context()
3685 stats = server_params_test(client_context, server_context,
3686 chatty=True, connectionchatty=True,
3687 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003688 if support.verbose:
3689 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3690 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3691
3692 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3693 "ssl.OP_NO_COMPRESSION needed for this test")
3694 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003695 client_context, server_context, hostname = testing_context()
3696 client_context.options |= ssl.OP_NO_COMPRESSION
3697 server_context.options |= ssl.OP_NO_COMPRESSION
3698 stats = server_params_test(client_context, server_context,
3699 chatty=True, connectionchatty=True,
3700 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003701 self.assertIs(stats['compression'], None)
3702
3703 def test_dh_params(self):
3704 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003705 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003706 # test scenario needs TLS <= 1.2
3707 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003708 server_context.load_dh_params(DHFILE)
3709 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003710 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003711 stats = server_params_test(client_context, server_context,
3712 chatty=True, connectionchatty=True,
3713 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003714 cipher = stats["cipher"][0]
3715 parts = cipher.split("-")
3716 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3717 self.fail("Non-DH cipher: " + cipher[0])
3718
Christian Heimesb7b92252018-02-25 09:49:31 +01003719 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003720 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003721 def test_ecdh_curve(self):
3722 # server secp384r1, client auto
3723 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003724
Christian Heimesb7b92252018-02-25 09:49:31 +01003725 server_context.set_ecdh_curve("secp384r1")
3726 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3727 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3728 stats = server_params_test(client_context, server_context,
3729 chatty=True, connectionchatty=True,
3730 sni_name=hostname)
3731
3732 # server auto, client secp384r1
3733 client_context, server_context, hostname = testing_context()
3734 client_context.set_ecdh_curve("secp384r1")
3735 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3736 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3737 stats = server_params_test(client_context, server_context,
3738 chatty=True, connectionchatty=True,
3739 sni_name=hostname)
3740
3741 # server / client curve mismatch
3742 client_context, server_context, hostname = testing_context()
3743 client_context.set_ecdh_curve("prime256v1")
3744 server_context.set_ecdh_curve("secp384r1")
3745 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3746 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3747 try:
3748 stats = server_params_test(client_context, server_context,
3749 chatty=True, connectionchatty=True,
3750 sni_name=hostname)
3751 except ssl.SSLError:
3752 pass
3753 else:
3754 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003755 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003756 self.fail("mismatch curve did not fail")
3757
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003758 def test_selected_alpn_protocol(self):
3759 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003760 client_context, server_context, hostname = testing_context()
3761 stats = server_params_test(client_context, server_context,
3762 chatty=True, connectionchatty=True,
3763 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003764 self.assertIs(stats['client_alpn_protocol'], None)
3765
3766 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3767 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3768 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003769 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003770 server_context.set_alpn_protocols(['foo', 'bar'])
3771 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003772 chatty=True, connectionchatty=True,
3773 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003774 self.assertIs(stats['client_alpn_protocol'], None)
3775
3776 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3777 def test_alpn_protocols(self):
3778 server_protocols = ['foo', 'bar', 'milkshake']
3779 protocol_tests = [
3780 (['foo', 'bar'], 'foo'),
3781 (['bar', 'foo'], 'foo'),
3782 (['milkshake'], 'milkshake'),
3783 (['http/3.0', 'http/4.0'], None)
3784 ]
3785 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003786 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003787 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003788 client_context.set_alpn_protocols(client_protocols)
3789
3790 try:
3791 stats = server_params_test(client_context,
3792 server_context,
3793 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003794 connectionchatty=True,
3795 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003796 except ssl.SSLError as e:
3797 stats = e
3798
Christian Heimes05d9fe32018-02-27 08:55:39 +01003799 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003800 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3801 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3802 self.assertIsInstance(stats, ssl.SSLError)
3803 else:
3804 msg = "failed trying %s (s) and %s (c).\n" \
3805 "was expecting %s, but got %%s from the %%s" \
3806 % (str(server_protocols), str(client_protocols),
3807 str(expected))
3808 client_result = stats['client_alpn_protocol']
3809 self.assertEqual(client_result, expected,
3810 msg % (client_result, "client"))
3811 server_result = stats['server_alpn_protocols'][-1] \
3812 if len(stats['server_alpn_protocols']) else 'nothing'
3813 self.assertEqual(server_result, expected,
3814 msg % (server_result, "server"))
3815
3816 def test_selected_npn_protocol(self):
3817 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003818 client_context, server_context, hostname = testing_context()
3819 stats = server_params_test(client_context, server_context,
3820 chatty=True, connectionchatty=True,
3821 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003822 self.assertIs(stats['client_npn_protocol'], None)
3823
3824 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3825 def test_npn_protocols(self):
3826 server_protocols = ['http/1.1', 'spdy/2']
3827 protocol_tests = [
3828 (['http/1.1', 'spdy/2'], 'http/1.1'),
3829 (['spdy/2', 'http/1.1'], 'http/1.1'),
3830 (['spdy/2', 'test'], 'spdy/2'),
3831 (['abc', 'def'], 'abc')
3832 ]
3833 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003834 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003835 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003836 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003837 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003838 chatty=True, connectionchatty=True,
3839 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003840 msg = "failed trying %s (s) and %s (c).\n" \
3841 "was expecting %s, but got %%s from the %%s" \
3842 % (str(server_protocols), str(client_protocols),
3843 str(expected))
3844 client_result = stats['client_npn_protocol']
3845 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3846 server_result = stats['server_npn_protocols'][-1] \
3847 if len(stats['server_npn_protocols']) else 'nothing'
3848 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3849
3850 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003851 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003852 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003853 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003854 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003855 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003856 client_context.load_verify_locations(SIGNING_CA)
3857 return server_context, other_context, client_context
3858
3859 def check_common_name(self, stats, name):
3860 cert = stats['peercert']
3861 self.assertIn((('commonName', name),), cert['subject'])
3862
3863 @needs_sni
3864 def test_sni_callback(self):
3865 calls = []
3866 server_context, other_context, client_context = self.sni_contexts()
3867
Christian Heimesa170fa12017-09-15 20:27:30 +02003868 client_context.check_hostname = False
3869
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003870 def servername_cb(ssl_sock, server_name, initial_context):
3871 calls.append((server_name, initial_context))
3872 if server_name is not None:
3873 ssl_sock.context = other_context
3874 server_context.set_servername_callback(servername_cb)
3875
3876 stats = server_params_test(client_context, server_context,
3877 chatty=True,
3878 sni_name='supermessage')
3879 # The hostname was fetched properly, and the certificate was
3880 # changed for the connection.
3881 self.assertEqual(calls, [("supermessage", server_context)])
3882 # CERTFILE4 was selected
3883 self.check_common_name(stats, 'fakehostname')
3884
3885 calls = []
3886 # The callback is called with server_name=None
3887 stats = server_params_test(client_context, server_context,
3888 chatty=True,
3889 sni_name=None)
3890 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003891 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003892
3893 # Check disabling the callback
3894 calls = []
3895 server_context.set_servername_callback(None)
3896
3897 stats = server_params_test(client_context, server_context,
3898 chatty=True,
3899 sni_name='notfunny')
3900 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003901 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003902 self.assertEqual(calls, [])
3903
3904 @needs_sni
3905 def test_sni_callback_alert(self):
3906 # Returning a TLS alert is reflected to the connecting client
3907 server_context, other_context, client_context = self.sni_contexts()
3908
3909 def cb_returning_alert(ssl_sock, server_name, initial_context):
3910 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3911 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003912 with self.assertRaises(ssl.SSLError) as cm:
3913 stats = server_params_test(client_context, server_context,
3914 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003915 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003916 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003917
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003918 @needs_sni
3919 def test_sni_callback_raising(self):
3920 # Raising fails the connection with a TLS handshake failure alert.
3921 server_context, other_context, client_context = self.sni_contexts()
3922
3923 def cb_raising(ssl_sock, server_name, initial_context):
3924 1/0
3925 server_context.set_servername_callback(cb_raising)
3926
3927 with self.assertRaises(ssl.SSLError) as cm, \
3928 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003929 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003930 chatty=False,
3931 sni_name='supermessage')
3932 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3933 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003934
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003935 @needs_sni
3936 def test_sni_callback_wrong_return_type(self):
3937 # Returning the wrong return type terminates the TLS connection
3938 # with an internal error alert.
3939 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003940
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003941 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3942 return "foo"
3943 server_context.set_servername_callback(cb_wrong_return_type)
3944
3945 with self.assertRaises(ssl.SSLError) as cm, \
3946 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003947 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003948 chatty=False,
3949 sni_name='supermessage')
3950 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3951 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003952
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003953 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003954 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003955 client_context.set_ciphers("AES128:AES256")
3956 server_context.set_ciphers("AES256")
3957 expected_algs = [
3958 "AES256", "AES-256",
3959 # TLS 1.3 ciphers are always enabled
3960 "TLS_CHACHA20", "TLS_AES",
3961 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003962
Christian Heimesa170fa12017-09-15 20:27:30 +02003963 stats = server_params_test(client_context, server_context,
3964 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003965 ciphers = stats['server_shared_ciphers'][0]
3966 self.assertGreater(len(ciphers), 0)
3967 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003968 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003969 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003970
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003971 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003972 client_context, server_context, hostname = testing_context()
3973 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003974
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003975 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003976 s = client_context.wrap_socket(socket.socket(),
3977 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003978 s.connect((HOST, server.port))
3979 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003980
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003981 self.assertRaises(ValueError, s.read, 1024)
3982 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003983
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003984 def test_sendfile(self):
3985 TEST_DATA = b"x" * 512
3986 with open(support.TESTFN, 'wb') as f:
3987 f.write(TEST_DATA)
3988 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003989 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003990 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003991 context.load_verify_locations(SIGNING_CA)
3992 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003993 server = ThreadedEchoServer(context=context, chatty=False)
3994 with server:
3995 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003996 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003997 with open(support.TESTFN, 'rb') as file:
3998 s.sendfile(file)
3999 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004000
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004001 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004002 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004003 # TODO: sessions aren't compatible with TLSv1.3 yet
4004 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004005
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004006 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004007 stats = server_params_test(client_context, server_context,
4008 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004009 session = stats['session']
4010 self.assertTrue(session.id)
4011 self.assertGreater(session.time, 0)
4012 self.assertGreater(session.timeout, 0)
4013 self.assertTrue(session.has_ticket)
4014 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4015 self.assertGreater(session.ticket_lifetime_hint, 0)
4016 self.assertFalse(stats['session_reused'])
4017 sess_stat = server_context.session_stats()
4018 self.assertEqual(sess_stat['accept'], 1)
4019 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004020
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004021 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004022 stats = server_params_test(client_context, server_context,
4023 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004024 sess_stat = server_context.session_stats()
4025 self.assertEqual(sess_stat['accept'], 2)
4026 self.assertEqual(sess_stat['hits'], 1)
4027 self.assertTrue(stats['session_reused'])
4028 session2 = stats['session']
4029 self.assertEqual(session2.id, session.id)
4030 self.assertEqual(session2, session)
4031 self.assertIsNot(session2, session)
4032 self.assertGreaterEqual(session2.time, session.time)
4033 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004034
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004035 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004036 stats = server_params_test(client_context, server_context,
4037 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004038 self.assertFalse(stats['session_reused'])
4039 session3 = stats['session']
4040 self.assertNotEqual(session3.id, session.id)
4041 self.assertNotEqual(session3, session)
4042 sess_stat = server_context.session_stats()
4043 self.assertEqual(sess_stat['accept'], 3)
4044 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004045
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004046 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004047 stats = server_params_test(client_context, server_context,
4048 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004049 self.assertTrue(stats['session_reused'])
4050 session4 = stats['session']
4051 self.assertEqual(session4.id, session.id)
4052 self.assertEqual(session4, session)
4053 self.assertGreaterEqual(session4.time, session.time)
4054 self.assertGreaterEqual(session4.timeout, session.timeout)
4055 sess_stat = server_context.session_stats()
4056 self.assertEqual(sess_stat['accept'], 4)
4057 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004058
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004059 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004060 client_context, server_context, hostname = testing_context()
4061 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004062
Christian Heimes05d9fe32018-02-27 08:55:39 +01004063 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004064 client_context.options |= ssl.OP_NO_TLSv1_3
4065 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004066
Christian Heimesa170fa12017-09-15 20:27:30 +02004067 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004068 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004069 with client_context.wrap_socket(socket.socket(),
4070 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004071 # session is None before handshake
4072 self.assertEqual(s.session, None)
4073 self.assertEqual(s.session_reused, None)
4074 s.connect((HOST, server.port))
4075 session = s.session
4076 self.assertTrue(session)
4077 with self.assertRaises(TypeError) as e:
4078 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004079 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004080
Christian Heimesa170fa12017-09-15 20:27:30 +02004081 with client_context.wrap_socket(socket.socket(),
4082 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004083 s.connect((HOST, server.port))
4084 # cannot set session after handshake
4085 with self.assertRaises(ValueError) as e:
4086 s.session = session
4087 self.assertEqual(str(e.exception),
4088 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004089
Christian Heimesa170fa12017-09-15 20:27:30 +02004090 with client_context.wrap_socket(socket.socket(),
4091 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004092 # can set session before handshake and before the
4093 # connection was established
4094 s.session = session
4095 s.connect((HOST, server.port))
4096 self.assertEqual(s.session.id, session.id)
4097 self.assertEqual(s.session, session)
4098 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004099
Christian Heimesa170fa12017-09-15 20:27:30 +02004100 with client_context2.wrap_socket(socket.socket(),
4101 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004102 # cannot re-use session with a different SSLContext
4103 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004104 s.session = session
4105 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004106 self.assertEqual(str(e.exception),
4107 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004108
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004109
Thomas Woutersed03b412007-08-28 21:37:11 +00004110def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004111 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004112 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004113 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004114 'Mac': platform.mac_ver,
4115 'Windows': platform.win32_ver,
4116 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004117 for name, func in plats.items():
4118 plat = func()
4119 if plat and plat[0]:
4120 plat = '%s %r' % (name, plat)
4121 break
4122 else:
4123 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004124 print("test_ssl: testing with %r %r" %
4125 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4126 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004127 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004128 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4129 try:
4130 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4131 except AttributeError:
4132 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004133
Antoine Pitrou152efa22010-05-16 18:19:27 +00004134 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004135 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004136 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004137 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004138 BADCERT, BADKEY, EMPTYCERT]:
4139 if not os.path.exists(filename):
4140 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004141
Martin Panter3840b2a2016-03-27 01:53:46 +00004142 tests = [
4143 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004144 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00004145 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004146
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004147 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004148 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004149
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004150 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004151 try:
4152 support.run_unittest(*tests)
4153 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004154 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004155
4156if __name__ == "__main__":
4157 test_main()