blob: 3f2c50b7795ad4149dbd01101feda8dfedb04728 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
33IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Christian Heimesefff7062013-11-21 03:35:02 +010036def data_file(*name):
37 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Antoine Pitrou81564092010-10-08 23:06:24 +000039# The custom key and certificate files used in test_ssl are generated
40# using Lib/test/make_ssl_certs.py.
41# Other certificates are simply fetched from the Internet servers they
42# are meant to authenticate.
43
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000046ONLYCERT = data_file("ssl_cert.pem")
47ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000048BYTES_ONLYCERT = os.fsencode(ONLYCERT)
49BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020050CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
51ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
52KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000053CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000054BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010055CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
56CAFILE_CACERT = data_file("capath", "5ed36f99.0")
57
Christian Heimesbd5c7d22018-01-20 15:16:30 +010058CERTFILE_INFO = {
59 'issuer': ((('countryName', 'XY'),),
60 (('localityName', 'Castle Anthrax'),),
61 (('organizationName', 'Python Software Foundation'),),
62 (('commonName', 'localhost'),)),
63 'notAfter': 'Jan 17 19:09:06 2028 GMT',
64 'notBefore': 'Jan 19 19:09:06 2018 GMT',
65 'serialNumber': 'F9BA076D5B6ABD9B',
66 'subject': ((('countryName', 'XY'),),
67 (('localityName', 'Castle Anthrax'),),
68 (('organizationName', 'Python Software Foundation'),),
69 (('commonName', 'localhost'),)),
70 'subjectAltName': (('DNS', 'localhost'),),
71 'version': 3
72}
Antoine Pitrou152efa22010-05-16 18:19:27 +000073
Christian Heimes22587792013-11-21 23:56:13 +010074# empty CRL
75CRLFILE = data_file("revocation.crl")
76
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010077# Two keys and certs signed by the same CA (for SNI tests)
78SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020079SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010080
81SIGNED_CERTFILE_INFO = {
82 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
83 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
84 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
85 'issuer': ((('countryName', 'XY'),),
86 (('organizationName', 'Python Software Foundation CA'),),
87 (('commonName', 'our-ca-server'),)),
88 'notAfter': 'Nov 28 19:09:06 2027 GMT',
89 'notBefore': 'Jan 19 19:09:06 2018 GMT',
90 'serialNumber': '82EDBF41C880919C',
91 'subject': ((('countryName', 'XY'),),
92 (('localityName', 'Castle Anthrax'),),
93 (('organizationName', 'Python Software Foundation'),),
94 (('commonName', 'localhost'),)),
95 'subjectAltName': (('DNS', 'localhost'),),
96 'version': 3
97}
98
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010099SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
102SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
103
Martin Panter3840b2a2016-03-27 01:53:46 +0000104# Same certificate as pycacert.pem, but without extra text in file
105SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200106# cert with all kinds of subject alt names
107ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100108IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100109
Martin Panter3d81d932016-01-14 09:36:00 +0000110REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000111
112EMPTYCERT = data_file("nullcert.pem")
113BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000114NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000115BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200116NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200117NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000118
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400119DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100120BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000121
Christian Heimes358cfd42016-09-10 22:43:48 +0200122# Not defined in all versions of OpenSSL
123OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
124OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
125OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
126OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
127
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100128
Thomas Woutersed03b412007-08-28 21:37:11 +0000129def handle_error(prefix):
130 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000131 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000132 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000133
Antoine Pitroub5218772010-05-21 09:56:06 +0000134def can_clear_options():
135 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200136 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000137
138def no_sslv2_implies_sslv3_hello():
139 # 0.9.7h or higher
140 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
141
Christian Heimes2427b502013-11-23 11:24:32 +0100142def have_verify_flags():
143 # 0.9.8 or higher
144 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
145
Antoine Pitrouc695c952014-04-28 20:57:36 +0200146def utc_offset(): #NOTE: ignore issues like #1647654
147 # local time = utc time + utc offset
148 if time.daylight and time.localtime().tm_isdst > 0:
149 return -time.altzone # seconds
150 return -time.timezone
151
Christian Heimes9424bb42013-06-17 15:32:57 +0200152def asn1time(cert_time):
153 # Some versions of OpenSSL ignore seconds, see #18207
154 # 0.9.8.i
155 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
156 fmt = "%b %d %H:%M:%S %Y GMT"
157 dt = datetime.datetime.strptime(cert_time, fmt)
158 dt = dt.replace(second=0)
159 cert_time = dt.strftime(fmt)
160 # %d adds leading zero but ASN1_TIME_print() uses leading space
161 if cert_time[4] == "0":
162 cert_time = cert_time[:4] + " " + cert_time[5:]
163
164 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000165
Antoine Pitrou23df4832010-08-04 17:14:06 +0000166# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
167def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200168 if hasattr(ssl, 'PROTOCOL_SSLv2'):
169 @functools.wraps(func)
170 def f(*args, **kwargs):
171 try:
172 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
173 except ssl.SSLError:
174 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
175 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
176 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
177 return func(*args, **kwargs)
178 return f
179 else:
180 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000181
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100182needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184
Christian Heimesd0486372016-09-10 23:23:33 +0200185def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
186 cert_reqs=ssl.CERT_NONE, ca_certs=None,
187 ciphers=None, certfile=None, keyfile=None,
188 **kwargs):
189 context = ssl.SSLContext(ssl_version)
190 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200191 if cert_reqs == ssl.CERT_NONE:
192 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200193 context.verify_mode = cert_reqs
194 if ca_certs is not None:
195 context.load_verify_locations(ca_certs)
196 if certfile is not None or keyfile is not None:
197 context.load_cert_chain(certfile, keyfile)
198 if ciphers is not None:
199 context.set_ciphers(ciphers)
200 return context.wrap_socket(sock, **kwargs)
201
Christian Heimesa170fa12017-09-15 20:27:30 +0200202
203def testing_context(server_cert=SIGNED_CERTFILE):
204 """Create context
205
206 client_context, server_context, hostname = testing_context()
207 """
208 if server_cert == SIGNED_CERTFILE:
209 hostname = SIGNED_CERTFILE_HOSTNAME
210 elif server_cert == SIGNED_CERTFILE2:
211 hostname = SIGNED_CERTFILE2_HOSTNAME
212 else:
213 raise ValueError(server_cert)
214
215 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
216 client_context.load_verify_locations(SIGNING_CA)
217
218 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
219 server_context.load_cert_chain(server_cert)
220
221 return client_context, server_context, hostname
222
223
Antoine Pitrou152efa22010-05-16 18:19:27 +0000224class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000225
Antoine Pitrou480a1242010-04-28 21:37:09 +0000226 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000227 ssl.CERT_NONE
228 ssl.CERT_OPTIONAL
229 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100230 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100231 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100232 if ssl.HAS_ECDH:
233 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100234 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
235 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000236 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100237 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700238 ssl.OP_NO_SSLv2
239 ssl.OP_NO_SSLv3
240 ssl.OP_NO_TLSv1
241 ssl.OP_NO_TLSv1_3
242 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
243 ssl.OP_NO_TLSv1_1
244 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200245 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000246
Antoine Pitrou172f0252014-04-18 20:33:08 +0200247 def test_str_for_enums(self):
248 # Make sure that the PROTOCOL_* constants have enum-like string
249 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200250 proto = ssl.PROTOCOL_TLS
251 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200252 ctx = ssl.SSLContext(proto)
253 self.assertIs(ctx.protocol, proto)
254
Antoine Pitrou480a1242010-04-28 21:37:09 +0000255 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000256 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000257 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000258 sys.stdout.write("\n RAND_status is %d (%s)\n"
259 % (v, (v and "sufficient randomness") or
260 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200261
262 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
263 self.assertEqual(len(data), 16)
264 self.assertEqual(is_cryptographic, v == 1)
265 if v:
266 data = ssl.RAND_bytes(16)
267 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200268 else:
269 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200270
Victor Stinner1e81a392013-12-19 16:47:04 +0100271 # negative num is invalid
272 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
273 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
274
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100275 if hasattr(ssl, 'RAND_egd'):
276 self.assertRaises(TypeError, ssl.RAND_egd, 1)
277 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200279 ssl.RAND_add(b"this is a random bytes object", 75.0)
280 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000281
Christian Heimesf77b4b22013-08-21 13:26:05 +0200282 @unittest.skipUnless(os.name == 'posix', 'requires posix')
283 def test_random_fork(self):
284 status = ssl.RAND_status()
285 if not status:
286 self.fail("OpenSSL's PRNG has insufficient randomness")
287
288 rfd, wfd = os.pipe()
289 pid = os.fork()
290 if pid == 0:
291 try:
292 os.close(rfd)
293 child_random = ssl.RAND_pseudo_bytes(16)[0]
294 self.assertEqual(len(child_random), 16)
295 os.write(wfd, child_random)
296 os.close(wfd)
297 except BaseException:
298 os._exit(1)
299 else:
300 os._exit(0)
301 else:
302 os.close(wfd)
303 self.addCleanup(os.close, rfd)
304 _, status = os.waitpid(pid, 0)
305 self.assertEqual(status, 0)
306
307 child_random = os.read(rfd, 16)
308 self.assertEqual(len(child_random), 16)
309 parent_random = ssl.RAND_pseudo_bytes(16)[0]
310 self.assertEqual(len(parent_random), 16)
311
312 self.assertNotEqual(child_random, parent_random)
313
Antoine Pitrou480a1242010-04-28 21:37:09 +0000314 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000315 # note that this uses an 'unofficial' function in _ssl.c,
316 # provided solely for this test, to exercise the certificate
317 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100318 self.assertEqual(
319 ssl._ssl._test_decode_cert(CERTFILE),
320 CERTFILE_INFO
321 )
322 self.assertEqual(
323 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
324 SIGNED_CERTFILE_INFO
325 )
326
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200327 # Issue #13034: the subjectAltName in some certificates
328 # (notably projects.developer.nokia.com:443) wasn't parsed
329 p = ssl._ssl._test_decode_cert(NOKIACERT)
330 if support.verbose:
331 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
332 self.assertEqual(p['subjectAltName'],
333 (('DNS', 'projects.developer.nokia.com'),
334 ('DNS', 'projects.forum.nokia.com'))
335 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100336 # extra OCSP and AIA fields
337 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
338 self.assertEqual(p['caIssuers'],
339 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
340 self.assertEqual(p['crlDistributionPoints'],
341 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000342
Christian Heimes824f7f32013-08-17 00:54:47 +0200343 def test_parse_cert_CVE_2013_4238(self):
344 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
345 if support.verbose:
346 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
347 subject = ((('countryName', 'US'),),
348 (('stateOrProvinceName', 'Oregon'),),
349 (('localityName', 'Beaverton'),),
350 (('organizationName', 'Python Software Foundation'),),
351 (('organizationalUnitName', 'Python Core Development'),),
352 (('commonName', 'null.python.org\x00example.org'),),
353 (('emailAddress', 'python-dev@python.org'),))
354 self.assertEqual(p['subject'], subject)
355 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200356 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
357 san = (('DNS', 'altnull.python.org\x00example.com'),
358 ('email', 'null@python.org\x00user@example.org'),
359 ('URI', 'http://null.python.org\x00http://example.org'),
360 ('IP Address', '192.0.2.1'),
361 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
362 else:
363 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
364 san = (('DNS', 'altnull.python.org\x00example.com'),
365 ('email', 'null@python.org\x00user@example.org'),
366 ('URI', 'http://null.python.org\x00http://example.org'),
367 ('IP Address', '192.0.2.1'),
368 ('IP Address', '<invalid>'))
369
370 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200371
Christian Heimes1c03abd2016-09-06 23:25:35 +0200372 def test_parse_all_sans(self):
373 p = ssl._ssl._test_decode_cert(ALLSANFILE)
374 self.assertEqual(p['subjectAltName'],
375 (
376 ('DNS', 'allsans'),
377 ('othername', '<unsupported>'),
378 ('othername', '<unsupported>'),
379 ('email', 'user@example.org'),
380 ('DNS', 'www.example.org'),
381 ('DirName',
382 ((('countryName', 'XY'),),
383 (('localityName', 'Castle Anthrax'),),
384 (('organizationName', 'Python Software Foundation'),),
385 (('commonName', 'dirname example'),))),
386 ('URI', 'https://www.python.org/'),
387 ('IP Address', '127.0.0.1'),
388 ('IP Address', '0:0:0:0:0:0:0:1\n'),
389 ('Registered ID', '1.2.3.4.5')
390 )
391 )
392
Antoine Pitrou480a1242010-04-28 21:37:09 +0000393 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000394 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000395 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000396 d1 = ssl.PEM_cert_to_DER_cert(pem)
397 p2 = ssl.DER_cert_to_PEM_cert(d1)
398 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000399 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000400 if not p2.startswith(ssl.PEM_HEADER + '\n'):
401 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
402 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
403 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000404
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000405 def test_openssl_version(self):
406 n = ssl.OPENSSL_VERSION_NUMBER
407 t = ssl.OPENSSL_VERSION_INFO
408 s = ssl.OPENSSL_VERSION
409 self.assertIsInstance(n, int)
410 self.assertIsInstance(t, tuple)
411 self.assertIsInstance(s, str)
412 # Some sanity checks follow
413 # >= 0.9
414 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400415 # < 3.0
416 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000417 major, minor, fix, patch, status = t
418 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400419 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000420 self.assertGreaterEqual(minor, 0)
421 self.assertLess(minor, 256)
422 self.assertGreaterEqual(fix, 0)
423 self.assertLess(fix, 256)
424 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100425 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000426 self.assertGreaterEqual(status, 0)
427 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400428 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200429 if IS_LIBRESSL:
430 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100431 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400432 else:
433 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100434 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000435
Antoine Pitrou9d543662010-04-23 23:10:32 +0000436 @support.cpython_only
437 def test_refcycle(self):
438 # Issue #7943: an SSL object doesn't create reference cycles with
439 # itself.
440 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200441 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000442 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100443 with support.check_warnings(("", ResourceWarning)):
444 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100445 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000446
Antoine Pitroua468adc2010-09-14 14:43:44 +0000447 def test_wrapped_unconnected(self):
448 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200449 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000450 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200451 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100452 self.assertRaises(OSError, ss.recv, 1)
453 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
454 self.assertRaises(OSError, ss.recvfrom, 1)
455 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
456 self.assertRaises(OSError, ss.send, b'x')
457 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -0800458 self.assertRaises(NotImplementedError, ss.sendmsg,
459 [b'x'], (), 0, ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000460
Antoine Pitrou40f08742010-04-24 22:04:40 +0000461 def test_timeout(self):
462 # Issue #8524: when creating an SSL socket, the timeout of the
463 # original socket should be retained.
464 for timeout in (None, 0.0, 5.0):
465 s = socket.socket(socket.AF_INET)
466 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200467 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100468 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000469
Christian Heimesd0486372016-09-10 23:23:33 +0200470 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000471 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000472 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000473 "certfile must be specified",
474 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000475 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000476 "certfile must be specified for server-side operations",
477 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000478 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000479 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200480 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100481 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
482 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200483 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200484 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000485 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000486 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000487 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200488 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000489 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000490 ssl.wrap_socket(sock,
491 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000492 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200493 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000494 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000495 ssl.wrap_socket(sock,
496 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000497 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000498
Martin Panter3464ea22016-02-01 21:58:11 +0000499 def bad_cert_test(self, certfile):
500 """Check that trying to use the given client certificate fails"""
501 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
502 certfile)
503 sock = socket.socket()
504 self.addCleanup(sock.close)
505 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200506 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200507 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000508
509 def test_empty_cert(self):
510 """Wrapping with an empty cert file"""
511 self.bad_cert_test("nullcert.pem")
512
513 def test_malformed_cert(self):
514 """Wrapping with a badly formatted certificate (syntax error)"""
515 self.bad_cert_test("badcert.pem")
516
517 def test_malformed_key(self):
518 """Wrapping with a badly formatted key (syntax error)"""
519 self.bad_cert_test("badkey.pem")
520
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000521 def test_match_hostname(self):
522 def ok(cert, hostname):
523 ssl.match_hostname(cert, hostname)
524 def fail(cert, hostname):
525 self.assertRaises(ssl.CertificateError,
526 ssl.match_hostname, cert, hostname)
527
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100528 # -- Hostname matching --
529
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000530 cert = {'subject': ((('commonName', 'example.com'),),)}
531 ok(cert, 'example.com')
532 ok(cert, 'ExAmple.cOm')
533 fail(cert, 'www.example.com')
534 fail(cert, '.example.com')
535 fail(cert, 'example.org')
536 fail(cert, 'exampleXcom')
537
538 cert = {'subject': ((('commonName', '*.a.com'),),)}
539 ok(cert, 'foo.a.com')
540 fail(cert, 'bar.foo.a.com')
541 fail(cert, 'a.com')
542 fail(cert, 'Xa.com')
543 fail(cert, '.a.com')
544
Mandeep Singhede2ac92017-11-27 04:01:27 +0530545 # only match wildcards when they are the only thing
546 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000547 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530548 fail(cert, 'foo.com')
549 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000550 fail(cert, 'bar.com')
551 fail(cert, 'foo.a.com')
552 fail(cert, 'bar.foo.com')
553
Christian Heimes824f7f32013-08-17 00:54:47 +0200554 # NULL bytes are bad, CVE-2013-4073
555 cert = {'subject': ((('commonName',
556 'null.python.org\x00example.org'),),)}
557 ok(cert, 'null.python.org\x00example.org') # or raise an error?
558 fail(cert, 'example.org')
559 fail(cert, 'null.python.org')
560
Georg Brandl72c98d32013-10-27 07:16:53 +0100561 # error cases with wildcards
562 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
563 fail(cert, 'bar.foo.a.com')
564 fail(cert, 'a.com')
565 fail(cert, 'Xa.com')
566 fail(cert, '.a.com')
567
568 cert = {'subject': ((('commonName', 'a.*.com'),),)}
569 fail(cert, 'a.foo.com')
570 fail(cert, 'a..com')
571 fail(cert, 'a.com')
572
573 # wildcard doesn't match IDNA prefix 'xn--'
574 idna = 'püthon.python.org'.encode("idna").decode("ascii")
575 cert = {'subject': ((('commonName', idna),),)}
576 ok(cert, idna)
577 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
578 fail(cert, idna)
579 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
580 fail(cert, idna)
581
582 # wildcard in first fragment and IDNA A-labels in sequent fragments
583 # are supported.
584 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
585 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530586 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
587 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100588 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
589 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
590
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000591 # Slightly fake real-world example
592 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
593 'subject': ((('commonName', 'linuxfrz.org'),),),
594 'subjectAltName': (('DNS', 'linuxfr.org'),
595 ('DNS', 'linuxfr.com'),
596 ('othername', '<unsupported>'))}
597 ok(cert, 'linuxfr.org')
598 ok(cert, 'linuxfr.com')
599 # Not a "DNS" entry
600 fail(cert, '<unsupported>')
601 # When there is a subjectAltName, commonName isn't used
602 fail(cert, 'linuxfrz.org')
603
604 # A pristine real-world example
605 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
606 'subject': ((('countryName', 'US'),),
607 (('stateOrProvinceName', 'California'),),
608 (('localityName', 'Mountain View'),),
609 (('organizationName', 'Google Inc'),),
610 (('commonName', 'mail.google.com'),))}
611 ok(cert, 'mail.google.com')
612 fail(cert, 'gmail.com')
613 # Only commonName is considered
614 fail(cert, 'California')
615
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100616 # -- IPv4 matching --
617 cert = {'subject': ((('commonName', 'example.com'),),),
618 'subjectAltName': (('DNS', 'example.com'),
619 ('IP Address', '10.11.12.13'),
620 ('IP Address', '14.15.16.17'))}
621 ok(cert, '10.11.12.13')
622 ok(cert, '14.15.16.17')
623 fail(cert, '14.15.16.18')
624 fail(cert, 'example.net')
625
626 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800627 if hasattr(socket, 'AF_INET6'):
628 cert = {'subject': ((('commonName', 'example.com'),),),
629 'subjectAltName': (
630 ('DNS', 'example.com'),
631 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
632 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
633 ok(cert, '2001::cafe')
634 ok(cert, '2003::baba')
635 fail(cert, '2003::bebe')
636 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100637
638 # -- Miscellaneous --
639
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000640 # Neither commonName nor subjectAltName
641 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
642 'subject': ((('countryName', 'US'),),
643 (('stateOrProvinceName', 'California'),),
644 (('localityName', 'Mountain View'),),
645 (('organizationName', 'Google Inc'),))}
646 fail(cert, 'mail.google.com')
647
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200648 # No DNS entry in subjectAltName but a commonName
649 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
650 'subject': ((('countryName', 'US'),),
651 (('stateOrProvinceName', 'California'),),
652 (('localityName', 'Mountain View'),),
653 (('commonName', 'mail.google.com'),)),
654 'subjectAltName': (('othername', 'blabla'), )}
655 ok(cert, 'mail.google.com')
656
657 # No DNS entry subjectAltName and no commonName
658 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
659 'subject': ((('countryName', 'US'),),
660 (('stateOrProvinceName', 'California'),),
661 (('localityName', 'Mountain View'),),
662 (('organizationName', 'Google Inc'),)),
663 'subjectAltName': (('othername', 'blabla'),)}
664 fail(cert, 'google.com')
665
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000666 # Empty cert / no cert
667 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
668 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
669
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200670 # Issue #17980: avoid denials of service by refusing more than one
671 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800672 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
673 with self.assertRaisesRegex(
674 ssl.CertificateError,
675 "partial wildcards in leftmost label are not supported"):
676 ssl.match_hostname(cert, 'axxb.example.com')
677
678 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
679 with self.assertRaisesRegex(
680 ssl.CertificateError,
681 "wildcard can only be present in the leftmost label"):
682 ssl.match_hostname(cert, 'www.sub.example.com')
683
684 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
685 with self.assertRaisesRegex(
686 ssl.CertificateError,
687 "too many wildcards"):
688 ssl.match_hostname(cert, 'axxbxxc.example.com')
689
690 cert = {'subject': ((('commonName', '*'),),)}
691 with self.assertRaisesRegex(
692 ssl.CertificateError,
693 "sole wildcard without additional labels are not support"):
694 ssl.match_hostname(cert, 'host')
695
696 cert = {'subject': ((('commonName', '*.com'),),)}
697 with self.assertRaisesRegex(
698 ssl.CertificateError,
699 r"hostname 'com' doesn't match '\*.com'"):
700 ssl.match_hostname(cert, 'com')
701
702 # extra checks for _inet_paton()
703 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
704 with self.assertRaises(ValueError):
705 ssl._inet_paton(invalid)
706 for ipaddr in ['127.0.0.1', '192.168.0.1']:
707 self.assertTrue(ssl._inet_paton(ipaddr))
708 if hasattr(socket, 'AF_INET6'):
709 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
710 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200711
Antoine Pitroud5323212010-10-22 18:19:07 +0000712 def test_server_side(self):
713 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200714 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000715 with socket.socket() as sock:
716 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
717 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000718
Antoine Pitroud6494802011-07-21 01:11:30 +0200719 def test_unknown_channel_binding(self):
720 # should raise ValueError for unknown type
721 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200722 s.bind(('127.0.0.1', 0))
723 s.listen()
724 c = socket.socket(socket.AF_INET)
725 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200726 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100727 with self.assertRaises(ValueError):
728 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200729 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200730
731 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
732 "'tls-unique' channel binding not available")
733 def test_tls_unique_channel_binding(self):
734 # unconnected should return None for known type
735 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200736 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100737 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200738 # the same for server-side
739 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200740 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100741 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200742
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600743 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200744 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600745 r = repr(ss)
746 with self.assertWarns(ResourceWarning) as cm:
747 ss = None
748 support.gc_collect()
749 self.assertIn(r, str(cm.warning.args[0]))
750
Christian Heimes6d7ad132013-06-09 18:02:55 +0200751 def test_get_default_verify_paths(self):
752 paths = ssl.get_default_verify_paths()
753 self.assertEqual(len(paths), 6)
754 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
755
756 with support.EnvironmentVarGuard() as env:
757 env["SSL_CERT_DIR"] = CAPATH
758 env["SSL_CERT_FILE"] = CERTFILE
759 paths = ssl.get_default_verify_paths()
760 self.assertEqual(paths.cafile, CERTFILE)
761 self.assertEqual(paths.capath, CAPATH)
762
Christian Heimes44109d72013-11-22 01:51:30 +0100763 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
764 def test_enum_certificates(self):
765 self.assertTrue(ssl.enum_certificates("CA"))
766 self.assertTrue(ssl.enum_certificates("ROOT"))
767
768 self.assertRaises(TypeError, ssl.enum_certificates)
769 self.assertRaises(WindowsError, ssl.enum_certificates, "")
770
Christian Heimesc2d65e12013-11-22 16:13:55 +0100771 trust_oids = set()
772 for storename in ("CA", "ROOT"):
773 store = ssl.enum_certificates(storename)
774 self.assertIsInstance(store, list)
775 for element in store:
776 self.assertIsInstance(element, tuple)
777 self.assertEqual(len(element), 3)
778 cert, enc, trust = element
779 self.assertIsInstance(cert, bytes)
780 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
781 self.assertIsInstance(trust, (set, bool))
782 if isinstance(trust, set):
783 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100784
785 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100786 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200787
Christian Heimes46bebee2013-06-09 19:03:31 +0200788 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100789 def test_enum_crls(self):
790 self.assertTrue(ssl.enum_crls("CA"))
791 self.assertRaises(TypeError, ssl.enum_crls)
792 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200793
Christian Heimes44109d72013-11-22 01:51:30 +0100794 crls = ssl.enum_crls("CA")
795 self.assertIsInstance(crls, list)
796 for element in crls:
797 self.assertIsInstance(element, tuple)
798 self.assertEqual(len(element), 2)
799 self.assertIsInstance(element[0], bytes)
800 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200801
Christian Heimes46bebee2013-06-09 19:03:31 +0200802
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100803 def test_asn1object(self):
804 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
805 '1.3.6.1.5.5.7.3.1')
806
807 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
808 self.assertEqual(val, expected)
809 self.assertEqual(val.nid, 129)
810 self.assertEqual(val.shortname, 'serverAuth')
811 self.assertEqual(val.longname, 'TLS Web Server Authentication')
812 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
813 self.assertIsInstance(val, ssl._ASN1Object)
814 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
815
816 val = ssl._ASN1Object.fromnid(129)
817 self.assertEqual(val, expected)
818 self.assertIsInstance(val, ssl._ASN1Object)
819 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100820 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
821 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100822 for i in range(1000):
823 try:
824 obj = ssl._ASN1Object.fromnid(i)
825 except ValueError:
826 pass
827 else:
828 self.assertIsInstance(obj.nid, int)
829 self.assertIsInstance(obj.shortname, str)
830 self.assertIsInstance(obj.longname, str)
831 self.assertIsInstance(obj.oid, (str, type(None)))
832
833 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
834 self.assertEqual(val, expected)
835 self.assertIsInstance(val, ssl._ASN1Object)
836 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
837 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
838 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100839 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
840 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100841
Christian Heimes72d28502013-11-23 13:56:58 +0100842 def test_purpose_enum(self):
843 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
844 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
845 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
846 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
847 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
848 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
849 '1.3.6.1.5.5.7.3.1')
850
851 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
852 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
853 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
854 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
855 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
856 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
857 '1.3.6.1.5.5.7.3.2')
858
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100859 def test_unsupported_dtls(self):
860 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
861 self.addCleanup(s.close)
862 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200863 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100864 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200865 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100866 with self.assertRaises(NotImplementedError) as cx:
867 ctx.wrap_socket(s)
868 self.assertEqual(str(cx.exception), "only stream sockets are supported")
869
Antoine Pitrouc695c952014-04-28 20:57:36 +0200870 def cert_time_ok(self, timestring, timestamp):
871 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
872
873 def cert_time_fail(self, timestring):
874 with self.assertRaises(ValueError):
875 ssl.cert_time_to_seconds(timestring)
876
877 @unittest.skipUnless(utc_offset(),
878 'local time needs to be different from UTC')
879 def test_cert_time_to_seconds_timezone(self):
880 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
881 # results if local timezone is not UTC
882 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
883 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
884
885 def test_cert_time_to_seconds(self):
886 timestring = "Jan 5 09:34:43 2018 GMT"
887 ts = 1515144883.0
888 self.cert_time_ok(timestring, ts)
889 # accept keyword parameter, assert its name
890 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
891 # accept both %e and %d (space or zero generated by strftime)
892 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
893 # case-insensitive
894 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
895 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
896 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
897 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
898 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
899 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
900 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
901 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
902
903 newyear_ts = 1230768000.0
904 # leap seconds
905 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
906 # same timestamp
907 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
908
909 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
910 # allow 60th second (even if it is not a leap second)
911 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
912 # allow 2nd leap second for compatibility with time.strptime()
913 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
914 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
915
Mike53f7a7c2017-12-14 14:04:53 +0300916 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200917 # 99991231235959Z (rfc 5280)
918 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
919
920 @support.run_with_locale('LC_ALL', '')
921 def test_cert_time_to_seconds_locale(self):
922 # `cert_time_to_seconds()` should be locale independent
923
924 def local_february_name():
925 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
926
927 if local_february_name().lower() == 'feb':
928 self.skipTest("locale-specific month name needs to be "
929 "different from C locale")
930
931 # locale-independent
932 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
933 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
934
Martin Panter3840b2a2016-03-27 01:53:46 +0000935 def test_connect_ex_error(self):
936 server = socket.socket(socket.AF_INET)
937 self.addCleanup(server.close)
938 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200939 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000940 cert_reqs=ssl.CERT_REQUIRED)
941 self.addCleanup(s.close)
942 rc = s.connect_ex((HOST, port))
943 # Issue #19919: Windows machines or VMs hosted on Windows
944 # machines sometimes return EWOULDBLOCK.
945 errors = (
946 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
947 errno.EWOULDBLOCK,
948 )
949 self.assertIn(rc, errors)
950
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100951
Antoine Pitrou152efa22010-05-16 18:19:27 +0000952class ContextTests(unittest.TestCase):
953
Antoine Pitrou23df4832010-08-04 17:14:06 +0000954 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000955 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100956 for protocol in PROTOCOLS:
957 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200958 ctx = ssl.SSLContext()
959 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000960 self.assertRaises(ValueError, ssl.SSLContext, -1)
961 self.assertRaises(ValueError, ssl.SSLContext, 42)
962
Antoine Pitrou23df4832010-08-04 17:14:06 +0000963 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000964 def test_protocol(self):
965 for proto in PROTOCOLS:
966 ctx = ssl.SSLContext(proto)
967 self.assertEqual(ctx.protocol, proto)
968
969 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200970 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000971 ctx.set_ciphers("ALL")
972 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000973 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000974 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000975
Christian Heimes892d66e2018-01-29 14:10:18 +0100976 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
977 "Test applies only to Python default ciphers")
978 def test_python_ciphers(self):
979 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
980 ciphers = ctx.get_ciphers()
981 for suite in ciphers:
982 name = suite['name']
983 self.assertNotIn("PSK", name)
984 self.assertNotIn("SRP", name)
985 self.assertNotIn("MD5", name)
986 self.assertNotIn("RC4", name)
987 self.assertNotIn("3DES", name)
988
Christian Heimes25bfcd52016-09-06 00:04:45 +0200989 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
990 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200991 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200992 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200993 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +0200994 self.assertIn('AES256-GCM-SHA384', names)
995 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +0200996
Antoine Pitrou23df4832010-08-04 17:14:06 +0000997 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000998 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200999 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001000 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001001 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001002 # SSLContext also enables these by default
1003 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
1004 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
Christian Heimes598894f2016-09-05 23:19:05 +02001005 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001006 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001007 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001008 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001009 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1010 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001011 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001012 # Ubuntu has OP_NO_SSLv3 forced on by default
1013 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001014 else:
1015 with self.assertRaises(ValueError):
1016 ctx.options = 0
1017
Christian Heimesa170fa12017-09-15 20:27:30 +02001018 def test_verify_mode_protocol(self):
1019 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001020 # Default value
1021 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1022 ctx.verify_mode = ssl.CERT_OPTIONAL
1023 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1024 ctx.verify_mode = ssl.CERT_REQUIRED
1025 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1026 ctx.verify_mode = ssl.CERT_NONE
1027 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1028 with self.assertRaises(TypeError):
1029 ctx.verify_mode = None
1030 with self.assertRaises(ValueError):
1031 ctx.verify_mode = 42
1032
Christian Heimesa170fa12017-09-15 20:27:30 +02001033 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1034 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1035 self.assertFalse(ctx.check_hostname)
1036
1037 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1038 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1039 self.assertTrue(ctx.check_hostname)
1040
Christian Heimes61d478c2018-01-27 15:51:38 +01001041 def test_hostname_checks_common_name(self):
1042 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1043 self.assertTrue(ctx.hostname_checks_common_name)
1044 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1045 ctx.hostname_checks_common_name = True
1046 self.assertTrue(ctx.hostname_checks_common_name)
1047 ctx.hostname_checks_common_name = False
1048 self.assertFalse(ctx.hostname_checks_common_name)
1049 ctx.hostname_checks_common_name = True
1050 self.assertTrue(ctx.hostname_checks_common_name)
1051 else:
1052 with self.assertRaises(AttributeError):
1053 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001054
Christian Heimes2427b502013-11-23 11:24:32 +01001055 @unittest.skipUnless(have_verify_flags(),
1056 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001057 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001058 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001059 # default value
1060 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1061 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001062 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1063 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1064 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1065 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1066 ctx.verify_flags = ssl.VERIFY_DEFAULT
1067 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1068 # supports any value
1069 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1070 self.assertEqual(ctx.verify_flags,
1071 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1072 with self.assertRaises(TypeError):
1073 ctx.verify_flags = None
1074
Antoine Pitrou152efa22010-05-16 18:19:27 +00001075 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001076 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001077 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001078 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001079 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1080 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001081 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001082 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001083 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001084 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001085 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001086 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001087 ctx.load_cert_chain(EMPTYCERT)
1088 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001089 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001090 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1091 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1092 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001093 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001094 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001095 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001096 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001097 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001098 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1099 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001100 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001101 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001102 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001103 # Password protected key and cert
1104 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1105 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1106 ctx.load_cert_chain(CERTFILE_PROTECTED,
1107 password=bytearray(KEY_PASSWORD.encode()))
1108 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1109 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1110 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1111 bytearray(KEY_PASSWORD.encode()))
1112 with self.assertRaisesRegex(TypeError, "should be a string"):
1113 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1114 with self.assertRaises(ssl.SSLError):
1115 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1116 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1117 # openssl has a fixed limit on the password buffer.
1118 # PEM_BUFSIZE is generally set to 1kb.
1119 # Return a string larger than this.
1120 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1121 # Password callback
1122 def getpass_unicode():
1123 return KEY_PASSWORD
1124 def getpass_bytes():
1125 return KEY_PASSWORD.encode()
1126 def getpass_bytearray():
1127 return bytearray(KEY_PASSWORD.encode())
1128 def getpass_badpass():
1129 return "badpass"
1130 def getpass_huge():
1131 return b'a' * (1024 * 1024)
1132 def getpass_bad_type():
1133 return 9
1134 def getpass_exception():
1135 raise Exception('getpass error')
1136 class GetPassCallable:
1137 def __call__(self):
1138 return KEY_PASSWORD
1139 def getpass(self):
1140 return KEY_PASSWORD
1141 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1142 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1143 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1144 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1145 ctx.load_cert_chain(CERTFILE_PROTECTED,
1146 password=GetPassCallable().getpass)
1147 with self.assertRaises(ssl.SSLError):
1148 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1149 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1150 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1151 with self.assertRaisesRegex(TypeError, "must return a string"):
1152 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1153 with self.assertRaisesRegex(Exception, "getpass error"):
1154 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1155 # Make sure the password function isn't called if it isn't needed
1156 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001157
1158 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001159 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001160 ctx.load_verify_locations(CERTFILE)
1161 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1162 ctx.load_verify_locations(BYTES_CERTFILE)
1163 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1164 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001165 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001166 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001167 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001168 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001169 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001170 ctx.load_verify_locations(BADCERT)
1171 ctx.load_verify_locations(CERTFILE, CAPATH)
1172 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1173
Victor Stinner80f75e62011-01-29 11:31:20 +00001174 # Issue #10989: crash if the second argument type is invalid
1175 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1176
Christian Heimesefff7062013-11-21 03:35:02 +01001177 def test_load_verify_cadata(self):
1178 # test cadata
1179 with open(CAFILE_CACERT) as f:
1180 cacert_pem = f.read()
1181 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1182 with open(CAFILE_NEURONIO) as f:
1183 neuronio_pem = f.read()
1184 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1185
1186 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001187 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001188 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1189 ctx.load_verify_locations(cadata=cacert_pem)
1190 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1191 ctx.load_verify_locations(cadata=neuronio_pem)
1192 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1193 # cert already in hash table
1194 ctx.load_verify_locations(cadata=neuronio_pem)
1195 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1196
1197 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001198 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001199 combined = "\n".join((cacert_pem, neuronio_pem))
1200 ctx.load_verify_locations(cadata=combined)
1201 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1202
1203 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001204 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001205 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1206 neuronio_pem, "tail"]
1207 ctx.load_verify_locations(cadata="\n".join(combined))
1208 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1209
1210 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001211 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001212 ctx.load_verify_locations(cadata=cacert_der)
1213 ctx.load_verify_locations(cadata=neuronio_der)
1214 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1215 # cert already in hash table
1216 ctx.load_verify_locations(cadata=cacert_der)
1217 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1218
1219 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001220 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001221 combined = b"".join((cacert_der, neuronio_der))
1222 ctx.load_verify_locations(cadata=combined)
1223 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1224
1225 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001226 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001227 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1228
1229 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1230 ctx.load_verify_locations(cadata="broken")
1231 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1232 ctx.load_verify_locations(cadata=b"broken")
1233
1234
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001235 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001236 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001237 ctx.load_dh_params(DHFILE)
1238 if os.name != 'nt':
1239 ctx.load_dh_params(BYTES_DHFILE)
1240 self.assertRaises(TypeError, ctx.load_dh_params)
1241 self.assertRaises(TypeError, ctx.load_dh_params, None)
1242 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001243 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001244 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001245 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001246 ctx.load_dh_params(CERTFILE)
1247
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001248 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001249 def test_session_stats(self):
1250 for proto in PROTOCOLS:
1251 ctx = ssl.SSLContext(proto)
1252 self.assertEqual(ctx.session_stats(), {
1253 'number': 0,
1254 'connect': 0,
1255 'connect_good': 0,
1256 'connect_renegotiate': 0,
1257 'accept': 0,
1258 'accept_good': 0,
1259 'accept_renegotiate': 0,
1260 'hits': 0,
1261 'misses': 0,
1262 'timeouts': 0,
1263 'cache_full': 0,
1264 })
1265
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001266 def test_set_default_verify_paths(self):
1267 # There's not much we can do to test that it acts as expected,
1268 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001269 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001270 ctx.set_default_verify_paths()
1271
Antoine Pitrou501da612011-12-21 09:27:41 +01001272 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001273 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001275 ctx.set_ecdh_curve("prime256v1")
1276 ctx.set_ecdh_curve(b"prime256v1")
1277 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1278 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1279 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1280 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1281
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001282 @needs_sni
1283 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001284 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001285
1286 # set_servername_callback expects a callable, or None
1287 self.assertRaises(TypeError, ctx.set_servername_callback)
1288 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1289 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1290 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1291
1292 def dummycallback(sock, servername, ctx):
1293 pass
1294 ctx.set_servername_callback(None)
1295 ctx.set_servername_callback(dummycallback)
1296
1297 @needs_sni
1298 def test_sni_callback_refcycle(self):
1299 # Reference cycles through the servername callback are detected
1300 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001301 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001302 def dummycallback(sock, servername, ctx, cycle=ctx):
1303 pass
1304 ctx.set_servername_callback(dummycallback)
1305 wr = weakref.ref(ctx)
1306 del ctx, dummycallback
1307 gc.collect()
1308 self.assertIs(wr(), None)
1309
Christian Heimes9a5395a2013-06-17 15:44:12 +02001310 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001311 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001312 self.assertEqual(ctx.cert_store_stats(),
1313 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1314 ctx.load_cert_chain(CERTFILE)
1315 self.assertEqual(ctx.cert_store_stats(),
1316 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1317 ctx.load_verify_locations(CERTFILE)
1318 self.assertEqual(ctx.cert_store_stats(),
1319 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001320 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001321 self.assertEqual(ctx.cert_store_stats(),
1322 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1323
1324 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001325 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001326 self.assertEqual(ctx.get_ca_certs(), [])
1327 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1328 ctx.load_verify_locations(CERTFILE)
1329 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001330 # but CAFILE_CACERT is a CA cert
1331 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001332 self.assertEqual(ctx.get_ca_certs(),
1333 [{'issuer': ((('organizationName', 'Root CA'),),
1334 (('organizationalUnitName', 'http://www.cacert.org'),),
1335 (('commonName', 'CA Cert Signing Authority'),),
1336 (('emailAddress', 'support@cacert.org'),)),
1337 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1338 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1339 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001340 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001341 'subject': ((('organizationName', 'Root CA'),),
1342 (('organizationalUnitName', 'http://www.cacert.org'),),
1343 (('commonName', 'CA Cert Signing Authority'),),
1344 (('emailAddress', 'support@cacert.org'),)),
1345 'version': 3}])
1346
Martin Panterb55f8b72016-01-14 12:53:56 +00001347 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001348 pem = f.read()
1349 der = ssl.PEM_cert_to_DER_cert(pem)
1350 self.assertEqual(ctx.get_ca_certs(True), [der])
1351
Christian Heimes72d28502013-11-23 13:56:58 +01001352 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001353 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001354 ctx.load_default_certs()
1355
Christian Heimesa170fa12017-09-15 20:27:30 +02001356 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001357 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1358 ctx.load_default_certs()
1359
Christian Heimesa170fa12017-09-15 20:27:30 +02001360 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001361 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1362
Christian Heimesa170fa12017-09-15 20:27:30 +02001363 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001364 self.assertRaises(TypeError, ctx.load_default_certs, None)
1365 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1366
Benjamin Peterson91244e02014-10-03 18:17:15 -04001367 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001368 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001369 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001370 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001371 with support.EnvironmentVarGuard() as env:
1372 env["SSL_CERT_DIR"] = CAPATH
1373 env["SSL_CERT_FILE"] = CERTFILE
1374 ctx.load_default_certs()
1375 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1376
Benjamin Peterson91244e02014-10-03 18:17:15 -04001377 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001378 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001379 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001380 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001381 ctx.load_default_certs()
1382 stats = ctx.cert_store_stats()
1383
Christian Heimesa170fa12017-09-15 20:27:30 +02001384 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001385 with support.EnvironmentVarGuard() as env:
1386 env["SSL_CERT_DIR"] = CAPATH
1387 env["SSL_CERT_FILE"] = CERTFILE
1388 ctx.load_default_certs()
1389 stats["x509"] += 1
1390 self.assertEqual(ctx.cert_store_stats(), stats)
1391
Christian Heimes358cfd42016-09-10 22:43:48 +02001392 def _assert_context_options(self, ctx):
1393 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1394 if OP_NO_COMPRESSION != 0:
1395 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1396 OP_NO_COMPRESSION)
1397 if OP_SINGLE_DH_USE != 0:
1398 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1399 OP_SINGLE_DH_USE)
1400 if OP_SINGLE_ECDH_USE != 0:
1401 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1402 OP_SINGLE_ECDH_USE)
1403 if OP_CIPHER_SERVER_PREFERENCE != 0:
1404 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1405 OP_CIPHER_SERVER_PREFERENCE)
1406
Christian Heimes4c05b472013-11-23 15:58:30 +01001407 def test_create_default_context(self):
1408 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001409
Christian Heimesa170fa12017-09-15 20:27:30 +02001410 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001411 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001412 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001413 self._assert_context_options(ctx)
1414
Christian Heimes4c05b472013-11-23 15:58:30 +01001415 with open(SIGNING_CA) as f:
1416 cadata = f.read()
1417 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1418 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001419 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001420 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001421 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001422
1423 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001424 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001425 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001426 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001427
Christian Heimes67986f92013-11-23 22:43:47 +01001428 def test__create_stdlib_context(self):
1429 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001430 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001431 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001432 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001433 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001434
1435 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1436 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1437 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001438 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001439
1440 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001441 cert_reqs=ssl.CERT_REQUIRED,
1442 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001443 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1444 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001445 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001446 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001447
1448 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001449 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001450 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001451 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001452
Christian Heimes1aa9a752013-12-02 02:41:19 +01001453 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001454 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001455 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001456 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001457
Christian Heimese82c0342017-09-15 20:29:57 +02001458 # Auto set CERT_REQUIRED
1459 ctx.check_hostname = True
1460 self.assertTrue(ctx.check_hostname)
1461 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1462 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001463 ctx.verify_mode = ssl.CERT_REQUIRED
1464 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001465 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001466
Christian Heimese82c0342017-09-15 20:29:57 +02001467 # Changing verify_mode does not affect check_hostname
1468 ctx.check_hostname = False
1469 ctx.verify_mode = ssl.CERT_NONE
1470 ctx.check_hostname = False
1471 self.assertFalse(ctx.check_hostname)
1472 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1473 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001474 ctx.check_hostname = True
1475 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001476 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1477
1478 ctx.check_hostname = False
1479 ctx.verify_mode = ssl.CERT_OPTIONAL
1480 ctx.check_hostname = False
1481 self.assertFalse(ctx.check_hostname)
1482 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1483 # keep CERT_OPTIONAL
1484 ctx.check_hostname = True
1485 self.assertTrue(ctx.check_hostname)
1486 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001487
1488 # Cannot set CERT_NONE with check_hostname enabled
1489 with self.assertRaises(ValueError):
1490 ctx.verify_mode = ssl.CERT_NONE
1491 ctx.check_hostname = False
1492 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001493 ctx.verify_mode = ssl.CERT_NONE
1494 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001495
Christian Heimes5fe668c2016-09-12 00:01:11 +02001496 def test_context_client_server(self):
1497 # PROTOCOL_TLS_CLIENT has sane defaults
1498 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1499 self.assertTrue(ctx.check_hostname)
1500 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1501
1502 # PROTOCOL_TLS_SERVER has different but also sane defaults
1503 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1504 self.assertFalse(ctx.check_hostname)
1505 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1506
Christian Heimes4df60f12017-09-15 20:26:05 +02001507 def test_context_custom_class(self):
1508 class MySSLSocket(ssl.SSLSocket):
1509 pass
1510
1511 class MySSLObject(ssl.SSLObject):
1512 pass
1513
1514 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1515 ctx.sslsocket_class = MySSLSocket
1516 ctx.sslobject_class = MySSLObject
1517
1518 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1519 self.assertIsInstance(sock, MySSLSocket)
1520 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1521 self.assertIsInstance(obj, MySSLObject)
1522
Antoine Pitrou152efa22010-05-16 18:19:27 +00001523
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001524class SSLErrorTests(unittest.TestCase):
1525
1526 def test_str(self):
1527 # The str() of a SSLError doesn't include the errno
1528 e = ssl.SSLError(1, "foo")
1529 self.assertEqual(str(e), "foo")
1530 self.assertEqual(e.errno, 1)
1531 # Same for a subclass
1532 e = ssl.SSLZeroReturnError(1, "foo")
1533 self.assertEqual(str(e), "foo")
1534 self.assertEqual(e.errno, 1)
1535
1536 def test_lib_reason(self):
1537 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001538 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001539 with self.assertRaises(ssl.SSLError) as cm:
1540 ctx.load_dh_params(CERTFILE)
1541 self.assertEqual(cm.exception.library, 'PEM')
1542 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1543 s = str(cm.exception)
1544 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1545
1546 def test_subclass(self):
1547 # Check that the appropriate SSLError subclass is raised
1548 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001549 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1550 ctx.check_hostname = False
1551 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001552 with socket.socket() as s:
1553 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001554 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001555 c = socket.socket()
1556 c.connect(s.getsockname())
1557 c.setblocking(False)
1558 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001559 with self.assertRaises(ssl.SSLWantReadError) as cm:
1560 c.do_handshake()
1561 s = str(cm.exception)
1562 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1563 # For compatibility
1564 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1565
1566
Christian Heimes61d478c2018-01-27 15:51:38 +01001567 def test_bad_server_hostname(self):
1568 ctx = ssl.create_default_context()
1569 with self.assertRaises(ValueError):
1570 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1571 server_hostname="")
1572 with self.assertRaises(ValueError):
1573 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1574 server_hostname=".example.org")
1575
1576
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001577class MemoryBIOTests(unittest.TestCase):
1578
1579 def test_read_write(self):
1580 bio = ssl.MemoryBIO()
1581 bio.write(b'foo')
1582 self.assertEqual(bio.read(), b'foo')
1583 self.assertEqual(bio.read(), b'')
1584 bio.write(b'foo')
1585 bio.write(b'bar')
1586 self.assertEqual(bio.read(), b'foobar')
1587 self.assertEqual(bio.read(), b'')
1588 bio.write(b'baz')
1589 self.assertEqual(bio.read(2), b'ba')
1590 self.assertEqual(bio.read(1), b'z')
1591 self.assertEqual(bio.read(1), b'')
1592
1593 def test_eof(self):
1594 bio = ssl.MemoryBIO()
1595 self.assertFalse(bio.eof)
1596 self.assertEqual(bio.read(), b'')
1597 self.assertFalse(bio.eof)
1598 bio.write(b'foo')
1599 self.assertFalse(bio.eof)
1600 bio.write_eof()
1601 self.assertFalse(bio.eof)
1602 self.assertEqual(bio.read(2), b'fo')
1603 self.assertFalse(bio.eof)
1604 self.assertEqual(bio.read(1), b'o')
1605 self.assertTrue(bio.eof)
1606 self.assertEqual(bio.read(), b'')
1607 self.assertTrue(bio.eof)
1608
1609 def test_pending(self):
1610 bio = ssl.MemoryBIO()
1611 self.assertEqual(bio.pending, 0)
1612 bio.write(b'foo')
1613 self.assertEqual(bio.pending, 3)
1614 for i in range(3):
1615 bio.read(1)
1616 self.assertEqual(bio.pending, 3-i-1)
1617 for i in range(3):
1618 bio.write(b'x')
1619 self.assertEqual(bio.pending, i+1)
1620 bio.read()
1621 self.assertEqual(bio.pending, 0)
1622
1623 def test_buffer_types(self):
1624 bio = ssl.MemoryBIO()
1625 bio.write(b'foo')
1626 self.assertEqual(bio.read(), b'foo')
1627 bio.write(bytearray(b'bar'))
1628 self.assertEqual(bio.read(), b'bar')
1629 bio.write(memoryview(b'baz'))
1630 self.assertEqual(bio.read(), b'baz')
1631
1632 def test_error_types(self):
1633 bio = ssl.MemoryBIO()
1634 self.assertRaises(TypeError, bio.write, 'foo')
1635 self.assertRaises(TypeError, bio.write, None)
1636 self.assertRaises(TypeError, bio.write, True)
1637 self.assertRaises(TypeError, bio.write, 1)
1638
1639
Martin Panter3840b2a2016-03-27 01:53:46 +00001640class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001641 """Tests that connect to a simple server running in the background"""
1642
1643 def setUp(self):
1644 server = ThreadedEchoServer(SIGNED_CERTFILE)
1645 self.server_addr = (HOST, server.port)
1646 server.__enter__()
1647 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001648
Antoine Pitrou480a1242010-04-28 21:37:09 +00001649 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001650 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001651 cert_reqs=ssl.CERT_NONE) as s:
1652 s.connect(self.server_addr)
1653 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001654 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001655
Martin Panter3840b2a2016-03-27 01:53:46 +00001656 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001657 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001658 cert_reqs=ssl.CERT_REQUIRED,
1659 ca_certs=SIGNING_CA) as s:
1660 s.connect(self.server_addr)
1661 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001662 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001663
Martin Panter3840b2a2016-03-27 01:53:46 +00001664 def test_connect_fail(self):
1665 # This should fail because we have no verification certs. Connection
1666 # failure crashes ThreadedEchoServer, so run this in an independent
1667 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001668 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001669 cert_reqs=ssl.CERT_REQUIRED)
1670 self.addCleanup(s.close)
1671 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1672 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001673
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001674 def test_connect_ex(self):
1675 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001676 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001677 cert_reqs=ssl.CERT_REQUIRED,
1678 ca_certs=SIGNING_CA)
1679 self.addCleanup(s.close)
1680 self.assertEqual(0, s.connect_ex(self.server_addr))
1681 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001682
1683 def test_non_blocking_connect_ex(self):
1684 # Issue #11326: non-blocking connect_ex() should allow handshake
1685 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001686 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001687 cert_reqs=ssl.CERT_REQUIRED,
1688 ca_certs=SIGNING_CA,
1689 do_handshake_on_connect=False)
1690 self.addCleanup(s.close)
1691 s.setblocking(False)
1692 rc = s.connect_ex(self.server_addr)
1693 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1694 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1695 # Wait for connect to finish
1696 select.select([], [s], [], 5.0)
1697 # Non-blocking handshake
1698 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001699 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001700 s.do_handshake()
1701 break
1702 except ssl.SSLWantReadError:
1703 select.select([s], [], [], 5.0)
1704 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001705 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001706 # SSL established
1707 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001708
Antoine Pitrou152efa22010-05-16 18:19:27 +00001709 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001710 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001711 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001712 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1713 s.connect(self.server_addr)
1714 self.assertEqual({}, s.getpeercert())
1715 # Same with a server hostname
1716 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1717 server_hostname="dummy") as s:
1718 s.connect(self.server_addr)
1719 ctx.verify_mode = ssl.CERT_REQUIRED
1720 # This should succeed because we specify the root cert
1721 ctx.load_verify_locations(SIGNING_CA)
1722 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1723 s.connect(self.server_addr)
1724 cert = s.getpeercert()
1725 self.assertTrue(cert)
1726
1727 def test_connect_with_context_fail(self):
1728 # This should fail because we have no verification certs. Connection
1729 # failure crashes ThreadedEchoServer, so run this in an independent
1730 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001731 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001732 ctx.verify_mode = ssl.CERT_REQUIRED
1733 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1734 self.addCleanup(s.close)
1735 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1736 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001737
1738 def test_connect_capath(self):
1739 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001740 # NOTE: the subject hashing algorithm has been changed between
1741 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1742 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001743 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001744 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001745 ctx.verify_mode = ssl.CERT_REQUIRED
1746 ctx.load_verify_locations(capath=CAPATH)
1747 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1748 s.connect(self.server_addr)
1749 cert = s.getpeercert()
1750 self.assertTrue(cert)
1751 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001752 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001753 ctx.verify_mode = ssl.CERT_REQUIRED
1754 ctx.load_verify_locations(capath=BYTES_CAPATH)
1755 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1756 s.connect(self.server_addr)
1757 cert = s.getpeercert()
1758 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001759
Christian Heimesefff7062013-11-21 03:35:02 +01001760 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001761 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001762 pem = f.read()
1763 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001764 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001765 ctx.verify_mode = ssl.CERT_REQUIRED
1766 ctx.load_verify_locations(cadata=pem)
1767 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1768 s.connect(self.server_addr)
1769 cert = s.getpeercert()
1770 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001771
Martin Panter3840b2a2016-03-27 01:53:46 +00001772 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001773 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001774 ctx.verify_mode = ssl.CERT_REQUIRED
1775 ctx.load_verify_locations(cadata=der)
1776 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1777 s.connect(self.server_addr)
1778 cert = s.getpeercert()
1779 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001780
Antoine Pitroue3220242010-04-24 11:13:53 +00001781 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1782 def test_makefile_close(self):
1783 # Issue #5238: creating a file-like object with makefile() shouldn't
1784 # delay closing the underlying "real socket" (here tested with its
1785 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001786 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001787 ss.connect(self.server_addr)
1788 fd = ss.fileno()
1789 f = ss.makefile()
1790 f.close()
1791 # The fd is still open
1792 os.read(fd, 0)
1793 # Closing the SSL socket should close the fd too
1794 ss.close()
1795 gc.collect()
1796 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001797 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001798 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001799
Antoine Pitrou480a1242010-04-28 21:37:09 +00001800 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001801 s = socket.socket(socket.AF_INET)
1802 s.connect(self.server_addr)
1803 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001804 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001805 cert_reqs=ssl.CERT_NONE,
1806 do_handshake_on_connect=False)
1807 self.addCleanup(s.close)
1808 count = 0
1809 while True:
1810 try:
1811 count += 1
1812 s.do_handshake()
1813 break
1814 except ssl.SSLWantReadError:
1815 select.select([s], [], [])
1816 except ssl.SSLWantWriteError:
1817 select.select([], [s], [])
1818 if support.verbose:
1819 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001820
Antoine Pitrou480a1242010-04-28 21:37:09 +00001821 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001822 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001823
Martin Panter3840b2a2016-03-27 01:53:46 +00001824 def test_get_server_certificate_fail(self):
1825 # Connection failure crashes ThreadedEchoServer, so run this in an
1826 # independent test method
1827 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001828
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001829 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001830 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001831 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1832 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001833 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001834 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1835 s.connect(self.server_addr)
1836 # Error checking can happen at instantiation or when connecting
1837 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1838 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001839 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001840 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1841 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001842
Christian Heimes9a5395a2013-06-17 15:44:12 +02001843 def test_get_ca_certs_capath(self):
1844 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001845 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001846 ctx.load_verify_locations(capath=CAPATH)
1847 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001848 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1849 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001850 s.connect(self.server_addr)
1851 cert = s.getpeercert()
1852 self.assertTrue(cert)
1853 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001854
Christian Heimes575596e2013-12-15 21:49:17 +01001855 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001856 def test_context_setget(self):
1857 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001858 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1859 ctx1.load_verify_locations(capath=CAPATH)
1860 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1861 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001862 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001863 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001864 ss.connect(self.server_addr)
1865 self.assertIs(ss.context, ctx1)
1866 self.assertIs(ss._sslobj.context, ctx1)
1867 ss.context = ctx2
1868 self.assertIs(ss.context, ctx2)
1869 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001870
1871 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1872 # A simple IO loop. Call func(*args) depending on the error we get
1873 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1874 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001875 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001876 count = 0
1877 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001878 if time.monotonic() > deadline:
1879 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001880 errno = None
1881 count += 1
1882 try:
1883 ret = func(*args)
1884 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001885 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001886 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001887 raise
1888 errno = e.errno
1889 # Get any data from the outgoing BIO irrespective of any error, and
1890 # send it to the socket.
1891 buf = outgoing.read()
1892 sock.sendall(buf)
1893 # If there's no error, we're done. For WANT_READ, we need to get
1894 # data from the socket and put it in the incoming BIO.
1895 if errno is None:
1896 break
1897 elif errno == ssl.SSL_ERROR_WANT_READ:
1898 buf = sock.recv(32768)
1899 if buf:
1900 incoming.write(buf)
1901 else:
1902 incoming.write_eof()
1903 if support.verbose:
1904 sys.stdout.write("Needed %d calls to complete %s().\n"
1905 % (count, func.__name__))
1906 return ret
1907
Martin Panter3840b2a2016-03-27 01:53:46 +00001908 def test_bio_handshake(self):
1909 sock = socket.socket(socket.AF_INET)
1910 self.addCleanup(sock.close)
1911 sock.connect(self.server_addr)
1912 incoming = ssl.MemoryBIO()
1913 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001914 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1915 self.assertTrue(ctx.check_hostname)
1916 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001917 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001918 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1919 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00001920 self.assertIs(sslobj._sslobj.owner, sslobj)
1921 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07001922 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02001923 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001924 self.assertRaises(ValueError, sslobj.getpeercert)
1925 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1926 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1927 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1928 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001929 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07001930 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00001931 self.assertTrue(sslobj.getpeercert())
1932 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1933 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1934 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001935 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001936 except ssl.SSLSyscallError:
1937 # If the server shuts down the TCP connection without sending a
1938 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1939 pass
1940 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1941
1942 def test_bio_read_write_data(self):
1943 sock = socket.socket(socket.AF_INET)
1944 self.addCleanup(sock.close)
1945 sock.connect(self.server_addr)
1946 incoming = ssl.MemoryBIO()
1947 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001948 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001949 ctx.verify_mode = ssl.CERT_NONE
1950 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1951 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1952 req = b'FOO\n'
1953 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1954 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1955 self.assertEqual(buf, b'foo\n')
1956 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001957
1958
Martin Panter3840b2a2016-03-27 01:53:46 +00001959class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001960
Martin Panter3840b2a2016-03-27 01:53:46 +00001961 def test_timeout_connect_ex(self):
1962 # Issue #12065: on a timeout, connect_ex() should return the original
1963 # errno (mimicking the behaviour of non-SSL sockets).
1964 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02001965 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001966 cert_reqs=ssl.CERT_REQUIRED,
1967 do_handshake_on_connect=False)
1968 self.addCleanup(s.close)
1969 s.settimeout(0.0000001)
1970 rc = s.connect_ex((REMOTE_HOST, 443))
1971 if rc == 0:
1972 self.skipTest("REMOTE_HOST responded too quickly")
1973 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1974
1975 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1976 def test_get_server_certificate_ipv6(self):
1977 with support.transient_internet('ipv6.google.com'):
1978 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1979 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1980
Martin Panter3840b2a2016-03-27 01:53:46 +00001981
1982def _test_get_server_certificate(test, host, port, cert=None):
1983 pem = ssl.get_server_certificate((host, port))
1984 if not pem:
1985 test.fail("No server certificate on %s:%s!" % (host, port))
1986
1987 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1988 if not pem:
1989 test.fail("No server certificate on %s:%s!" % (host, port))
1990 if support.verbose:
1991 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1992
1993def _test_get_server_certificate_fail(test, host, port):
1994 try:
1995 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1996 except ssl.SSLError as x:
1997 #should fail
1998 if support.verbose:
1999 sys.stdout.write("%s\n" % x)
2000 else:
2001 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2002
2003
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002004from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002005
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002006class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002007
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002008 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002009
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002010 """A mildly complicated class, because we want it to work both
2011 with and without the SSL wrapper around the socket connection, so
2012 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002013
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002014 def __init__(self, server, connsock, addr):
2015 self.server = server
2016 self.running = False
2017 self.sock = connsock
2018 self.addr = addr
2019 self.sock.setblocking(1)
2020 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002021 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002022 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002023
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002024 def wrap_conn(self):
2025 try:
2026 self.sslconn = self.server.context.wrap_socket(
2027 self.sock, server_side=True)
2028 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2029 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2030 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2031 # We treat ConnectionResetError as though it were an
2032 # SSLError - OpenSSL on Ubuntu abruptly closes the
2033 # connection when asked to use an unsupported protocol.
2034 #
2035 # OSError may occur with wrong protocols, e.g. both
2036 # sides use PROTOCOL_TLS_SERVER.
2037 #
2038 # XXX Various errors can have happened here, for example
2039 # a mismatching protocol version, an invalid certificate,
2040 # or a low-level bug. This should be made more discriminating.
2041 #
2042 # bpo-31323: Store the exception as string to prevent
2043 # a reference leak: server -> conn_errors -> exception
2044 # -> traceback -> self (ConnectionHandler) -> server
2045 self.server.conn_errors.append(str(e))
2046 if self.server.chatty:
2047 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2048 self.running = False
2049 self.server.stop()
2050 self.close()
2051 return False
2052 else:
2053 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2054 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2055 cert = self.sslconn.getpeercert()
2056 if support.verbose and self.server.chatty:
2057 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2058 cert_binary = self.sslconn.getpeercert(True)
2059 if support.verbose and self.server.chatty:
2060 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2061 cipher = self.sslconn.cipher()
2062 if support.verbose and self.server.chatty:
2063 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2064 sys.stdout.write(" server: selected protocol is now "
2065 + str(self.sslconn.selected_npn_protocol()) + "\n")
2066 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002068 def read(self):
2069 if self.sslconn:
2070 return self.sslconn.read()
2071 else:
2072 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002073
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002074 def write(self, bytes):
2075 if self.sslconn:
2076 return self.sslconn.write(bytes)
2077 else:
2078 return self.sock.send(bytes)
2079
2080 def close(self):
2081 if self.sslconn:
2082 self.sslconn.close()
2083 else:
2084 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002085
Antoine Pitrou480a1242010-04-28 21:37:09 +00002086 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002087 self.running = True
2088 if not self.server.starttls_server:
2089 if not self.wrap_conn():
2090 return
2091 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002092 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002093 msg = self.read()
2094 stripped = msg.strip()
2095 if not stripped:
2096 # eof, so quit this handler
2097 self.running = False
2098 try:
2099 self.sock = self.sslconn.unwrap()
2100 except OSError:
2101 # Many tests shut the TCP connection down
2102 # without an SSL shutdown. This causes
2103 # unwrap() to raise OSError with errno=0!
2104 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002105 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002106 self.sslconn = None
2107 self.close()
2108 elif stripped == b'over':
2109 if support.verbose and self.server.connectionchatty:
2110 sys.stdout.write(" server: client closed connection\n")
2111 self.close()
2112 return
2113 elif (self.server.starttls_server and
2114 stripped == b'STARTTLS'):
2115 if support.verbose and self.server.connectionchatty:
2116 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2117 self.write(b"OK\n")
2118 if not self.wrap_conn():
2119 return
2120 elif (self.server.starttls_server and self.sslconn
2121 and stripped == b'ENDTLS'):
2122 if support.verbose and self.server.connectionchatty:
2123 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2124 self.write(b"OK\n")
2125 self.sock = self.sslconn.unwrap()
2126 self.sslconn = None
2127 if support.verbose and self.server.connectionchatty:
2128 sys.stdout.write(" server: connection is now unencrypted...\n")
2129 elif stripped == b'CB tls-unique':
2130 if support.verbose and self.server.connectionchatty:
2131 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2132 data = self.sslconn.get_channel_binding("tls-unique")
2133 self.write(repr(data).encode("us-ascii") + b"\n")
2134 else:
2135 if (support.verbose and
2136 self.server.connectionchatty):
2137 ctype = (self.sslconn and "encrypted") or "unencrypted"
2138 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2139 % (msg, ctype, msg.lower(), ctype))
2140 self.write(msg.lower())
2141 except OSError:
2142 if self.server.chatty:
2143 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002144 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002145 self.running = False
2146 # normally, we'd just stop here, but for the test
2147 # harness, we want to stop the server
2148 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002149
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002150 def __init__(self, certificate=None, ssl_version=None,
2151 certreqs=None, cacerts=None,
2152 chatty=True, connectionchatty=False, starttls_server=False,
2153 npn_protocols=None, alpn_protocols=None,
2154 ciphers=None, context=None):
2155 if context:
2156 self.context = context
2157 else:
2158 self.context = ssl.SSLContext(ssl_version
2159 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002160 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002161 self.context.verify_mode = (certreqs if certreqs is not None
2162 else ssl.CERT_NONE)
2163 if cacerts:
2164 self.context.load_verify_locations(cacerts)
2165 if certificate:
2166 self.context.load_cert_chain(certificate)
2167 if npn_protocols:
2168 self.context.set_npn_protocols(npn_protocols)
2169 if alpn_protocols:
2170 self.context.set_alpn_protocols(alpn_protocols)
2171 if ciphers:
2172 self.context.set_ciphers(ciphers)
2173 self.chatty = chatty
2174 self.connectionchatty = connectionchatty
2175 self.starttls_server = starttls_server
2176 self.sock = socket.socket()
2177 self.port = support.bind_port(self.sock)
2178 self.flag = None
2179 self.active = False
2180 self.selected_npn_protocols = []
2181 self.selected_alpn_protocols = []
2182 self.shared_ciphers = []
2183 self.conn_errors = []
2184 threading.Thread.__init__(self)
2185 self.daemon = True
2186
2187 def __enter__(self):
2188 self.start(threading.Event())
2189 self.flag.wait()
2190 return self
2191
2192 def __exit__(self, *args):
2193 self.stop()
2194 self.join()
2195
2196 def start(self, flag=None):
2197 self.flag = flag
2198 threading.Thread.start(self)
2199
2200 def run(self):
2201 self.sock.settimeout(0.05)
2202 self.sock.listen()
2203 self.active = True
2204 if self.flag:
2205 # signal an event
2206 self.flag.set()
2207 while self.active:
2208 try:
2209 newconn, connaddr = self.sock.accept()
2210 if support.verbose and self.chatty:
2211 sys.stdout.write(' server: new connection from '
2212 + repr(connaddr) + '\n')
2213 handler = self.ConnectionHandler(self, newconn, connaddr)
2214 handler.start()
2215 handler.join()
2216 except socket.timeout:
2217 pass
2218 except KeyboardInterrupt:
2219 self.stop()
2220 self.sock.close()
2221
2222 def stop(self):
2223 self.active = False
2224
2225class AsyncoreEchoServer(threading.Thread):
2226
2227 # this one's based on asyncore.dispatcher
2228
2229 class EchoServer (asyncore.dispatcher):
2230
2231 class ConnectionHandler(asyncore.dispatcher_with_send):
2232
2233 def __init__(self, conn, certfile):
2234 self.socket = test_wrap_socket(conn, server_side=True,
2235 certfile=certfile,
2236 do_handshake_on_connect=False)
2237 asyncore.dispatcher_with_send.__init__(self, self.socket)
2238 self._ssl_accepting = True
2239 self._do_ssl_handshake()
2240
2241 def readable(self):
2242 if isinstance(self.socket, ssl.SSLSocket):
2243 while self.socket.pending() > 0:
2244 self.handle_read_event()
2245 return True
2246
2247 def _do_ssl_handshake(self):
2248 try:
2249 self.socket.do_handshake()
2250 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2251 return
2252 except ssl.SSLEOFError:
2253 return self.handle_close()
2254 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002255 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002256 except OSError as err:
2257 if err.args[0] == errno.ECONNABORTED:
2258 return self.handle_close()
2259 else:
2260 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002261
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002262 def handle_read(self):
2263 if self._ssl_accepting:
2264 self._do_ssl_handshake()
2265 else:
2266 data = self.recv(1024)
2267 if support.verbose:
2268 sys.stdout.write(" server: read %s from client\n" % repr(data))
2269 if not data:
2270 self.close()
2271 else:
2272 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002273
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002274 def handle_close(self):
2275 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002276 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002277 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002278
2279 def handle_error(self):
2280 raise
2281
Trent Nelson78520002008-04-10 20:54:35 +00002282 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002283 self.certfile = certfile
2284 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2285 self.port = support.bind_port(sock, '')
2286 asyncore.dispatcher.__init__(self, sock)
2287 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002288
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002289 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002290 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002291 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2292 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002293
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002294 def handle_error(self):
2295 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002296
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002297 def __init__(self, certfile):
2298 self.flag = None
2299 self.active = False
2300 self.server = self.EchoServer(certfile)
2301 self.port = self.server.port
2302 threading.Thread.__init__(self)
2303 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002304
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002305 def __str__(self):
2306 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002307
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002308 def __enter__(self):
2309 self.start(threading.Event())
2310 self.flag.wait()
2311 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002312
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002313 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002314 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002315 sys.stdout.write(" cleanup: stopping server.\n")
2316 self.stop()
2317 if support.verbose:
2318 sys.stdout.write(" cleanup: joining server thread.\n")
2319 self.join()
2320 if support.verbose:
2321 sys.stdout.write(" cleanup: successfully joined.\n")
2322 # make sure that ConnectionHandler is removed from socket_map
2323 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002324
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002325 def start (self, flag=None):
2326 self.flag = flag
2327 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002328
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002329 def run(self):
2330 self.active = True
2331 if self.flag:
2332 self.flag.set()
2333 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002334 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002335 asyncore.loop(1)
2336 except:
2337 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002338
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002339 def stop(self):
2340 self.active = False
2341 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002342
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002343def server_params_test(client_context, server_context, indata=b"FOO\n",
2344 chatty=True, connectionchatty=False, sni_name=None,
2345 session=None):
2346 """
2347 Launch a server, connect a client to it and try various reads
2348 and writes.
2349 """
2350 stats = {}
2351 server = ThreadedEchoServer(context=server_context,
2352 chatty=chatty,
2353 connectionchatty=False)
2354 with server:
2355 with client_context.wrap_socket(socket.socket(),
2356 server_hostname=sni_name, session=session) as s:
2357 s.connect((HOST, server.port))
2358 for arg in [indata, bytearray(indata), memoryview(indata)]:
2359 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002360 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002361 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002362 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002363 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002364 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002365 if connectionchatty:
2366 if support.verbose:
2367 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002368 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002369 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002370 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2371 % (outdata[:20], len(outdata),
2372 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002373 s.write(b"over\n")
2374 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002375 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002376 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002377 stats.update({
2378 'compression': s.compression(),
2379 'cipher': s.cipher(),
2380 'peercert': s.getpeercert(),
2381 'client_alpn_protocol': s.selected_alpn_protocol(),
2382 'client_npn_protocol': s.selected_npn_protocol(),
2383 'version': s.version(),
2384 'session_reused': s.session_reused,
2385 'session': s.session,
2386 })
2387 s.close()
2388 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2389 stats['server_npn_protocols'] = server.selected_npn_protocols
2390 stats['server_shared_ciphers'] = server.shared_ciphers
2391 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002392
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002393def try_protocol_combo(server_protocol, client_protocol, expect_success,
2394 certsreqs=None, server_options=0, client_options=0):
2395 """
2396 Try to SSL-connect using *client_protocol* to *server_protocol*.
2397 If *expect_success* is true, assert that the connection succeeds,
2398 if it's false, assert that the connection fails.
2399 Also, if *expect_success* is a string, assert that it is the protocol
2400 version actually used by the connection.
2401 """
2402 if certsreqs is None:
2403 certsreqs = ssl.CERT_NONE
2404 certtype = {
2405 ssl.CERT_NONE: "CERT_NONE",
2406 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2407 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2408 }[certsreqs]
2409 if support.verbose:
2410 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2411 sys.stdout.write(formatstr %
2412 (ssl.get_protocol_name(client_protocol),
2413 ssl.get_protocol_name(server_protocol),
2414 certtype))
2415 client_context = ssl.SSLContext(client_protocol)
2416 client_context.options |= client_options
2417 server_context = ssl.SSLContext(server_protocol)
2418 server_context.options |= server_options
2419
2420 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2421 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2422 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002423 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002424 client_context.set_ciphers("ALL")
2425
2426 for ctx in (client_context, server_context):
2427 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002428 ctx.load_cert_chain(SIGNED_CERTFILE)
2429 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002430 try:
2431 stats = server_params_test(client_context, server_context,
2432 chatty=False, connectionchatty=False)
2433 # Protocol mismatch can result in either an SSLError, or a
2434 # "Connection reset by peer" error.
2435 except ssl.SSLError:
2436 if expect_success:
2437 raise
2438 except OSError as e:
2439 if expect_success or e.errno != errno.ECONNRESET:
2440 raise
2441 else:
2442 if not expect_success:
2443 raise AssertionError(
2444 "Client protocol %s succeeded with server protocol %s!"
2445 % (ssl.get_protocol_name(client_protocol),
2446 ssl.get_protocol_name(server_protocol)))
2447 elif (expect_success is not True
2448 and expect_success != stats['version']):
2449 raise AssertionError("version mismatch: expected %r, got %r"
2450 % (expect_success, stats['version']))
2451
2452
2453class ThreadedTests(unittest.TestCase):
2454
2455 @skip_if_broken_ubuntu_ssl
2456 def test_echo(self):
2457 """Basic test of an SSL client connecting to a server"""
2458 if support.verbose:
2459 sys.stdout.write("\n")
2460 for protocol in PROTOCOLS:
2461 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2462 continue
2463 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2464 context = ssl.SSLContext(protocol)
2465 context.load_cert_chain(CERTFILE)
2466 server_params_test(context, context,
2467 chatty=True, connectionchatty=True)
2468
Christian Heimesa170fa12017-09-15 20:27:30 +02002469 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002470
2471 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2472 server_params_test(client_context=client_context,
2473 server_context=server_context,
2474 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002475 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002476
2477 client_context.check_hostname = False
2478 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2479 with self.assertRaises(ssl.SSLError) as e:
2480 server_params_test(client_context=server_context,
2481 server_context=client_context,
2482 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002483 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002484 self.assertIn('called a function you should not call',
2485 str(e.exception))
2486
2487 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2488 with self.assertRaises(ssl.SSLError) as e:
2489 server_params_test(client_context=server_context,
2490 server_context=server_context,
2491 chatty=True, connectionchatty=True)
2492 self.assertIn('called a function you should not call',
2493 str(e.exception))
2494
2495 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2496 with self.assertRaises(ssl.SSLError) as e:
2497 server_params_test(client_context=server_context,
2498 server_context=client_context,
2499 chatty=True, connectionchatty=True)
2500 self.assertIn('called a function you should not call',
2501 str(e.exception))
2502
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002503 def test_getpeercert(self):
2504 if support.verbose:
2505 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002506
2507 client_context, server_context, hostname = testing_context()
2508 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002509 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002510 with client_context.wrap_socket(socket.socket(),
2511 do_handshake_on_connect=False,
2512 server_hostname=hostname) as s:
2513 s.connect((HOST, server.port))
2514 # getpeercert() raise ValueError while the handshake isn't
2515 # done.
2516 with self.assertRaises(ValueError):
2517 s.getpeercert()
2518 s.do_handshake()
2519 cert = s.getpeercert()
2520 self.assertTrue(cert, "Can't get peer certificate.")
2521 cipher = s.cipher()
2522 if support.verbose:
2523 sys.stdout.write(pprint.pformat(cert) + '\n')
2524 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2525 if 'subject' not in cert:
2526 self.fail("No subject field in certificate: %s." %
2527 pprint.pformat(cert))
2528 if ((('organizationName', 'Python Software Foundation'),)
2529 not in cert['subject']):
2530 self.fail(
2531 "Missing or invalid 'organizationName' field in certificate subject; "
2532 "should be 'Python Software Foundation'.")
2533 self.assertIn('notBefore', cert)
2534 self.assertIn('notAfter', cert)
2535 before = ssl.cert_time_to_seconds(cert['notBefore'])
2536 after = ssl.cert_time_to_seconds(cert['notAfter'])
2537 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002538
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002539 @unittest.skipUnless(have_verify_flags(),
2540 "verify_flags need OpenSSL > 0.9.8")
2541 def test_crl_check(self):
2542 if support.verbose:
2543 sys.stdout.write("\n")
2544
Christian Heimesa170fa12017-09-15 20:27:30 +02002545 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002546
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002547 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002548 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002549
2550 # VERIFY_DEFAULT should pass
2551 server = ThreadedEchoServer(context=server_context, chatty=True)
2552 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002553 with client_context.wrap_socket(socket.socket(),
2554 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002555 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002556 cert = s.getpeercert()
2557 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002558
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002559 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002560 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002561
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002562 server = ThreadedEchoServer(context=server_context, chatty=True)
2563 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002564 with client_context.wrap_socket(socket.socket(),
2565 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002566 with self.assertRaisesRegex(ssl.SSLError,
2567 "certificate verify failed"):
2568 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002569
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002570 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002571 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002572
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002573 server = ThreadedEchoServer(context=server_context, chatty=True)
2574 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002575 with client_context.wrap_socket(socket.socket(),
2576 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002577 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002578 cert = s.getpeercert()
2579 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002580
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002581 def test_check_hostname(self):
2582 if support.verbose:
2583 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002584
Christian Heimesa170fa12017-09-15 20:27:30 +02002585 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002586
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002587 # correct hostname should verify
2588 server = ThreadedEchoServer(context=server_context, chatty=True)
2589 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002590 with client_context.wrap_socket(socket.socket(),
2591 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002592 s.connect((HOST, server.port))
2593 cert = s.getpeercert()
2594 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002595
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002596 # incorrect hostname should raise an exception
2597 server = ThreadedEchoServer(context=server_context, chatty=True)
2598 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002599 with client_context.wrap_socket(socket.socket(),
2600 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002601 with self.assertRaisesRegex(
2602 ssl.CertificateError,
2603 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002604 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002605
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002606 # missing server_hostname arg should cause an exception, too
2607 server = ThreadedEchoServer(context=server_context, chatty=True)
2608 with server:
2609 with socket.socket() as s:
2610 with self.assertRaisesRegex(ValueError,
2611 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002612 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002613
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002614 def test_ecc_cert(self):
2615 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2616 client_context.load_verify_locations(SIGNING_CA)
2617 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2618 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2619
2620 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2621 # load ECC cert
2622 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2623
2624 # correct hostname should verify
2625 server = ThreadedEchoServer(context=server_context, chatty=True)
2626 with server:
2627 with client_context.wrap_socket(socket.socket(),
2628 server_hostname=hostname) as s:
2629 s.connect((HOST, server.port))
2630 cert = s.getpeercert()
2631 self.assertTrue(cert, "Can't get peer certificate.")
2632 cipher = s.cipher()[0].split('-')
2633 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2634
2635 def test_dual_rsa_ecc(self):
2636 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2637 client_context.load_verify_locations(SIGNING_CA)
2638 # only ECDSA certs
2639 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2640 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2641
2642 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2643 # load ECC and RSA key/cert pairs
2644 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2645 server_context.load_cert_chain(SIGNED_CERTFILE)
2646
2647 # correct hostname should verify
2648 server = ThreadedEchoServer(context=server_context, chatty=True)
2649 with server:
2650 with client_context.wrap_socket(socket.socket(),
2651 server_hostname=hostname) as s:
2652 s.connect((HOST, server.port))
2653 cert = s.getpeercert()
2654 self.assertTrue(cert, "Can't get peer certificate.")
2655 cipher = s.cipher()[0].split('-')
2656 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2657
Christian Heimes66e57422018-01-29 14:25:13 +01002658 def test_check_hostname_idn(self):
2659 if support.verbose:
2660 sys.stdout.write("\n")
2661
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002662 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002663 server_context.load_cert_chain(IDNSANSFILE)
2664
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002665 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002666 context.verify_mode = ssl.CERT_REQUIRED
2667 context.check_hostname = True
2668 context.load_verify_locations(SIGNING_CA)
2669
2670 # correct hostname should verify, when specified in several
2671 # different ways
2672 idn_hostnames = [
2673 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002674 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002675 ('xn--knig-5qa.idn.pythontest.net',
2676 'xn--knig-5qa.idn.pythontest.net'),
2677 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002678 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002679
2680 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002681 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002682 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2683 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2684 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002685 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2686
2687 # ('königsgäßchen.idna2008.pythontest.net',
2688 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2689 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2690 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2691 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2692 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2693
Christian Heimes66e57422018-01-29 14:25:13 +01002694 ]
2695 for server_hostname, expected_hostname in idn_hostnames:
2696 server = ThreadedEchoServer(context=server_context, chatty=True)
2697 with server:
2698 with context.wrap_socket(socket.socket(),
2699 server_hostname=server_hostname) as s:
2700 self.assertEqual(s.server_hostname, expected_hostname)
2701 s.connect((HOST, server.port))
2702 cert = s.getpeercert()
2703 self.assertEqual(s.server_hostname, expected_hostname)
2704 self.assertTrue(cert, "Can't get peer certificate.")
2705
2706 with ssl.SSLSocket(socket.socket(),
2707 server_hostname=server_hostname) as s:
2708 s.connect((HOST, server.port))
2709 s.getpeercert()
2710 self.assertEqual(s.server_hostname, expected_hostname)
2711
Christian Heimes66e57422018-01-29 14:25:13 +01002712 # incorrect hostname should raise an exception
2713 server = ThreadedEchoServer(context=server_context, chatty=True)
2714 with server:
2715 with context.wrap_socket(socket.socket(),
2716 server_hostname="python.example.org") as s:
2717 with self.assertRaises(ssl.CertificateError):
2718 s.connect((HOST, server.port))
2719
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002720 def test_wrong_cert(self):
2721 """Connecting when the server rejects the client's certificate
2722
2723 Launch a server with CERT_REQUIRED, and check that trying to
2724 connect to it with a wrong client certificate fails.
2725 """
2726 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2727 "wrongcert.pem")
2728 server = ThreadedEchoServer(CERTFILE,
2729 certreqs=ssl.CERT_REQUIRED,
2730 cacerts=CERTFILE, chatty=False,
2731 connectionchatty=False)
2732 with server, \
2733 socket.socket() as sock, \
Christian Heimesa170fa12017-09-15 20:27:30 +02002734 test_wrap_socket(sock, certfile=certfile) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002735 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002736 # Expect either an SSL error about the server rejecting
2737 # the connection, or a low-level connection reset (which
2738 # sometimes happens on Windows)
2739 s.connect((HOST, server.port))
2740 except ssl.SSLError as e:
2741 if support.verbose:
2742 sys.stdout.write("\nSSLError is %r\n" % e)
2743 except OSError as e:
2744 if e.errno != errno.ECONNRESET:
2745 raise
2746 if support.verbose:
2747 sys.stdout.write("\nsocket.error is %r\n" % e)
2748 else:
2749 self.fail("Use of invalid cert should have failed!")
2750
2751 def test_rude_shutdown(self):
2752 """A brutal shutdown of an SSL server should raise an OSError
2753 in the client when attempting handshake.
2754 """
2755 listener_ready = threading.Event()
2756 listener_gone = threading.Event()
2757
2758 s = socket.socket()
2759 port = support.bind_port(s, HOST)
2760
2761 # `listener` runs in a thread. It sits in an accept() until
2762 # the main thread connects. Then it rudely closes the socket,
2763 # and sets Event `listener_gone` to let the main thread know
2764 # the socket is gone.
2765 def listener():
2766 s.listen()
2767 listener_ready.set()
2768 newsock, addr = s.accept()
2769 newsock.close()
2770 s.close()
2771 listener_gone.set()
2772
2773 def connector():
2774 listener_ready.wait()
2775 with socket.socket() as c:
2776 c.connect((HOST, port))
2777 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002778 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002779 ssl_sock = test_wrap_socket(c)
2780 except OSError:
2781 pass
2782 else:
2783 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002784
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002785 t = threading.Thread(target=listener)
2786 t.start()
2787 try:
2788 connector()
2789 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002790 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002791
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002792 def test_ssl_cert_verify_error(self):
2793 if support.verbose:
2794 sys.stdout.write("\n")
2795
2796 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2797 server_context.load_cert_chain(SIGNED_CERTFILE)
2798
2799 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2800
2801 server = ThreadedEchoServer(context=server_context, chatty=True)
2802 with server:
2803 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002804 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002805 try:
2806 s.connect((HOST, server.port))
2807 except ssl.SSLError as e:
2808 msg = 'unable to get local issuer certificate'
2809 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2810 self.assertEqual(e.verify_code, 20)
2811 self.assertEqual(e.verify_message, msg)
2812 self.assertIn(msg, repr(e))
2813 self.assertIn('certificate verify failed', repr(e))
2814
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002815 @skip_if_broken_ubuntu_ssl
2816 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2817 "OpenSSL is compiled without SSLv2 support")
2818 def test_protocol_sslv2(self):
2819 """Connecting to an SSLv2 server with various client options"""
2820 if support.verbose:
2821 sys.stdout.write("\n")
2822 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2823 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2824 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002825 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002826 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2827 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2828 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2829 # SSLv23 client with specific SSL options
2830 if no_sslv2_implies_sslv3_hello():
2831 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002832 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002833 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002834 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002836 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002837 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002838
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002839 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002840 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002841 """Connecting to an SSLv23 server with various client options"""
2842 if support.verbose:
2843 sys.stdout.write("\n")
2844 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002845 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002846 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002847 except OSError as x:
2848 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2849 if support.verbose:
2850 sys.stdout.write(
2851 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2852 % str(x))
2853 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002854 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2855 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2856 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002857
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002858 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002859 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2860 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2861 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002862
2863 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002864 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2865 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2866 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002867
2868 # Server with specific SSL options
2869 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002870 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002871 server_options=ssl.OP_NO_SSLv3)
2872 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002873 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002874 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002875 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002876 server_options=ssl.OP_NO_TLSv1)
2877
2878
2879 @skip_if_broken_ubuntu_ssl
2880 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2881 "OpenSSL is compiled without SSLv3 support")
2882 def test_protocol_sslv3(self):
2883 """Connecting to an SSLv3 server with various client options"""
2884 if support.verbose:
2885 sys.stdout.write("\n")
2886 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2887 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2888 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2889 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2890 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002891 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002892 client_options=ssl.OP_NO_SSLv3)
2893 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2894 if no_sslv2_implies_sslv3_hello():
2895 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002896 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002897 False, client_options=ssl.OP_NO_SSLv2)
2898
2899 @skip_if_broken_ubuntu_ssl
2900 def test_protocol_tlsv1(self):
2901 """Connecting to a TLSv1 server with various client options"""
2902 if support.verbose:
2903 sys.stdout.write("\n")
2904 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2905 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2906 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2907 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2908 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2909 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2910 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002911 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002912 client_options=ssl.OP_NO_TLSv1)
2913
2914 @skip_if_broken_ubuntu_ssl
2915 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2916 "TLS version 1.1 not supported.")
2917 def test_protocol_tlsv1_1(self):
2918 """Connecting to a TLSv1.1 server with various client options.
2919 Testing against older TLS versions."""
2920 if support.verbose:
2921 sys.stdout.write("\n")
2922 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2923 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2924 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2925 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2926 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002927 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002928 client_options=ssl.OP_NO_TLSv1_1)
2929
Christian Heimesa170fa12017-09-15 20:27:30 +02002930 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002931 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2932 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2933
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002934 @skip_if_broken_ubuntu_ssl
2935 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2936 "TLS version 1.2 not supported.")
2937 def test_protocol_tlsv1_2(self):
2938 """Connecting to a TLSv1.2 server with various client options.
2939 Testing against older TLS versions."""
2940 if support.verbose:
2941 sys.stdout.write("\n")
2942 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2943 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2944 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2945 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2946 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2947 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2948 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002949 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002950 client_options=ssl.OP_NO_TLSv1_2)
2951
Christian Heimesa170fa12017-09-15 20:27:30 +02002952 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002953 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2954 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2955 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2956 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2957
2958 def test_starttls(self):
2959 """Switching from clear text to encrypted and back again."""
2960 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2961
2962 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002963 starttls_server=True,
2964 chatty=True,
2965 connectionchatty=True)
2966 wrapped = False
2967 with server:
2968 s = socket.socket()
2969 s.setblocking(1)
2970 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002971 if support.verbose:
2972 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02002974 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002975 sys.stdout.write(
2976 " client: sending %r...\n" % indata)
2977 if wrapped:
2978 conn.write(indata)
2979 outdata = conn.read()
2980 else:
2981 s.send(indata)
2982 outdata = s.recv(1024)
2983 msg = outdata.strip().lower()
2984 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2985 # STARTTLS ok, switch to secure mode
2986 if support.verbose:
2987 sys.stdout.write(
2988 " client: read %r from server, starting TLS...\n"
2989 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02002990 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002991 wrapped = True
2992 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2993 # ENDTLS ok, switch back to clear text
2994 if support.verbose:
2995 sys.stdout.write(
2996 " client: read %r from server, ending TLS...\n"
2997 % msg)
2998 s = conn.unwrap()
2999 wrapped = False
3000 else:
3001 if support.verbose:
3002 sys.stdout.write(
3003 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003004 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003005 sys.stdout.write(" client: closing connection.\n")
3006 if wrapped:
3007 conn.write(b"over\n")
3008 else:
3009 s.send(b"over\n")
3010 if wrapped:
3011 conn.close()
3012 else:
3013 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003014
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003015 def test_socketserver(self):
3016 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003017 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003018 # try to connect
3019 if support.verbose:
3020 sys.stdout.write('\n')
3021 with open(CERTFILE, 'rb') as f:
3022 d1 = f.read()
3023 d2 = ''
3024 # now fetch the same data from the HTTPS server
3025 url = 'https://localhost:%d/%s' % (
3026 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003027 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003028 f = urllib.request.urlopen(url, context=context)
3029 try:
3030 dlen = f.info().get("content-length")
3031 if dlen and (int(dlen) > 0):
3032 d2 = f.read(int(dlen))
3033 if support.verbose:
3034 sys.stdout.write(
3035 " client: read %d bytes from remote server '%s'\n"
3036 % (len(d2), server))
3037 finally:
3038 f.close()
3039 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003040
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003041 def test_asyncore_server(self):
3042 """Check the example asyncore integration."""
3043 if support.verbose:
3044 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003045
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003046 indata = b"FOO\n"
3047 server = AsyncoreEchoServer(CERTFILE)
3048 with server:
3049 s = test_wrap_socket(socket.socket())
3050 s.connect(('127.0.0.1', server.port))
3051 if support.verbose:
3052 sys.stdout.write(
3053 " client: sending %r...\n" % indata)
3054 s.write(indata)
3055 outdata = s.read()
3056 if support.verbose:
3057 sys.stdout.write(" client: read %r\n" % outdata)
3058 if outdata != indata.lower():
3059 self.fail(
3060 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3061 % (outdata[:20], len(outdata),
3062 indata[:20].lower(), len(indata)))
3063 s.write(b"over\n")
3064 if support.verbose:
3065 sys.stdout.write(" client: closing connection.\n")
3066 s.close()
3067 if support.verbose:
3068 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003069
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003070 def test_recv_send(self):
3071 """Test recv(), send() and friends."""
3072 if support.verbose:
3073 sys.stdout.write("\n")
3074
3075 server = ThreadedEchoServer(CERTFILE,
3076 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003077 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003078 cacerts=CERTFILE,
3079 chatty=True,
3080 connectionchatty=False)
3081 with server:
3082 s = test_wrap_socket(socket.socket(),
3083 server_side=False,
3084 certfile=CERTFILE,
3085 ca_certs=CERTFILE,
3086 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003087 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003088 s.connect((HOST, server.port))
3089 # helper methods for standardising recv* method signatures
3090 def _recv_into():
3091 b = bytearray(b"\0"*100)
3092 count = s.recv_into(b)
3093 return b[:count]
3094
3095 def _recvfrom_into():
3096 b = bytearray(b"\0"*100)
3097 count, addr = s.recvfrom_into(b)
3098 return b[:count]
3099
3100 # (name, method, expect success?, *args, return value func)
3101 send_methods = [
3102 ('send', s.send, True, [], len),
3103 ('sendto', s.sendto, False, ["some.address"], len),
3104 ('sendall', s.sendall, True, [], lambda x: None),
3105 ]
3106 # (name, method, whether to expect success, *args)
3107 recv_methods = [
3108 ('recv', s.recv, True, []),
3109 ('recvfrom', s.recvfrom, False, ["some.address"]),
3110 ('recv_into', _recv_into, True, []),
3111 ('recvfrom_into', _recvfrom_into, False, []),
3112 ]
3113 data_prefix = "PREFIX_"
3114
3115 for (meth_name, send_meth, expect_success, args,
3116 ret_val_meth) in send_methods:
3117 indata = (data_prefix + meth_name).encode('ascii')
3118 try:
3119 ret = send_meth(indata, *args)
3120 msg = "sending with {}".format(meth_name)
3121 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3122 outdata = s.read()
3123 if outdata != indata.lower():
3124 self.fail(
3125 "While sending with <<{name:s}>> bad data "
3126 "<<{outdata:r}>> ({nout:d}) received; "
3127 "expected <<{indata:r}>> ({nin:d})\n".format(
3128 name=meth_name, outdata=outdata[:20],
3129 nout=len(outdata),
3130 indata=indata[:20], nin=len(indata)
3131 )
3132 )
3133 except ValueError as e:
3134 if expect_success:
3135 self.fail(
3136 "Failed to send with method <<{name:s}>>; "
3137 "expected to succeed.\n".format(name=meth_name)
3138 )
3139 if not str(e).startswith(meth_name):
3140 self.fail(
3141 "Method <<{name:s}>> failed with unexpected "
3142 "exception message: {exp:s}\n".format(
3143 name=meth_name, exp=e
3144 )
3145 )
3146
3147 for meth_name, recv_meth, expect_success, args in recv_methods:
3148 indata = (data_prefix + meth_name).encode('ascii')
3149 try:
3150 s.send(indata)
3151 outdata = recv_meth(*args)
3152 if outdata != indata.lower():
3153 self.fail(
3154 "While receiving with <<{name:s}>> bad data "
3155 "<<{outdata:r}>> ({nout:d}) received; "
3156 "expected <<{indata:r}>> ({nin:d})\n".format(
3157 name=meth_name, outdata=outdata[:20],
3158 nout=len(outdata),
3159 indata=indata[:20], nin=len(indata)
3160 )
3161 )
3162 except ValueError as e:
3163 if expect_success:
3164 self.fail(
3165 "Failed to receive with method <<{name:s}>>; "
3166 "expected to succeed.\n".format(name=meth_name)
3167 )
3168 if not str(e).startswith(meth_name):
3169 self.fail(
3170 "Method <<{name:s}>> failed with unexpected "
3171 "exception message: {exp:s}\n".format(
3172 name=meth_name, exp=e
3173 )
3174 )
3175 # consume data
3176 s.read()
3177
3178 # read(-1, buffer) is supported, even though read(-1) is not
3179 data = b"data"
3180 s.send(data)
3181 buffer = bytearray(len(data))
3182 self.assertEqual(s.read(-1, buffer), len(data))
3183 self.assertEqual(buffer, data)
3184
Christian Heimes888bbdc2017-09-07 14:18:21 -07003185 # sendall accepts bytes-like objects
3186 if ctypes is not None:
3187 ubyte = ctypes.c_ubyte * len(data)
3188 byteslike = ubyte.from_buffer_copy(data)
3189 s.sendall(byteslike)
3190 self.assertEqual(s.read(), data)
3191
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003192 # Make sure sendmsg et al are disallowed to avoid
3193 # inadvertent disclosure of data and/or corruption
3194 # of the encrypted data stream
3195 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3196 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3197 self.assertRaises(NotImplementedError,
3198 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003199 s.write(b"over\n")
3200
3201 self.assertRaises(ValueError, s.recv, -1)
3202 self.assertRaises(ValueError, s.read, -1)
3203
3204 s.close()
3205
3206 def test_recv_zero(self):
3207 server = ThreadedEchoServer(CERTFILE)
3208 server.__enter__()
3209 self.addCleanup(server.__exit__, None, None)
3210 s = socket.create_connection((HOST, server.port))
3211 self.addCleanup(s.close)
3212 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3213 self.addCleanup(s.close)
3214
3215 # recv/read(0) should return no data
3216 s.send(b"data")
3217 self.assertEqual(s.recv(0), b"")
3218 self.assertEqual(s.read(0), b"")
3219 self.assertEqual(s.read(), b"data")
3220
3221 # Should not block if the other end sends no data
3222 s.setblocking(False)
3223 self.assertEqual(s.recv(0), b"")
3224 self.assertEqual(s.recv_into(bytearray()), 0)
3225
3226 def test_nonblocking_send(self):
3227 server = ThreadedEchoServer(CERTFILE,
3228 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003229 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003230 cacerts=CERTFILE,
3231 chatty=True,
3232 connectionchatty=False)
3233 with server:
3234 s = test_wrap_socket(socket.socket(),
3235 server_side=False,
3236 certfile=CERTFILE,
3237 ca_certs=CERTFILE,
3238 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003239 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003240 s.connect((HOST, server.port))
3241 s.setblocking(False)
3242
3243 # If we keep sending data, at some point the buffers
3244 # will be full and the call will block
3245 buf = bytearray(8192)
3246 def fill_buffer():
3247 while True:
3248 s.send(buf)
3249 self.assertRaises((ssl.SSLWantWriteError,
3250 ssl.SSLWantReadError), fill_buffer)
3251
3252 # Now read all the output and discard it
3253 s.setblocking(True)
3254 s.close()
3255
3256 def test_handshake_timeout(self):
3257 # Issue #5103: SSL handshake must respect the socket timeout
3258 server = socket.socket(socket.AF_INET)
3259 host = "127.0.0.1"
3260 port = support.bind_port(server)
3261 started = threading.Event()
3262 finish = False
3263
3264 def serve():
3265 server.listen()
3266 started.set()
3267 conns = []
3268 while not finish:
3269 r, w, e = select.select([server], [], [], 0.1)
3270 if server in r:
3271 # Let the socket hang around rather than having
3272 # it closed by garbage collection.
3273 conns.append(server.accept()[0])
3274 for sock in conns:
3275 sock.close()
3276
3277 t = threading.Thread(target=serve)
3278 t.start()
3279 started.wait()
3280
3281 try:
3282 try:
3283 c = socket.socket(socket.AF_INET)
3284 c.settimeout(0.2)
3285 c.connect((host, port))
3286 # Will attempt handshake and time out
3287 self.assertRaisesRegex(socket.timeout, "timed out",
3288 test_wrap_socket, c)
3289 finally:
3290 c.close()
3291 try:
3292 c = socket.socket(socket.AF_INET)
3293 c = test_wrap_socket(c)
3294 c.settimeout(0.2)
3295 # Will attempt handshake and time out
3296 self.assertRaisesRegex(socket.timeout, "timed out",
3297 c.connect, (host, port))
3298 finally:
3299 c.close()
3300 finally:
3301 finish = True
3302 t.join()
3303 server.close()
3304
3305 def test_server_accept(self):
3306 # Issue #16357: accept() on a SSLSocket created through
3307 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003308 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003309 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003310 context.load_verify_locations(SIGNING_CA)
3311 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003312 server = socket.socket(socket.AF_INET)
3313 host = "127.0.0.1"
3314 port = support.bind_port(server)
3315 server = context.wrap_socket(server, server_side=True)
3316 self.assertTrue(server.server_side)
3317
3318 evt = threading.Event()
3319 remote = None
3320 peer = None
3321 def serve():
3322 nonlocal remote, peer
3323 server.listen()
3324 # Block on the accept and wait on the connection to close.
3325 evt.set()
3326 remote, peer = server.accept()
3327 remote.recv(1)
3328
3329 t = threading.Thread(target=serve)
3330 t.start()
3331 # Client wait until server setup and perform a connect.
3332 evt.wait()
3333 client = context.wrap_socket(socket.socket())
3334 client.connect((host, port))
3335 client_addr = client.getsockname()
3336 client.close()
3337 t.join()
3338 remote.close()
3339 server.close()
3340 # Sanity checks.
3341 self.assertIsInstance(remote, ssl.SSLSocket)
3342 self.assertEqual(peer, client_addr)
3343
3344 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003345 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003346 with context.wrap_socket(socket.socket()) as sock:
3347 with self.assertRaises(OSError) as cm:
3348 sock.getpeercert()
3349 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3350
3351 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003352 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003353 with context.wrap_socket(socket.socket()) as sock:
3354 with self.assertRaises(OSError) as cm:
3355 sock.do_handshake()
3356 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3357
3358 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003359 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003360 try:
3361 # Force a set of weak ciphers on our client context
3362 context.set_ciphers("DES")
3363 except ssl.SSLError:
3364 self.skipTest("no DES cipher available")
3365 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003366 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003367 chatty=False) as server:
3368 with context.wrap_socket(socket.socket()) as s:
3369 with self.assertRaises(OSError):
3370 s.connect((HOST, server.port))
3371 self.assertIn("no shared cipher", server.conn_errors[0])
3372
3373 def test_version_basic(self):
3374 """
3375 Basic tests for SSLSocket.version().
3376 More tests are done in the test_protocol_*() methods.
3377 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003378 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3379 context.check_hostname = False
3380 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003381 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003382 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003383 chatty=False) as server:
3384 with context.wrap_socket(socket.socket()) as s:
3385 self.assertIs(s.version(), None)
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003386 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003387 s.connect((HOST, server.port))
Christian Heimesa170fa12017-09-15 20:27:30 +02003388 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3389 self.assertEqual(s.version(), 'TLSv1.2')
3390 else: # 0.9.8 to 1.0.1
3391 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Miss Islington (bot)8fa84782018-02-24 12:51:56 -08003392 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003393 self.assertIs(s.version(), None)
3394
Christian Heimescb5b68a2017-09-07 18:07:00 -07003395 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3396 "test requires TLSv1.3 enabled OpenSSL")
3397 def test_tls1_3(self):
3398 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3399 context.load_cert_chain(CERTFILE)
3400 # disable all but TLS 1.3
3401 context.options |= (
3402 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3403 )
3404 with ThreadedEchoServer(context=context) as server:
3405 with context.wrap_socket(socket.socket()) as s:
3406 s.connect((HOST, server.port))
3407 self.assertIn(s.cipher()[0], [
3408 'TLS13-AES-256-GCM-SHA384',
3409 'TLS13-CHACHA20-POLY1305-SHA256',
3410 'TLS13-AES-128-GCM-SHA256',
3411 ])
3412
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003413 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3414 def test_default_ecdh_curve(self):
3415 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3416 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003417 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003418 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003419 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3420 # cipher name.
3421 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003422 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3423 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3424 # our default cipher list should prefer ECDH-based ciphers
3425 # automatically.
3426 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3427 context.set_ciphers("ECCdraft:ECDH")
3428 with ThreadedEchoServer(context=context) as server:
3429 with context.wrap_socket(socket.socket()) as s:
3430 s.connect((HOST, server.port))
3431 self.assertIn("ECDH", s.cipher()[0])
3432
3433 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3434 "'tls-unique' channel binding not available")
3435 def test_tls_unique_channel_binding(self):
3436 """Test tls-unique channel binding."""
3437 if support.verbose:
3438 sys.stdout.write("\n")
3439
3440 server = ThreadedEchoServer(CERTFILE,
3441 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003442 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003443 cacerts=CERTFILE,
3444 chatty=True,
3445 connectionchatty=False)
3446 with server:
3447 s = test_wrap_socket(socket.socket(),
3448 server_side=False,
3449 certfile=CERTFILE,
3450 ca_certs=CERTFILE,
3451 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003452 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003453 s.connect((HOST, server.port))
3454 # get the data
3455 cb_data = s.get_channel_binding("tls-unique")
3456 if support.verbose:
3457 sys.stdout.write(" got channel binding data: {0!r}\n"
3458 .format(cb_data))
3459
3460 # check if it is sane
3461 self.assertIsNotNone(cb_data)
3462 self.assertEqual(len(cb_data), 12) # True for TLSv1
3463
3464 # and compare with the peers version
3465 s.write(b"CB tls-unique\n")
3466 peer_data_repr = s.read().strip()
3467 self.assertEqual(peer_data_repr,
3468 repr(cb_data).encode("us-ascii"))
3469 s.close()
3470
3471 # now, again
3472 s = test_wrap_socket(socket.socket(),
3473 server_side=False,
3474 certfile=CERTFILE,
3475 ca_certs=CERTFILE,
3476 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003477 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003478 s.connect((HOST, server.port))
3479 new_cb_data = s.get_channel_binding("tls-unique")
3480 if support.verbose:
3481 sys.stdout.write(" got another channel binding data: {0!r}\n"
3482 .format(new_cb_data))
3483 # is it really unique
3484 self.assertNotEqual(cb_data, new_cb_data)
3485 self.assertIsNotNone(cb_data)
3486 self.assertEqual(len(cb_data), 12) # True for TLSv1
3487 s.write(b"CB tls-unique\n")
3488 peer_data_repr = s.read().strip()
3489 self.assertEqual(peer_data_repr,
3490 repr(new_cb_data).encode("us-ascii"))
3491 s.close()
3492
3493 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003494 client_context, server_context, hostname = testing_context()
3495 stats = server_params_test(client_context, server_context,
3496 chatty=True, connectionchatty=True,
3497 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003498 if support.verbose:
3499 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3500 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3501
3502 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3503 "ssl.OP_NO_COMPRESSION needed for this test")
3504 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003505 client_context, server_context, hostname = testing_context()
3506 client_context.options |= ssl.OP_NO_COMPRESSION
3507 server_context.options |= ssl.OP_NO_COMPRESSION
3508 stats = server_params_test(client_context, server_context,
3509 chatty=True, connectionchatty=True,
3510 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003511 self.assertIs(stats['compression'], None)
3512
3513 def test_dh_params(self):
3514 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003515 client_context, server_context, hostname = testing_context()
3516 server_context.load_dh_params(DHFILE)
3517 server_context.set_ciphers("kEDH")
3518 stats = server_params_test(client_context, server_context,
3519 chatty=True, connectionchatty=True,
3520 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003521 cipher = stats["cipher"][0]
3522 parts = cipher.split("-")
3523 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3524 self.fail("Non-DH cipher: " + cipher[0])
3525
3526 def test_selected_alpn_protocol(self):
3527 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003528 client_context, server_context, hostname = testing_context()
3529 stats = server_params_test(client_context, server_context,
3530 chatty=True, connectionchatty=True,
3531 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003532 self.assertIs(stats['client_alpn_protocol'], None)
3533
3534 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3535 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3536 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003537 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003538 server_context.set_alpn_protocols(['foo', 'bar'])
3539 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003540 chatty=True, connectionchatty=True,
3541 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003542 self.assertIs(stats['client_alpn_protocol'], None)
3543
3544 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3545 def test_alpn_protocols(self):
3546 server_protocols = ['foo', 'bar', 'milkshake']
3547 protocol_tests = [
3548 (['foo', 'bar'], 'foo'),
3549 (['bar', 'foo'], 'foo'),
3550 (['milkshake'], 'milkshake'),
3551 (['http/3.0', 'http/4.0'], None)
3552 ]
3553 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003554 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003555 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003556 client_context.set_alpn_protocols(client_protocols)
3557
3558 try:
3559 stats = server_params_test(client_context,
3560 server_context,
3561 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003562 connectionchatty=True,
3563 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003564 except ssl.SSLError as e:
3565 stats = e
3566
3567 if (expected is None and IS_OPENSSL_1_1
3568 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3569 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3570 self.assertIsInstance(stats, ssl.SSLError)
3571 else:
3572 msg = "failed trying %s (s) and %s (c).\n" \
3573 "was expecting %s, but got %%s from the %%s" \
3574 % (str(server_protocols), str(client_protocols),
3575 str(expected))
3576 client_result = stats['client_alpn_protocol']
3577 self.assertEqual(client_result, expected,
3578 msg % (client_result, "client"))
3579 server_result = stats['server_alpn_protocols'][-1] \
3580 if len(stats['server_alpn_protocols']) else 'nothing'
3581 self.assertEqual(server_result, expected,
3582 msg % (server_result, "server"))
3583
3584 def test_selected_npn_protocol(self):
3585 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003586 client_context, server_context, hostname = testing_context()
3587 stats = server_params_test(client_context, server_context,
3588 chatty=True, connectionchatty=True,
3589 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003590 self.assertIs(stats['client_npn_protocol'], None)
3591
3592 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3593 def test_npn_protocols(self):
3594 server_protocols = ['http/1.1', 'spdy/2']
3595 protocol_tests = [
3596 (['http/1.1', 'spdy/2'], 'http/1.1'),
3597 (['spdy/2', 'http/1.1'], 'http/1.1'),
3598 (['spdy/2', 'test'], 'spdy/2'),
3599 (['abc', 'def'], 'abc')
3600 ]
3601 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003602 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003603 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003604 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003605 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003606 chatty=True, connectionchatty=True,
3607 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003608 msg = "failed trying %s (s) and %s (c).\n" \
3609 "was expecting %s, but got %%s from the %%s" \
3610 % (str(server_protocols), str(client_protocols),
3611 str(expected))
3612 client_result = stats['client_npn_protocol']
3613 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3614 server_result = stats['server_npn_protocols'][-1] \
3615 if len(stats['server_npn_protocols']) else 'nothing'
3616 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3617
3618 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003619 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003620 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003621 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003622 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003623 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003624 client_context.load_verify_locations(SIGNING_CA)
3625 return server_context, other_context, client_context
3626
3627 def check_common_name(self, stats, name):
3628 cert = stats['peercert']
3629 self.assertIn((('commonName', name),), cert['subject'])
3630
3631 @needs_sni
3632 def test_sni_callback(self):
3633 calls = []
3634 server_context, other_context, client_context = self.sni_contexts()
3635
Christian Heimesa170fa12017-09-15 20:27:30 +02003636 client_context.check_hostname = False
3637
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003638 def servername_cb(ssl_sock, server_name, initial_context):
3639 calls.append((server_name, initial_context))
3640 if server_name is not None:
3641 ssl_sock.context = other_context
3642 server_context.set_servername_callback(servername_cb)
3643
3644 stats = server_params_test(client_context, server_context,
3645 chatty=True,
3646 sni_name='supermessage')
3647 # The hostname was fetched properly, and the certificate was
3648 # changed for the connection.
3649 self.assertEqual(calls, [("supermessage", server_context)])
3650 # CERTFILE4 was selected
3651 self.check_common_name(stats, 'fakehostname')
3652
3653 calls = []
3654 # The callback is called with server_name=None
3655 stats = server_params_test(client_context, server_context,
3656 chatty=True,
3657 sni_name=None)
3658 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003659 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003660
3661 # Check disabling the callback
3662 calls = []
3663 server_context.set_servername_callback(None)
3664
3665 stats = server_params_test(client_context, server_context,
3666 chatty=True,
3667 sni_name='notfunny')
3668 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003669 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003670 self.assertEqual(calls, [])
3671
3672 @needs_sni
3673 def test_sni_callback_alert(self):
3674 # Returning a TLS alert is reflected to the connecting client
3675 server_context, other_context, client_context = self.sni_contexts()
3676
3677 def cb_returning_alert(ssl_sock, server_name, initial_context):
3678 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3679 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003680 with self.assertRaises(ssl.SSLError) as cm:
3681 stats = server_params_test(client_context, server_context,
3682 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003683 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003684 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003685
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003686 @needs_sni
3687 def test_sni_callback_raising(self):
3688 # Raising fails the connection with a TLS handshake failure alert.
3689 server_context, other_context, client_context = self.sni_contexts()
3690
3691 def cb_raising(ssl_sock, server_name, initial_context):
3692 1/0
3693 server_context.set_servername_callback(cb_raising)
3694
3695 with self.assertRaises(ssl.SSLError) as cm, \
3696 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003697 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003698 chatty=False,
3699 sni_name='supermessage')
3700 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3701 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003702
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003703 @needs_sni
3704 def test_sni_callback_wrong_return_type(self):
3705 # Returning the wrong return type terminates the TLS connection
3706 # with an internal error alert.
3707 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003708
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003709 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3710 return "foo"
3711 server_context.set_servername_callback(cb_wrong_return_type)
3712
3713 with self.assertRaises(ssl.SSLError) as cm, \
3714 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003715 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003716 chatty=False,
3717 sni_name='supermessage')
3718 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3719 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003720
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003721 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003722 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003723 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3724 client_context.set_ciphers("AES128:AES256")
3725 server_context.set_ciphers("AES256")
3726 alg1 = "AES256"
3727 alg2 = "AES-256"
3728 else:
3729 client_context.set_ciphers("AES:3DES")
3730 server_context.set_ciphers("3DES")
3731 alg1 = "3DES"
3732 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003733
Christian Heimesa170fa12017-09-15 20:27:30 +02003734 stats = server_params_test(client_context, server_context,
3735 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003736 ciphers = stats['server_shared_ciphers'][0]
3737 self.assertGreater(len(ciphers), 0)
3738 for name, tls_version, bits in ciphers:
3739 if not alg1 in name.split("-") and alg2 not in name:
3740 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003741
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003742 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003743 client_context, server_context, hostname = testing_context()
3744 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003745
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003746 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003747 s = client_context.wrap_socket(socket.socket(),
3748 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003749 s.connect((HOST, server.port))
3750 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003751
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003752 self.assertRaises(ValueError, s.read, 1024)
3753 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003754
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003755 def test_sendfile(self):
3756 TEST_DATA = b"x" * 512
3757 with open(support.TESTFN, 'wb') as f:
3758 f.write(TEST_DATA)
3759 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003760 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003761 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003762 context.load_verify_locations(SIGNING_CA)
3763 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003764 server = ThreadedEchoServer(context=context, chatty=False)
3765 with server:
3766 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003767 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003768 with open(support.TESTFN, 'rb') as file:
3769 s.sendfile(file)
3770 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003771
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003772 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003773 client_context, server_context, hostname = testing_context()
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003774
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003775 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003776 stats = server_params_test(client_context, server_context,
3777 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003778 session = stats['session']
3779 self.assertTrue(session.id)
3780 self.assertGreater(session.time, 0)
3781 self.assertGreater(session.timeout, 0)
3782 self.assertTrue(session.has_ticket)
3783 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3784 self.assertGreater(session.ticket_lifetime_hint, 0)
3785 self.assertFalse(stats['session_reused'])
3786 sess_stat = server_context.session_stats()
3787 self.assertEqual(sess_stat['accept'], 1)
3788 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003789
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003790 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003791 stats = server_params_test(client_context, server_context,
3792 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003793 sess_stat = server_context.session_stats()
3794 self.assertEqual(sess_stat['accept'], 2)
3795 self.assertEqual(sess_stat['hits'], 1)
3796 self.assertTrue(stats['session_reused'])
3797 session2 = stats['session']
3798 self.assertEqual(session2.id, session.id)
3799 self.assertEqual(session2, session)
3800 self.assertIsNot(session2, session)
3801 self.assertGreaterEqual(session2.time, session.time)
3802 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003803
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003804 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003805 stats = server_params_test(client_context, server_context,
3806 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003807 self.assertFalse(stats['session_reused'])
3808 session3 = stats['session']
3809 self.assertNotEqual(session3.id, session.id)
3810 self.assertNotEqual(session3, session)
3811 sess_stat = server_context.session_stats()
3812 self.assertEqual(sess_stat['accept'], 3)
3813 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003814
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003815 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02003816 stats = server_params_test(client_context, server_context,
3817 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003818 self.assertTrue(stats['session_reused'])
3819 session4 = stats['session']
3820 self.assertEqual(session4.id, session.id)
3821 self.assertEqual(session4, session)
3822 self.assertGreaterEqual(session4.time, session.time)
3823 self.assertGreaterEqual(session4.timeout, session.timeout)
3824 sess_stat = server_context.session_stats()
3825 self.assertEqual(sess_stat['accept'], 4)
3826 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02003827
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003828 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003829 client_context, server_context, hostname = testing_context()
3830 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02003831
Christian Heimescb5b68a2017-09-07 18:07:00 -07003832 # TODO: session reuse does not work with TLS 1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02003833 client_context.options |= ssl.OP_NO_TLSv1_3
3834 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07003835
Christian Heimesa170fa12017-09-15 20:27:30 +02003836 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003837 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003838 with client_context.wrap_socket(socket.socket(),
3839 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003840 # session is None before handshake
3841 self.assertEqual(s.session, None)
3842 self.assertEqual(s.session_reused, None)
3843 s.connect((HOST, server.port))
3844 session = s.session
3845 self.assertTrue(session)
3846 with self.assertRaises(TypeError) as e:
3847 s.session = object
3848 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02003849
Christian Heimesa170fa12017-09-15 20:27:30 +02003850 with client_context.wrap_socket(socket.socket(),
3851 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003852 s.connect((HOST, server.port))
3853 # cannot set session after handshake
3854 with self.assertRaises(ValueError) as e:
3855 s.session = session
3856 self.assertEqual(str(e.exception),
3857 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02003858
Christian Heimesa170fa12017-09-15 20:27:30 +02003859 with client_context.wrap_socket(socket.socket(),
3860 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003861 # can set session before handshake and before the
3862 # connection was established
3863 s.session = session
3864 s.connect((HOST, server.port))
3865 self.assertEqual(s.session.id, session.id)
3866 self.assertEqual(s.session, session)
3867 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02003868
Christian Heimesa170fa12017-09-15 20:27:30 +02003869 with client_context2.wrap_socket(socket.socket(),
3870 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003871 # cannot re-use session with a different SSLContext
3872 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02003873 s.session = session
3874 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003875 self.assertEqual(str(e.exception),
3876 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02003877
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003878
Thomas Woutersed03b412007-08-28 21:37:11 +00003879def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003880 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003881 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003882 plats = {
3883 'Linux': platform.linux_distribution,
3884 'Mac': platform.mac_ver,
3885 'Windows': platform.win32_ver,
3886 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003887 with warnings.catch_warnings():
3888 warnings.filterwarnings(
3889 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003890 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003891 'functions are deprecated .*',
3892 PendingDeprecationWarning,
3893 )
3894 for name, func in plats.items():
3895 plat = func()
3896 if plat and plat[0]:
3897 plat = '%s %r' % (name, plat)
3898 break
3899 else:
3900 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003901 print("test_ssl: testing with %r %r" %
3902 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3903 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003904 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003905 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3906 try:
3907 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3908 except AttributeError:
3909 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003910
Antoine Pitrou152efa22010-05-16 18:19:27 +00003911 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003912 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003913 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003914 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003915 BADCERT, BADKEY, EMPTYCERT]:
3916 if not os.path.exists(filename):
3917 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003918
Martin Panter3840b2a2016-03-27 01:53:46 +00003919 tests = [
3920 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003921 SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00003922 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003923
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003924 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003925 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003926
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00003928 try:
3929 support.run_unittest(*tests)
3930 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003931 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003932
3933if __name__ == "__main__":
3934 test_main()