blob: 98d3e571b734bdf611753cf937aa7222e45337f4 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes888bbdc2017-09-07 14:18:21 -070021try:
22 import ctypes
23except ImportError:
24 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000025
Antoine Pitrou05d936d2010-10-13 11:38:36 +000026ssl = support.import_module("ssl")
27
Martin Panter3840b2a2016-03-27 01:53:46 +000028
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010029PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000030HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020031IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
32IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
33
Antoine Pitrou152efa22010-05-16 18:19:27 +000034
Christian Heimesefff7062013-11-21 03:35:02 +010035def data_file(*name):
36 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000037
Antoine Pitrou81564092010-10-08 23:06:24 +000038# The custom key and certificate files used in test_ssl are generated
39# using Lib/test/make_ssl_certs.py.
40# Other certificates are simply fetched from the Internet servers they
41# are meant to authenticate.
42
Antoine Pitrou152efa22010-05-16 18:19:27 +000043CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000044BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000045ONLYCERT = data_file("ssl_cert.pem")
46ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000047BYTES_ONLYCERT = os.fsencode(ONLYCERT)
48BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020049CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
50ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
51KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000052CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000053BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010054CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
55CAFILE_CACERT = data_file("capath", "5ed36f99.0")
56
Antoine Pitrou152efa22010-05-16 18:19:27 +000057
Christian Heimes22587792013-11-21 23:56:13 +010058# empty CRL
59CRLFILE = data_file("revocation.crl")
60
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010061# Two keys and certs signed by the same CA (for SNI tests)
62SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020063SIGNED_CERTFILE_HOSTNAME = 'localhost'
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010064SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020065SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Martin Panter3840b2a2016-03-27 01:53:46 +000066# Same certificate as pycacert.pem, but without extra text in file
67SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +020068# cert with all kinds of subject alt names
69ALLSANFILE = data_file("allsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010070
Martin Panter3d81d932016-01-14 09:36:00 +000071REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +000072
73EMPTYCERT = data_file("nullcert.pem")
74BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +000075NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000076BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020077NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020078NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000079
Benjamin Petersona7eaf562015-04-02 00:04:06 -040080DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +010081BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000082
Christian Heimes358cfd42016-09-10 22:43:48 +020083# Not defined in all versions of OpenSSL
84OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
85OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
86OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
87OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
88
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010089
Thomas Woutersed03b412007-08-28 21:37:11 +000090def handle_error(prefix):
91 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000092 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000093 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000094
Antoine Pitroub5218772010-05-21 09:56:06 +000095def can_clear_options():
96 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020097 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000098
99def no_sslv2_implies_sslv3_hello():
100 # 0.9.7h or higher
101 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
102
Christian Heimes2427b502013-11-23 11:24:32 +0100103def have_verify_flags():
104 # 0.9.8 or higher
105 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
106
Antoine Pitrouc695c952014-04-28 20:57:36 +0200107def utc_offset(): #NOTE: ignore issues like #1647654
108 # local time = utc time + utc offset
109 if time.daylight and time.localtime().tm_isdst > 0:
110 return -time.altzone # seconds
111 return -time.timezone
112
Christian Heimes9424bb42013-06-17 15:32:57 +0200113def asn1time(cert_time):
114 # Some versions of OpenSSL ignore seconds, see #18207
115 # 0.9.8.i
116 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
117 fmt = "%b %d %H:%M:%S %Y GMT"
118 dt = datetime.datetime.strptime(cert_time, fmt)
119 dt = dt.replace(second=0)
120 cert_time = dt.strftime(fmt)
121 # %d adds leading zero but ASN1_TIME_print() uses leading space
122 if cert_time[4] == "0":
123 cert_time = cert_time[:4] + " " + cert_time[5:]
124
125 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000126
Antoine Pitrou23df4832010-08-04 17:14:06 +0000127# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
128def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200129 if hasattr(ssl, 'PROTOCOL_SSLv2'):
130 @functools.wraps(func)
131 def f(*args, **kwargs):
132 try:
133 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
134 except ssl.SSLError:
135 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
136 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
137 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
138 return func(*args, **kwargs)
139 return f
140 else:
141 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000142
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100143needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
144
Antoine Pitrou23df4832010-08-04 17:14:06 +0000145
Christian Heimesd0486372016-09-10 23:23:33 +0200146def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
147 cert_reqs=ssl.CERT_NONE, ca_certs=None,
148 ciphers=None, certfile=None, keyfile=None,
149 **kwargs):
150 context = ssl.SSLContext(ssl_version)
151 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200152 if cert_reqs == ssl.CERT_NONE:
153 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200154 context.verify_mode = cert_reqs
155 if ca_certs is not None:
156 context.load_verify_locations(ca_certs)
157 if certfile is not None or keyfile is not None:
158 context.load_cert_chain(certfile, keyfile)
159 if ciphers is not None:
160 context.set_ciphers(ciphers)
161 return context.wrap_socket(sock, **kwargs)
162
Christian Heimesa170fa12017-09-15 20:27:30 +0200163
164def testing_context(server_cert=SIGNED_CERTFILE):
165 """Create context
166
167 client_context, server_context, hostname = testing_context()
168 """
169 if server_cert == SIGNED_CERTFILE:
170 hostname = SIGNED_CERTFILE_HOSTNAME
171 elif server_cert == SIGNED_CERTFILE2:
172 hostname = SIGNED_CERTFILE2_HOSTNAME
173 else:
174 raise ValueError(server_cert)
175
176 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
177 client_context.load_verify_locations(SIGNING_CA)
178
179 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
180 server_context.load_cert_chain(server_cert)
181
182 return client_context, server_context, hostname
183
184
Antoine Pitrou152efa22010-05-16 18:19:27 +0000185class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000186
Antoine Pitrou480a1242010-04-28 21:37:09 +0000187 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000188 ssl.CERT_NONE
189 ssl.CERT_OPTIONAL
190 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100191 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100192 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100193 if ssl.HAS_ECDH:
194 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100195 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
196 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000197 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100198 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700199 ssl.OP_NO_SSLv2
200 ssl.OP_NO_SSLv3
201 ssl.OP_NO_TLSv1
202 ssl.OP_NO_TLSv1_3
203 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
204 ssl.OP_NO_TLSv1_1
205 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200206 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000207
Antoine Pitrou172f0252014-04-18 20:33:08 +0200208 def test_str_for_enums(self):
209 # Make sure that the PROTOCOL_* constants have enum-like string
210 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200211 proto = ssl.PROTOCOL_TLS
212 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200213 ctx = ssl.SSLContext(proto)
214 self.assertIs(ctx.protocol, proto)
215
Antoine Pitrou480a1242010-04-28 21:37:09 +0000216 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000217 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000218 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000219 sys.stdout.write("\n RAND_status is %d (%s)\n"
220 % (v, (v and "sufficient randomness") or
221 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200222
223 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
224 self.assertEqual(len(data), 16)
225 self.assertEqual(is_cryptographic, v == 1)
226 if v:
227 data = ssl.RAND_bytes(16)
228 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200229 else:
230 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200231
Victor Stinner1e81a392013-12-19 16:47:04 +0100232 # negative num is invalid
233 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
234 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
235
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100236 if hasattr(ssl, 'RAND_egd'):
237 self.assertRaises(TypeError, ssl.RAND_egd, 1)
238 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000239 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200240 ssl.RAND_add(b"this is a random bytes object", 75.0)
241 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000242
Christian Heimesf77b4b22013-08-21 13:26:05 +0200243 @unittest.skipUnless(os.name == 'posix', 'requires posix')
244 def test_random_fork(self):
245 status = ssl.RAND_status()
246 if not status:
247 self.fail("OpenSSL's PRNG has insufficient randomness")
248
249 rfd, wfd = os.pipe()
250 pid = os.fork()
251 if pid == 0:
252 try:
253 os.close(rfd)
254 child_random = ssl.RAND_pseudo_bytes(16)[0]
255 self.assertEqual(len(child_random), 16)
256 os.write(wfd, child_random)
257 os.close(wfd)
258 except BaseException:
259 os._exit(1)
260 else:
261 os._exit(0)
262 else:
263 os.close(wfd)
264 self.addCleanup(os.close, rfd)
265 _, status = os.waitpid(pid, 0)
266 self.assertEqual(status, 0)
267
268 child_random = os.read(rfd, 16)
269 self.assertEqual(len(child_random), 16)
270 parent_random = ssl.RAND_pseudo_bytes(16)[0]
271 self.assertEqual(len(parent_random), 16)
272
273 self.assertNotEqual(child_random, parent_random)
274
Antoine Pitrou480a1242010-04-28 21:37:09 +0000275 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000276 # note that this uses an 'unofficial' function in _ssl.c,
277 # provided solely for this test, to exercise the certificate
278 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000279 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000280 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200282 self.assertEqual(p['issuer'],
283 ((('countryName', 'XY'),),
284 (('localityName', 'Castle Anthrax'),),
285 (('organizationName', 'Python Software Foundation'),),
286 (('commonName', 'localhost'),))
287 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100288 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200289 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
290 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200291 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
292 self.assertEqual(p['subject'],
293 ((('countryName', 'XY'),),
294 (('localityName', 'Castle Anthrax'),),
295 (('organizationName', 'Python Software Foundation'),),
296 (('commonName', 'localhost'),))
297 )
298 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
299 # Issue #13034: the subjectAltName in some certificates
300 # (notably projects.developer.nokia.com:443) wasn't parsed
301 p = ssl._ssl._test_decode_cert(NOKIACERT)
302 if support.verbose:
303 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
304 self.assertEqual(p['subjectAltName'],
305 (('DNS', 'projects.developer.nokia.com'),
306 ('DNS', 'projects.forum.nokia.com'))
307 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100308 # extra OCSP and AIA fields
309 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
310 self.assertEqual(p['caIssuers'],
311 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
312 self.assertEqual(p['crlDistributionPoints'],
313 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000314
Christian Heimes824f7f32013-08-17 00:54:47 +0200315 def test_parse_cert_CVE_2013_4238(self):
316 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
317 if support.verbose:
318 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
319 subject = ((('countryName', 'US'),),
320 (('stateOrProvinceName', 'Oregon'),),
321 (('localityName', 'Beaverton'),),
322 (('organizationName', 'Python Software Foundation'),),
323 (('organizationalUnitName', 'Python Core Development'),),
324 (('commonName', 'null.python.org\x00example.org'),),
325 (('emailAddress', 'python-dev@python.org'),))
326 self.assertEqual(p['subject'], subject)
327 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200328 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
329 san = (('DNS', 'altnull.python.org\x00example.com'),
330 ('email', 'null@python.org\x00user@example.org'),
331 ('URI', 'http://null.python.org\x00http://example.org'),
332 ('IP Address', '192.0.2.1'),
333 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
334 else:
335 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
336 san = (('DNS', 'altnull.python.org\x00example.com'),
337 ('email', 'null@python.org\x00user@example.org'),
338 ('URI', 'http://null.python.org\x00http://example.org'),
339 ('IP Address', '192.0.2.1'),
340 ('IP Address', '<invalid>'))
341
342 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200343
Christian Heimes1c03abd2016-09-06 23:25:35 +0200344 def test_parse_all_sans(self):
345 p = ssl._ssl._test_decode_cert(ALLSANFILE)
346 self.assertEqual(p['subjectAltName'],
347 (
348 ('DNS', 'allsans'),
349 ('othername', '<unsupported>'),
350 ('othername', '<unsupported>'),
351 ('email', 'user@example.org'),
352 ('DNS', 'www.example.org'),
353 ('DirName',
354 ((('countryName', 'XY'),),
355 (('localityName', 'Castle Anthrax'),),
356 (('organizationName', 'Python Software Foundation'),),
357 (('commonName', 'dirname example'),))),
358 ('URI', 'https://www.python.org/'),
359 ('IP Address', '127.0.0.1'),
360 ('IP Address', '0:0:0:0:0:0:0:1\n'),
361 ('Registered ID', '1.2.3.4.5')
362 )
363 )
364
Antoine Pitrou480a1242010-04-28 21:37:09 +0000365 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000366 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000367 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000368 d1 = ssl.PEM_cert_to_DER_cert(pem)
369 p2 = ssl.DER_cert_to_PEM_cert(d1)
370 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000371 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000372 if not p2.startswith(ssl.PEM_HEADER + '\n'):
373 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
374 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
375 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000376
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000377 def test_openssl_version(self):
378 n = ssl.OPENSSL_VERSION_NUMBER
379 t = ssl.OPENSSL_VERSION_INFO
380 s = ssl.OPENSSL_VERSION
381 self.assertIsInstance(n, int)
382 self.assertIsInstance(t, tuple)
383 self.assertIsInstance(s, str)
384 # Some sanity checks follow
385 # >= 0.9
386 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400387 # < 3.0
388 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000389 major, minor, fix, patch, status = t
390 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400391 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000392 self.assertGreaterEqual(minor, 0)
393 self.assertLess(minor, 256)
394 self.assertGreaterEqual(fix, 0)
395 self.assertLess(fix, 256)
396 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100397 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000398 self.assertGreaterEqual(status, 0)
399 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400400 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200401 if IS_LIBRESSL:
402 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100403 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400404 else:
405 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100406 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000407
Antoine Pitrou9d543662010-04-23 23:10:32 +0000408 @support.cpython_only
409 def test_refcycle(self):
410 # Issue #7943: an SSL object doesn't create reference cycles with
411 # itself.
412 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200413 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000414 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100415 with support.check_warnings(("", ResourceWarning)):
416 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100417 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000418
Antoine Pitroua468adc2010-09-14 14:43:44 +0000419 def test_wrapped_unconnected(self):
420 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200421 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000422 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200423 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100424 self.assertRaises(OSError, ss.recv, 1)
425 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
426 self.assertRaises(OSError, ss.recvfrom, 1)
427 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
428 self.assertRaises(OSError, ss.send, b'x')
429 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000430
Antoine Pitrou40f08742010-04-24 22:04:40 +0000431 def test_timeout(self):
432 # Issue #8524: when creating an SSL socket, the timeout of the
433 # original socket should be retained.
434 for timeout in (None, 0.0, 5.0):
435 s = socket.socket(socket.AF_INET)
436 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200437 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100438 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000439
Christian Heimesd0486372016-09-10 23:23:33 +0200440 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000441 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000442 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000443 "certfile must be specified",
444 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000445 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000446 "certfile must be specified for server-side operations",
447 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000448 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000449 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200450 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100451 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
452 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200453 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200454 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000455 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000456 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000457 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200458 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000459 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000460 ssl.wrap_socket(sock,
461 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000462 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200463 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000464 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000465 ssl.wrap_socket(sock,
466 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000467 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000468
Martin Panter3464ea22016-02-01 21:58:11 +0000469 def bad_cert_test(self, certfile):
470 """Check that trying to use the given client certificate fails"""
471 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
472 certfile)
473 sock = socket.socket()
474 self.addCleanup(sock.close)
475 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200476 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200477 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000478
479 def test_empty_cert(self):
480 """Wrapping with an empty cert file"""
481 self.bad_cert_test("nullcert.pem")
482
483 def test_malformed_cert(self):
484 """Wrapping with a badly formatted certificate (syntax error)"""
485 self.bad_cert_test("badcert.pem")
486
487 def test_malformed_key(self):
488 """Wrapping with a badly formatted key (syntax error)"""
489 self.bad_cert_test("badkey.pem")
490
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000491 def test_match_hostname(self):
492 def ok(cert, hostname):
493 ssl.match_hostname(cert, hostname)
494 def fail(cert, hostname):
495 self.assertRaises(ssl.CertificateError,
496 ssl.match_hostname, cert, hostname)
497
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100498 # -- Hostname matching --
499
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000500 cert = {'subject': ((('commonName', 'example.com'),),)}
501 ok(cert, 'example.com')
502 ok(cert, 'ExAmple.cOm')
503 fail(cert, 'www.example.com')
504 fail(cert, '.example.com')
505 fail(cert, 'example.org')
506 fail(cert, 'exampleXcom')
507
508 cert = {'subject': ((('commonName', '*.a.com'),),)}
509 ok(cert, 'foo.a.com')
510 fail(cert, 'bar.foo.a.com')
511 fail(cert, 'a.com')
512 fail(cert, 'Xa.com')
513 fail(cert, '.a.com')
514
Mandeep Singhede2ac92017-11-27 04:01:27 +0530515 # only match wildcards when they are the only thing
516 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000517 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530518 fail(cert, 'foo.com')
519 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000520 fail(cert, 'bar.com')
521 fail(cert, 'foo.a.com')
522 fail(cert, 'bar.foo.com')
523
Christian Heimes824f7f32013-08-17 00:54:47 +0200524 # NULL bytes are bad, CVE-2013-4073
525 cert = {'subject': ((('commonName',
526 'null.python.org\x00example.org'),),)}
527 ok(cert, 'null.python.org\x00example.org') # or raise an error?
528 fail(cert, 'example.org')
529 fail(cert, 'null.python.org')
530
Georg Brandl72c98d32013-10-27 07:16:53 +0100531 # error cases with wildcards
532 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
533 fail(cert, 'bar.foo.a.com')
534 fail(cert, 'a.com')
535 fail(cert, 'Xa.com')
536 fail(cert, '.a.com')
537
538 cert = {'subject': ((('commonName', 'a.*.com'),),)}
539 fail(cert, 'a.foo.com')
540 fail(cert, 'a..com')
541 fail(cert, 'a.com')
542
543 # wildcard doesn't match IDNA prefix 'xn--'
544 idna = 'püthon.python.org'.encode("idna").decode("ascii")
545 cert = {'subject': ((('commonName', idna),),)}
546 ok(cert, idna)
547 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
548 fail(cert, idna)
549 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
550 fail(cert, idna)
551
552 # wildcard in first fragment and IDNA A-labels in sequent fragments
553 # are supported.
554 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
555 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530556 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
557 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100558 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
559 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
560
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000561 # Slightly fake real-world example
562 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
563 'subject': ((('commonName', 'linuxfrz.org'),),),
564 'subjectAltName': (('DNS', 'linuxfr.org'),
565 ('DNS', 'linuxfr.com'),
566 ('othername', '<unsupported>'))}
567 ok(cert, 'linuxfr.org')
568 ok(cert, 'linuxfr.com')
569 # Not a "DNS" entry
570 fail(cert, '<unsupported>')
571 # When there is a subjectAltName, commonName isn't used
572 fail(cert, 'linuxfrz.org')
573
574 # A pristine real-world example
575 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
576 'subject': ((('countryName', 'US'),),
577 (('stateOrProvinceName', 'California'),),
578 (('localityName', 'Mountain View'),),
579 (('organizationName', 'Google Inc'),),
580 (('commonName', 'mail.google.com'),))}
581 ok(cert, 'mail.google.com')
582 fail(cert, 'gmail.com')
583 # Only commonName is considered
584 fail(cert, 'California')
585
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100586 # -- IPv4 matching --
587 cert = {'subject': ((('commonName', 'example.com'),),),
588 'subjectAltName': (('DNS', 'example.com'),
589 ('IP Address', '10.11.12.13'),
590 ('IP Address', '14.15.16.17'))}
591 ok(cert, '10.11.12.13')
592 ok(cert, '14.15.16.17')
593 fail(cert, '14.15.16.18')
594 fail(cert, 'example.net')
595
596 # -- IPv6 matching --
597 cert = {'subject': ((('commonName', 'example.com'),),),
598 'subjectAltName': (('DNS', 'example.com'),
599 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
600 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
601 ok(cert, '2001::cafe')
602 ok(cert, '2003::baba')
603 fail(cert, '2003::bebe')
604 fail(cert, 'example.net')
605
606 # -- Miscellaneous --
607
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000608 # Neither commonName nor subjectAltName
609 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
610 'subject': ((('countryName', 'US'),),
611 (('stateOrProvinceName', 'California'),),
612 (('localityName', 'Mountain View'),),
613 (('organizationName', 'Google Inc'),))}
614 fail(cert, 'mail.google.com')
615
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200616 # No DNS entry in subjectAltName but a commonName
617 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
618 'subject': ((('countryName', 'US'),),
619 (('stateOrProvinceName', 'California'),),
620 (('localityName', 'Mountain View'),),
621 (('commonName', 'mail.google.com'),)),
622 'subjectAltName': (('othername', 'blabla'), )}
623 ok(cert, 'mail.google.com')
624
625 # No DNS entry subjectAltName and no commonName
626 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
627 'subject': ((('countryName', 'US'),),
628 (('stateOrProvinceName', 'California'),),
629 (('localityName', 'Mountain View'),),
630 (('organizationName', 'Google Inc'),)),
631 'subjectAltName': (('othername', 'blabla'),)}
632 fail(cert, 'google.com')
633
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000634 # Empty cert / no cert
635 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
636 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
637
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200638 # Issue #17980: avoid denials of service by refusing more than one
639 # wildcard per fragment.
640 cert = {'subject': ((('commonName', 'a*b.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530641 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200642 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100643 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200644 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
645 with self.assertRaises(ssl.CertificateError) as cm:
646 ssl.match_hostname(cert, 'axxbxxc.com')
647 self.assertIn("too many wildcards", str(cm.exception))
648
Antoine Pitroud5323212010-10-22 18:19:07 +0000649 def test_server_side(self):
650 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200651 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000652 with socket.socket() as sock:
653 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
654 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000655
Antoine Pitroud6494802011-07-21 01:11:30 +0200656 def test_unknown_channel_binding(self):
657 # should raise ValueError for unknown type
658 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200659 s.bind(('127.0.0.1', 0))
660 s.listen()
661 c = socket.socket(socket.AF_INET)
662 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200663 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100664 with self.assertRaises(ValueError):
665 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200666 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200667
668 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
669 "'tls-unique' channel binding not available")
670 def test_tls_unique_channel_binding(self):
671 # unconnected should return None for known type
672 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200673 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100674 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200675 # the same for server-side
676 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200677 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100678 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200679
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600680 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200681 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600682 r = repr(ss)
683 with self.assertWarns(ResourceWarning) as cm:
684 ss = None
685 support.gc_collect()
686 self.assertIn(r, str(cm.warning.args[0]))
687
Christian Heimes6d7ad132013-06-09 18:02:55 +0200688 def test_get_default_verify_paths(self):
689 paths = ssl.get_default_verify_paths()
690 self.assertEqual(len(paths), 6)
691 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
692
693 with support.EnvironmentVarGuard() as env:
694 env["SSL_CERT_DIR"] = CAPATH
695 env["SSL_CERT_FILE"] = CERTFILE
696 paths = ssl.get_default_verify_paths()
697 self.assertEqual(paths.cafile, CERTFILE)
698 self.assertEqual(paths.capath, CAPATH)
699
Christian Heimes44109d72013-11-22 01:51:30 +0100700 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
701 def test_enum_certificates(self):
702 self.assertTrue(ssl.enum_certificates("CA"))
703 self.assertTrue(ssl.enum_certificates("ROOT"))
704
705 self.assertRaises(TypeError, ssl.enum_certificates)
706 self.assertRaises(WindowsError, ssl.enum_certificates, "")
707
Christian Heimesc2d65e12013-11-22 16:13:55 +0100708 trust_oids = set()
709 for storename in ("CA", "ROOT"):
710 store = ssl.enum_certificates(storename)
711 self.assertIsInstance(store, list)
712 for element in store:
713 self.assertIsInstance(element, tuple)
714 self.assertEqual(len(element), 3)
715 cert, enc, trust = element
716 self.assertIsInstance(cert, bytes)
717 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
718 self.assertIsInstance(trust, (set, bool))
719 if isinstance(trust, set):
720 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100721
722 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100723 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200724
Christian Heimes46bebee2013-06-09 19:03:31 +0200725 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100726 def test_enum_crls(self):
727 self.assertTrue(ssl.enum_crls("CA"))
728 self.assertRaises(TypeError, ssl.enum_crls)
729 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200730
Christian Heimes44109d72013-11-22 01:51:30 +0100731 crls = ssl.enum_crls("CA")
732 self.assertIsInstance(crls, list)
733 for element in crls:
734 self.assertIsInstance(element, tuple)
735 self.assertEqual(len(element), 2)
736 self.assertIsInstance(element[0], bytes)
737 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200738
Christian Heimes46bebee2013-06-09 19:03:31 +0200739
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100740 def test_asn1object(self):
741 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
742 '1.3.6.1.5.5.7.3.1')
743
744 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
745 self.assertEqual(val, expected)
746 self.assertEqual(val.nid, 129)
747 self.assertEqual(val.shortname, 'serverAuth')
748 self.assertEqual(val.longname, 'TLS Web Server Authentication')
749 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
750 self.assertIsInstance(val, ssl._ASN1Object)
751 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
752
753 val = ssl._ASN1Object.fromnid(129)
754 self.assertEqual(val, expected)
755 self.assertIsInstance(val, ssl._ASN1Object)
756 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100757 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
758 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100759 for i in range(1000):
760 try:
761 obj = ssl._ASN1Object.fromnid(i)
762 except ValueError:
763 pass
764 else:
765 self.assertIsInstance(obj.nid, int)
766 self.assertIsInstance(obj.shortname, str)
767 self.assertIsInstance(obj.longname, str)
768 self.assertIsInstance(obj.oid, (str, type(None)))
769
770 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
771 self.assertEqual(val, expected)
772 self.assertIsInstance(val, ssl._ASN1Object)
773 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
774 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
775 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100776 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
777 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100778
Christian Heimes72d28502013-11-23 13:56:58 +0100779 def test_purpose_enum(self):
780 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
781 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
782 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
783 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
784 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
785 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
786 '1.3.6.1.5.5.7.3.1')
787
788 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
789 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
790 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
791 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
792 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
793 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
794 '1.3.6.1.5.5.7.3.2')
795
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100796 def test_unsupported_dtls(self):
797 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
798 self.addCleanup(s.close)
799 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200800 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100801 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200802 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100803 with self.assertRaises(NotImplementedError) as cx:
804 ctx.wrap_socket(s)
805 self.assertEqual(str(cx.exception), "only stream sockets are supported")
806
Antoine Pitrouc695c952014-04-28 20:57:36 +0200807 def cert_time_ok(self, timestring, timestamp):
808 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
809
810 def cert_time_fail(self, timestring):
811 with self.assertRaises(ValueError):
812 ssl.cert_time_to_seconds(timestring)
813
814 @unittest.skipUnless(utc_offset(),
815 'local time needs to be different from UTC')
816 def test_cert_time_to_seconds_timezone(self):
817 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
818 # results if local timezone is not UTC
819 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
820 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
821
822 def test_cert_time_to_seconds(self):
823 timestring = "Jan 5 09:34:43 2018 GMT"
824 ts = 1515144883.0
825 self.cert_time_ok(timestring, ts)
826 # accept keyword parameter, assert its name
827 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
828 # accept both %e and %d (space or zero generated by strftime)
829 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
830 # case-insensitive
831 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
832 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
833 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
834 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
835 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
836 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
837 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
838 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
839
840 newyear_ts = 1230768000.0
841 # leap seconds
842 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
843 # same timestamp
844 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
845
846 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
847 # allow 60th second (even if it is not a leap second)
848 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
849 # allow 2nd leap second for compatibility with time.strptime()
850 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
851 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
852
Mike53f7a7c2017-12-14 14:04:53 +0300853 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200854 # 99991231235959Z (rfc 5280)
855 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
856
857 @support.run_with_locale('LC_ALL', '')
858 def test_cert_time_to_seconds_locale(self):
859 # `cert_time_to_seconds()` should be locale independent
860
861 def local_february_name():
862 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
863
864 if local_february_name().lower() == 'feb':
865 self.skipTest("locale-specific month name needs to be "
866 "different from C locale")
867
868 # locale-independent
869 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
870 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
871
Martin Panter3840b2a2016-03-27 01:53:46 +0000872 def test_connect_ex_error(self):
873 server = socket.socket(socket.AF_INET)
874 self.addCleanup(server.close)
875 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200876 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000877 cert_reqs=ssl.CERT_REQUIRED)
878 self.addCleanup(s.close)
879 rc = s.connect_ex((HOST, port))
880 # Issue #19919: Windows machines or VMs hosted on Windows
881 # machines sometimes return EWOULDBLOCK.
882 errors = (
883 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
884 errno.EWOULDBLOCK,
885 )
886 self.assertIn(rc, errors)
887
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100888
Antoine Pitrou152efa22010-05-16 18:19:27 +0000889class ContextTests(unittest.TestCase):
890
Antoine Pitrou23df4832010-08-04 17:14:06 +0000891 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000892 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100893 for protocol in PROTOCOLS:
894 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200895 ctx = ssl.SSLContext()
896 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000897 self.assertRaises(ValueError, ssl.SSLContext, -1)
898 self.assertRaises(ValueError, ssl.SSLContext, 42)
899
Antoine Pitrou23df4832010-08-04 17:14:06 +0000900 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000901 def test_protocol(self):
902 for proto in PROTOCOLS:
903 ctx = ssl.SSLContext(proto)
904 self.assertEqual(ctx.protocol, proto)
905
906 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200907 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000908 ctx.set_ciphers("ALL")
909 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000910 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000911 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000912
Christian Heimes25bfcd52016-09-06 00:04:45 +0200913 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
914 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200915 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200916 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200917 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +0200918 self.assertIn('AES256-GCM-SHA384', names)
919 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +0200920
Antoine Pitrou23df4832010-08-04 17:14:06 +0000921 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000922 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200923 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800924 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +0200925 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +0200926 # SSLContext also enables these by default
927 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
928 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
Christian Heimes598894f2016-09-05 23:19:05 +0200929 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800930 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +0200931 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000932 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +0200933 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
934 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000935 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -0700936 # Ubuntu has OP_NO_SSLv3 forced on by default
937 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +0000938 else:
939 with self.assertRaises(ValueError):
940 ctx.options = 0
941
Christian Heimesa170fa12017-09-15 20:27:30 +0200942 def test_verify_mode_protocol(self):
943 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000944 # Default value
945 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
946 ctx.verify_mode = ssl.CERT_OPTIONAL
947 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
948 ctx.verify_mode = ssl.CERT_REQUIRED
949 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
950 ctx.verify_mode = ssl.CERT_NONE
951 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
952 with self.assertRaises(TypeError):
953 ctx.verify_mode = None
954 with self.assertRaises(ValueError):
955 ctx.verify_mode = 42
956
Christian Heimesa170fa12017-09-15 20:27:30 +0200957 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
958 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
959 self.assertFalse(ctx.check_hostname)
960
961 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
962 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
963 self.assertTrue(ctx.check_hostname)
964
965
Christian Heimes2427b502013-11-23 11:24:32 +0100966 @unittest.skipUnless(have_verify_flags(),
967 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100968 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200969 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -0500970 # default value
971 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
972 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +0100973 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
974 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
975 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
976 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
977 ctx.verify_flags = ssl.VERIFY_DEFAULT
978 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
979 # supports any value
980 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
981 self.assertEqual(ctx.verify_flags,
982 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
983 with self.assertRaises(TypeError):
984 ctx.verify_flags = None
985
Antoine Pitrou152efa22010-05-16 18:19:27 +0000986 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200987 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000988 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500989 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000990 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
991 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200992 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000993 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000994 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000995 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000996 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000997 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000998 ctx.load_cert_chain(EMPTYCERT)
999 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001000 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001001 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1002 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1003 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001004 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001005 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001006 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001007 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001008 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001009 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1010 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001012 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001013 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001014 # Password protected key and cert
1015 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1016 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1017 ctx.load_cert_chain(CERTFILE_PROTECTED,
1018 password=bytearray(KEY_PASSWORD.encode()))
1019 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1020 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1021 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1022 bytearray(KEY_PASSWORD.encode()))
1023 with self.assertRaisesRegex(TypeError, "should be a string"):
1024 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1025 with self.assertRaises(ssl.SSLError):
1026 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1027 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1028 # openssl has a fixed limit on the password buffer.
1029 # PEM_BUFSIZE is generally set to 1kb.
1030 # Return a string larger than this.
1031 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1032 # Password callback
1033 def getpass_unicode():
1034 return KEY_PASSWORD
1035 def getpass_bytes():
1036 return KEY_PASSWORD.encode()
1037 def getpass_bytearray():
1038 return bytearray(KEY_PASSWORD.encode())
1039 def getpass_badpass():
1040 return "badpass"
1041 def getpass_huge():
1042 return b'a' * (1024 * 1024)
1043 def getpass_bad_type():
1044 return 9
1045 def getpass_exception():
1046 raise Exception('getpass error')
1047 class GetPassCallable:
1048 def __call__(self):
1049 return KEY_PASSWORD
1050 def getpass(self):
1051 return KEY_PASSWORD
1052 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1053 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1054 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1055 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1056 ctx.load_cert_chain(CERTFILE_PROTECTED,
1057 password=GetPassCallable().getpass)
1058 with self.assertRaises(ssl.SSLError):
1059 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1060 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1061 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1062 with self.assertRaisesRegex(TypeError, "must return a string"):
1063 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1064 with self.assertRaisesRegex(Exception, "getpass error"):
1065 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1066 # Make sure the password function isn't called if it isn't needed
1067 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001068
1069 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001070 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001071 ctx.load_verify_locations(CERTFILE)
1072 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1073 ctx.load_verify_locations(BYTES_CERTFILE)
1074 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1075 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001076 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001077 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001078 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001079 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001080 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001081 ctx.load_verify_locations(BADCERT)
1082 ctx.load_verify_locations(CERTFILE, CAPATH)
1083 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1084
Victor Stinner80f75e62011-01-29 11:31:20 +00001085 # Issue #10989: crash if the second argument type is invalid
1086 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1087
Christian Heimesefff7062013-11-21 03:35:02 +01001088 def test_load_verify_cadata(self):
1089 # test cadata
1090 with open(CAFILE_CACERT) as f:
1091 cacert_pem = f.read()
1092 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1093 with open(CAFILE_NEURONIO) as f:
1094 neuronio_pem = f.read()
1095 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1096
1097 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001098 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001099 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1100 ctx.load_verify_locations(cadata=cacert_pem)
1101 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1102 ctx.load_verify_locations(cadata=neuronio_pem)
1103 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1104 # cert already in hash table
1105 ctx.load_verify_locations(cadata=neuronio_pem)
1106 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1107
1108 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001109 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001110 combined = "\n".join((cacert_pem, neuronio_pem))
1111 ctx.load_verify_locations(cadata=combined)
1112 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1113
1114 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001115 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001116 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1117 neuronio_pem, "tail"]
1118 ctx.load_verify_locations(cadata="\n".join(combined))
1119 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1120
1121 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001122 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001123 ctx.load_verify_locations(cadata=cacert_der)
1124 ctx.load_verify_locations(cadata=neuronio_der)
1125 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1126 # cert already in hash table
1127 ctx.load_verify_locations(cadata=cacert_der)
1128 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1129
1130 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001131 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001132 combined = b"".join((cacert_der, neuronio_der))
1133 ctx.load_verify_locations(cadata=combined)
1134 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1135
1136 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001137 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001138 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1139
1140 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1141 ctx.load_verify_locations(cadata="broken")
1142 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1143 ctx.load_verify_locations(cadata=b"broken")
1144
1145
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001146 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001147 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001148 ctx.load_dh_params(DHFILE)
1149 if os.name != 'nt':
1150 ctx.load_dh_params(BYTES_DHFILE)
1151 self.assertRaises(TypeError, ctx.load_dh_params)
1152 self.assertRaises(TypeError, ctx.load_dh_params, None)
1153 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001154 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001155 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001156 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001157 ctx.load_dh_params(CERTFILE)
1158
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001159 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001160 def test_session_stats(self):
1161 for proto in PROTOCOLS:
1162 ctx = ssl.SSLContext(proto)
1163 self.assertEqual(ctx.session_stats(), {
1164 'number': 0,
1165 'connect': 0,
1166 'connect_good': 0,
1167 'connect_renegotiate': 0,
1168 'accept': 0,
1169 'accept_good': 0,
1170 'accept_renegotiate': 0,
1171 'hits': 0,
1172 'misses': 0,
1173 'timeouts': 0,
1174 'cache_full': 0,
1175 })
1176
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001177 def test_set_default_verify_paths(self):
1178 # There's not much we can do to test that it acts as expected,
1179 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001180 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001181 ctx.set_default_verify_paths()
1182
Antoine Pitrou501da612011-12-21 09:27:41 +01001183 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001184 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001185 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001186 ctx.set_ecdh_curve("prime256v1")
1187 ctx.set_ecdh_curve(b"prime256v1")
1188 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1189 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1190 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1191 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1192
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001193 @needs_sni
1194 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001195 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001196
1197 # set_servername_callback expects a callable, or None
1198 self.assertRaises(TypeError, ctx.set_servername_callback)
1199 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1200 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1201 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1202
1203 def dummycallback(sock, servername, ctx):
1204 pass
1205 ctx.set_servername_callback(None)
1206 ctx.set_servername_callback(dummycallback)
1207
1208 @needs_sni
1209 def test_sni_callback_refcycle(self):
1210 # Reference cycles through the servername callback are detected
1211 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001212 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001213 def dummycallback(sock, servername, ctx, cycle=ctx):
1214 pass
1215 ctx.set_servername_callback(dummycallback)
1216 wr = weakref.ref(ctx)
1217 del ctx, dummycallback
1218 gc.collect()
1219 self.assertIs(wr(), None)
1220
Christian Heimes9a5395a2013-06-17 15:44:12 +02001221 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001222 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001223 self.assertEqual(ctx.cert_store_stats(),
1224 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1225 ctx.load_cert_chain(CERTFILE)
1226 self.assertEqual(ctx.cert_store_stats(),
1227 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1228 ctx.load_verify_locations(CERTFILE)
1229 self.assertEqual(ctx.cert_store_stats(),
1230 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001231 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001232 self.assertEqual(ctx.cert_store_stats(),
1233 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1234
1235 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001236 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001237 self.assertEqual(ctx.get_ca_certs(), [])
1238 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1239 ctx.load_verify_locations(CERTFILE)
1240 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001241 # but CAFILE_CACERT is a CA cert
1242 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001243 self.assertEqual(ctx.get_ca_certs(),
1244 [{'issuer': ((('organizationName', 'Root CA'),),
1245 (('organizationalUnitName', 'http://www.cacert.org'),),
1246 (('commonName', 'CA Cert Signing Authority'),),
1247 (('emailAddress', 'support@cacert.org'),)),
1248 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1249 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1250 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001251 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001252 'subject': ((('organizationName', 'Root CA'),),
1253 (('organizationalUnitName', 'http://www.cacert.org'),),
1254 (('commonName', 'CA Cert Signing Authority'),),
1255 (('emailAddress', 'support@cacert.org'),)),
1256 'version': 3}])
1257
Martin Panterb55f8b72016-01-14 12:53:56 +00001258 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001259 pem = f.read()
1260 der = ssl.PEM_cert_to_DER_cert(pem)
1261 self.assertEqual(ctx.get_ca_certs(True), [der])
1262
Christian Heimes72d28502013-11-23 13:56:58 +01001263 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001264 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001265 ctx.load_default_certs()
1266
Christian Heimesa170fa12017-09-15 20:27:30 +02001267 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001268 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1269 ctx.load_default_certs()
1270
Christian Heimesa170fa12017-09-15 20:27:30 +02001271 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001272 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1273
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001275 self.assertRaises(TypeError, ctx.load_default_certs, None)
1276 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1277
Benjamin Peterson91244e02014-10-03 18:17:15 -04001278 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001279 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001280 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001281 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001282 with support.EnvironmentVarGuard() as env:
1283 env["SSL_CERT_DIR"] = CAPATH
1284 env["SSL_CERT_FILE"] = CERTFILE
1285 ctx.load_default_certs()
1286 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1287
Benjamin Peterson91244e02014-10-03 18:17:15 -04001288 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001289 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001290 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001291 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001292 ctx.load_default_certs()
1293 stats = ctx.cert_store_stats()
1294
Christian Heimesa170fa12017-09-15 20:27:30 +02001295 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001296 with support.EnvironmentVarGuard() as env:
1297 env["SSL_CERT_DIR"] = CAPATH
1298 env["SSL_CERT_FILE"] = CERTFILE
1299 ctx.load_default_certs()
1300 stats["x509"] += 1
1301 self.assertEqual(ctx.cert_store_stats(), stats)
1302
Christian Heimes358cfd42016-09-10 22:43:48 +02001303 def _assert_context_options(self, ctx):
1304 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1305 if OP_NO_COMPRESSION != 0:
1306 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1307 OP_NO_COMPRESSION)
1308 if OP_SINGLE_DH_USE != 0:
1309 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1310 OP_SINGLE_DH_USE)
1311 if OP_SINGLE_ECDH_USE != 0:
1312 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1313 OP_SINGLE_ECDH_USE)
1314 if OP_CIPHER_SERVER_PREFERENCE != 0:
1315 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1316 OP_CIPHER_SERVER_PREFERENCE)
1317
Christian Heimes4c05b472013-11-23 15:58:30 +01001318 def test_create_default_context(self):
1319 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001320
Christian Heimesa170fa12017-09-15 20:27:30 +02001321 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001322 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001323 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001324 self._assert_context_options(ctx)
1325
Christian Heimes4c05b472013-11-23 15:58:30 +01001326 with open(SIGNING_CA) as f:
1327 cadata = f.read()
1328 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1329 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001330 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001331 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001332 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001333
1334 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001335 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001336 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001337 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001338
Christian Heimes67986f92013-11-23 22:43:47 +01001339 def test__create_stdlib_context(self):
1340 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001341 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001342 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001343 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001344 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001345
1346 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1347 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1348 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001349 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001350
1351 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001352 cert_reqs=ssl.CERT_REQUIRED,
1353 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001354 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1355 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001356 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001357 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001358
1359 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001360 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001361 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001362 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001363
Christian Heimes1aa9a752013-12-02 02:41:19 +01001364 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001365 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001366 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001367 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001368
Christian Heimese82c0342017-09-15 20:29:57 +02001369 # Auto set CERT_REQUIRED
1370 ctx.check_hostname = True
1371 self.assertTrue(ctx.check_hostname)
1372 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1373 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001374 ctx.verify_mode = ssl.CERT_REQUIRED
1375 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001376 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001377
Christian Heimese82c0342017-09-15 20:29:57 +02001378 # Changing verify_mode does not affect check_hostname
1379 ctx.check_hostname = False
1380 ctx.verify_mode = ssl.CERT_NONE
1381 ctx.check_hostname = False
1382 self.assertFalse(ctx.check_hostname)
1383 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1384 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001385 ctx.check_hostname = True
1386 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001387 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1388
1389 ctx.check_hostname = False
1390 ctx.verify_mode = ssl.CERT_OPTIONAL
1391 ctx.check_hostname = False
1392 self.assertFalse(ctx.check_hostname)
1393 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1394 # keep CERT_OPTIONAL
1395 ctx.check_hostname = True
1396 self.assertTrue(ctx.check_hostname)
1397 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001398
1399 # Cannot set CERT_NONE with check_hostname enabled
1400 with self.assertRaises(ValueError):
1401 ctx.verify_mode = ssl.CERT_NONE
1402 ctx.check_hostname = False
1403 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001404 ctx.verify_mode = ssl.CERT_NONE
1405 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001406
Christian Heimes5fe668c2016-09-12 00:01:11 +02001407 def test_context_client_server(self):
1408 # PROTOCOL_TLS_CLIENT has sane defaults
1409 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1410 self.assertTrue(ctx.check_hostname)
1411 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1412
1413 # PROTOCOL_TLS_SERVER has different but also sane defaults
1414 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1415 self.assertFalse(ctx.check_hostname)
1416 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1417
Christian Heimes4df60f12017-09-15 20:26:05 +02001418 def test_context_custom_class(self):
1419 class MySSLSocket(ssl.SSLSocket):
1420 pass
1421
1422 class MySSLObject(ssl.SSLObject):
1423 pass
1424
1425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1426 ctx.sslsocket_class = MySSLSocket
1427 ctx.sslobject_class = MySSLObject
1428
1429 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1430 self.assertIsInstance(sock, MySSLSocket)
1431 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1432 self.assertIsInstance(obj, MySSLObject)
1433
Antoine Pitrou152efa22010-05-16 18:19:27 +00001434
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001435class SSLErrorTests(unittest.TestCase):
1436
1437 def test_str(self):
1438 # The str() of a SSLError doesn't include the errno
1439 e = ssl.SSLError(1, "foo")
1440 self.assertEqual(str(e), "foo")
1441 self.assertEqual(e.errno, 1)
1442 # Same for a subclass
1443 e = ssl.SSLZeroReturnError(1, "foo")
1444 self.assertEqual(str(e), "foo")
1445 self.assertEqual(e.errno, 1)
1446
1447 def test_lib_reason(self):
1448 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001449 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001450 with self.assertRaises(ssl.SSLError) as cm:
1451 ctx.load_dh_params(CERTFILE)
1452 self.assertEqual(cm.exception.library, 'PEM')
1453 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1454 s = str(cm.exception)
1455 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1456
1457 def test_subclass(self):
1458 # Check that the appropriate SSLError subclass is raised
1459 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001460 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1461 ctx.check_hostname = False
1462 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001463 with socket.socket() as s:
1464 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001465 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001466 c = socket.socket()
1467 c.connect(s.getsockname())
1468 c.setblocking(False)
1469 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001470 with self.assertRaises(ssl.SSLWantReadError) as cm:
1471 c.do_handshake()
1472 s = str(cm.exception)
1473 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1474 # For compatibility
1475 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1476
Nathaniel J. Smith59fdf0f2017-06-09 02:35:16 -07001477 def test_bad_idna_in_server_hostname(self):
1478 # Note: this test is testing some code that probably shouldn't exist
1479 # in the first place, so if it starts failing at some point because
1480 # you made the ssl module stop doing IDNA decoding then please feel
1481 # free to remove it. The test was mainly added because this case used
1482 # to cause memory corruption (see bpo-30594).
1483 ctx = ssl.create_default_context()
1484 with self.assertRaises(UnicodeError):
1485 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1486 server_hostname="xn--.com")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001487
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001488class MemoryBIOTests(unittest.TestCase):
1489
1490 def test_read_write(self):
1491 bio = ssl.MemoryBIO()
1492 bio.write(b'foo')
1493 self.assertEqual(bio.read(), b'foo')
1494 self.assertEqual(bio.read(), b'')
1495 bio.write(b'foo')
1496 bio.write(b'bar')
1497 self.assertEqual(bio.read(), b'foobar')
1498 self.assertEqual(bio.read(), b'')
1499 bio.write(b'baz')
1500 self.assertEqual(bio.read(2), b'ba')
1501 self.assertEqual(bio.read(1), b'z')
1502 self.assertEqual(bio.read(1), b'')
1503
1504 def test_eof(self):
1505 bio = ssl.MemoryBIO()
1506 self.assertFalse(bio.eof)
1507 self.assertEqual(bio.read(), b'')
1508 self.assertFalse(bio.eof)
1509 bio.write(b'foo')
1510 self.assertFalse(bio.eof)
1511 bio.write_eof()
1512 self.assertFalse(bio.eof)
1513 self.assertEqual(bio.read(2), b'fo')
1514 self.assertFalse(bio.eof)
1515 self.assertEqual(bio.read(1), b'o')
1516 self.assertTrue(bio.eof)
1517 self.assertEqual(bio.read(), b'')
1518 self.assertTrue(bio.eof)
1519
1520 def test_pending(self):
1521 bio = ssl.MemoryBIO()
1522 self.assertEqual(bio.pending, 0)
1523 bio.write(b'foo')
1524 self.assertEqual(bio.pending, 3)
1525 for i in range(3):
1526 bio.read(1)
1527 self.assertEqual(bio.pending, 3-i-1)
1528 for i in range(3):
1529 bio.write(b'x')
1530 self.assertEqual(bio.pending, i+1)
1531 bio.read()
1532 self.assertEqual(bio.pending, 0)
1533
1534 def test_buffer_types(self):
1535 bio = ssl.MemoryBIO()
1536 bio.write(b'foo')
1537 self.assertEqual(bio.read(), b'foo')
1538 bio.write(bytearray(b'bar'))
1539 self.assertEqual(bio.read(), b'bar')
1540 bio.write(memoryview(b'baz'))
1541 self.assertEqual(bio.read(), b'baz')
1542
1543 def test_error_types(self):
1544 bio = ssl.MemoryBIO()
1545 self.assertRaises(TypeError, bio.write, 'foo')
1546 self.assertRaises(TypeError, bio.write, None)
1547 self.assertRaises(TypeError, bio.write, True)
1548 self.assertRaises(TypeError, bio.write, 1)
1549
1550
Martin Panter3840b2a2016-03-27 01:53:46 +00001551class SimpleBackgroundTests(unittest.TestCase):
1552
1553 """Tests that connect to a simple server running in the background"""
1554
1555 def setUp(self):
1556 server = ThreadedEchoServer(SIGNED_CERTFILE)
1557 self.server_addr = (HOST, server.port)
1558 server.__enter__()
1559 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001560
Antoine Pitrou480a1242010-04-28 21:37:09 +00001561 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001562 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001563 cert_reqs=ssl.CERT_NONE) as s:
1564 s.connect(self.server_addr)
1565 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001566 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001567
Martin Panter3840b2a2016-03-27 01:53:46 +00001568 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001569 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001570 cert_reqs=ssl.CERT_REQUIRED,
1571 ca_certs=SIGNING_CA) as s:
1572 s.connect(self.server_addr)
1573 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001574 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001575
Martin Panter3840b2a2016-03-27 01:53:46 +00001576 def test_connect_fail(self):
1577 # This should fail because we have no verification certs. Connection
1578 # failure crashes ThreadedEchoServer, so run this in an independent
1579 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001580 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001581 cert_reqs=ssl.CERT_REQUIRED)
1582 self.addCleanup(s.close)
1583 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1584 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001585
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001586 def test_connect_ex(self):
1587 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001588 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001589 cert_reqs=ssl.CERT_REQUIRED,
1590 ca_certs=SIGNING_CA)
1591 self.addCleanup(s.close)
1592 self.assertEqual(0, s.connect_ex(self.server_addr))
1593 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001594
1595 def test_non_blocking_connect_ex(self):
1596 # Issue #11326: non-blocking connect_ex() should allow handshake
1597 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001598 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001599 cert_reqs=ssl.CERT_REQUIRED,
1600 ca_certs=SIGNING_CA,
1601 do_handshake_on_connect=False)
1602 self.addCleanup(s.close)
1603 s.setblocking(False)
1604 rc = s.connect_ex(self.server_addr)
1605 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1606 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1607 # Wait for connect to finish
1608 select.select([], [s], [], 5.0)
1609 # Non-blocking handshake
1610 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001611 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001612 s.do_handshake()
1613 break
1614 except ssl.SSLWantReadError:
1615 select.select([s], [], [], 5.0)
1616 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001617 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001618 # SSL established
1619 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001620
Antoine Pitrou152efa22010-05-16 18:19:27 +00001621 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001622 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001623 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001624 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1625 s.connect(self.server_addr)
1626 self.assertEqual({}, s.getpeercert())
1627 # Same with a server hostname
1628 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1629 server_hostname="dummy") as s:
1630 s.connect(self.server_addr)
1631 ctx.verify_mode = ssl.CERT_REQUIRED
1632 # This should succeed because we specify the root cert
1633 ctx.load_verify_locations(SIGNING_CA)
1634 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1635 s.connect(self.server_addr)
1636 cert = s.getpeercert()
1637 self.assertTrue(cert)
1638
1639 def test_connect_with_context_fail(self):
1640 # This should fail because we have no verification certs. Connection
1641 # failure crashes ThreadedEchoServer, so run this in an independent
1642 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001643 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001644 ctx.verify_mode = ssl.CERT_REQUIRED
1645 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1646 self.addCleanup(s.close)
1647 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1648 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001649
1650 def test_connect_capath(self):
1651 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001652 # NOTE: the subject hashing algorithm has been changed between
1653 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1654 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001655 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001656 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001657 ctx.verify_mode = ssl.CERT_REQUIRED
1658 ctx.load_verify_locations(capath=CAPATH)
1659 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1660 s.connect(self.server_addr)
1661 cert = s.getpeercert()
1662 self.assertTrue(cert)
1663 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001664 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001665 ctx.verify_mode = ssl.CERT_REQUIRED
1666 ctx.load_verify_locations(capath=BYTES_CAPATH)
1667 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1668 s.connect(self.server_addr)
1669 cert = s.getpeercert()
1670 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001671
Christian Heimesefff7062013-11-21 03:35:02 +01001672 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001673 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001674 pem = f.read()
1675 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001676 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001677 ctx.verify_mode = ssl.CERT_REQUIRED
1678 ctx.load_verify_locations(cadata=pem)
1679 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1680 s.connect(self.server_addr)
1681 cert = s.getpeercert()
1682 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001683
Martin Panter3840b2a2016-03-27 01:53:46 +00001684 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001685 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001686 ctx.verify_mode = ssl.CERT_REQUIRED
1687 ctx.load_verify_locations(cadata=der)
1688 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1689 s.connect(self.server_addr)
1690 cert = s.getpeercert()
1691 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001692
Antoine Pitroue3220242010-04-24 11:13:53 +00001693 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1694 def test_makefile_close(self):
1695 # Issue #5238: creating a file-like object with makefile() shouldn't
1696 # delay closing the underlying "real socket" (here tested with its
1697 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001698 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001699 ss.connect(self.server_addr)
1700 fd = ss.fileno()
1701 f = ss.makefile()
1702 f.close()
1703 # The fd is still open
1704 os.read(fd, 0)
1705 # Closing the SSL socket should close the fd too
1706 ss.close()
1707 gc.collect()
1708 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001709 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001710 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001711
Antoine Pitrou480a1242010-04-28 21:37:09 +00001712 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001713 s = socket.socket(socket.AF_INET)
1714 s.connect(self.server_addr)
1715 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001716 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001717 cert_reqs=ssl.CERT_NONE,
1718 do_handshake_on_connect=False)
1719 self.addCleanup(s.close)
1720 count = 0
1721 while True:
1722 try:
1723 count += 1
1724 s.do_handshake()
1725 break
1726 except ssl.SSLWantReadError:
1727 select.select([s], [], [])
1728 except ssl.SSLWantWriteError:
1729 select.select([], [s], [])
1730 if support.verbose:
1731 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001732
Antoine Pitrou480a1242010-04-28 21:37:09 +00001733 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001734 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001735
Martin Panter3840b2a2016-03-27 01:53:46 +00001736 def test_get_server_certificate_fail(self):
1737 # Connection failure crashes ThreadedEchoServer, so run this in an
1738 # independent test method
1739 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001740
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001741 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001742 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001743 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1744 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001745 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001746 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1747 s.connect(self.server_addr)
1748 # Error checking can happen at instantiation or when connecting
1749 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1750 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001751 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001752 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1753 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001754
Christian Heimes9a5395a2013-06-17 15:44:12 +02001755 def test_get_ca_certs_capath(self):
1756 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001757 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001758 ctx.load_verify_locations(capath=CAPATH)
1759 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001760 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1761 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001762 s.connect(self.server_addr)
1763 cert = s.getpeercert()
1764 self.assertTrue(cert)
1765 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001766
Christian Heimes575596e2013-12-15 21:49:17 +01001767 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001768 def test_context_setget(self):
1769 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001770 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1771 ctx1.load_verify_locations(capath=CAPATH)
1772 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1773 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001774 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001775 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001776 ss.connect(self.server_addr)
1777 self.assertIs(ss.context, ctx1)
1778 self.assertIs(ss._sslobj.context, ctx1)
1779 ss.context = ctx2
1780 self.assertIs(ss.context, ctx2)
1781 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001782
1783 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1784 # A simple IO loop. Call func(*args) depending on the error we get
1785 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1786 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001787 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001788 count = 0
1789 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001790 if time.monotonic() > deadline:
1791 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001792 errno = None
1793 count += 1
1794 try:
1795 ret = func(*args)
1796 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001797 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001798 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001799 raise
1800 errno = e.errno
1801 # Get any data from the outgoing BIO irrespective of any error, and
1802 # send it to the socket.
1803 buf = outgoing.read()
1804 sock.sendall(buf)
1805 # If there's no error, we're done. For WANT_READ, we need to get
1806 # data from the socket and put it in the incoming BIO.
1807 if errno is None:
1808 break
1809 elif errno == ssl.SSL_ERROR_WANT_READ:
1810 buf = sock.recv(32768)
1811 if buf:
1812 incoming.write(buf)
1813 else:
1814 incoming.write_eof()
1815 if support.verbose:
1816 sys.stdout.write("Needed %d calls to complete %s().\n"
1817 % (count, func.__name__))
1818 return ret
1819
Martin Panter3840b2a2016-03-27 01:53:46 +00001820 def test_bio_handshake(self):
1821 sock = socket.socket(socket.AF_INET)
1822 self.addCleanup(sock.close)
1823 sock.connect(self.server_addr)
1824 incoming = ssl.MemoryBIO()
1825 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001826 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1827 self.assertTrue(ctx.check_hostname)
1828 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001830 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1831 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00001832 self.assertIs(sslobj._sslobj.owner, sslobj)
1833 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07001834 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02001835 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001836 self.assertRaises(ValueError, sslobj.getpeercert)
1837 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1838 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1839 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1840 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001841 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07001842 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00001843 self.assertTrue(sslobj.getpeercert())
1844 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1845 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1846 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001847 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001848 except ssl.SSLSyscallError:
1849 # If the server shuts down the TCP connection without sending a
1850 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1851 pass
1852 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1853
1854 def test_bio_read_write_data(self):
1855 sock = socket.socket(socket.AF_INET)
1856 self.addCleanup(sock.close)
1857 sock.connect(self.server_addr)
1858 incoming = ssl.MemoryBIO()
1859 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001860 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001861 ctx.verify_mode = ssl.CERT_NONE
1862 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1863 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1864 req = b'FOO\n'
1865 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1866 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1867 self.assertEqual(buf, b'foo\n')
1868 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001869
1870
Martin Panter3840b2a2016-03-27 01:53:46 +00001871class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001872
Martin Panter3840b2a2016-03-27 01:53:46 +00001873 def test_timeout_connect_ex(self):
1874 # Issue #12065: on a timeout, connect_ex() should return the original
1875 # errno (mimicking the behaviour of non-SSL sockets).
1876 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02001877 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001878 cert_reqs=ssl.CERT_REQUIRED,
1879 do_handshake_on_connect=False)
1880 self.addCleanup(s.close)
1881 s.settimeout(0.0000001)
1882 rc = s.connect_ex((REMOTE_HOST, 443))
1883 if rc == 0:
1884 self.skipTest("REMOTE_HOST responded too quickly")
1885 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1886
1887 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1888 def test_get_server_certificate_ipv6(self):
1889 with support.transient_internet('ipv6.google.com'):
1890 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1891 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1892
Martin Panter3840b2a2016-03-27 01:53:46 +00001893
1894def _test_get_server_certificate(test, host, port, cert=None):
1895 pem = ssl.get_server_certificate((host, port))
1896 if not pem:
1897 test.fail("No server certificate on %s:%s!" % (host, port))
1898
1899 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1900 if not pem:
1901 test.fail("No server certificate on %s:%s!" % (host, port))
1902 if support.verbose:
1903 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1904
1905def _test_get_server_certificate_fail(test, host, port):
1906 try:
1907 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1908 except ssl.SSLError as x:
1909 #should fail
1910 if support.verbose:
1911 sys.stdout.write("%s\n" % x)
1912 else:
1913 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1914
1915
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001916from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001917
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001918class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001919
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001920 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001921
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001922 """A mildly complicated class, because we want it to work both
1923 with and without the SSL wrapper around the socket connection, so
1924 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001925
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001926 def __init__(self, server, connsock, addr):
1927 self.server = server
1928 self.running = False
1929 self.sock = connsock
1930 self.addr = addr
1931 self.sock.setblocking(1)
1932 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001933 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001934 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001935
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001936 def wrap_conn(self):
1937 try:
1938 self.sslconn = self.server.context.wrap_socket(
1939 self.sock, server_side=True)
1940 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1941 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
1942 except (ssl.SSLError, ConnectionResetError, OSError) as e:
1943 # We treat ConnectionResetError as though it were an
1944 # SSLError - OpenSSL on Ubuntu abruptly closes the
1945 # connection when asked to use an unsupported protocol.
1946 #
1947 # OSError may occur with wrong protocols, e.g. both
1948 # sides use PROTOCOL_TLS_SERVER.
1949 #
1950 # XXX Various errors can have happened here, for example
1951 # a mismatching protocol version, an invalid certificate,
1952 # or a low-level bug. This should be made more discriminating.
1953 #
1954 # bpo-31323: Store the exception as string to prevent
1955 # a reference leak: server -> conn_errors -> exception
1956 # -> traceback -> self (ConnectionHandler) -> server
1957 self.server.conn_errors.append(str(e))
1958 if self.server.chatty:
1959 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
1960 self.running = False
1961 self.server.stop()
1962 self.close()
1963 return False
1964 else:
1965 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
1966 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1967 cert = self.sslconn.getpeercert()
1968 if support.verbose and self.server.chatty:
1969 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1970 cert_binary = self.sslconn.getpeercert(True)
1971 if support.verbose and self.server.chatty:
1972 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1973 cipher = self.sslconn.cipher()
1974 if support.verbose and self.server.chatty:
1975 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1976 sys.stdout.write(" server: selected protocol is now "
1977 + str(self.sslconn.selected_npn_protocol()) + "\n")
1978 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001979
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001980 def read(self):
1981 if self.sslconn:
1982 return self.sslconn.read()
1983 else:
1984 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001985
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001986 def write(self, bytes):
1987 if self.sslconn:
1988 return self.sslconn.write(bytes)
1989 else:
1990 return self.sock.send(bytes)
1991
1992 def close(self):
1993 if self.sslconn:
1994 self.sslconn.close()
1995 else:
1996 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001997
Antoine Pitrou480a1242010-04-28 21:37:09 +00001998 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001999 self.running = True
2000 if not self.server.starttls_server:
2001 if not self.wrap_conn():
2002 return
2003 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002004 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002005 msg = self.read()
2006 stripped = msg.strip()
2007 if not stripped:
2008 # eof, so quit this handler
2009 self.running = False
2010 try:
2011 self.sock = self.sslconn.unwrap()
2012 except OSError:
2013 # Many tests shut the TCP connection down
2014 # without an SSL shutdown. This causes
2015 # unwrap() to raise OSError with errno=0!
2016 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002017 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002018 self.sslconn = None
2019 self.close()
2020 elif stripped == b'over':
2021 if support.verbose and self.server.connectionchatty:
2022 sys.stdout.write(" server: client closed connection\n")
2023 self.close()
2024 return
2025 elif (self.server.starttls_server and
2026 stripped == b'STARTTLS'):
2027 if support.verbose and self.server.connectionchatty:
2028 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2029 self.write(b"OK\n")
2030 if not self.wrap_conn():
2031 return
2032 elif (self.server.starttls_server and self.sslconn
2033 and stripped == b'ENDTLS'):
2034 if support.verbose and self.server.connectionchatty:
2035 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2036 self.write(b"OK\n")
2037 self.sock = self.sslconn.unwrap()
2038 self.sslconn = None
2039 if support.verbose and self.server.connectionchatty:
2040 sys.stdout.write(" server: connection is now unencrypted...\n")
2041 elif stripped == b'CB tls-unique':
2042 if support.verbose and self.server.connectionchatty:
2043 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2044 data = self.sslconn.get_channel_binding("tls-unique")
2045 self.write(repr(data).encode("us-ascii") + b"\n")
2046 else:
2047 if (support.verbose and
2048 self.server.connectionchatty):
2049 ctype = (self.sslconn and "encrypted") or "unencrypted"
2050 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2051 % (msg, ctype, msg.lower(), ctype))
2052 self.write(msg.lower())
2053 except OSError:
2054 if self.server.chatty:
2055 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002056 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002057 self.running = False
2058 # normally, we'd just stop here, but for the test
2059 # harness, we want to stop the server
2060 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002061
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002062 def __init__(self, certificate=None, ssl_version=None,
2063 certreqs=None, cacerts=None,
2064 chatty=True, connectionchatty=False, starttls_server=False,
2065 npn_protocols=None, alpn_protocols=None,
2066 ciphers=None, context=None):
2067 if context:
2068 self.context = context
2069 else:
2070 self.context = ssl.SSLContext(ssl_version
2071 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002072 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002073 self.context.verify_mode = (certreqs if certreqs is not None
2074 else ssl.CERT_NONE)
2075 if cacerts:
2076 self.context.load_verify_locations(cacerts)
2077 if certificate:
2078 self.context.load_cert_chain(certificate)
2079 if npn_protocols:
2080 self.context.set_npn_protocols(npn_protocols)
2081 if alpn_protocols:
2082 self.context.set_alpn_protocols(alpn_protocols)
2083 if ciphers:
2084 self.context.set_ciphers(ciphers)
2085 self.chatty = chatty
2086 self.connectionchatty = connectionchatty
2087 self.starttls_server = starttls_server
2088 self.sock = socket.socket()
2089 self.port = support.bind_port(self.sock)
2090 self.flag = None
2091 self.active = False
2092 self.selected_npn_protocols = []
2093 self.selected_alpn_protocols = []
2094 self.shared_ciphers = []
2095 self.conn_errors = []
2096 threading.Thread.__init__(self)
2097 self.daemon = True
2098
2099 def __enter__(self):
2100 self.start(threading.Event())
2101 self.flag.wait()
2102 return self
2103
2104 def __exit__(self, *args):
2105 self.stop()
2106 self.join()
2107
2108 def start(self, flag=None):
2109 self.flag = flag
2110 threading.Thread.start(self)
2111
2112 def run(self):
2113 self.sock.settimeout(0.05)
2114 self.sock.listen()
2115 self.active = True
2116 if self.flag:
2117 # signal an event
2118 self.flag.set()
2119 while self.active:
2120 try:
2121 newconn, connaddr = self.sock.accept()
2122 if support.verbose and self.chatty:
2123 sys.stdout.write(' server: new connection from '
2124 + repr(connaddr) + '\n')
2125 handler = self.ConnectionHandler(self, newconn, connaddr)
2126 handler.start()
2127 handler.join()
2128 except socket.timeout:
2129 pass
2130 except KeyboardInterrupt:
2131 self.stop()
2132 self.sock.close()
2133
2134 def stop(self):
2135 self.active = False
2136
2137class AsyncoreEchoServer(threading.Thread):
2138
2139 # this one's based on asyncore.dispatcher
2140
2141 class EchoServer (asyncore.dispatcher):
2142
2143 class ConnectionHandler(asyncore.dispatcher_with_send):
2144
2145 def __init__(self, conn, certfile):
2146 self.socket = test_wrap_socket(conn, server_side=True,
2147 certfile=certfile,
2148 do_handshake_on_connect=False)
2149 asyncore.dispatcher_with_send.__init__(self, self.socket)
2150 self._ssl_accepting = True
2151 self._do_ssl_handshake()
2152
2153 def readable(self):
2154 if isinstance(self.socket, ssl.SSLSocket):
2155 while self.socket.pending() > 0:
2156 self.handle_read_event()
2157 return True
2158
2159 def _do_ssl_handshake(self):
2160 try:
2161 self.socket.do_handshake()
2162 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2163 return
2164 except ssl.SSLEOFError:
2165 return self.handle_close()
2166 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002167 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002168 except OSError as err:
2169 if err.args[0] == errno.ECONNABORTED:
2170 return self.handle_close()
2171 else:
2172 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002173
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002174 def handle_read(self):
2175 if self._ssl_accepting:
2176 self._do_ssl_handshake()
2177 else:
2178 data = self.recv(1024)
2179 if support.verbose:
2180 sys.stdout.write(" server: read %s from client\n" % repr(data))
2181 if not data:
2182 self.close()
2183 else:
2184 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002185
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002186 def handle_close(self):
2187 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002188 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002189 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002190
2191 def handle_error(self):
2192 raise
2193
Trent Nelson78520002008-04-10 20:54:35 +00002194 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002195 self.certfile = certfile
2196 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2197 self.port = support.bind_port(sock, '')
2198 asyncore.dispatcher.__init__(self, sock)
2199 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002200
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002201 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002202 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002203 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2204 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002205
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002206 def handle_error(self):
2207 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002208
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002209 def __init__(self, certfile):
2210 self.flag = None
2211 self.active = False
2212 self.server = self.EchoServer(certfile)
2213 self.port = self.server.port
2214 threading.Thread.__init__(self)
2215 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002216
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002217 def __str__(self):
2218 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002219
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002220 def __enter__(self):
2221 self.start(threading.Event())
2222 self.flag.wait()
2223 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002224
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002225 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002226 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002227 sys.stdout.write(" cleanup: stopping server.\n")
2228 self.stop()
2229 if support.verbose:
2230 sys.stdout.write(" cleanup: joining server thread.\n")
2231 self.join()
2232 if support.verbose:
2233 sys.stdout.write(" cleanup: successfully joined.\n")
2234 # make sure that ConnectionHandler is removed from socket_map
2235 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002236
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002237 def start (self, flag=None):
2238 self.flag = flag
2239 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002240
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002241 def run(self):
2242 self.active = True
2243 if self.flag:
2244 self.flag.set()
2245 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002246 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002247 asyncore.loop(1)
2248 except:
2249 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002250
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002251 def stop(self):
2252 self.active = False
2253 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002254
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002255def server_params_test(client_context, server_context, indata=b"FOO\n",
2256 chatty=True, connectionchatty=False, sni_name=None,
2257 session=None):
2258 """
2259 Launch a server, connect a client to it and try various reads
2260 and writes.
2261 """
2262 stats = {}
2263 server = ThreadedEchoServer(context=server_context,
2264 chatty=chatty,
2265 connectionchatty=False)
2266 with server:
2267 with client_context.wrap_socket(socket.socket(),
2268 server_hostname=sni_name, session=session) as s:
2269 s.connect((HOST, server.port))
2270 for arg in [indata, bytearray(indata), memoryview(indata)]:
2271 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002272 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002273 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002274 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002275 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002276 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002277 if connectionchatty:
2278 if support.verbose:
2279 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002280 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002281 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002282 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2283 % (outdata[:20], len(outdata),
2284 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002285 s.write(b"over\n")
2286 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002287 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002288 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002289 stats.update({
2290 'compression': s.compression(),
2291 'cipher': s.cipher(),
2292 'peercert': s.getpeercert(),
2293 'client_alpn_protocol': s.selected_alpn_protocol(),
2294 'client_npn_protocol': s.selected_npn_protocol(),
2295 'version': s.version(),
2296 'session_reused': s.session_reused,
2297 'session': s.session,
2298 })
2299 s.close()
2300 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2301 stats['server_npn_protocols'] = server.selected_npn_protocols
2302 stats['server_shared_ciphers'] = server.shared_ciphers
2303 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002304
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002305def try_protocol_combo(server_protocol, client_protocol, expect_success,
2306 certsreqs=None, server_options=0, client_options=0):
2307 """
2308 Try to SSL-connect using *client_protocol* to *server_protocol*.
2309 If *expect_success* is true, assert that the connection succeeds,
2310 if it's false, assert that the connection fails.
2311 Also, if *expect_success* is a string, assert that it is the protocol
2312 version actually used by the connection.
2313 """
2314 if certsreqs is None:
2315 certsreqs = ssl.CERT_NONE
2316 certtype = {
2317 ssl.CERT_NONE: "CERT_NONE",
2318 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2319 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2320 }[certsreqs]
2321 if support.verbose:
2322 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2323 sys.stdout.write(formatstr %
2324 (ssl.get_protocol_name(client_protocol),
2325 ssl.get_protocol_name(server_protocol),
2326 certtype))
2327 client_context = ssl.SSLContext(client_protocol)
2328 client_context.options |= client_options
2329 server_context = ssl.SSLContext(server_protocol)
2330 server_context.options |= server_options
2331
2332 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2333 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2334 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002335 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002336 client_context.set_ciphers("ALL")
2337
2338 for ctx in (client_context, server_context):
2339 ctx.verify_mode = certsreqs
2340 ctx.load_cert_chain(CERTFILE)
2341 ctx.load_verify_locations(CERTFILE)
2342 try:
2343 stats = server_params_test(client_context, server_context,
2344 chatty=False, connectionchatty=False)
2345 # Protocol mismatch can result in either an SSLError, or a
2346 # "Connection reset by peer" error.
2347 except ssl.SSLError:
2348 if expect_success:
2349 raise
2350 except OSError as e:
2351 if expect_success or e.errno != errno.ECONNRESET:
2352 raise
2353 else:
2354 if not expect_success:
2355 raise AssertionError(
2356 "Client protocol %s succeeded with server protocol %s!"
2357 % (ssl.get_protocol_name(client_protocol),
2358 ssl.get_protocol_name(server_protocol)))
2359 elif (expect_success is not True
2360 and expect_success != stats['version']):
2361 raise AssertionError("version mismatch: expected %r, got %r"
2362 % (expect_success, stats['version']))
2363
2364
2365class ThreadedTests(unittest.TestCase):
2366
2367 @skip_if_broken_ubuntu_ssl
2368 def test_echo(self):
2369 """Basic test of an SSL client connecting to a server"""
2370 if support.verbose:
2371 sys.stdout.write("\n")
2372 for protocol in PROTOCOLS:
2373 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2374 continue
2375 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2376 context = ssl.SSLContext(protocol)
2377 context.load_cert_chain(CERTFILE)
2378 server_params_test(context, context,
2379 chatty=True, connectionchatty=True)
2380
Christian Heimesa170fa12017-09-15 20:27:30 +02002381 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002382
2383 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2384 server_params_test(client_context=client_context,
2385 server_context=server_context,
2386 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002387 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002388
2389 client_context.check_hostname = False
2390 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2391 with self.assertRaises(ssl.SSLError) as e:
2392 server_params_test(client_context=server_context,
2393 server_context=client_context,
2394 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002395 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002396 self.assertIn('called a function you should not call',
2397 str(e.exception))
2398
2399 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2400 with self.assertRaises(ssl.SSLError) as e:
2401 server_params_test(client_context=server_context,
2402 server_context=server_context,
2403 chatty=True, connectionchatty=True)
2404 self.assertIn('called a function you should not call',
2405 str(e.exception))
2406
2407 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2408 with self.assertRaises(ssl.SSLError) as e:
2409 server_params_test(client_context=server_context,
2410 server_context=client_context,
2411 chatty=True, connectionchatty=True)
2412 self.assertIn('called a function you should not call',
2413 str(e.exception))
2414
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002415 def test_getpeercert(self):
2416 if support.verbose:
2417 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002418
2419 client_context, server_context, hostname = testing_context()
2420 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002421 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002422 with client_context.wrap_socket(socket.socket(),
2423 do_handshake_on_connect=False,
2424 server_hostname=hostname) as s:
2425 s.connect((HOST, server.port))
2426 # getpeercert() raise ValueError while the handshake isn't
2427 # done.
2428 with self.assertRaises(ValueError):
2429 s.getpeercert()
2430 s.do_handshake()
2431 cert = s.getpeercert()
2432 self.assertTrue(cert, "Can't get peer certificate.")
2433 cipher = s.cipher()
2434 if support.verbose:
2435 sys.stdout.write(pprint.pformat(cert) + '\n')
2436 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2437 if 'subject' not in cert:
2438 self.fail("No subject field in certificate: %s." %
2439 pprint.pformat(cert))
2440 if ((('organizationName', 'Python Software Foundation'),)
2441 not in cert['subject']):
2442 self.fail(
2443 "Missing or invalid 'organizationName' field in certificate subject; "
2444 "should be 'Python Software Foundation'.")
2445 self.assertIn('notBefore', cert)
2446 self.assertIn('notAfter', cert)
2447 before = ssl.cert_time_to_seconds(cert['notBefore'])
2448 after = ssl.cert_time_to_seconds(cert['notAfter'])
2449 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002450
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002451 @unittest.skipUnless(have_verify_flags(),
2452 "verify_flags need OpenSSL > 0.9.8")
2453 def test_crl_check(self):
2454 if support.verbose:
2455 sys.stdout.write("\n")
2456
Christian Heimesa170fa12017-09-15 20:27:30 +02002457 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002458
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002459 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002460 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002461
2462 # VERIFY_DEFAULT should pass
2463 server = ThreadedEchoServer(context=server_context, chatty=True)
2464 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002465 with client_context.wrap_socket(socket.socket(),
2466 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002467 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002468 cert = s.getpeercert()
2469 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002470
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002471 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002472 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002473
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002474 server = ThreadedEchoServer(context=server_context, chatty=True)
2475 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002476 with client_context.wrap_socket(socket.socket(),
2477 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002478 with self.assertRaisesRegex(ssl.SSLError,
2479 "certificate verify failed"):
2480 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002481
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002482 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002483 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002484
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002485 server = ThreadedEchoServer(context=server_context, chatty=True)
2486 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002487 with client_context.wrap_socket(socket.socket(),
2488 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002489 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002490 cert = s.getpeercert()
2491 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002492
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002493 def test_check_hostname(self):
2494 if support.verbose:
2495 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002496
Christian Heimesa170fa12017-09-15 20:27:30 +02002497 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002498
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002499 # correct hostname should verify
2500 server = ThreadedEchoServer(context=server_context, chatty=True)
2501 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002502 with client_context.wrap_socket(socket.socket(),
2503 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002504 s.connect((HOST, server.port))
2505 cert = s.getpeercert()
2506 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002507
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002508 # incorrect hostname should raise an exception
2509 server = ThreadedEchoServer(context=server_context, chatty=True)
2510 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002511 with client_context.wrap_socket(socket.socket(),
2512 server_hostname="invalid") as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002513 with self.assertRaisesRegex(ssl.CertificateError,
2514 "hostname 'invalid' doesn't match 'localhost'"):
2515 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002516
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002517 # missing server_hostname arg should cause an exception, too
2518 server = ThreadedEchoServer(context=server_context, chatty=True)
2519 with server:
2520 with socket.socket() as s:
2521 with self.assertRaisesRegex(ValueError,
2522 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002523 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002524
2525 def test_wrong_cert(self):
2526 """Connecting when the server rejects the client's certificate
2527
2528 Launch a server with CERT_REQUIRED, and check that trying to
2529 connect to it with a wrong client certificate fails.
2530 """
2531 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2532 "wrongcert.pem")
2533 server = ThreadedEchoServer(CERTFILE,
2534 certreqs=ssl.CERT_REQUIRED,
2535 cacerts=CERTFILE, chatty=False,
2536 connectionchatty=False)
2537 with server, \
2538 socket.socket() as sock, \
Christian Heimesa170fa12017-09-15 20:27:30 +02002539 test_wrap_socket(sock, certfile=certfile) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002540 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002541 # Expect either an SSL error about the server rejecting
2542 # the connection, or a low-level connection reset (which
2543 # sometimes happens on Windows)
2544 s.connect((HOST, server.port))
2545 except ssl.SSLError as e:
2546 if support.verbose:
2547 sys.stdout.write("\nSSLError is %r\n" % e)
2548 except OSError as e:
2549 if e.errno != errno.ECONNRESET:
2550 raise
2551 if support.verbose:
2552 sys.stdout.write("\nsocket.error is %r\n" % e)
2553 else:
2554 self.fail("Use of invalid cert should have failed!")
2555
2556 def test_rude_shutdown(self):
2557 """A brutal shutdown of an SSL server should raise an OSError
2558 in the client when attempting handshake.
2559 """
2560 listener_ready = threading.Event()
2561 listener_gone = threading.Event()
2562
2563 s = socket.socket()
2564 port = support.bind_port(s, HOST)
2565
2566 # `listener` runs in a thread. It sits in an accept() until
2567 # the main thread connects. Then it rudely closes the socket,
2568 # and sets Event `listener_gone` to let the main thread know
2569 # the socket is gone.
2570 def listener():
2571 s.listen()
2572 listener_ready.set()
2573 newsock, addr = s.accept()
2574 newsock.close()
2575 s.close()
2576 listener_gone.set()
2577
2578 def connector():
2579 listener_ready.wait()
2580 with socket.socket() as c:
2581 c.connect((HOST, port))
2582 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002583 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002584 ssl_sock = test_wrap_socket(c)
2585 except OSError:
2586 pass
2587 else:
2588 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002589
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002590 t = threading.Thread(target=listener)
2591 t.start()
2592 try:
2593 connector()
2594 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002595 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002596
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002597 def test_ssl_cert_verify_error(self):
2598 if support.verbose:
2599 sys.stdout.write("\n")
2600
2601 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2602 server_context.load_cert_chain(SIGNED_CERTFILE)
2603
2604 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2605
2606 server = ThreadedEchoServer(context=server_context, chatty=True)
2607 with server:
2608 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002609 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002610 try:
2611 s.connect((HOST, server.port))
2612 except ssl.SSLError as e:
2613 msg = 'unable to get local issuer certificate'
2614 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2615 self.assertEqual(e.verify_code, 20)
2616 self.assertEqual(e.verify_message, msg)
2617 self.assertIn(msg, repr(e))
2618 self.assertIn('certificate verify failed', repr(e))
2619
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002620 @skip_if_broken_ubuntu_ssl
2621 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2622 "OpenSSL is compiled without SSLv2 support")
2623 def test_protocol_sslv2(self):
2624 """Connecting to an SSLv2 server with various client options"""
2625 if support.verbose:
2626 sys.stdout.write("\n")
2627 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2628 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2629 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002630 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002631 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2632 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2633 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2634 # SSLv23 client with specific SSL options
2635 if no_sslv2_implies_sslv3_hello():
2636 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002637 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002638 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002639 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002640 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002641 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002642 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002643
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002644 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002645 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002646 """Connecting to an SSLv23 server with various client options"""
2647 if support.verbose:
2648 sys.stdout.write("\n")
2649 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002650 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002651 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002652 except OSError as x:
2653 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2654 if support.verbose:
2655 sys.stdout.write(
2656 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2657 % str(x))
2658 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002659 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2660 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2661 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002662
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002663 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002664 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2665 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2666 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002667
2668 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002669 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2670 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2671 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002672
2673 # Server with specific SSL options
2674 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002675 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002676 server_options=ssl.OP_NO_SSLv3)
2677 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002678 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002679 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002680 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002681 server_options=ssl.OP_NO_TLSv1)
2682
2683
2684 @skip_if_broken_ubuntu_ssl
2685 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2686 "OpenSSL is compiled without SSLv3 support")
2687 def test_protocol_sslv3(self):
2688 """Connecting to an SSLv3 server with various client options"""
2689 if support.verbose:
2690 sys.stdout.write("\n")
2691 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2692 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2693 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2694 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2695 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002696 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002697 client_options=ssl.OP_NO_SSLv3)
2698 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2699 if no_sslv2_implies_sslv3_hello():
2700 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002701 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002702 False, client_options=ssl.OP_NO_SSLv2)
2703
2704 @skip_if_broken_ubuntu_ssl
2705 def test_protocol_tlsv1(self):
2706 """Connecting to a TLSv1 server with various client options"""
2707 if support.verbose:
2708 sys.stdout.write("\n")
2709 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2710 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2711 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2712 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2713 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2714 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2715 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002716 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002717 client_options=ssl.OP_NO_TLSv1)
2718
2719 @skip_if_broken_ubuntu_ssl
2720 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2721 "TLS version 1.1 not supported.")
2722 def test_protocol_tlsv1_1(self):
2723 """Connecting to a TLSv1.1 server with various client options.
2724 Testing against older TLS versions."""
2725 if support.verbose:
2726 sys.stdout.write("\n")
2727 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2728 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2729 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2730 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2731 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002732 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002733 client_options=ssl.OP_NO_TLSv1_1)
2734
Christian Heimesa170fa12017-09-15 20:27:30 +02002735 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002736 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2737 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2738
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002739 @skip_if_broken_ubuntu_ssl
2740 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2741 "TLS version 1.2 not supported.")
2742 def test_protocol_tlsv1_2(self):
2743 """Connecting to a TLSv1.2 server with various client options.
2744 Testing against older TLS versions."""
2745 if support.verbose:
2746 sys.stdout.write("\n")
2747 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2748 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2749 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2750 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2751 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2752 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2753 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002754 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002755 client_options=ssl.OP_NO_TLSv1_2)
2756
Christian Heimesa170fa12017-09-15 20:27:30 +02002757 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002758 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2759 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2760 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2761 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2762
2763 def test_starttls(self):
2764 """Switching from clear text to encrypted and back again."""
2765 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2766
2767 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002768 starttls_server=True,
2769 chatty=True,
2770 connectionchatty=True)
2771 wrapped = False
2772 with server:
2773 s = socket.socket()
2774 s.setblocking(1)
2775 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002776 if support.verbose:
2777 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002778 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02002779 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002780 sys.stdout.write(
2781 " client: sending %r...\n" % indata)
2782 if wrapped:
2783 conn.write(indata)
2784 outdata = conn.read()
2785 else:
2786 s.send(indata)
2787 outdata = s.recv(1024)
2788 msg = outdata.strip().lower()
2789 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2790 # STARTTLS ok, switch to secure mode
2791 if support.verbose:
2792 sys.stdout.write(
2793 " client: read %r from server, starting TLS...\n"
2794 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02002795 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002796 wrapped = True
2797 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2798 # ENDTLS ok, switch back to clear text
2799 if support.verbose:
2800 sys.stdout.write(
2801 " client: read %r from server, ending TLS...\n"
2802 % msg)
2803 s = conn.unwrap()
2804 wrapped = False
2805 else:
2806 if support.verbose:
2807 sys.stdout.write(
2808 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002809 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002810 sys.stdout.write(" client: closing connection.\n")
2811 if wrapped:
2812 conn.write(b"over\n")
2813 else:
2814 s.send(b"over\n")
2815 if wrapped:
2816 conn.close()
2817 else:
2818 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002819
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002820 def test_socketserver(self):
2821 """Using socketserver to create and manage SSL connections."""
2822 server = make_https_server(self, certfile=CERTFILE)
2823 # try to connect
2824 if support.verbose:
2825 sys.stdout.write('\n')
2826 with open(CERTFILE, 'rb') as f:
2827 d1 = f.read()
2828 d2 = ''
2829 # now fetch the same data from the HTTPS server
2830 url = 'https://localhost:%d/%s' % (
2831 server.port, os.path.split(CERTFILE)[1])
2832 context = ssl.create_default_context(cafile=CERTFILE)
2833 f = urllib.request.urlopen(url, context=context)
2834 try:
2835 dlen = f.info().get("content-length")
2836 if dlen and (int(dlen) > 0):
2837 d2 = f.read(int(dlen))
2838 if support.verbose:
2839 sys.stdout.write(
2840 " client: read %d bytes from remote server '%s'\n"
2841 % (len(d2), server))
2842 finally:
2843 f.close()
2844 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002845
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002846 def test_asyncore_server(self):
2847 """Check the example asyncore integration."""
2848 if support.verbose:
2849 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01002850
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002851 indata = b"FOO\n"
2852 server = AsyncoreEchoServer(CERTFILE)
2853 with server:
2854 s = test_wrap_socket(socket.socket())
2855 s.connect(('127.0.0.1', server.port))
2856 if support.verbose:
2857 sys.stdout.write(
2858 " client: sending %r...\n" % indata)
2859 s.write(indata)
2860 outdata = s.read()
2861 if support.verbose:
2862 sys.stdout.write(" client: read %r\n" % outdata)
2863 if outdata != indata.lower():
2864 self.fail(
2865 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2866 % (outdata[:20], len(outdata),
2867 indata[:20].lower(), len(indata)))
2868 s.write(b"over\n")
2869 if support.verbose:
2870 sys.stdout.write(" client: closing connection.\n")
2871 s.close()
2872 if support.verbose:
2873 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05002874
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002875 def test_recv_send(self):
2876 """Test recv(), send() and friends."""
2877 if support.verbose:
2878 sys.stdout.write("\n")
2879
2880 server = ThreadedEchoServer(CERTFILE,
2881 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02002882 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002883 cacerts=CERTFILE,
2884 chatty=True,
2885 connectionchatty=False)
2886 with server:
2887 s = test_wrap_socket(socket.socket(),
2888 server_side=False,
2889 certfile=CERTFILE,
2890 ca_certs=CERTFILE,
2891 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02002892 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002893 s.connect((HOST, server.port))
2894 # helper methods for standardising recv* method signatures
2895 def _recv_into():
2896 b = bytearray(b"\0"*100)
2897 count = s.recv_into(b)
2898 return b[:count]
2899
2900 def _recvfrom_into():
2901 b = bytearray(b"\0"*100)
2902 count, addr = s.recvfrom_into(b)
2903 return b[:count]
2904
2905 # (name, method, expect success?, *args, return value func)
2906 send_methods = [
2907 ('send', s.send, True, [], len),
2908 ('sendto', s.sendto, False, ["some.address"], len),
2909 ('sendall', s.sendall, True, [], lambda x: None),
2910 ]
2911 # (name, method, whether to expect success, *args)
2912 recv_methods = [
2913 ('recv', s.recv, True, []),
2914 ('recvfrom', s.recvfrom, False, ["some.address"]),
2915 ('recv_into', _recv_into, True, []),
2916 ('recvfrom_into', _recvfrom_into, False, []),
2917 ]
2918 data_prefix = "PREFIX_"
2919
2920 for (meth_name, send_meth, expect_success, args,
2921 ret_val_meth) in send_methods:
2922 indata = (data_prefix + meth_name).encode('ascii')
2923 try:
2924 ret = send_meth(indata, *args)
2925 msg = "sending with {}".format(meth_name)
2926 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
2927 outdata = s.read()
2928 if outdata != indata.lower():
2929 self.fail(
2930 "While sending with <<{name:s}>> bad data "
2931 "<<{outdata:r}>> ({nout:d}) received; "
2932 "expected <<{indata:r}>> ({nin:d})\n".format(
2933 name=meth_name, outdata=outdata[:20],
2934 nout=len(outdata),
2935 indata=indata[:20], nin=len(indata)
2936 )
2937 )
2938 except ValueError as e:
2939 if expect_success:
2940 self.fail(
2941 "Failed to send with method <<{name:s}>>; "
2942 "expected to succeed.\n".format(name=meth_name)
2943 )
2944 if not str(e).startswith(meth_name):
2945 self.fail(
2946 "Method <<{name:s}>> failed with unexpected "
2947 "exception message: {exp:s}\n".format(
2948 name=meth_name, exp=e
2949 )
2950 )
2951
2952 for meth_name, recv_meth, expect_success, args in recv_methods:
2953 indata = (data_prefix + meth_name).encode('ascii')
2954 try:
2955 s.send(indata)
2956 outdata = recv_meth(*args)
2957 if outdata != indata.lower():
2958 self.fail(
2959 "While receiving with <<{name:s}>> bad data "
2960 "<<{outdata:r}>> ({nout:d}) received; "
2961 "expected <<{indata:r}>> ({nin:d})\n".format(
2962 name=meth_name, outdata=outdata[:20],
2963 nout=len(outdata),
2964 indata=indata[:20], nin=len(indata)
2965 )
2966 )
2967 except ValueError as e:
2968 if expect_success:
2969 self.fail(
2970 "Failed to receive with method <<{name:s}>>; "
2971 "expected to succeed.\n".format(name=meth_name)
2972 )
2973 if not str(e).startswith(meth_name):
2974 self.fail(
2975 "Method <<{name:s}>> failed with unexpected "
2976 "exception message: {exp:s}\n".format(
2977 name=meth_name, exp=e
2978 )
2979 )
2980 # consume data
2981 s.read()
2982
2983 # read(-1, buffer) is supported, even though read(-1) is not
2984 data = b"data"
2985 s.send(data)
2986 buffer = bytearray(len(data))
2987 self.assertEqual(s.read(-1, buffer), len(data))
2988 self.assertEqual(buffer, data)
2989
Christian Heimes888bbdc2017-09-07 14:18:21 -07002990 # sendall accepts bytes-like objects
2991 if ctypes is not None:
2992 ubyte = ctypes.c_ubyte * len(data)
2993 byteslike = ubyte.from_buffer_copy(data)
2994 s.sendall(byteslike)
2995 self.assertEqual(s.read(), data)
2996
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002997 # Make sure sendmsg et al are disallowed to avoid
2998 # inadvertent disclosure of data and/or corruption
2999 # of the encrypted data stream
3000 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3001 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3002 self.assertRaises(NotImplementedError,
3003 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003004 s.write(b"over\n")
3005
3006 self.assertRaises(ValueError, s.recv, -1)
3007 self.assertRaises(ValueError, s.read, -1)
3008
3009 s.close()
3010
3011 def test_recv_zero(self):
3012 server = ThreadedEchoServer(CERTFILE)
3013 server.__enter__()
3014 self.addCleanup(server.__exit__, None, None)
3015 s = socket.create_connection((HOST, server.port))
3016 self.addCleanup(s.close)
3017 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3018 self.addCleanup(s.close)
3019
3020 # recv/read(0) should return no data
3021 s.send(b"data")
3022 self.assertEqual(s.recv(0), b"")
3023 self.assertEqual(s.read(0), b"")
3024 self.assertEqual(s.read(), b"data")
3025
3026 # Should not block if the other end sends no data
3027 s.setblocking(False)
3028 self.assertEqual(s.recv(0), b"")
3029 self.assertEqual(s.recv_into(bytearray()), 0)
3030
3031 def test_nonblocking_send(self):
3032 server = ThreadedEchoServer(CERTFILE,
3033 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003034 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003035 cacerts=CERTFILE,
3036 chatty=True,
3037 connectionchatty=False)
3038 with server:
3039 s = test_wrap_socket(socket.socket(),
3040 server_side=False,
3041 certfile=CERTFILE,
3042 ca_certs=CERTFILE,
3043 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003044 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003045 s.connect((HOST, server.port))
3046 s.setblocking(False)
3047
3048 # If we keep sending data, at some point the buffers
3049 # will be full and the call will block
3050 buf = bytearray(8192)
3051 def fill_buffer():
3052 while True:
3053 s.send(buf)
3054 self.assertRaises((ssl.SSLWantWriteError,
3055 ssl.SSLWantReadError), fill_buffer)
3056
3057 # Now read all the output and discard it
3058 s.setblocking(True)
3059 s.close()
3060
3061 def test_handshake_timeout(self):
3062 # Issue #5103: SSL handshake must respect the socket timeout
3063 server = socket.socket(socket.AF_INET)
3064 host = "127.0.0.1"
3065 port = support.bind_port(server)
3066 started = threading.Event()
3067 finish = False
3068
3069 def serve():
3070 server.listen()
3071 started.set()
3072 conns = []
3073 while not finish:
3074 r, w, e = select.select([server], [], [], 0.1)
3075 if server in r:
3076 # Let the socket hang around rather than having
3077 # it closed by garbage collection.
3078 conns.append(server.accept()[0])
3079 for sock in conns:
3080 sock.close()
3081
3082 t = threading.Thread(target=serve)
3083 t.start()
3084 started.wait()
3085
3086 try:
3087 try:
3088 c = socket.socket(socket.AF_INET)
3089 c.settimeout(0.2)
3090 c.connect((host, port))
3091 # Will attempt handshake and time out
3092 self.assertRaisesRegex(socket.timeout, "timed out",
3093 test_wrap_socket, c)
3094 finally:
3095 c.close()
3096 try:
3097 c = socket.socket(socket.AF_INET)
3098 c = test_wrap_socket(c)
3099 c.settimeout(0.2)
3100 # Will attempt handshake and time out
3101 self.assertRaisesRegex(socket.timeout, "timed out",
3102 c.connect, (host, port))
3103 finally:
3104 c.close()
3105 finally:
3106 finish = True
3107 t.join()
3108 server.close()
3109
3110 def test_server_accept(self):
3111 # Issue #16357: accept() on a SSLSocket created through
3112 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003113 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003114 context.verify_mode = ssl.CERT_REQUIRED
3115 context.load_verify_locations(CERTFILE)
3116 context.load_cert_chain(CERTFILE)
3117 server = socket.socket(socket.AF_INET)
3118 host = "127.0.0.1"
3119 port = support.bind_port(server)
3120 server = context.wrap_socket(server, server_side=True)
3121 self.assertTrue(server.server_side)
3122
3123 evt = threading.Event()
3124 remote = None
3125 peer = None
3126 def serve():
3127 nonlocal remote, peer
3128 server.listen()
3129 # Block on the accept and wait on the connection to close.
3130 evt.set()
3131 remote, peer = server.accept()
3132 remote.recv(1)
3133
3134 t = threading.Thread(target=serve)
3135 t.start()
3136 # Client wait until server setup and perform a connect.
3137 evt.wait()
3138 client = context.wrap_socket(socket.socket())
3139 client.connect((host, port))
3140 client_addr = client.getsockname()
3141 client.close()
3142 t.join()
3143 remote.close()
3144 server.close()
3145 # Sanity checks.
3146 self.assertIsInstance(remote, ssl.SSLSocket)
3147 self.assertEqual(peer, client_addr)
3148
3149 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003150 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003151 with context.wrap_socket(socket.socket()) as sock:
3152 with self.assertRaises(OSError) as cm:
3153 sock.getpeercert()
3154 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3155
3156 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003157 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003158 with context.wrap_socket(socket.socket()) as sock:
3159 with self.assertRaises(OSError) as cm:
3160 sock.do_handshake()
3161 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3162
3163 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003164 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003165 try:
3166 # Force a set of weak ciphers on our client context
3167 context.set_ciphers("DES")
3168 except ssl.SSLError:
3169 self.skipTest("no DES cipher available")
3170 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003171 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003172 chatty=False) as server:
3173 with context.wrap_socket(socket.socket()) as s:
3174 with self.assertRaises(OSError):
3175 s.connect((HOST, server.port))
3176 self.assertIn("no shared cipher", server.conn_errors[0])
3177
3178 def test_version_basic(self):
3179 """
3180 Basic tests for SSLSocket.version().
3181 More tests are done in the test_protocol_*() methods.
3182 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003183 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3184 context.check_hostname = False
3185 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003186 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003187 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003188 chatty=False) as server:
3189 with context.wrap_socket(socket.socket()) as s:
3190 self.assertIs(s.version(), None)
3191 s.connect((HOST, server.port))
Christian Heimesa170fa12017-09-15 20:27:30 +02003192 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3193 self.assertEqual(s.version(), 'TLSv1.2')
3194 else: # 0.9.8 to 1.0.1
3195 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003196 self.assertIs(s.version(), None)
3197
Christian Heimescb5b68a2017-09-07 18:07:00 -07003198 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3199 "test requires TLSv1.3 enabled OpenSSL")
3200 def test_tls1_3(self):
3201 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3202 context.load_cert_chain(CERTFILE)
3203 # disable all but TLS 1.3
3204 context.options |= (
3205 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3206 )
3207 with ThreadedEchoServer(context=context) as server:
3208 with context.wrap_socket(socket.socket()) as s:
3209 s.connect((HOST, server.port))
3210 self.assertIn(s.cipher()[0], [
3211 'TLS13-AES-256-GCM-SHA384',
3212 'TLS13-CHACHA20-POLY1305-SHA256',
3213 'TLS13-AES-128-GCM-SHA256',
3214 ])
3215
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003216 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3217 def test_default_ecdh_curve(self):
3218 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3219 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003220 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003221 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003222 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3223 # cipher name.
3224 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003225 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3226 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3227 # our default cipher list should prefer ECDH-based ciphers
3228 # automatically.
3229 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3230 context.set_ciphers("ECCdraft:ECDH")
3231 with ThreadedEchoServer(context=context) as server:
3232 with context.wrap_socket(socket.socket()) as s:
3233 s.connect((HOST, server.port))
3234 self.assertIn("ECDH", s.cipher()[0])
3235
3236 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3237 "'tls-unique' channel binding not available")
3238 def test_tls_unique_channel_binding(self):
3239 """Test tls-unique channel binding."""
3240 if support.verbose:
3241 sys.stdout.write("\n")
3242
3243 server = ThreadedEchoServer(CERTFILE,
3244 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003245 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003246 cacerts=CERTFILE,
3247 chatty=True,
3248 connectionchatty=False)
3249 with server:
3250 s = test_wrap_socket(socket.socket(),
3251 server_side=False,
3252 certfile=CERTFILE,
3253 ca_certs=CERTFILE,
3254 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003255 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003256 s.connect((HOST, server.port))
3257 # get the data
3258 cb_data = s.get_channel_binding("tls-unique")
3259 if support.verbose:
3260 sys.stdout.write(" got channel binding data: {0!r}\n"
3261 .format(cb_data))
3262
3263 # check if it is sane
3264 self.assertIsNotNone(cb_data)
3265 self.assertEqual(len(cb_data), 12) # True for TLSv1
3266
3267 # and compare with the peers version
3268 s.write(b"CB tls-unique\n")
3269 peer_data_repr = s.read().strip()
3270 self.assertEqual(peer_data_repr,
3271 repr(cb_data).encode("us-ascii"))
3272 s.close()
3273
3274 # now, again
3275 s = test_wrap_socket(socket.socket(),
3276 server_side=False,
3277 certfile=CERTFILE,
3278 ca_certs=CERTFILE,
3279 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003280 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003281 s.connect((HOST, server.port))
3282 new_cb_data = s.get_channel_binding("tls-unique")
3283 if support.verbose:
3284 sys.stdout.write(" got another channel binding data: {0!r}\n"
3285 .format(new_cb_data))
3286 # is it really unique
3287 self.assertNotEqual(cb_data, new_cb_data)
3288 self.assertIsNotNone(cb_data)
3289 self.assertEqual(len(cb_data), 12) # True for TLSv1
3290 s.write(b"CB tls-unique\n")
3291 peer_data_repr = s.read().strip()
3292 self.assertEqual(peer_data_repr,
3293 repr(new_cb_data).encode("us-ascii"))
3294 s.close()
3295
3296 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003297 client_context, server_context, hostname = testing_context()
3298 stats = server_params_test(client_context, server_context,
3299 chatty=True, connectionchatty=True,
3300 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003301 if support.verbose:
3302 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3303 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3304
3305 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3306 "ssl.OP_NO_COMPRESSION needed for this test")
3307 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003308 client_context, server_context, hostname = testing_context()
3309 client_context.options |= ssl.OP_NO_COMPRESSION
3310 server_context.options |= ssl.OP_NO_COMPRESSION
3311 stats = server_params_test(client_context, server_context,
3312 chatty=True, connectionchatty=True,
3313 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003314 self.assertIs(stats['compression'], None)
3315
3316 def test_dh_params(self):
3317 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003318 client_context, server_context, hostname = testing_context()
3319 server_context.load_dh_params(DHFILE)
3320 server_context.set_ciphers("kEDH")
3321 stats = server_params_test(client_context, server_context,
3322 chatty=True, connectionchatty=True,
3323 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003324 cipher = stats["cipher"][0]
3325 parts = cipher.split("-")
3326 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3327 self.fail("Non-DH cipher: " + cipher[0])
3328
3329 def test_selected_alpn_protocol(self):
3330 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003331 client_context, server_context, hostname = testing_context()
3332 stats = server_params_test(client_context, server_context,
3333 chatty=True, connectionchatty=True,
3334 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003335 self.assertIs(stats['client_alpn_protocol'], None)
3336
3337 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3338 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3339 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003340 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003341 server_context.set_alpn_protocols(['foo', 'bar'])
3342 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003343 chatty=True, connectionchatty=True,
3344 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003345 self.assertIs(stats['client_alpn_protocol'], None)
3346
3347 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3348 def test_alpn_protocols(self):
3349 server_protocols = ['foo', 'bar', 'milkshake']
3350 protocol_tests = [
3351 (['foo', 'bar'], 'foo'),
3352 (['bar', 'foo'], 'foo'),
3353 (['milkshake'], 'milkshake'),
3354 (['http/3.0', 'http/4.0'], None)
3355 ]
3356 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003357 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003358 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003359 client_context.set_alpn_protocols(client_protocols)
3360
3361 try:
3362 stats = server_params_test(client_context,
3363 server_context,
3364 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003365 connectionchatty=True,
3366 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003367 except ssl.SSLError as e:
3368 stats = e
3369
3370 if (expected is None and IS_OPENSSL_1_1
3371 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3372 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3373 self.assertIsInstance(stats, ssl.SSLError)
3374 else:
3375 msg = "failed trying %s (s) and %s (c).\n" \
3376 "was expecting %s, but got %%s from the %%s" \
3377 % (str(server_protocols), str(client_protocols),
3378 str(expected))
3379 client_result = stats['client_alpn_protocol']
3380 self.assertEqual(client_result, expected,
3381 msg % (client_result, "client"))
3382 server_result = stats['server_alpn_protocols'][-1] \
3383 if len(stats['server_alpn_protocols']) else 'nothing'
3384 self.assertEqual(server_result, expected,
3385 msg % (server_result, "server"))
3386
3387 def test_selected_npn_protocol(self):
3388 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003389 client_context, server_context, hostname = testing_context()
3390 stats = server_params_test(client_context, server_context,
3391 chatty=True, connectionchatty=True,
3392 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003393 self.assertIs(stats['client_npn_protocol'], None)
3394
3395 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3396 def test_npn_protocols(self):
3397 server_protocols = ['http/1.1', 'spdy/2']
3398 protocol_tests = [
3399 (['http/1.1', 'spdy/2'], 'http/1.1'),
3400 (['spdy/2', 'http/1.1'], 'http/1.1'),
3401 (['spdy/2', 'test'], 'spdy/2'),
3402 (['abc', 'def'], 'abc')
3403 ]
3404 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003405 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003406 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003407 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003408 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003409 chatty=True, connectionchatty=True,
3410 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003411 msg = "failed trying %s (s) and %s (c).\n" \
3412 "was expecting %s, but got %%s from the %%s" \
3413 % (str(server_protocols), str(client_protocols),
3414 str(expected))
3415 client_result = stats['client_npn_protocol']
3416 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3417 server_result = stats['server_npn_protocols'][-1] \
3418 if len(stats['server_npn_protocols']) else 'nothing'
3419 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3420
3421 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003422 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003423 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003424 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003425 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003426 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003427 client_context.load_verify_locations(SIGNING_CA)
3428 return server_context, other_context, client_context
3429
3430 def check_common_name(self, stats, name):
3431 cert = stats['peercert']
3432 self.assertIn((('commonName', name),), cert['subject'])
3433
3434 @needs_sni
3435 def test_sni_callback(self):
3436 calls = []
3437 server_context, other_context, client_context = self.sni_contexts()
3438
Christian Heimesa170fa12017-09-15 20:27:30 +02003439 client_context.check_hostname = False
3440
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003441 def servername_cb(ssl_sock, server_name, initial_context):
3442 calls.append((server_name, initial_context))
3443 if server_name is not None:
3444 ssl_sock.context = other_context
3445 server_context.set_servername_callback(servername_cb)
3446
3447 stats = server_params_test(client_context, server_context,
3448 chatty=True,
3449 sni_name='supermessage')
3450 # The hostname was fetched properly, and the certificate was
3451 # changed for the connection.
3452 self.assertEqual(calls, [("supermessage", server_context)])
3453 # CERTFILE4 was selected
3454 self.check_common_name(stats, 'fakehostname')
3455
3456 calls = []
3457 # The callback is called with server_name=None
3458 stats = server_params_test(client_context, server_context,
3459 chatty=True,
3460 sni_name=None)
3461 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003462 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003463
3464 # Check disabling the callback
3465 calls = []
3466 server_context.set_servername_callback(None)
3467
3468 stats = server_params_test(client_context, server_context,
3469 chatty=True,
3470 sni_name='notfunny')
3471 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003472 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003473 self.assertEqual(calls, [])
3474
3475 @needs_sni
3476 def test_sni_callback_alert(self):
3477 # Returning a TLS alert is reflected to the connecting client
3478 server_context, other_context, client_context = self.sni_contexts()
3479
3480 def cb_returning_alert(ssl_sock, server_name, initial_context):
3481 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3482 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003483 with self.assertRaises(ssl.SSLError) as cm:
3484 stats = server_params_test(client_context, server_context,
3485 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003486 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003487 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003488
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003489 @needs_sni
3490 def test_sni_callback_raising(self):
3491 # Raising fails the connection with a TLS handshake failure alert.
3492 server_context, other_context, client_context = self.sni_contexts()
3493
3494 def cb_raising(ssl_sock, server_name, initial_context):
3495 1/0
3496 server_context.set_servername_callback(cb_raising)
3497
3498 with self.assertRaises(ssl.SSLError) as cm, \
3499 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003500 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003501 chatty=False,
3502 sni_name='supermessage')
3503 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3504 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003505
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003506 @needs_sni
3507 def test_sni_callback_wrong_return_type(self):
3508 # Returning the wrong return type terminates the TLS connection
3509 # with an internal error alert.
3510 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003511
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003512 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3513 return "foo"
3514 server_context.set_servername_callback(cb_wrong_return_type)
3515
3516 with self.assertRaises(ssl.SSLError) as cm, \
3517 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003518 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003519 chatty=False,
3520 sni_name='supermessage')
3521 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3522 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003523
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003524 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003525 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003526 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3527 client_context.set_ciphers("AES128:AES256")
3528 server_context.set_ciphers("AES256")
3529 alg1 = "AES256"
3530 alg2 = "AES-256"
3531 else:
3532 client_context.set_ciphers("AES:3DES")
3533 server_context.set_ciphers("3DES")
3534 alg1 = "3DES"
3535 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003536
Christian Heimesa170fa12017-09-15 20:27:30 +02003537 stats = server_params_test(client_context, server_context,
3538 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003539 ciphers = stats['server_shared_ciphers'][0]
3540 self.assertGreater(len(ciphers), 0)
3541 for name, tls_version, bits in ciphers:
3542 if not alg1 in name.split("-") and alg2 not in name:
3543 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003544
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003545 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003546 client_context, server_context, hostname = testing_context()
3547 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003548
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003549 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003550 s = client_context.wrap_socket(socket.socket(),
3551 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003552 s.connect((HOST, server.port))
3553 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003554
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003555 self.assertRaises(ValueError, s.read, 1024)
3556 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003557
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003558 def test_sendfile(self):
3559 TEST_DATA = b"x" * 512
3560 with open(support.TESTFN, 'wb') as f:
3561 f.write(TEST_DATA)
3562 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003563 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003564 context.verify_mode = ssl.CERT_REQUIRED
3565 context.load_verify_locations(CERTFILE)
3566 context.load_cert_chain(CERTFILE)
3567 server = ThreadedEchoServer(context=context, chatty=False)
3568 with server:
3569 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003570 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003571 with open(support.TESTFN, 'rb') as file:
3572 s.sendfile(file)
3573 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003574
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003575 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003576 client_context, server_context, hostname = testing_context()
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003577
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003578 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003579 stats = server_params_test(client_context, server_context,
3580 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003581 session = stats['session']
3582 self.assertTrue(session.id)
3583 self.assertGreater(session.time, 0)
3584 self.assertGreater(session.timeout, 0)
3585 self.assertTrue(session.has_ticket)
3586 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3587 self.assertGreater(session.ticket_lifetime_hint, 0)
3588 self.assertFalse(stats['session_reused'])
3589 sess_stat = server_context.session_stats()
3590 self.assertEqual(sess_stat['accept'], 1)
3591 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003592
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003593 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003594 stats = server_params_test(client_context, server_context,
3595 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003596 sess_stat = server_context.session_stats()
3597 self.assertEqual(sess_stat['accept'], 2)
3598 self.assertEqual(sess_stat['hits'], 1)
3599 self.assertTrue(stats['session_reused'])
3600 session2 = stats['session']
3601 self.assertEqual(session2.id, session.id)
3602 self.assertEqual(session2, session)
3603 self.assertIsNot(session2, session)
3604 self.assertGreaterEqual(session2.time, session.time)
3605 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003606
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003607 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003608 stats = server_params_test(client_context, server_context,
3609 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003610 self.assertFalse(stats['session_reused'])
3611 session3 = stats['session']
3612 self.assertNotEqual(session3.id, session.id)
3613 self.assertNotEqual(session3, session)
3614 sess_stat = server_context.session_stats()
3615 self.assertEqual(sess_stat['accept'], 3)
3616 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003617
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003618 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02003619 stats = server_params_test(client_context, server_context,
3620 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003621 self.assertTrue(stats['session_reused'])
3622 session4 = stats['session']
3623 self.assertEqual(session4.id, session.id)
3624 self.assertEqual(session4, session)
3625 self.assertGreaterEqual(session4.time, session.time)
3626 self.assertGreaterEqual(session4.timeout, session.timeout)
3627 sess_stat = server_context.session_stats()
3628 self.assertEqual(sess_stat['accept'], 4)
3629 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02003630
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003631 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003632 client_context, server_context, hostname = testing_context()
3633 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02003634
Christian Heimescb5b68a2017-09-07 18:07:00 -07003635 # TODO: session reuse does not work with TLS 1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02003636 client_context.options |= ssl.OP_NO_TLSv1_3
3637 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07003638
Christian Heimesa170fa12017-09-15 20:27:30 +02003639 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003640 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003641 with client_context.wrap_socket(socket.socket(),
3642 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003643 # session is None before handshake
3644 self.assertEqual(s.session, None)
3645 self.assertEqual(s.session_reused, None)
3646 s.connect((HOST, server.port))
3647 session = s.session
3648 self.assertTrue(session)
3649 with self.assertRaises(TypeError) as e:
3650 s.session = object
3651 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02003652
Christian Heimesa170fa12017-09-15 20:27:30 +02003653 with client_context.wrap_socket(socket.socket(),
3654 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003655 s.connect((HOST, server.port))
3656 # cannot set session after handshake
3657 with self.assertRaises(ValueError) as e:
3658 s.session = session
3659 self.assertEqual(str(e.exception),
3660 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02003661
Christian Heimesa170fa12017-09-15 20:27:30 +02003662 with client_context.wrap_socket(socket.socket(),
3663 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003664 # can set session before handshake and before the
3665 # connection was established
3666 s.session = session
3667 s.connect((HOST, server.port))
3668 self.assertEqual(s.session.id, session.id)
3669 self.assertEqual(s.session, session)
3670 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02003671
Christian Heimesa170fa12017-09-15 20:27:30 +02003672 with client_context2.wrap_socket(socket.socket(),
3673 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003674 # cannot re-use session with a different SSLContext
3675 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02003676 s.session = session
3677 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003678 self.assertEqual(str(e.exception),
3679 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02003680
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003681
Thomas Woutersed03b412007-08-28 21:37:11 +00003682def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003683 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003684 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003685 plats = {
3686 'Linux': platform.linux_distribution,
3687 'Mac': platform.mac_ver,
3688 'Windows': platform.win32_ver,
3689 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003690 with warnings.catch_warnings():
3691 warnings.filterwarnings(
3692 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003693 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003694 'functions are deprecated .*',
3695 PendingDeprecationWarning,
3696 )
3697 for name, func in plats.items():
3698 plat = func()
3699 if plat and plat[0]:
3700 plat = '%s %r' % (name, plat)
3701 break
3702 else:
3703 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003704 print("test_ssl: testing with %r %r" %
3705 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3706 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003707 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003708 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3709 try:
3710 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3711 except AttributeError:
3712 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003713
Antoine Pitrou152efa22010-05-16 18:19:27 +00003714 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003715 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003716 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003717 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003718 BADCERT, BADKEY, EMPTYCERT]:
3719 if not os.path.exists(filename):
3720 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003721
Martin Panter3840b2a2016-03-27 01:53:46 +00003722 tests = [
3723 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003724 SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00003725 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003726
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003727 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003728 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003729
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003730 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00003731 try:
3732 support.run_unittest(*tests)
3733 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003734 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003735
3736if __name__ == "__main__":
3737 test_main()