blob: 7aa112335cdbc2ac950d89af8976acb0e2443b3e [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
33IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Christian Heimesefff7062013-11-21 03:35:02 +010036def data_file(*name):
37 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Antoine Pitrou81564092010-10-08 23:06:24 +000039# The custom key and certificate files used in test_ssl are generated
40# using Lib/test/make_ssl_certs.py.
41# Other certificates are simply fetched from the Internet servers they
42# are meant to authenticate.
43
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000046ONLYCERT = data_file("ssl_cert.pem")
47ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000048BYTES_ONLYCERT = os.fsencode(ONLYCERT)
49BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020050CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
51ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
52KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000053CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000054BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010055CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
56CAFILE_CACERT = data_file("capath", "5ed36f99.0")
57
Christian Heimesbd5c7d22018-01-20 15:16:30 +010058CERTFILE_INFO = {
59 'issuer': ((('countryName', 'XY'),),
60 (('localityName', 'Castle Anthrax'),),
61 (('organizationName', 'Python Software Foundation'),),
62 (('commonName', 'localhost'),)),
63 'notAfter': 'Jan 17 19:09:06 2028 GMT',
64 'notBefore': 'Jan 19 19:09:06 2018 GMT',
65 'serialNumber': 'F9BA076D5B6ABD9B',
66 'subject': ((('countryName', 'XY'),),
67 (('localityName', 'Castle Anthrax'),),
68 (('organizationName', 'Python Software Foundation'),),
69 (('commonName', 'localhost'),)),
70 'subjectAltName': (('DNS', 'localhost'),),
71 'version': 3
72}
Antoine Pitrou152efa22010-05-16 18:19:27 +000073
Christian Heimes22587792013-11-21 23:56:13 +010074# empty CRL
75CRLFILE = data_file("revocation.crl")
76
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010077# Two keys and certs signed by the same CA (for SNI tests)
78SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020079SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010080
81SIGNED_CERTFILE_INFO = {
82 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
83 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
84 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
85 'issuer': ((('countryName', 'XY'),),
86 (('organizationName', 'Python Software Foundation CA'),),
87 (('commonName', 'our-ca-server'),)),
88 'notAfter': 'Nov 28 19:09:06 2027 GMT',
89 'notBefore': 'Jan 19 19:09:06 2018 GMT',
90 'serialNumber': '82EDBF41C880919C',
91 'subject': ((('countryName', 'XY'),),
92 (('localityName', 'Castle Anthrax'),),
93 (('organizationName', 'Python Software Foundation'),),
94 (('commonName', 'localhost'),)),
95 'subjectAltName': (('DNS', 'localhost'),),
96 'version': 3
97}
98
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010099SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
102SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
103
Martin Panter3840b2a2016-03-27 01:53:46 +0000104# Same certificate as pycacert.pem, but without extra text in file
105SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200106# cert with all kinds of subject alt names
107ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100108IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100109
Martin Panter3d81d932016-01-14 09:36:00 +0000110REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000111
112EMPTYCERT = data_file("nullcert.pem")
113BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000114NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000115BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200116NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200117NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000118
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400119DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100120BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000121
Christian Heimes358cfd42016-09-10 22:43:48 +0200122# Not defined in all versions of OpenSSL
123OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
124OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
125OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
126OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
127
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100128
Thomas Woutersed03b412007-08-28 21:37:11 +0000129def handle_error(prefix):
130 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000131 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000132 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000133
Antoine Pitroub5218772010-05-21 09:56:06 +0000134def can_clear_options():
135 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200136 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000137
138def no_sslv2_implies_sslv3_hello():
139 # 0.9.7h or higher
140 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
141
Christian Heimes2427b502013-11-23 11:24:32 +0100142def have_verify_flags():
143 # 0.9.8 or higher
144 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
145
Antoine Pitrouc695c952014-04-28 20:57:36 +0200146def utc_offset(): #NOTE: ignore issues like #1647654
147 # local time = utc time + utc offset
148 if time.daylight and time.localtime().tm_isdst > 0:
149 return -time.altzone # seconds
150 return -time.timezone
151
Christian Heimes9424bb42013-06-17 15:32:57 +0200152def asn1time(cert_time):
153 # Some versions of OpenSSL ignore seconds, see #18207
154 # 0.9.8.i
155 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
156 fmt = "%b %d %H:%M:%S %Y GMT"
157 dt = datetime.datetime.strptime(cert_time, fmt)
158 dt = dt.replace(second=0)
159 cert_time = dt.strftime(fmt)
160 # %d adds leading zero but ASN1_TIME_print() uses leading space
161 if cert_time[4] == "0":
162 cert_time = cert_time[:4] + " " + cert_time[5:]
163
164 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000165
Antoine Pitrou23df4832010-08-04 17:14:06 +0000166# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
167def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200168 if hasattr(ssl, 'PROTOCOL_SSLv2'):
169 @functools.wraps(func)
170 def f(*args, **kwargs):
171 try:
172 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
173 except ssl.SSLError:
174 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
175 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
176 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
177 return func(*args, **kwargs)
178 return f
179 else:
180 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000181
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100182needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184
Christian Heimesd0486372016-09-10 23:23:33 +0200185def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
186 cert_reqs=ssl.CERT_NONE, ca_certs=None,
187 ciphers=None, certfile=None, keyfile=None,
188 **kwargs):
189 context = ssl.SSLContext(ssl_version)
190 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200191 if cert_reqs == ssl.CERT_NONE:
192 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200193 context.verify_mode = cert_reqs
194 if ca_certs is not None:
195 context.load_verify_locations(ca_certs)
196 if certfile is not None or keyfile is not None:
197 context.load_cert_chain(certfile, keyfile)
198 if ciphers is not None:
199 context.set_ciphers(ciphers)
200 return context.wrap_socket(sock, **kwargs)
201
Christian Heimesa170fa12017-09-15 20:27:30 +0200202
203def testing_context(server_cert=SIGNED_CERTFILE):
204 """Create context
205
206 client_context, server_context, hostname = testing_context()
207 """
208 if server_cert == SIGNED_CERTFILE:
209 hostname = SIGNED_CERTFILE_HOSTNAME
210 elif server_cert == SIGNED_CERTFILE2:
211 hostname = SIGNED_CERTFILE2_HOSTNAME
212 else:
213 raise ValueError(server_cert)
214
215 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
216 client_context.load_verify_locations(SIGNING_CA)
217
218 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
219 server_context.load_cert_chain(server_cert)
220
221 return client_context, server_context, hostname
222
223
Antoine Pitrou152efa22010-05-16 18:19:27 +0000224class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000225
Antoine Pitrou480a1242010-04-28 21:37:09 +0000226 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000227 ssl.CERT_NONE
228 ssl.CERT_OPTIONAL
229 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100230 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100231 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100232 if ssl.HAS_ECDH:
233 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100234 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
235 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000236 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100237 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700238 ssl.OP_NO_SSLv2
239 ssl.OP_NO_SSLv3
240 ssl.OP_NO_TLSv1
241 ssl.OP_NO_TLSv1_3
242 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
243 ssl.OP_NO_TLSv1_1
244 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200245 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000246
Antoine Pitrou172f0252014-04-18 20:33:08 +0200247 def test_str_for_enums(self):
248 # Make sure that the PROTOCOL_* constants have enum-like string
249 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200250 proto = ssl.PROTOCOL_TLS
251 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200252 ctx = ssl.SSLContext(proto)
253 self.assertIs(ctx.protocol, proto)
254
Antoine Pitrou480a1242010-04-28 21:37:09 +0000255 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000256 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000257 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000258 sys.stdout.write("\n RAND_status is %d (%s)\n"
259 % (v, (v and "sufficient randomness") or
260 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200261
262 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
263 self.assertEqual(len(data), 16)
264 self.assertEqual(is_cryptographic, v == 1)
265 if v:
266 data = ssl.RAND_bytes(16)
267 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200268 else:
269 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200270
Victor Stinner1e81a392013-12-19 16:47:04 +0100271 # negative num is invalid
272 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
273 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
274
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100275 if hasattr(ssl, 'RAND_egd'):
276 self.assertRaises(TypeError, ssl.RAND_egd, 1)
277 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200279 ssl.RAND_add(b"this is a random bytes object", 75.0)
280 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000281
Christian Heimesf77b4b22013-08-21 13:26:05 +0200282 @unittest.skipUnless(os.name == 'posix', 'requires posix')
283 def test_random_fork(self):
284 status = ssl.RAND_status()
285 if not status:
286 self.fail("OpenSSL's PRNG has insufficient randomness")
287
288 rfd, wfd = os.pipe()
289 pid = os.fork()
290 if pid == 0:
291 try:
292 os.close(rfd)
293 child_random = ssl.RAND_pseudo_bytes(16)[0]
294 self.assertEqual(len(child_random), 16)
295 os.write(wfd, child_random)
296 os.close(wfd)
297 except BaseException:
298 os._exit(1)
299 else:
300 os._exit(0)
301 else:
302 os.close(wfd)
303 self.addCleanup(os.close, rfd)
304 _, status = os.waitpid(pid, 0)
305 self.assertEqual(status, 0)
306
307 child_random = os.read(rfd, 16)
308 self.assertEqual(len(child_random), 16)
309 parent_random = ssl.RAND_pseudo_bytes(16)[0]
310 self.assertEqual(len(parent_random), 16)
311
312 self.assertNotEqual(child_random, parent_random)
313
Antoine Pitrou480a1242010-04-28 21:37:09 +0000314 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000315 # note that this uses an 'unofficial' function in _ssl.c,
316 # provided solely for this test, to exercise the certificate
317 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100318 self.assertEqual(
319 ssl._ssl._test_decode_cert(CERTFILE),
320 CERTFILE_INFO
321 )
322 self.assertEqual(
323 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
324 SIGNED_CERTFILE_INFO
325 )
326
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200327 # Issue #13034: the subjectAltName in some certificates
328 # (notably projects.developer.nokia.com:443) wasn't parsed
329 p = ssl._ssl._test_decode_cert(NOKIACERT)
330 if support.verbose:
331 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
332 self.assertEqual(p['subjectAltName'],
333 (('DNS', 'projects.developer.nokia.com'),
334 ('DNS', 'projects.forum.nokia.com'))
335 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100336 # extra OCSP and AIA fields
337 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
338 self.assertEqual(p['caIssuers'],
339 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
340 self.assertEqual(p['crlDistributionPoints'],
341 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000342
Christian Heimes824f7f32013-08-17 00:54:47 +0200343 def test_parse_cert_CVE_2013_4238(self):
344 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
345 if support.verbose:
346 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
347 subject = ((('countryName', 'US'),),
348 (('stateOrProvinceName', 'Oregon'),),
349 (('localityName', 'Beaverton'),),
350 (('organizationName', 'Python Software Foundation'),),
351 (('organizationalUnitName', 'Python Core Development'),),
352 (('commonName', 'null.python.org\x00example.org'),),
353 (('emailAddress', 'python-dev@python.org'),))
354 self.assertEqual(p['subject'], subject)
355 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200356 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
357 san = (('DNS', 'altnull.python.org\x00example.com'),
358 ('email', 'null@python.org\x00user@example.org'),
359 ('URI', 'http://null.python.org\x00http://example.org'),
360 ('IP Address', '192.0.2.1'),
361 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
362 else:
363 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
364 san = (('DNS', 'altnull.python.org\x00example.com'),
365 ('email', 'null@python.org\x00user@example.org'),
366 ('URI', 'http://null.python.org\x00http://example.org'),
367 ('IP Address', '192.0.2.1'),
368 ('IP Address', '<invalid>'))
369
370 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200371
Christian Heimes1c03abd2016-09-06 23:25:35 +0200372 def test_parse_all_sans(self):
373 p = ssl._ssl._test_decode_cert(ALLSANFILE)
374 self.assertEqual(p['subjectAltName'],
375 (
376 ('DNS', 'allsans'),
377 ('othername', '<unsupported>'),
378 ('othername', '<unsupported>'),
379 ('email', 'user@example.org'),
380 ('DNS', 'www.example.org'),
381 ('DirName',
382 ((('countryName', 'XY'),),
383 (('localityName', 'Castle Anthrax'),),
384 (('organizationName', 'Python Software Foundation'),),
385 (('commonName', 'dirname example'),))),
386 ('URI', 'https://www.python.org/'),
387 ('IP Address', '127.0.0.1'),
388 ('IP Address', '0:0:0:0:0:0:0:1\n'),
389 ('Registered ID', '1.2.3.4.5')
390 )
391 )
392
Antoine Pitrou480a1242010-04-28 21:37:09 +0000393 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000394 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000395 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000396 d1 = ssl.PEM_cert_to_DER_cert(pem)
397 p2 = ssl.DER_cert_to_PEM_cert(d1)
398 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000399 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000400 if not p2.startswith(ssl.PEM_HEADER + '\n'):
401 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
402 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
403 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000404
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000405 def test_openssl_version(self):
406 n = ssl.OPENSSL_VERSION_NUMBER
407 t = ssl.OPENSSL_VERSION_INFO
408 s = ssl.OPENSSL_VERSION
409 self.assertIsInstance(n, int)
410 self.assertIsInstance(t, tuple)
411 self.assertIsInstance(s, str)
412 # Some sanity checks follow
413 # >= 0.9
414 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400415 # < 3.0
416 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000417 major, minor, fix, patch, status = t
418 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400419 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000420 self.assertGreaterEqual(minor, 0)
421 self.assertLess(minor, 256)
422 self.assertGreaterEqual(fix, 0)
423 self.assertLess(fix, 256)
424 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100425 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000426 self.assertGreaterEqual(status, 0)
427 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400428 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200429 if IS_LIBRESSL:
430 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100431 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400432 else:
433 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100434 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000435
Antoine Pitrou9d543662010-04-23 23:10:32 +0000436 @support.cpython_only
437 def test_refcycle(self):
438 # Issue #7943: an SSL object doesn't create reference cycles with
439 # itself.
440 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200441 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000442 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100443 with support.check_warnings(("", ResourceWarning)):
444 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100445 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000446
Antoine Pitroua468adc2010-09-14 14:43:44 +0000447 def test_wrapped_unconnected(self):
448 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200449 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000450 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200451 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100452 self.assertRaises(OSError, ss.recv, 1)
453 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
454 self.assertRaises(OSError, ss.recvfrom, 1)
455 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
456 self.assertRaises(OSError, ss.send, b'x')
457 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000458
Antoine Pitrou40f08742010-04-24 22:04:40 +0000459 def test_timeout(self):
460 # Issue #8524: when creating an SSL socket, the timeout of the
461 # original socket should be retained.
462 for timeout in (None, 0.0, 5.0):
463 s = socket.socket(socket.AF_INET)
464 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200465 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100466 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000467
Christian Heimesd0486372016-09-10 23:23:33 +0200468 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000469 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000470 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000471 "certfile must be specified",
472 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000473 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000474 "certfile must be specified for server-side operations",
475 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000476 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000477 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200478 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100479 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
480 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200481 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200482 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000483 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000484 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000485 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200486 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000487 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000488 ssl.wrap_socket(sock,
489 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000490 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200491 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000492 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000493 ssl.wrap_socket(sock,
494 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000495 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000496
Martin Panter3464ea22016-02-01 21:58:11 +0000497 def bad_cert_test(self, certfile):
498 """Check that trying to use the given client certificate fails"""
499 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
500 certfile)
501 sock = socket.socket()
502 self.addCleanup(sock.close)
503 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200504 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200505 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000506
507 def test_empty_cert(self):
508 """Wrapping with an empty cert file"""
509 self.bad_cert_test("nullcert.pem")
510
511 def test_malformed_cert(self):
512 """Wrapping with a badly formatted certificate (syntax error)"""
513 self.bad_cert_test("badcert.pem")
514
515 def test_malformed_key(self):
516 """Wrapping with a badly formatted key (syntax error)"""
517 self.bad_cert_test("badkey.pem")
518
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000519 def test_match_hostname(self):
520 def ok(cert, hostname):
521 ssl.match_hostname(cert, hostname)
522 def fail(cert, hostname):
523 self.assertRaises(ssl.CertificateError,
524 ssl.match_hostname, cert, hostname)
525
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100526 # -- Hostname matching --
527
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000528 cert = {'subject': ((('commonName', 'example.com'),),)}
529 ok(cert, 'example.com')
530 ok(cert, 'ExAmple.cOm')
531 fail(cert, 'www.example.com')
532 fail(cert, '.example.com')
533 fail(cert, 'example.org')
534 fail(cert, 'exampleXcom')
535
536 cert = {'subject': ((('commonName', '*.a.com'),),)}
537 ok(cert, 'foo.a.com')
538 fail(cert, 'bar.foo.a.com')
539 fail(cert, 'a.com')
540 fail(cert, 'Xa.com')
541 fail(cert, '.a.com')
542
Mandeep Singhede2ac92017-11-27 04:01:27 +0530543 # only match wildcards when they are the only thing
544 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000545 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530546 fail(cert, 'foo.com')
547 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000548 fail(cert, 'bar.com')
549 fail(cert, 'foo.a.com')
550 fail(cert, 'bar.foo.com')
551
Christian Heimes824f7f32013-08-17 00:54:47 +0200552 # NULL bytes are bad, CVE-2013-4073
553 cert = {'subject': ((('commonName',
554 'null.python.org\x00example.org'),),)}
555 ok(cert, 'null.python.org\x00example.org') # or raise an error?
556 fail(cert, 'example.org')
557 fail(cert, 'null.python.org')
558
Georg Brandl72c98d32013-10-27 07:16:53 +0100559 # error cases with wildcards
560 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
561 fail(cert, 'bar.foo.a.com')
562 fail(cert, 'a.com')
563 fail(cert, 'Xa.com')
564 fail(cert, '.a.com')
565
566 cert = {'subject': ((('commonName', 'a.*.com'),),)}
567 fail(cert, 'a.foo.com')
568 fail(cert, 'a..com')
569 fail(cert, 'a.com')
570
571 # wildcard doesn't match IDNA prefix 'xn--'
572 idna = 'püthon.python.org'.encode("idna").decode("ascii")
573 cert = {'subject': ((('commonName', idna),),)}
574 ok(cert, idna)
575 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
576 fail(cert, idna)
577 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
578 fail(cert, idna)
579
580 # wildcard in first fragment and IDNA A-labels in sequent fragments
581 # are supported.
582 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
583 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530584 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
585 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100586 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
587 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
588
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000589 # Slightly fake real-world example
590 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
591 'subject': ((('commonName', 'linuxfrz.org'),),),
592 'subjectAltName': (('DNS', 'linuxfr.org'),
593 ('DNS', 'linuxfr.com'),
594 ('othername', '<unsupported>'))}
595 ok(cert, 'linuxfr.org')
596 ok(cert, 'linuxfr.com')
597 # Not a "DNS" entry
598 fail(cert, '<unsupported>')
599 # When there is a subjectAltName, commonName isn't used
600 fail(cert, 'linuxfrz.org')
601
602 # A pristine real-world example
603 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
604 'subject': ((('countryName', 'US'),),
605 (('stateOrProvinceName', 'California'),),
606 (('localityName', 'Mountain View'),),
607 (('organizationName', 'Google Inc'),),
608 (('commonName', 'mail.google.com'),))}
609 ok(cert, 'mail.google.com')
610 fail(cert, 'gmail.com')
611 # Only commonName is considered
612 fail(cert, 'California')
613
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100614 # -- IPv4 matching --
615 cert = {'subject': ((('commonName', 'example.com'),),),
616 'subjectAltName': (('DNS', 'example.com'),
617 ('IP Address', '10.11.12.13'),
618 ('IP Address', '14.15.16.17'))}
619 ok(cert, '10.11.12.13')
620 ok(cert, '14.15.16.17')
621 fail(cert, '14.15.16.18')
622 fail(cert, 'example.net')
623
624 # -- IPv6 matching --
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800625 if hasattr(socket, 'AF_INET6'):
626 cert = {'subject': ((('commonName', 'example.com'),),),
627 'subjectAltName': (
628 ('DNS', 'example.com'),
629 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
630 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
631 ok(cert, '2001::cafe')
632 ok(cert, '2003::baba')
633 fail(cert, '2003::bebe')
634 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100635
636 # -- Miscellaneous --
637
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000638 # Neither commonName nor subjectAltName
639 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
640 'subject': ((('countryName', 'US'),),
641 (('stateOrProvinceName', 'California'),),
642 (('localityName', 'Mountain View'),),
643 (('organizationName', 'Google Inc'),))}
644 fail(cert, 'mail.google.com')
645
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200646 # No DNS entry in subjectAltName but a commonName
647 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
648 'subject': ((('countryName', 'US'),),
649 (('stateOrProvinceName', 'California'),),
650 (('localityName', 'Mountain View'),),
651 (('commonName', 'mail.google.com'),)),
652 'subjectAltName': (('othername', 'blabla'), )}
653 ok(cert, 'mail.google.com')
654
655 # No DNS entry subjectAltName and no commonName
656 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
657 'subject': ((('countryName', 'US'),),
658 (('stateOrProvinceName', 'California'),),
659 (('localityName', 'Mountain View'),),
660 (('organizationName', 'Google Inc'),)),
661 'subjectAltName': (('othername', 'blabla'),)}
662 fail(cert, 'google.com')
663
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000664 # Empty cert / no cert
665 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
666 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
667
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200668 # Issue #17980: avoid denials of service by refusing more than one
669 # wildcard per fragment.
Miss Islington (bot)46632f42018-02-24 06:06:46 -0800670 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
671 with self.assertRaisesRegex(
672 ssl.CertificateError,
673 "partial wildcards in leftmost label are not supported"):
674 ssl.match_hostname(cert, 'axxb.example.com')
675
676 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
677 with self.assertRaisesRegex(
678 ssl.CertificateError,
679 "wildcard can only be present in the leftmost label"):
680 ssl.match_hostname(cert, 'www.sub.example.com')
681
682 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
683 with self.assertRaisesRegex(
684 ssl.CertificateError,
685 "too many wildcards"):
686 ssl.match_hostname(cert, 'axxbxxc.example.com')
687
688 cert = {'subject': ((('commonName', '*'),),)}
689 with self.assertRaisesRegex(
690 ssl.CertificateError,
691 "sole wildcard without additional labels are not support"):
692 ssl.match_hostname(cert, 'host')
693
694 cert = {'subject': ((('commonName', '*.com'),),)}
695 with self.assertRaisesRegex(
696 ssl.CertificateError,
697 r"hostname 'com' doesn't match '\*.com'"):
698 ssl.match_hostname(cert, 'com')
699
700 # extra checks for _inet_paton()
701 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
702 with self.assertRaises(ValueError):
703 ssl._inet_paton(invalid)
704 for ipaddr in ['127.0.0.1', '192.168.0.1']:
705 self.assertTrue(ssl._inet_paton(ipaddr))
706 if hasattr(socket, 'AF_INET6'):
707 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
708 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200709
Antoine Pitroud5323212010-10-22 18:19:07 +0000710 def test_server_side(self):
711 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200712 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000713 with socket.socket() as sock:
714 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
715 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000716
Antoine Pitroud6494802011-07-21 01:11:30 +0200717 def test_unknown_channel_binding(self):
718 # should raise ValueError for unknown type
719 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200720 s.bind(('127.0.0.1', 0))
721 s.listen()
722 c = socket.socket(socket.AF_INET)
723 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200724 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100725 with self.assertRaises(ValueError):
726 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200727 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200728
729 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
730 "'tls-unique' channel binding not available")
731 def test_tls_unique_channel_binding(self):
732 # unconnected should return None for known type
733 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200734 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100735 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200736 # the same for server-side
737 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200738 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100739 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200740
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600741 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200742 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600743 r = repr(ss)
744 with self.assertWarns(ResourceWarning) as cm:
745 ss = None
746 support.gc_collect()
747 self.assertIn(r, str(cm.warning.args[0]))
748
Christian Heimes6d7ad132013-06-09 18:02:55 +0200749 def test_get_default_verify_paths(self):
750 paths = ssl.get_default_verify_paths()
751 self.assertEqual(len(paths), 6)
752 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
753
754 with support.EnvironmentVarGuard() as env:
755 env["SSL_CERT_DIR"] = CAPATH
756 env["SSL_CERT_FILE"] = CERTFILE
757 paths = ssl.get_default_verify_paths()
758 self.assertEqual(paths.cafile, CERTFILE)
759 self.assertEqual(paths.capath, CAPATH)
760
Christian Heimes44109d72013-11-22 01:51:30 +0100761 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
762 def test_enum_certificates(self):
763 self.assertTrue(ssl.enum_certificates("CA"))
764 self.assertTrue(ssl.enum_certificates("ROOT"))
765
766 self.assertRaises(TypeError, ssl.enum_certificates)
767 self.assertRaises(WindowsError, ssl.enum_certificates, "")
768
Christian Heimesc2d65e12013-11-22 16:13:55 +0100769 trust_oids = set()
770 for storename in ("CA", "ROOT"):
771 store = ssl.enum_certificates(storename)
772 self.assertIsInstance(store, list)
773 for element in store:
774 self.assertIsInstance(element, tuple)
775 self.assertEqual(len(element), 3)
776 cert, enc, trust = element
777 self.assertIsInstance(cert, bytes)
778 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
779 self.assertIsInstance(trust, (set, bool))
780 if isinstance(trust, set):
781 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100782
783 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100784 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200785
Christian Heimes46bebee2013-06-09 19:03:31 +0200786 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100787 def test_enum_crls(self):
788 self.assertTrue(ssl.enum_crls("CA"))
789 self.assertRaises(TypeError, ssl.enum_crls)
790 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200791
Christian Heimes44109d72013-11-22 01:51:30 +0100792 crls = ssl.enum_crls("CA")
793 self.assertIsInstance(crls, list)
794 for element in crls:
795 self.assertIsInstance(element, tuple)
796 self.assertEqual(len(element), 2)
797 self.assertIsInstance(element[0], bytes)
798 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200799
Christian Heimes46bebee2013-06-09 19:03:31 +0200800
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100801 def test_asn1object(self):
802 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
803 '1.3.6.1.5.5.7.3.1')
804
805 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
806 self.assertEqual(val, expected)
807 self.assertEqual(val.nid, 129)
808 self.assertEqual(val.shortname, 'serverAuth')
809 self.assertEqual(val.longname, 'TLS Web Server Authentication')
810 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
811 self.assertIsInstance(val, ssl._ASN1Object)
812 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
813
814 val = ssl._ASN1Object.fromnid(129)
815 self.assertEqual(val, expected)
816 self.assertIsInstance(val, ssl._ASN1Object)
817 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100818 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
819 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100820 for i in range(1000):
821 try:
822 obj = ssl._ASN1Object.fromnid(i)
823 except ValueError:
824 pass
825 else:
826 self.assertIsInstance(obj.nid, int)
827 self.assertIsInstance(obj.shortname, str)
828 self.assertIsInstance(obj.longname, str)
829 self.assertIsInstance(obj.oid, (str, type(None)))
830
831 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
832 self.assertEqual(val, expected)
833 self.assertIsInstance(val, ssl._ASN1Object)
834 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
835 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
836 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100837 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
838 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100839
Christian Heimes72d28502013-11-23 13:56:58 +0100840 def test_purpose_enum(self):
841 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
842 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
843 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
844 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
845 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
846 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
847 '1.3.6.1.5.5.7.3.1')
848
849 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
850 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
851 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
852 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
853 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
854 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
855 '1.3.6.1.5.5.7.3.2')
856
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100857 def test_unsupported_dtls(self):
858 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
859 self.addCleanup(s.close)
860 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200861 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100862 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200863 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100864 with self.assertRaises(NotImplementedError) as cx:
865 ctx.wrap_socket(s)
866 self.assertEqual(str(cx.exception), "only stream sockets are supported")
867
Antoine Pitrouc695c952014-04-28 20:57:36 +0200868 def cert_time_ok(self, timestring, timestamp):
869 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
870
871 def cert_time_fail(self, timestring):
872 with self.assertRaises(ValueError):
873 ssl.cert_time_to_seconds(timestring)
874
875 @unittest.skipUnless(utc_offset(),
876 'local time needs to be different from UTC')
877 def test_cert_time_to_seconds_timezone(self):
878 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
879 # results if local timezone is not UTC
880 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
881 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
882
883 def test_cert_time_to_seconds(self):
884 timestring = "Jan 5 09:34:43 2018 GMT"
885 ts = 1515144883.0
886 self.cert_time_ok(timestring, ts)
887 # accept keyword parameter, assert its name
888 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
889 # accept both %e and %d (space or zero generated by strftime)
890 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
891 # case-insensitive
892 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
893 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
894 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
895 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
896 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
897 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
898 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
899 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
900
901 newyear_ts = 1230768000.0
902 # leap seconds
903 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
904 # same timestamp
905 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
906
907 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
908 # allow 60th second (even if it is not a leap second)
909 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
910 # allow 2nd leap second for compatibility with time.strptime()
911 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
912 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
913
Mike53f7a7c2017-12-14 14:04:53 +0300914 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200915 # 99991231235959Z (rfc 5280)
916 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
917
918 @support.run_with_locale('LC_ALL', '')
919 def test_cert_time_to_seconds_locale(self):
920 # `cert_time_to_seconds()` should be locale independent
921
922 def local_february_name():
923 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
924
925 if local_february_name().lower() == 'feb':
926 self.skipTest("locale-specific month name needs to be "
927 "different from C locale")
928
929 # locale-independent
930 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
931 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
932
Martin Panter3840b2a2016-03-27 01:53:46 +0000933 def test_connect_ex_error(self):
934 server = socket.socket(socket.AF_INET)
935 self.addCleanup(server.close)
936 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200937 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000938 cert_reqs=ssl.CERT_REQUIRED)
939 self.addCleanup(s.close)
940 rc = s.connect_ex((HOST, port))
941 # Issue #19919: Windows machines or VMs hosted on Windows
942 # machines sometimes return EWOULDBLOCK.
943 errors = (
944 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
945 errno.EWOULDBLOCK,
946 )
947 self.assertIn(rc, errors)
948
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100949
Antoine Pitrou152efa22010-05-16 18:19:27 +0000950class ContextTests(unittest.TestCase):
951
Antoine Pitrou23df4832010-08-04 17:14:06 +0000952 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000953 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100954 for protocol in PROTOCOLS:
955 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200956 ctx = ssl.SSLContext()
957 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000958 self.assertRaises(ValueError, ssl.SSLContext, -1)
959 self.assertRaises(ValueError, ssl.SSLContext, 42)
960
Antoine Pitrou23df4832010-08-04 17:14:06 +0000961 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000962 def test_protocol(self):
963 for proto in PROTOCOLS:
964 ctx = ssl.SSLContext(proto)
965 self.assertEqual(ctx.protocol, proto)
966
967 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200968 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000969 ctx.set_ciphers("ALL")
970 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000971 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000972 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000973
Christian Heimes892d66e2018-01-29 14:10:18 +0100974 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
975 "Test applies only to Python default ciphers")
976 def test_python_ciphers(self):
977 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
978 ciphers = ctx.get_ciphers()
979 for suite in ciphers:
980 name = suite['name']
981 self.assertNotIn("PSK", name)
982 self.assertNotIn("SRP", name)
983 self.assertNotIn("MD5", name)
984 self.assertNotIn("RC4", name)
985 self.assertNotIn("3DES", name)
986
Christian Heimes25bfcd52016-09-06 00:04:45 +0200987 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
988 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200989 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200990 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200991 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +0200992 self.assertIn('AES256-GCM-SHA384', names)
993 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +0200994
Antoine Pitrou23df4832010-08-04 17:14:06 +0000995 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000996 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200997 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800998 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +0200999 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001000 # SSLContext also enables these by default
1001 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
1002 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
Christian Heimes598894f2016-09-05 23:19:05 +02001003 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001004 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001005 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001006 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001007 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1008 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001009 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001010 # Ubuntu has OP_NO_SSLv3 forced on by default
1011 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001012 else:
1013 with self.assertRaises(ValueError):
1014 ctx.options = 0
1015
Christian Heimesa170fa12017-09-15 20:27:30 +02001016 def test_verify_mode_protocol(self):
1017 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001018 # Default value
1019 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1020 ctx.verify_mode = ssl.CERT_OPTIONAL
1021 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1022 ctx.verify_mode = ssl.CERT_REQUIRED
1023 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1024 ctx.verify_mode = ssl.CERT_NONE
1025 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1026 with self.assertRaises(TypeError):
1027 ctx.verify_mode = None
1028 with self.assertRaises(ValueError):
1029 ctx.verify_mode = 42
1030
Christian Heimesa170fa12017-09-15 20:27:30 +02001031 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1032 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1033 self.assertFalse(ctx.check_hostname)
1034
1035 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1036 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1037 self.assertTrue(ctx.check_hostname)
1038
Christian Heimes61d478c2018-01-27 15:51:38 +01001039 def test_hostname_checks_common_name(self):
1040 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1041 self.assertTrue(ctx.hostname_checks_common_name)
1042 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1043 ctx.hostname_checks_common_name = True
1044 self.assertTrue(ctx.hostname_checks_common_name)
1045 ctx.hostname_checks_common_name = False
1046 self.assertFalse(ctx.hostname_checks_common_name)
1047 ctx.hostname_checks_common_name = True
1048 self.assertTrue(ctx.hostname_checks_common_name)
1049 else:
1050 with self.assertRaises(AttributeError):
1051 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001052
Christian Heimes2427b502013-11-23 11:24:32 +01001053 @unittest.skipUnless(have_verify_flags(),
1054 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001055 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001056 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001057 # default value
1058 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1059 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001060 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1061 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1062 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1063 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1064 ctx.verify_flags = ssl.VERIFY_DEFAULT
1065 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1066 # supports any value
1067 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1068 self.assertEqual(ctx.verify_flags,
1069 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1070 with self.assertRaises(TypeError):
1071 ctx.verify_flags = None
1072
Antoine Pitrou152efa22010-05-16 18:19:27 +00001073 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001074 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001075 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001076 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001077 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1078 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001079 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001080 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001081 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001082 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001083 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001084 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001085 ctx.load_cert_chain(EMPTYCERT)
1086 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001087 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001088 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1089 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1090 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001091 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001092 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001093 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001094 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001095 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001096 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1097 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001098 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001099 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001100 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001101 # Password protected key and cert
1102 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1103 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1104 ctx.load_cert_chain(CERTFILE_PROTECTED,
1105 password=bytearray(KEY_PASSWORD.encode()))
1106 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1107 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1108 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1109 bytearray(KEY_PASSWORD.encode()))
1110 with self.assertRaisesRegex(TypeError, "should be a string"):
1111 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1112 with self.assertRaises(ssl.SSLError):
1113 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1114 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1115 # openssl has a fixed limit on the password buffer.
1116 # PEM_BUFSIZE is generally set to 1kb.
1117 # Return a string larger than this.
1118 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1119 # Password callback
1120 def getpass_unicode():
1121 return KEY_PASSWORD
1122 def getpass_bytes():
1123 return KEY_PASSWORD.encode()
1124 def getpass_bytearray():
1125 return bytearray(KEY_PASSWORD.encode())
1126 def getpass_badpass():
1127 return "badpass"
1128 def getpass_huge():
1129 return b'a' * (1024 * 1024)
1130 def getpass_bad_type():
1131 return 9
1132 def getpass_exception():
1133 raise Exception('getpass error')
1134 class GetPassCallable:
1135 def __call__(self):
1136 return KEY_PASSWORD
1137 def getpass(self):
1138 return KEY_PASSWORD
1139 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1140 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1141 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1142 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1143 ctx.load_cert_chain(CERTFILE_PROTECTED,
1144 password=GetPassCallable().getpass)
1145 with self.assertRaises(ssl.SSLError):
1146 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1147 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1148 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1149 with self.assertRaisesRegex(TypeError, "must return a string"):
1150 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1151 with self.assertRaisesRegex(Exception, "getpass error"):
1152 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1153 # Make sure the password function isn't called if it isn't needed
1154 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001155
1156 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001157 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001158 ctx.load_verify_locations(CERTFILE)
1159 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1160 ctx.load_verify_locations(BYTES_CERTFILE)
1161 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1162 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001163 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001164 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001165 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001166 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001167 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001168 ctx.load_verify_locations(BADCERT)
1169 ctx.load_verify_locations(CERTFILE, CAPATH)
1170 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1171
Victor Stinner80f75e62011-01-29 11:31:20 +00001172 # Issue #10989: crash if the second argument type is invalid
1173 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1174
Christian Heimesefff7062013-11-21 03:35:02 +01001175 def test_load_verify_cadata(self):
1176 # test cadata
1177 with open(CAFILE_CACERT) as f:
1178 cacert_pem = f.read()
1179 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1180 with open(CAFILE_NEURONIO) as f:
1181 neuronio_pem = f.read()
1182 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1183
1184 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001185 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001186 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1187 ctx.load_verify_locations(cadata=cacert_pem)
1188 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1189 ctx.load_verify_locations(cadata=neuronio_pem)
1190 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1191 # cert already in hash table
1192 ctx.load_verify_locations(cadata=neuronio_pem)
1193 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1194
1195 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001196 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001197 combined = "\n".join((cacert_pem, neuronio_pem))
1198 ctx.load_verify_locations(cadata=combined)
1199 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1200
1201 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001202 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001203 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1204 neuronio_pem, "tail"]
1205 ctx.load_verify_locations(cadata="\n".join(combined))
1206 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1207
1208 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001209 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001210 ctx.load_verify_locations(cadata=cacert_der)
1211 ctx.load_verify_locations(cadata=neuronio_der)
1212 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1213 # cert already in hash table
1214 ctx.load_verify_locations(cadata=cacert_der)
1215 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1216
1217 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001218 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001219 combined = b"".join((cacert_der, neuronio_der))
1220 ctx.load_verify_locations(cadata=combined)
1221 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1222
1223 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001224 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001225 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1226
1227 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1228 ctx.load_verify_locations(cadata="broken")
1229 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1230 ctx.load_verify_locations(cadata=b"broken")
1231
1232
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001233 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001234 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001235 ctx.load_dh_params(DHFILE)
1236 if os.name != 'nt':
1237 ctx.load_dh_params(BYTES_DHFILE)
1238 self.assertRaises(TypeError, ctx.load_dh_params)
1239 self.assertRaises(TypeError, ctx.load_dh_params, None)
1240 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001241 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001242 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001243 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001244 ctx.load_dh_params(CERTFILE)
1245
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001246 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001247 def test_session_stats(self):
1248 for proto in PROTOCOLS:
1249 ctx = ssl.SSLContext(proto)
1250 self.assertEqual(ctx.session_stats(), {
1251 'number': 0,
1252 'connect': 0,
1253 'connect_good': 0,
1254 'connect_renegotiate': 0,
1255 'accept': 0,
1256 'accept_good': 0,
1257 'accept_renegotiate': 0,
1258 'hits': 0,
1259 'misses': 0,
1260 'timeouts': 0,
1261 'cache_full': 0,
1262 })
1263
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001264 def test_set_default_verify_paths(self):
1265 # There's not much we can do to test that it acts as expected,
1266 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001267 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001268 ctx.set_default_verify_paths()
1269
Antoine Pitrou501da612011-12-21 09:27:41 +01001270 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001271 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001272 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001273 ctx.set_ecdh_curve("prime256v1")
1274 ctx.set_ecdh_curve(b"prime256v1")
1275 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1276 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1277 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1278 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1279
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001280 @needs_sni
1281 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001282 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001283
1284 # set_servername_callback expects a callable, or None
1285 self.assertRaises(TypeError, ctx.set_servername_callback)
1286 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1287 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1288 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1289
1290 def dummycallback(sock, servername, ctx):
1291 pass
1292 ctx.set_servername_callback(None)
1293 ctx.set_servername_callback(dummycallback)
1294
1295 @needs_sni
1296 def test_sni_callback_refcycle(self):
1297 # Reference cycles through the servername callback are detected
1298 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001299 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001300 def dummycallback(sock, servername, ctx, cycle=ctx):
1301 pass
1302 ctx.set_servername_callback(dummycallback)
1303 wr = weakref.ref(ctx)
1304 del ctx, dummycallback
1305 gc.collect()
1306 self.assertIs(wr(), None)
1307
Christian Heimes9a5395a2013-06-17 15:44:12 +02001308 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001309 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001310 self.assertEqual(ctx.cert_store_stats(),
1311 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1312 ctx.load_cert_chain(CERTFILE)
1313 self.assertEqual(ctx.cert_store_stats(),
1314 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1315 ctx.load_verify_locations(CERTFILE)
1316 self.assertEqual(ctx.cert_store_stats(),
1317 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001318 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001319 self.assertEqual(ctx.cert_store_stats(),
1320 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1321
1322 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001323 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001324 self.assertEqual(ctx.get_ca_certs(), [])
1325 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1326 ctx.load_verify_locations(CERTFILE)
1327 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001328 # but CAFILE_CACERT is a CA cert
1329 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001330 self.assertEqual(ctx.get_ca_certs(),
1331 [{'issuer': ((('organizationName', 'Root CA'),),
1332 (('organizationalUnitName', 'http://www.cacert.org'),),
1333 (('commonName', 'CA Cert Signing Authority'),),
1334 (('emailAddress', 'support@cacert.org'),)),
1335 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1336 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1337 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001338 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001339 'subject': ((('organizationName', 'Root CA'),),
1340 (('organizationalUnitName', 'http://www.cacert.org'),),
1341 (('commonName', 'CA Cert Signing Authority'),),
1342 (('emailAddress', 'support@cacert.org'),)),
1343 'version': 3}])
1344
Martin Panterb55f8b72016-01-14 12:53:56 +00001345 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001346 pem = f.read()
1347 der = ssl.PEM_cert_to_DER_cert(pem)
1348 self.assertEqual(ctx.get_ca_certs(True), [der])
1349
Christian Heimes72d28502013-11-23 13:56:58 +01001350 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001351 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001352 ctx.load_default_certs()
1353
Christian Heimesa170fa12017-09-15 20:27:30 +02001354 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001355 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1356 ctx.load_default_certs()
1357
Christian Heimesa170fa12017-09-15 20:27:30 +02001358 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001359 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1360
Christian Heimesa170fa12017-09-15 20:27:30 +02001361 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001362 self.assertRaises(TypeError, ctx.load_default_certs, None)
1363 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1364
Benjamin Peterson91244e02014-10-03 18:17:15 -04001365 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001366 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001367 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001368 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001369 with support.EnvironmentVarGuard() as env:
1370 env["SSL_CERT_DIR"] = CAPATH
1371 env["SSL_CERT_FILE"] = CERTFILE
1372 ctx.load_default_certs()
1373 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1374
Benjamin Peterson91244e02014-10-03 18:17:15 -04001375 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001376 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001377 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001378 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001379 ctx.load_default_certs()
1380 stats = ctx.cert_store_stats()
1381
Christian Heimesa170fa12017-09-15 20:27:30 +02001382 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001383 with support.EnvironmentVarGuard() as env:
1384 env["SSL_CERT_DIR"] = CAPATH
1385 env["SSL_CERT_FILE"] = CERTFILE
1386 ctx.load_default_certs()
1387 stats["x509"] += 1
1388 self.assertEqual(ctx.cert_store_stats(), stats)
1389
Christian Heimes358cfd42016-09-10 22:43:48 +02001390 def _assert_context_options(self, ctx):
1391 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1392 if OP_NO_COMPRESSION != 0:
1393 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1394 OP_NO_COMPRESSION)
1395 if OP_SINGLE_DH_USE != 0:
1396 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1397 OP_SINGLE_DH_USE)
1398 if OP_SINGLE_ECDH_USE != 0:
1399 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1400 OP_SINGLE_ECDH_USE)
1401 if OP_CIPHER_SERVER_PREFERENCE != 0:
1402 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1403 OP_CIPHER_SERVER_PREFERENCE)
1404
Christian Heimes4c05b472013-11-23 15:58:30 +01001405 def test_create_default_context(self):
1406 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001407
Christian Heimesa170fa12017-09-15 20:27:30 +02001408 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001409 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001410 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001411 self._assert_context_options(ctx)
1412
Christian Heimes4c05b472013-11-23 15:58:30 +01001413 with open(SIGNING_CA) as f:
1414 cadata = f.read()
1415 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1416 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001417 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001418 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001419 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001420
1421 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001422 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001423 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001424 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001425
Christian Heimes67986f92013-11-23 22:43:47 +01001426 def test__create_stdlib_context(self):
1427 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001428 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001429 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001430 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001431 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001432
1433 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1434 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1435 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001436 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001437
1438 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001439 cert_reqs=ssl.CERT_REQUIRED,
1440 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001441 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1442 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001443 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001444 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001445
1446 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001447 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001448 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001449 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001450
Christian Heimes1aa9a752013-12-02 02:41:19 +01001451 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001452 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001453 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001454 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001455
Christian Heimese82c0342017-09-15 20:29:57 +02001456 # Auto set CERT_REQUIRED
1457 ctx.check_hostname = True
1458 self.assertTrue(ctx.check_hostname)
1459 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1460 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001461 ctx.verify_mode = ssl.CERT_REQUIRED
1462 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001463 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001464
Christian Heimese82c0342017-09-15 20:29:57 +02001465 # Changing verify_mode does not affect check_hostname
1466 ctx.check_hostname = False
1467 ctx.verify_mode = ssl.CERT_NONE
1468 ctx.check_hostname = False
1469 self.assertFalse(ctx.check_hostname)
1470 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1471 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001472 ctx.check_hostname = True
1473 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001474 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1475
1476 ctx.check_hostname = False
1477 ctx.verify_mode = ssl.CERT_OPTIONAL
1478 ctx.check_hostname = False
1479 self.assertFalse(ctx.check_hostname)
1480 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1481 # keep CERT_OPTIONAL
1482 ctx.check_hostname = True
1483 self.assertTrue(ctx.check_hostname)
1484 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001485
1486 # Cannot set CERT_NONE with check_hostname enabled
1487 with self.assertRaises(ValueError):
1488 ctx.verify_mode = ssl.CERT_NONE
1489 ctx.check_hostname = False
1490 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001491 ctx.verify_mode = ssl.CERT_NONE
1492 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001493
Christian Heimes5fe668c2016-09-12 00:01:11 +02001494 def test_context_client_server(self):
1495 # PROTOCOL_TLS_CLIENT has sane defaults
1496 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1497 self.assertTrue(ctx.check_hostname)
1498 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1499
1500 # PROTOCOL_TLS_SERVER has different but also sane defaults
1501 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1502 self.assertFalse(ctx.check_hostname)
1503 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1504
Christian Heimes4df60f12017-09-15 20:26:05 +02001505 def test_context_custom_class(self):
1506 class MySSLSocket(ssl.SSLSocket):
1507 pass
1508
1509 class MySSLObject(ssl.SSLObject):
1510 pass
1511
1512 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1513 ctx.sslsocket_class = MySSLSocket
1514 ctx.sslobject_class = MySSLObject
1515
1516 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1517 self.assertIsInstance(sock, MySSLSocket)
1518 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1519 self.assertIsInstance(obj, MySSLObject)
1520
Antoine Pitrou152efa22010-05-16 18:19:27 +00001521
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001522class SSLErrorTests(unittest.TestCase):
1523
1524 def test_str(self):
1525 # The str() of a SSLError doesn't include the errno
1526 e = ssl.SSLError(1, "foo")
1527 self.assertEqual(str(e), "foo")
1528 self.assertEqual(e.errno, 1)
1529 # Same for a subclass
1530 e = ssl.SSLZeroReturnError(1, "foo")
1531 self.assertEqual(str(e), "foo")
1532 self.assertEqual(e.errno, 1)
1533
1534 def test_lib_reason(self):
1535 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001536 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001537 with self.assertRaises(ssl.SSLError) as cm:
1538 ctx.load_dh_params(CERTFILE)
1539 self.assertEqual(cm.exception.library, 'PEM')
1540 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1541 s = str(cm.exception)
1542 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1543
1544 def test_subclass(self):
1545 # Check that the appropriate SSLError subclass is raised
1546 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001547 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1548 ctx.check_hostname = False
1549 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001550 with socket.socket() as s:
1551 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001552 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001553 c = socket.socket()
1554 c.connect(s.getsockname())
1555 c.setblocking(False)
1556 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001557 with self.assertRaises(ssl.SSLWantReadError) as cm:
1558 c.do_handshake()
1559 s = str(cm.exception)
1560 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1561 # For compatibility
1562 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1563
1564
Christian Heimes61d478c2018-01-27 15:51:38 +01001565 def test_bad_server_hostname(self):
1566 ctx = ssl.create_default_context()
1567 with self.assertRaises(ValueError):
1568 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1569 server_hostname="")
1570 with self.assertRaises(ValueError):
1571 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1572 server_hostname=".example.org")
1573
1574
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001575class MemoryBIOTests(unittest.TestCase):
1576
1577 def test_read_write(self):
1578 bio = ssl.MemoryBIO()
1579 bio.write(b'foo')
1580 self.assertEqual(bio.read(), b'foo')
1581 self.assertEqual(bio.read(), b'')
1582 bio.write(b'foo')
1583 bio.write(b'bar')
1584 self.assertEqual(bio.read(), b'foobar')
1585 self.assertEqual(bio.read(), b'')
1586 bio.write(b'baz')
1587 self.assertEqual(bio.read(2), b'ba')
1588 self.assertEqual(bio.read(1), b'z')
1589 self.assertEqual(bio.read(1), b'')
1590
1591 def test_eof(self):
1592 bio = ssl.MemoryBIO()
1593 self.assertFalse(bio.eof)
1594 self.assertEqual(bio.read(), b'')
1595 self.assertFalse(bio.eof)
1596 bio.write(b'foo')
1597 self.assertFalse(bio.eof)
1598 bio.write_eof()
1599 self.assertFalse(bio.eof)
1600 self.assertEqual(bio.read(2), b'fo')
1601 self.assertFalse(bio.eof)
1602 self.assertEqual(bio.read(1), b'o')
1603 self.assertTrue(bio.eof)
1604 self.assertEqual(bio.read(), b'')
1605 self.assertTrue(bio.eof)
1606
1607 def test_pending(self):
1608 bio = ssl.MemoryBIO()
1609 self.assertEqual(bio.pending, 0)
1610 bio.write(b'foo')
1611 self.assertEqual(bio.pending, 3)
1612 for i in range(3):
1613 bio.read(1)
1614 self.assertEqual(bio.pending, 3-i-1)
1615 for i in range(3):
1616 bio.write(b'x')
1617 self.assertEqual(bio.pending, i+1)
1618 bio.read()
1619 self.assertEqual(bio.pending, 0)
1620
1621 def test_buffer_types(self):
1622 bio = ssl.MemoryBIO()
1623 bio.write(b'foo')
1624 self.assertEqual(bio.read(), b'foo')
1625 bio.write(bytearray(b'bar'))
1626 self.assertEqual(bio.read(), b'bar')
1627 bio.write(memoryview(b'baz'))
1628 self.assertEqual(bio.read(), b'baz')
1629
1630 def test_error_types(self):
1631 bio = ssl.MemoryBIO()
1632 self.assertRaises(TypeError, bio.write, 'foo')
1633 self.assertRaises(TypeError, bio.write, None)
1634 self.assertRaises(TypeError, bio.write, True)
1635 self.assertRaises(TypeError, bio.write, 1)
1636
1637
Martin Panter3840b2a2016-03-27 01:53:46 +00001638class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001639 """Tests that connect to a simple server running in the background"""
1640
1641 def setUp(self):
1642 server = ThreadedEchoServer(SIGNED_CERTFILE)
1643 self.server_addr = (HOST, server.port)
1644 server.__enter__()
1645 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001646
Antoine Pitrou480a1242010-04-28 21:37:09 +00001647 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001648 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001649 cert_reqs=ssl.CERT_NONE) as s:
1650 s.connect(self.server_addr)
1651 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001652 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001653
Martin Panter3840b2a2016-03-27 01:53:46 +00001654 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001655 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001656 cert_reqs=ssl.CERT_REQUIRED,
1657 ca_certs=SIGNING_CA) as s:
1658 s.connect(self.server_addr)
1659 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001660 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001661
Martin Panter3840b2a2016-03-27 01:53:46 +00001662 def test_connect_fail(self):
1663 # This should fail because we have no verification certs. Connection
1664 # failure crashes ThreadedEchoServer, so run this in an independent
1665 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001666 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001667 cert_reqs=ssl.CERT_REQUIRED)
1668 self.addCleanup(s.close)
1669 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1670 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001671
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001672 def test_connect_ex(self):
1673 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001674 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001675 cert_reqs=ssl.CERT_REQUIRED,
1676 ca_certs=SIGNING_CA)
1677 self.addCleanup(s.close)
1678 self.assertEqual(0, s.connect_ex(self.server_addr))
1679 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001680
1681 def test_non_blocking_connect_ex(self):
1682 # Issue #11326: non-blocking connect_ex() should allow handshake
1683 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001684 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001685 cert_reqs=ssl.CERT_REQUIRED,
1686 ca_certs=SIGNING_CA,
1687 do_handshake_on_connect=False)
1688 self.addCleanup(s.close)
1689 s.setblocking(False)
1690 rc = s.connect_ex(self.server_addr)
1691 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1692 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1693 # Wait for connect to finish
1694 select.select([], [s], [], 5.0)
1695 # Non-blocking handshake
1696 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001697 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001698 s.do_handshake()
1699 break
1700 except ssl.SSLWantReadError:
1701 select.select([s], [], [], 5.0)
1702 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001703 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001704 # SSL established
1705 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001706
Antoine Pitrou152efa22010-05-16 18:19:27 +00001707 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001708 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001709 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001710 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1711 s.connect(self.server_addr)
1712 self.assertEqual({}, s.getpeercert())
1713 # Same with a server hostname
1714 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1715 server_hostname="dummy") as s:
1716 s.connect(self.server_addr)
1717 ctx.verify_mode = ssl.CERT_REQUIRED
1718 # This should succeed because we specify the root cert
1719 ctx.load_verify_locations(SIGNING_CA)
1720 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1721 s.connect(self.server_addr)
1722 cert = s.getpeercert()
1723 self.assertTrue(cert)
1724
1725 def test_connect_with_context_fail(self):
1726 # This should fail because we have no verification certs. Connection
1727 # failure crashes ThreadedEchoServer, so run this in an independent
1728 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001729 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001730 ctx.verify_mode = ssl.CERT_REQUIRED
1731 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1732 self.addCleanup(s.close)
1733 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1734 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001735
1736 def test_connect_capath(self):
1737 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001738 # NOTE: the subject hashing algorithm has been changed between
1739 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1740 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001741 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001742 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001743 ctx.verify_mode = ssl.CERT_REQUIRED
1744 ctx.load_verify_locations(capath=CAPATH)
1745 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1746 s.connect(self.server_addr)
1747 cert = s.getpeercert()
1748 self.assertTrue(cert)
1749 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001750 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001751 ctx.verify_mode = ssl.CERT_REQUIRED
1752 ctx.load_verify_locations(capath=BYTES_CAPATH)
1753 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1754 s.connect(self.server_addr)
1755 cert = s.getpeercert()
1756 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001757
Christian Heimesefff7062013-11-21 03:35:02 +01001758 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001759 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001760 pem = f.read()
1761 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001762 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001763 ctx.verify_mode = ssl.CERT_REQUIRED
1764 ctx.load_verify_locations(cadata=pem)
1765 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1766 s.connect(self.server_addr)
1767 cert = s.getpeercert()
1768 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001769
Martin Panter3840b2a2016-03-27 01:53:46 +00001770 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001771 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001772 ctx.verify_mode = ssl.CERT_REQUIRED
1773 ctx.load_verify_locations(cadata=der)
1774 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1775 s.connect(self.server_addr)
1776 cert = s.getpeercert()
1777 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001778
Antoine Pitroue3220242010-04-24 11:13:53 +00001779 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1780 def test_makefile_close(self):
1781 # Issue #5238: creating a file-like object with makefile() shouldn't
1782 # delay closing the underlying "real socket" (here tested with its
1783 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001784 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001785 ss.connect(self.server_addr)
1786 fd = ss.fileno()
1787 f = ss.makefile()
1788 f.close()
1789 # The fd is still open
1790 os.read(fd, 0)
1791 # Closing the SSL socket should close the fd too
1792 ss.close()
1793 gc.collect()
1794 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001795 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001796 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001797
Antoine Pitrou480a1242010-04-28 21:37:09 +00001798 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001799 s = socket.socket(socket.AF_INET)
1800 s.connect(self.server_addr)
1801 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001802 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001803 cert_reqs=ssl.CERT_NONE,
1804 do_handshake_on_connect=False)
1805 self.addCleanup(s.close)
1806 count = 0
1807 while True:
1808 try:
1809 count += 1
1810 s.do_handshake()
1811 break
1812 except ssl.SSLWantReadError:
1813 select.select([s], [], [])
1814 except ssl.SSLWantWriteError:
1815 select.select([], [s], [])
1816 if support.verbose:
1817 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001818
Antoine Pitrou480a1242010-04-28 21:37:09 +00001819 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001820 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001821
Martin Panter3840b2a2016-03-27 01:53:46 +00001822 def test_get_server_certificate_fail(self):
1823 # Connection failure crashes ThreadedEchoServer, so run this in an
1824 # independent test method
1825 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001826
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001827 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001828 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1830 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001831 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001832 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1833 s.connect(self.server_addr)
1834 # Error checking can happen at instantiation or when connecting
1835 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1836 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001837 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001838 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1839 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001840
Christian Heimes9a5395a2013-06-17 15:44:12 +02001841 def test_get_ca_certs_capath(self):
1842 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001843 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001844 ctx.load_verify_locations(capath=CAPATH)
1845 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001846 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1847 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001848 s.connect(self.server_addr)
1849 cert = s.getpeercert()
1850 self.assertTrue(cert)
1851 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001852
Christian Heimes575596e2013-12-15 21:49:17 +01001853 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001854 def test_context_setget(self):
1855 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001856 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1857 ctx1.load_verify_locations(capath=CAPATH)
1858 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1859 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001860 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001861 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001862 ss.connect(self.server_addr)
1863 self.assertIs(ss.context, ctx1)
1864 self.assertIs(ss._sslobj.context, ctx1)
1865 ss.context = ctx2
1866 self.assertIs(ss.context, ctx2)
1867 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001868
1869 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1870 # A simple IO loop. Call func(*args) depending on the error we get
1871 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1872 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001873 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001874 count = 0
1875 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001876 if time.monotonic() > deadline:
1877 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001878 errno = None
1879 count += 1
1880 try:
1881 ret = func(*args)
1882 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001883 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001884 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001885 raise
1886 errno = e.errno
1887 # Get any data from the outgoing BIO irrespective of any error, and
1888 # send it to the socket.
1889 buf = outgoing.read()
1890 sock.sendall(buf)
1891 # If there's no error, we're done. For WANT_READ, we need to get
1892 # data from the socket and put it in the incoming BIO.
1893 if errno is None:
1894 break
1895 elif errno == ssl.SSL_ERROR_WANT_READ:
1896 buf = sock.recv(32768)
1897 if buf:
1898 incoming.write(buf)
1899 else:
1900 incoming.write_eof()
1901 if support.verbose:
1902 sys.stdout.write("Needed %d calls to complete %s().\n"
1903 % (count, func.__name__))
1904 return ret
1905
Martin Panter3840b2a2016-03-27 01:53:46 +00001906 def test_bio_handshake(self):
1907 sock = socket.socket(socket.AF_INET)
1908 self.addCleanup(sock.close)
1909 sock.connect(self.server_addr)
1910 incoming = ssl.MemoryBIO()
1911 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001912 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1913 self.assertTrue(ctx.check_hostname)
1914 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001915 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001916 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1917 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00001918 self.assertIs(sslobj._sslobj.owner, sslobj)
1919 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07001920 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02001921 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001922 self.assertRaises(ValueError, sslobj.getpeercert)
1923 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1924 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1925 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1926 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001927 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07001928 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00001929 self.assertTrue(sslobj.getpeercert())
1930 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1931 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1932 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001933 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001934 except ssl.SSLSyscallError:
1935 # If the server shuts down the TCP connection without sending a
1936 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1937 pass
1938 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1939
1940 def test_bio_read_write_data(self):
1941 sock = socket.socket(socket.AF_INET)
1942 self.addCleanup(sock.close)
1943 sock.connect(self.server_addr)
1944 incoming = ssl.MemoryBIO()
1945 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001946 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001947 ctx.verify_mode = ssl.CERT_NONE
1948 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1949 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1950 req = b'FOO\n'
1951 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1952 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1953 self.assertEqual(buf, b'foo\n')
1954 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001955
1956
Martin Panter3840b2a2016-03-27 01:53:46 +00001957class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001958
Martin Panter3840b2a2016-03-27 01:53:46 +00001959 def test_timeout_connect_ex(self):
1960 # Issue #12065: on a timeout, connect_ex() should return the original
1961 # errno (mimicking the behaviour of non-SSL sockets).
1962 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02001963 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001964 cert_reqs=ssl.CERT_REQUIRED,
1965 do_handshake_on_connect=False)
1966 self.addCleanup(s.close)
1967 s.settimeout(0.0000001)
1968 rc = s.connect_ex((REMOTE_HOST, 443))
1969 if rc == 0:
1970 self.skipTest("REMOTE_HOST responded too quickly")
1971 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1972
1973 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1974 def test_get_server_certificate_ipv6(self):
1975 with support.transient_internet('ipv6.google.com'):
1976 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1977 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1978
Martin Panter3840b2a2016-03-27 01:53:46 +00001979
1980def _test_get_server_certificate(test, host, port, cert=None):
1981 pem = ssl.get_server_certificate((host, port))
1982 if not pem:
1983 test.fail("No server certificate on %s:%s!" % (host, port))
1984
1985 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1986 if not pem:
1987 test.fail("No server certificate on %s:%s!" % (host, port))
1988 if support.verbose:
1989 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1990
1991def _test_get_server_certificate_fail(test, host, port):
1992 try:
1993 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1994 except ssl.SSLError as x:
1995 #should fail
1996 if support.verbose:
1997 sys.stdout.write("%s\n" % x)
1998 else:
1999 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2000
2001
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002002from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002003
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002004class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002005
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002006 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002007
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002008 """A mildly complicated class, because we want it to work both
2009 with and without the SSL wrapper around the socket connection, so
2010 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002011
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002012 def __init__(self, server, connsock, addr):
2013 self.server = server
2014 self.running = False
2015 self.sock = connsock
2016 self.addr = addr
2017 self.sock.setblocking(1)
2018 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002019 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002020 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002021
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002022 def wrap_conn(self):
2023 try:
2024 self.sslconn = self.server.context.wrap_socket(
2025 self.sock, server_side=True)
2026 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2027 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2028 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2029 # We treat ConnectionResetError as though it were an
2030 # SSLError - OpenSSL on Ubuntu abruptly closes the
2031 # connection when asked to use an unsupported protocol.
2032 #
2033 # OSError may occur with wrong protocols, e.g. both
2034 # sides use PROTOCOL_TLS_SERVER.
2035 #
2036 # XXX Various errors can have happened here, for example
2037 # a mismatching protocol version, an invalid certificate,
2038 # or a low-level bug. This should be made more discriminating.
2039 #
2040 # bpo-31323: Store the exception as string to prevent
2041 # a reference leak: server -> conn_errors -> exception
2042 # -> traceback -> self (ConnectionHandler) -> server
2043 self.server.conn_errors.append(str(e))
2044 if self.server.chatty:
2045 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2046 self.running = False
2047 self.server.stop()
2048 self.close()
2049 return False
2050 else:
2051 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2052 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2053 cert = self.sslconn.getpeercert()
2054 if support.verbose and self.server.chatty:
2055 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2056 cert_binary = self.sslconn.getpeercert(True)
2057 if support.verbose and self.server.chatty:
2058 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2059 cipher = self.sslconn.cipher()
2060 if support.verbose and self.server.chatty:
2061 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2062 sys.stdout.write(" server: selected protocol is now "
2063 + str(self.sslconn.selected_npn_protocol()) + "\n")
2064 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002065
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002066 def read(self):
2067 if self.sslconn:
2068 return self.sslconn.read()
2069 else:
2070 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002071
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002072 def write(self, bytes):
2073 if self.sslconn:
2074 return self.sslconn.write(bytes)
2075 else:
2076 return self.sock.send(bytes)
2077
2078 def close(self):
2079 if self.sslconn:
2080 self.sslconn.close()
2081 else:
2082 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002083
Antoine Pitrou480a1242010-04-28 21:37:09 +00002084 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002085 self.running = True
2086 if not self.server.starttls_server:
2087 if not self.wrap_conn():
2088 return
2089 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002090 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002091 msg = self.read()
2092 stripped = msg.strip()
2093 if not stripped:
2094 # eof, so quit this handler
2095 self.running = False
2096 try:
2097 self.sock = self.sslconn.unwrap()
2098 except OSError:
2099 # Many tests shut the TCP connection down
2100 # without an SSL shutdown. This causes
2101 # unwrap() to raise OSError with errno=0!
2102 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002103 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002104 self.sslconn = None
2105 self.close()
2106 elif stripped == b'over':
2107 if support.verbose and self.server.connectionchatty:
2108 sys.stdout.write(" server: client closed connection\n")
2109 self.close()
2110 return
2111 elif (self.server.starttls_server and
2112 stripped == b'STARTTLS'):
2113 if support.verbose and self.server.connectionchatty:
2114 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2115 self.write(b"OK\n")
2116 if not self.wrap_conn():
2117 return
2118 elif (self.server.starttls_server and self.sslconn
2119 and stripped == b'ENDTLS'):
2120 if support.verbose and self.server.connectionchatty:
2121 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2122 self.write(b"OK\n")
2123 self.sock = self.sslconn.unwrap()
2124 self.sslconn = None
2125 if support.verbose and self.server.connectionchatty:
2126 sys.stdout.write(" server: connection is now unencrypted...\n")
2127 elif stripped == b'CB tls-unique':
2128 if support.verbose and self.server.connectionchatty:
2129 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2130 data = self.sslconn.get_channel_binding("tls-unique")
2131 self.write(repr(data).encode("us-ascii") + b"\n")
2132 else:
2133 if (support.verbose and
2134 self.server.connectionchatty):
2135 ctype = (self.sslconn and "encrypted") or "unencrypted"
2136 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2137 % (msg, ctype, msg.lower(), ctype))
2138 self.write(msg.lower())
2139 except OSError:
2140 if self.server.chatty:
2141 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002142 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002143 self.running = False
2144 # normally, we'd just stop here, but for the test
2145 # harness, we want to stop the server
2146 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002147
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002148 def __init__(self, certificate=None, ssl_version=None,
2149 certreqs=None, cacerts=None,
2150 chatty=True, connectionchatty=False, starttls_server=False,
2151 npn_protocols=None, alpn_protocols=None,
2152 ciphers=None, context=None):
2153 if context:
2154 self.context = context
2155 else:
2156 self.context = ssl.SSLContext(ssl_version
2157 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002158 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002159 self.context.verify_mode = (certreqs if certreqs is not None
2160 else ssl.CERT_NONE)
2161 if cacerts:
2162 self.context.load_verify_locations(cacerts)
2163 if certificate:
2164 self.context.load_cert_chain(certificate)
2165 if npn_protocols:
2166 self.context.set_npn_protocols(npn_protocols)
2167 if alpn_protocols:
2168 self.context.set_alpn_protocols(alpn_protocols)
2169 if ciphers:
2170 self.context.set_ciphers(ciphers)
2171 self.chatty = chatty
2172 self.connectionchatty = connectionchatty
2173 self.starttls_server = starttls_server
2174 self.sock = socket.socket()
2175 self.port = support.bind_port(self.sock)
2176 self.flag = None
2177 self.active = False
2178 self.selected_npn_protocols = []
2179 self.selected_alpn_protocols = []
2180 self.shared_ciphers = []
2181 self.conn_errors = []
2182 threading.Thread.__init__(self)
2183 self.daemon = True
2184
2185 def __enter__(self):
2186 self.start(threading.Event())
2187 self.flag.wait()
2188 return self
2189
2190 def __exit__(self, *args):
2191 self.stop()
2192 self.join()
2193
2194 def start(self, flag=None):
2195 self.flag = flag
2196 threading.Thread.start(self)
2197
2198 def run(self):
2199 self.sock.settimeout(0.05)
2200 self.sock.listen()
2201 self.active = True
2202 if self.flag:
2203 # signal an event
2204 self.flag.set()
2205 while self.active:
2206 try:
2207 newconn, connaddr = self.sock.accept()
2208 if support.verbose and self.chatty:
2209 sys.stdout.write(' server: new connection from '
2210 + repr(connaddr) + '\n')
2211 handler = self.ConnectionHandler(self, newconn, connaddr)
2212 handler.start()
2213 handler.join()
2214 except socket.timeout:
2215 pass
2216 except KeyboardInterrupt:
2217 self.stop()
2218 self.sock.close()
2219
2220 def stop(self):
2221 self.active = False
2222
2223class AsyncoreEchoServer(threading.Thread):
2224
2225 # this one's based on asyncore.dispatcher
2226
2227 class EchoServer (asyncore.dispatcher):
2228
2229 class ConnectionHandler(asyncore.dispatcher_with_send):
2230
2231 def __init__(self, conn, certfile):
2232 self.socket = test_wrap_socket(conn, server_side=True,
2233 certfile=certfile,
2234 do_handshake_on_connect=False)
2235 asyncore.dispatcher_with_send.__init__(self, self.socket)
2236 self._ssl_accepting = True
2237 self._do_ssl_handshake()
2238
2239 def readable(self):
2240 if isinstance(self.socket, ssl.SSLSocket):
2241 while self.socket.pending() > 0:
2242 self.handle_read_event()
2243 return True
2244
2245 def _do_ssl_handshake(self):
2246 try:
2247 self.socket.do_handshake()
2248 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2249 return
2250 except ssl.SSLEOFError:
2251 return self.handle_close()
2252 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002253 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002254 except OSError as err:
2255 if err.args[0] == errno.ECONNABORTED:
2256 return self.handle_close()
2257 else:
2258 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002259
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002260 def handle_read(self):
2261 if self._ssl_accepting:
2262 self._do_ssl_handshake()
2263 else:
2264 data = self.recv(1024)
2265 if support.verbose:
2266 sys.stdout.write(" server: read %s from client\n" % repr(data))
2267 if not data:
2268 self.close()
2269 else:
2270 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002271
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002272 def handle_close(self):
2273 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002274 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002275 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002276
2277 def handle_error(self):
2278 raise
2279
Trent Nelson78520002008-04-10 20:54:35 +00002280 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002281 self.certfile = certfile
2282 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2283 self.port = support.bind_port(sock, '')
2284 asyncore.dispatcher.__init__(self, sock)
2285 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002286
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002287 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002288 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002289 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2290 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002291
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002292 def handle_error(self):
2293 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002294
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002295 def __init__(self, certfile):
2296 self.flag = None
2297 self.active = False
2298 self.server = self.EchoServer(certfile)
2299 self.port = self.server.port
2300 threading.Thread.__init__(self)
2301 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002302
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002303 def __str__(self):
2304 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002305
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002306 def __enter__(self):
2307 self.start(threading.Event())
2308 self.flag.wait()
2309 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002310
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002311 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002312 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002313 sys.stdout.write(" cleanup: stopping server.\n")
2314 self.stop()
2315 if support.verbose:
2316 sys.stdout.write(" cleanup: joining server thread.\n")
2317 self.join()
2318 if support.verbose:
2319 sys.stdout.write(" cleanup: successfully joined.\n")
2320 # make sure that ConnectionHandler is removed from socket_map
2321 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002322
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002323 def start (self, flag=None):
2324 self.flag = flag
2325 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002326
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002327 def run(self):
2328 self.active = True
2329 if self.flag:
2330 self.flag.set()
2331 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002332 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002333 asyncore.loop(1)
2334 except:
2335 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002336
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002337 def stop(self):
2338 self.active = False
2339 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002340
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002341def server_params_test(client_context, server_context, indata=b"FOO\n",
2342 chatty=True, connectionchatty=False, sni_name=None,
2343 session=None):
2344 """
2345 Launch a server, connect a client to it and try various reads
2346 and writes.
2347 """
2348 stats = {}
2349 server = ThreadedEchoServer(context=server_context,
2350 chatty=chatty,
2351 connectionchatty=False)
2352 with server:
2353 with client_context.wrap_socket(socket.socket(),
2354 server_hostname=sni_name, session=session) as s:
2355 s.connect((HOST, server.port))
2356 for arg in [indata, bytearray(indata), memoryview(indata)]:
2357 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002358 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002359 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002360 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002361 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002362 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002363 if connectionchatty:
2364 if support.verbose:
2365 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002366 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002367 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002368 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2369 % (outdata[:20], len(outdata),
2370 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002371 s.write(b"over\n")
2372 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002373 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002374 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002375 stats.update({
2376 'compression': s.compression(),
2377 'cipher': s.cipher(),
2378 'peercert': s.getpeercert(),
2379 'client_alpn_protocol': s.selected_alpn_protocol(),
2380 'client_npn_protocol': s.selected_npn_protocol(),
2381 'version': s.version(),
2382 'session_reused': s.session_reused,
2383 'session': s.session,
2384 })
2385 s.close()
2386 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2387 stats['server_npn_protocols'] = server.selected_npn_protocols
2388 stats['server_shared_ciphers'] = server.shared_ciphers
2389 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002390
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002391def try_protocol_combo(server_protocol, client_protocol, expect_success,
2392 certsreqs=None, server_options=0, client_options=0):
2393 """
2394 Try to SSL-connect using *client_protocol* to *server_protocol*.
2395 If *expect_success* is true, assert that the connection succeeds,
2396 if it's false, assert that the connection fails.
2397 Also, if *expect_success* is a string, assert that it is the protocol
2398 version actually used by the connection.
2399 """
2400 if certsreqs is None:
2401 certsreqs = ssl.CERT_NONE
2402 certtype = {
2403 ssl.CERT_NONE: "CERT_NONE",
2404 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2405 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2406 }[certsreqs]
2407 if support.verbose:
2408 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2409 sys.stdout.write(formatstr %
2410 (ssl.get_protocol_name(client_protocol),
2411 ssl.get_protocol_name(server_protocol),
2412 certtype))
2413 client_context = ssl.SSLContext(client_protocol)
2414 client_context.options |= client_options
2415 server_context = ssl.SSLContext(server_protocol)
2416 server_context.options |= server_options
2417
2418 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2419 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2420 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002421 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002422 client_context.set_ciphers("ALL")
2423
2424 for ctx in (client_context, server_context):
2425 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002426 ctx.load_cert_chain(SIGNED_CERTFILE)
2427 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002428 try:
2429 stats = server_params_test(client_context, server_context,
2430 chatty=False, connectionchatty=False)
2431 # Protocol mismatch can result in either an SSLError, or a
2432 # "Connection reset by peer" error.
2433 except ssl.SSLError:
2434 if expect_success:
2435 raise
2436 except OSError as e:
2437 if expect_success or e.errno != errno.ECONNRESET:
2438 raise
2439 else:
2440 if not expect_success:
2441 raise AssertionError(
2442 "Client protocol %s succeeded with server protocol %s!"
2443 % (ssl.get_protocol_name(client_protocol),
2444 ssl.get_protocol_name(server_protocol)))
2445 elif (expect_success is not True
2446 and expect_success != stats['version']):
2447 raise AssertionError("version mismatch: expected %r, got %r"
2448 % (expect_success, stats['version']))
2449
2450
2451class ThreadedTests(unittest.TestCase):
2452
2453 @skip_if_broken_ubuntu_ssl
2454 def test_echo(self):
2455 """Basic test of an SSL client connecting to a server"""
2456 if support.verbose:
2457 sys.stdout.write("\n")
2458 for protocol in PROTOCOLS:
2459 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2460 continue
2461 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2462 context = ssl.SSLContext(protocol)
2463 context.load_cert_chain(CERTFILE)
2464 server_params_test(context, context,
2465 chatty=True, connectionchatty=True)
2466
Christian Heimesa170fa12017-09-15 20:27:30 +02002467 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002468
2469 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2470 server_params_test(client_context=client_context,
2471 server_context=server_context,
2472 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002473 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002474
2475 client_context.check_hostname = False
2476 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2477 with self.assertRaises(ssl.SSLError) as e:
2478 server_params_test(client_context=server_context,
2479 server_context=client_context,
2480 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002481 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002482 self.assertIn('called a function you should not call',
2483 str(e.exception))
2484
2485 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2486 with self.assertRaises(ssl.SSLError) as e:
2487 server_params_test(client_context=server_context,
2488 server_context=server_context,
2489 chatty=True, connectionchatty=True)
2490 self.assertIn('called a function you should not call',
2491 str(e.exception))
2492
2493 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2494 with self.assertRaises(ssl.SSLError) as e:
2495 server_params_test(client_context=server_context,
2496 server_context=client_context,
2497 chatty=True, connectionchatty=True)
2498 self.assertIn('called a function you should not call',
2499 str(e.exception))
2500
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002501 def test_getpeercert(self):
2502 if support.verbose:
2503 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002504
2505 client_context, server_context, hostname = testing_context()
2506 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002507 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002508 with client_context.wrap_socket(socket.socket(),
2509 do_handshake_on_connect=False,
2510 server_hostname=hostname) as s:
2511 s.connect((HOST, server.port))
2512 # getpeercert() raise ValueError while the handshake isn't
2513 # done.
2514 with self.assertRaises(ValueError):
2515 s.getpeercert()
2516 s.do_handshake()
2517 cert = s.getpeercert()
2518 self.assertTrue(cert, "Can't get peer certificate.")
2519 cipher = s.cipher()
2520 if support.verbose:
2521 sys.stdout.write(pprint.pformat(cert) + '\n')
2522 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2523 if 'subject' not in cert:
2524 self.fail("No subject field in certificate: %s." %
2525 pprint.pformat(cert))
2526 if ((('organizationName', 'Python Software Foundation'),)
2527 not in cert['subject']):
2528 self.fail(
2529 "Missing or invalid 'organizationName' field in certificate subject; "
2530 "should be 'Python Software Foundation'.")
2531 self.assertIn('notBefore', cert)
2532 self.assertIn('notAfter', cert)
2533 before = ssl.cert_time_to_seconds(cert['notBefore'])
2534 after = ssl.cert_time_to_seconds(cert['notAfter'])
2535 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002536
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002537 @unittest.skipUnless(have_verify_flags(),
2538 "verify_flags need OpenSSL > 0.9.8")
2539 def test_crl_check(self):
2540 if support.verbose:
2541 sys.stdout.write("\n")
2542
Christian Heimesa170fa12017-09-15 20:27:30 +02002543 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002544
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002545 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002546 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002547
2548 # VERIFY_DEFAULT should pass
2549 server = ThreadedEchoServer(context=server_context, chatty=True)
2550 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002551 with client_context.wrap_socket(socket.socket(),
2552 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002553 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002554 cert = s.getpeercert()
2555 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002556
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002557 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002558 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002559
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002560 server = ThreadedEchoServer(context=server_context, chatty=True)
2561 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002562 with client_context.wrap_socket(socket.socket(),
2563 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002564 with self.assertRaisesRegex(ssl.SSLError,
2565 "certificate verify failed"):
2566 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002567
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002568 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002569 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002570
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002571 server = ThreadedEchoServer(context=server_context, chatty=True)
2572 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002573 with client_context.wrap_socket(socket.socket(),
2574 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002575 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002576 cert = s.getpeercert()
2577 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002578
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002579 def test_check_hostname(self):
2580 if support.verbose:
2581 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002582
Christian Heimesa170fa12017-09-15 20:27:30 +02002583 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002584
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002585 # correct hostname should verify
2586 server = ThreadedEchoServer(context=server_context, chatty=True)
2587 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002588 with client_context.wrap_socket(socket.socket(),
2589 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002590 s.connect((HOST, server.port))
2591 cert = s.getpeercert()
2592 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002593
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002594 # incorrect hostname should raise an exception
2595 server = ThreadedEchoServer(context=server_context, chatty=True)
2596 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002597 with client_context.wrap_socket(socket.socket(),
2598 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002599 with self.assertRaisesRegex(
2600 ssl.CertificateError,
2601 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002602 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002603
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002604 # missing server_hostname arg should cause an exception, too
2605 server = ThreadedEchoServer(context=server_context, chatty=True)
2606 with server:
2607 with socket.socket() as s:
2608 with self.assertRaisesRegex(ValueError,
2609 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002610 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002611
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002612 def test_ecc_cert(self):
2613 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2614 client_context.load_verify_locations(SIGNING_CA)
2615 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2616 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2617
2618 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2619 # load ECC cert
2620 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2621
2622 # correct hostname should verify
2623 server = ThreadedEchoServer(context=server_context, chatty=True)
2624 with server:
2625 with client_context.wrap_socket(socket.socket(),
2626 server_hostname=hostname) as s:
2627 s.connect((HOST, server.port))
2628 cert = s.getpeercert()
2629 self.assertTrue(cert, "Can't get peer certificate.")
2630 cipher = s.cipher()[0].split('-')
2631 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2632
2633 def test_dual_rsa_ecc(self):
2634 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2635 client_context.load_verify_locations(SIGNING_CA)
2636 # only ECDSA certs
2637 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2638 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2639
2640 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2641 # load ECC and RSA key/cert pairs
2642 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2643 server_context.load_cert_chain(SIGNED_CERTFILE)
2644
2645 # correct hostname should verify
2646 server = ThreadedEchoServer(context=server_context, chatty=True)
2647 with server:
2648 with client_context.wrap_socket(socket.socket(),
2649 server_hostname=hostname) as s:
2650 s.connect((HOST, server.port))
2651 cert = s.getpeercert()
2652 self.assertTrue(cert, "Can't get peer certificate.")
2653 cipher = s.cipher()[0].split('-')
2654 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2655
Christian Heimes66e57422018-01-29 14:25:13 +01002656 def test_check_hostname_idn(self):
2657 if support.verbose:
2658 sys.stdout.write("\n")
2659
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002660 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002661 server_context.load_cert_chain(IDNSANSFILE)
2662
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002663 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002664 context.verify_mode = ssl.CERT_REQUIRED
2665 context.check_hostname = True
2666 context.load_verify_locations(SIGNING_CA)
2667
2668 # correct hostname should verify, when specified in several
2669 # different ways
2670 idn_hostnames = [
2671 ('könig.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002672 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002673 ('xn--knig-5qa.idn.pythontest.net',
2674 'xn--knig-5qa.idn.pythontest.net'),
2675 (b'xn--knig-5qa.idn.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002676 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002677
2678 ('königsgäßchen.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002679 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002680 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2681 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2682 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Miss Islington (bot)1c37e272018-02-23 19:18:28 -08002683 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2684
2685 # ('königsgäßchen.idna2008.pythontest.net',
2686 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2687 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2688 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2689 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2690 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2691
Christian Heimes66e57422018-01-29 14:25:13 +01002692 ]
2693 for server_hostname, expected_hostname in idn_hostnames:
2694 server = ThreadedEchoServer(context=server_context, chatty=True)
2695 with server:
2696 with context.wrap_socket(socket.socket(),
2697 server_hostname=server_hostname) as s:
2698 self.assertEqual(s.server_hostname, expected_hostname)
2699 s.connect((HOST, server.port))
2700 cert = s.getpeercert()
2701 self.assertEqual(s.server_hostname, expected_hostname)
2702 self.assertTrue(cert, "Can't get peer certificate.")
2703
2704 with ssl.SSLSocket(socket.socket(),
2705 server_hostname=server_hostname) as s:
2706 s.connect((HOST, server.port))
2707 s.getpeercert()
2708 self.assertEqual(s.server_hostname, expected_hostname)
2709
Christian Heimes66e57422018-01-29 14:25:13 +01002710 # incorrect hostname should raise an exception
2711 server = ThreadedEchoServer(context=server_context, chatty=True)
2712 with server:
2713 with context.wrap_socket(socket.socket(),
2714 server_hostname="python.example.org") as s:
2715 with self.assertRaises(ssl.CertificateError):
2716 s.connect((HOST, server.port))
2717
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002718 def test_wrong_cert(self):
2719 """Connecting when the server rejects the client's certificate
2720
2721 Launch a server with CERT_REQUIRED, and check that trying to
2722 connect to it with a wrong client certificate fails.
2723 """
2724 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2725 "wrongcert.pem")
2726 server = ThreadedEchoServer(CERTFILE,
2727 certreqs=ssl.CERT_REQUIRED,
2728 cacerts=CERTFILE, chatty=False,
2729 connectionchatty=False)
2730 with server, \
2731 socket.socket() as sock, \
Christian Heimesa170fa12017-09-15 20:27:30 +02002732 test_wrap_socket(sock, certfile=certfile) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002733 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002734 # Expect either an SSL error about the server rejecting
2735 # the connection, or a low-level connection reset (which
2736 # sometimes happens on Windows)
2737 s.connect((HOST, server.port))
2738 except ssl.SSLError as e:
2739 if support.verbose:
2740 sys.stdout.write("\nSSLError is %r\n" % e)
2741 except OSError as e:
2742 if e.errno != errno.ECONNRESET:
2743 raise
2744 if support.verbose:
2745 sys.stdout.write("\nsocket.error is %r\n" % e)
2746 else:
2747 self.fail("Use of invalid cert should have failed!")
2748
2749 def test_rude_shutdown(self):
2750 """A brutal shutdown of an SSL server should raise an OSError
2751 in the client when attempting handshake.
2752 """
2753 listener_ready = threading.Event()
2754 listener_gone = threading.Event()
2755
2756 s = socket.socket()
2757 port = support.bind_port(s, HOST)
2758
2759 # `listener` runs in a thread. It sits in an accept() until
2760 # the main thread connects. Then it rudely closes the socket,
2761 # and sets Event `listener_gone` to let the main thread know
2762 # the socket is gone.
2763 def listener():
2764 s.listen()
2765 listener_ready.set()
2766 newsock, addr = s.accept()
2767 newsock.close()
2768 s.close()
2769 listener_gone.set()
2770
2771 def connector():
2772 listener_ready.wait()
2773 with socket.socket() as c:
2774 c.connect((HOST, port))
2775 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002776 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002777 ssl_sock = test_wrap_socket(c)
2778 except OSError:
2779 pass
2780 else:
2781 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002782
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002783 t = threading.Thread(target=listener)
2784 t.start()
2785 try:
2786 connector()
2787 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002788 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002789
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002790 def test_ssl_cert_verify_error(self):
2791 if support.verbose:
2792 sys.stdout.write("\n")
2793
2794 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2795 server_context.load_cert_chain(SIGNED_CERTFILE)
2796
2797 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2798
2799 server = ThreadedEchoServer(context=server_context, chatty=True)
2800 with server:
2801 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002802 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002803 try:
2804 s.connect((HOST, server.port))
2805 except ssl.SSLError as e:
2806 msg = 'unable to get local issuer certificate'
2807 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2808 self.assertEqual(e.verify_code, 20)
2809 self.assertEqual(e.verify_message, msg)
2810 self.assertIn(msg, repr(e))
2811 self.assertIn('certificate verify failed', repr(e))
2812
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002813 @skip_if_broken_ubuntu_ssl
2814 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2815 "OpenSSL is compiled without SSLv2 support")
2816 def test_protocol_sslv2(self):
2817 """Connecting to an SSLv2 server with various client options"""
2818 if support.verbose:
2819 sys.stdout.write("\n")
2820 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2821 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2822 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002823 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002824 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2825 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2826 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2827 # SSLv23 client with specific SSL options
2828 if no_sslv2_implies_sslv3_hello():
2829 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002830 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002831 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002832 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002833 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002834 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002836
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002837 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002838 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002839 """Connecting to an SSLv23 server with various client options"""
2840 if support.verbose:
2841 sys.stdout.write("\n")
2842 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002843 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002844 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002845 except OSError as x:
2846 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2847 if support.verbose:
2848 sys.stdout.write(
2849 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2850 % str(x))
2851 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002852 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2853 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2854 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002855
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002856 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002857 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2858 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2859 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002860
2861 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002862 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2863 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2864 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002865
2866 # Server with specific SSL options
2867 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002868 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002869 server_options=ssl.OP_NO_SSLv3)
2870 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002871 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002872 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002873 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002874 server_options=ssl.OP_NO_TLSv1)
2875
2876
2877 @skip_if_broken_ubuntu_ssl
2878 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2879 "OpenSSL is compiled without SSLv3 support")
2880 def test_protocol_sslv3(self):
2881 """Connecting to an SSLv3 server with various client options"""
2882 if support.verbose:
2883 sys.stdout.write("\n")
2884 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2885 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2886 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2887 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2888 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002889 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002890 client_options=ssl.OP_NO_SSLv3)
2891 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2892 if no_sslv2_implies_sslv3_hello():
2893 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002894 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002895 False, client_options=ssl.OP_NO_SSLv2)
2896
2897 @skip_if_broken_ubuntu_ssl
2898 def test_protocol_tlsv1(self):
2899 """Connecting to a TLSv1 server with various client options"""
2900 if support.verbose:
2901 sys.stdout.write("\n")
2902 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2903 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2904 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2905 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2906 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2907 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2908 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002909 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002910 client_options=ssl.OP_NO_TLSv1)
2911
2912 @skip_if_broken_ubuntu_ssl
2913 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2914 "TLS version 1.1 not supported.")
2915 def test_protocol_tlsv1_1(self):
2916 """Connecting to a TLSv1.1 server with various client options.
2917 Testing against older TLS versions."""
2918 if support.verbose:
2919 sys.stdout.write("\n")
2920 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2921 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2922 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2923 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2924 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002925 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002926 client_options=ssl.OP_NO_TLSv1_1)
2927
Christian Heimesa170fa12017-09-15 20:27:30 +02002928 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002929 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2930 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2931
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002932 @skip_if_broken_ubuntu_ssl
2933 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2934 "TLS version 1.2 not supported.")
2935 def test_protocol_tlsv1_2(self):
2936 """Connecting to a TLSv1.2 server with various client options.
2937 Testing against older TLS versions."""
2938 if support.verbose:
2939 sys.stdout.write("\n")
2940 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2941 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2942 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2943 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2944 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2945 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2946 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002947 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002948 client_options=ssl.OP_NO_TLSv1_2)
2949
Christian Heimesa170fa12017-09-15 20:27:30 +02002950 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002951 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2952 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2953 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2954 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2955
2956 def test_starttls(self):
2957 """Switching from clear text to encrypted and back again."""
2958 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2959
2960 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002961 starttls_server=True,
2962 chatty=True,
2963 connectionchatty=True)
2964 wrapped = False
2965 with server:
2966 s = socket.socket()
2967 s.setblocking(1)
2968 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002969 if support.verbose:
2970 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002971 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02002972 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002973 sys.stdout.write(
2974 " client: sending %r...\n" % indata)
2975 if wrapped:
2976 conn.write(indata)
2977 outdata = conn.read()
2978 else:
2979 s.send(indata)
2980 outdata = s.recv(1024)
2981 msg = outdata.strip().lower()
2982 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2983 # STARTTLS ok, switch to secure mode
2984 if support.verbose:
2985 sys.stdout.write(
2986 " client: read %r from server, starting TLS...\n"
2987 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02002988 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002989 wrapped = True
2990 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2991 # ENDTLS ok, switch back to clear text
2992 if support.verbose:
2993 sys.stdout.write(
2994 " client: read %r from server, ending TLS...\n"
2995 % msg)
2996 s = conn.unwrap()
2997 wrapped = False
2998 else:
2999 if support.verbose:
3000 sys.stdout.write(
3001 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003002 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003003 sys.stdout.write(" client: closing connection.\n")
3004 if wrapped:
3005 conn.write(b"over\n")
3006 else:
3007 s.send(b"over\n")
3008 if wrapped:
3009 conn.close()
3010 else:
3011 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003012
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003013 def test_socketserver(self):
3014 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003015 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003016 # try to connect
3017 if support.verbose:
3018 sys.stdout.write('\n')
3019 with open(CERTFILE, 'rb') as f:
3020 d1 = f.read()
3021 d2 = ''
3022 # now fetch the same data from the HTTPS server
3023 url = 'https://localhost:%d/%s' % (
3024 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003025 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003026 f = urllib.request.urlopen(url, context=context)
3027 try:
3028 dlen = f.info().get("content-length")
3029 if dlen and (int(dlen) > 0):
3030 d2 = f.read(int(dlen))
3031 if support.verbose:
3032 sys.stdout.write(
3033 " client: read %d bytes from remote server '%s'\n"
3034 % (len(d2), server))
3035 finally:
3036 f.close()
3037 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003038
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003039 def test_asyncore_server(self):
3040 """Check the example asyncore integration."""
3041 if support.verbose:
3042 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003043
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003044 indata = b"FOO\n"
3045 server = AsyncoreEchoServer(CERTFILE)
3046 with server:
3047 s = test_wrap_socket(socket.socket())
3048 s.connect(('127.0.0.1', server.port))
3049 if support.verbose:
3050 sys.stdout.write(
3051 " client: sending %r...\n" % indata)
3052 s.write(indata)
3053 outdata = s.read()
3054 if support.verbose:
3055 sys.stdout.write(" client: read %r\n" % outdata)
3056 if outdata != indata.lower():
3057 self.fail(
3058 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3059 % (outdata[:20], len(outdata),
3060 indata[:20].lower(), len(indata)))
3061 s.write(b"over\n")
3062 if support.verbose:
3063 sys.stdout.write(" client: closing connection.\n")
3064 s.close()
3065 if support.verbose:
3066 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003068 def test_recv_send(self):
3069 """Test recv(), send() and friends."""
3070 if support.verbose:
3071 sys.stdout.write("\n")
3072
3073 server = ThreadedEchoServer(CERTFILE,
3074 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003075 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003076 cacerts=CERTFILE,
3077 chatty=True,
3078 connectionchatty=False)
3079 with server:
3080 s = test_wrap_socket(socket.socket(),
3081 server_side=False,
3082 certfile=CERTFILE,
3083 ca_certs=CERTFILE,
3084 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003085 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003086 s.connect((HOST, server.port))
3087 # helper methods for standardising recv* method signatures
3088 def _recv_into():
3089 b = bytearray(b"\0"*100)
3090 count = s.recv_into(b)
3091 return b[:count]
3092
3093 def _recvfrom_into():
3094 b = bytearray(b"\0"*100)
3095 count, addr = s.recvfrom_into(b)
3096 return b[:count]
3097
3098 # (name, method, expect success?, *args, return value func)
3099 send_methods = [
3100 ('send', s.send, True, [], len),
3101 ('sendto', s.sendto, False, ["some.address"], len),
3102 ('sendall', s.sendall, True, [], lambda x: None),
3103 ]
3104 # (name, method, whether to expect success, *args)
3105 recv_methods = [
3106 ('recv', s.recv, True, []),
3107 ('recvfrom', s.recvfrom, False, ["some.address"]),
3108 ('recv_into', _recv_into, True, []),
3109 ('recvfrom_into', _recvfrom_into, False, []),
3110 ]
3111 data_prefix = "PREFIX_"
3112
3113 for (meth_name, send_meth, expect_success, args,
3114 ret_val_meth) in send_methods:
3115 indata = (data_prefix + meth_name).encode('ascii')
3116 try:
3117 ret = send_meth(indata, *args)
3118 msg = "sending with {}".format(meth_name)
3119 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3120 outdata = s.read()
3121 if outdata != indata.lower():
3122 self.fail(
3123 "While sending with <<{name:s}>> bad data "
3124 "<<{outdata:r}>> ({nout:d}) received; "
3125 "expected <<{indata:r}>> ({nin:d})\n".format(
3126 name=meth_name, outdata=outdata[:20],
3127 nout=len(outdata),
3128 indata=indata[:20], nin=len(indata)
3129 )
3130 )
3131 except ValueError as e:
3132 if expect_success:
3133 self.fail(
3134 "Failed to send with method <<{name:s}>>; "
3135 "expected to succeed.\n".format(name=meth_name)
3136 )
3137 if not str(e).startswith(meth_name):
3138 self.fail(
3139 "Method <<{name:s}>> failed with unexpected "
3140 "exception message: {exp:s}\n".format(
3141 name=meth_name, exp=e
3142 )
3143 )
3144
3145 for meth_name, recv_meth, expect_success, args in recv_methods:
3146 indata = (data_prefix + meth_name).encode('ascii')
3147 try:
3148 s.send(indata)
3149 outdata = recv_meth(*args)
3150 if outdata != indata.lower():
3151 self.fail(
3152 "While receiving with <<{name:s}>> bad data "
3153 "<<{outdata:r}>> ({nout:d}) received; "
3154 "expected <<{indata:r}>> ({nin:d})\n".format(
3155 name=meth_name, outdata=outdata[:20],
3156 nout=len(outdata),
3157 indata=indata[:20], nin=len(indata)
3158 )
3159 )
3160 except ValueError as e:
3161 if expect_success:
3162 self.fail(
3163 "Failed to receive with method <<{name:s}>>; "
3164 "expected to succeed.\n".format(name=meth_name)
3165 )
3166 if not str(e).startswith(meth_name):
3167 self.fail(
3168 "Method <<{name:s}>> failed with unexpected "
3169 "exception message: {exp:s}\n".format(
3170 name=meth_name, exp=e
3171 )
3172 )
3173 # consume data
3174 s.read()
3175
3176 # read(-1, buffer) is supported, even though read(-1) is not
3177 data = b"data"
3178 s.send(data)
3179 buffer = bytearray(len(data))
3180 self.assertEqual(s.read(-1, buffer), len(data))
3181 self.assertEqual(buffer, data)
3182
Christian Heimes888bbdc2017-09-07 14:18:21 -07003183 # sendall accepts bytes-like objects
3184 if ctypes is not None:
3185 ubyte = ctypes.c_ubyte * len(data)
3186 byteslike = ubyte.from_buffer_copy(data)
3187 s.sendall(byteslike)
3188 self.assertEqual(s.read(), data)
3189
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003190 # Make sure sendmsg et al are disallowed to avoid
3191 # inadvertent disclosure of data and/or corruption
3192 # of the encrypted data stream
3193 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3194 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3195 self.assertRaises(NotImplementedError,
3196 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003197 s.write(b"over\n")
3198
3199 self.assertRaises(ValueError, s.recv, -1)
3200 self.assertRaises(ValueError, s.read, -1)
3201
3202 s.close()
3203
3204 def test_recv_zero(self):
3205 server = ThreadedEchoServer(CERTFILE)
3206 server.__enter__()
3207 self.addCleanup(server.__exit__, None, None)
3208 s = socket.create_connection((HOST, server.port))
3209 self.addCleanup(s.close)
3210 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3211 self.addCleanup(s.close)
3212
3213 # recv/read(0) should return no data
3214 s.send(b"data")
3215 self.assertEqual(s.recv(0), b"")
3216 self.assertEqual(s.read(0), b"")
3217 self.assertEqual(s.read(), b"data")
3218
3219 # Should not block if the other end sends no data
3220 s.setblocking(False)
3221 self.assertEqual(s.recv(0), b"")
3222 self.assertEqual(s.recv_into(bytearray()), 0)
3223
3224 def test_nonblocking_send(self):
3225 server = ThreadedEchoServer(CERTFILE,
3226 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003227 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003228 cacerts=CERTFILE,
3229 chatty=True,
3230 connectionchatty=False)
3231 with server:
3232 s = test_wrap_socket(socket.socket(),
3233 server_side=False,
3234 certfile=CERTFILE,
3235 ca_certs=CERTFILE,
3236 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003237 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003238 s.connect((HOST, server.port))
3239 s.setblocking(False)
3240
3241 # If we keep sending data, at some point the buffers
3242 # will be full and the call will block
3243 buf = bytearray(8192)
3244 def fill_buffer():
3245 while True:
3246 s.send(buf)
3247 self.assertRaises((ssl.SSLWantWriteError,
3248 ssl.SSLWantReadError), fill_buffer)
3249
3250 # Now read all the output and discard it
3251 s.setblocking(True)
3252 s.close()
3253
3254 def test_handshake_timeout(self):
3255 # Issue #5103: SSL handshake must respect the socket timeout
3256 server = socket.socket(socket.AF_INET)
3257 host = "127.0.0.1"
3258 port = support.bind_port(server)
3259 started = threading.Event()
3260 finish = False
3261
3262 def serve():
3263 server.listen()
3264 started.set()
3265 conns = []
3266 while not finish:
3267 r, w, e = select.select([server], [], [], 0.1)
3268 if server in r:
3269 # Let the socket hang around rather than having
3270 # it closed by garbage collection.
3271 conns.append(server.accept()[0])
3272 for sock in conns:
3273 sock.close()
3274
3275 t = threading.Thread(target=serve)
3276 t.start()
3277 started.wait()
3278
3279 try:
3280 try:
3281 c = socket.socket(socket.AF_INET)
3282 c.settimeout(0.2)
3283 c.connect((host, port))
3284 # Will attempt handshake and time out
3285 self.assertRaisesRegex(socket.timeout, "timed out",
3286 test_wrap_socket, c)
3287 finally:
3288 c.close()
3289 try:
3290 c = socket.socket(socket.AF_INET)
3291 c = test_wrap_socket(c)
3292 c.settimeout(0.2)
3293 # Will attempt handshake and time out
3294 self.assertRaisesRegex(socket.timeout, "timed out",
3295 c.connect, (host, port))
3296 finally:
3297 c.close()
3298 finally:
3299 finish = True
3300 t.join()
3301 server.close()
3302
3303 def test_server_accept(self):
3304 # Issue #16357: accept() on a SSLSocket created through
3305 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003306 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003307 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003308 context.load_verify_locations(SIGNING_CA)
3309 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003310 server = socket.socket(socket.AF_INET)
3311 host = "127.0.0.1"
3312 port = support.bind_port(server)
3313 server = context.wrap_socket(server, server_side=True)
3314 self.assertTrue(server.server_side)
3315
3316 evt = threading.Event()
3317 remote = None
3318 peer = None
3319 def serve():
3320 nonlocal remote, peer
3321 server.listen()
3322 # Block on the accept and wait on the connection to close.
3323 evt.set()
3324 remote, peer = server.accept()
3325 remote.recv(1)
3326
3327 t = threading.Thread(target=serve)
3328 t.start()
3329 # Client wait until server setup and perform a connect.
3330 evt.wait()
3331 client = context.wrap_socket(socket.socket())
3332 client.connect((host, port))
3333 client_addr = client.getsockname()
3334 client.close()
3335 t.join()
3336 remote.close()
3337 server.close()
3338 # Sanity checks.
3339 self.assertIsInstance(remote, ssl.SSLSocket)
3340 self.assertEqual(peer, client_addr)
3341
3342 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003343 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003344 with context.wrap_socket(socket.socket()) as sock:
3345 with self.assertRaises(OSError) as cm:
3346 sock.getpeercert()
3347 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3348
3349 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003350 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003351 with context.wrap_socket(socket.socket()) as sock:
3352 with self.assertRaises(OSError) as cm:
3353 sock.do_handshake()
3354 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3355
3356 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003357 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003358 try:
3359 # Force a set of weak ciphers on our client context
3360 context.set_ciphers("DES")
3361 except ssl.SSLError:
3362 self.skipTest("no DES cipher available")
3363 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003364 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003365 chatty=False) as server:
3366 with context.wrap_socket(socket.socket()) as s:
3367 with self.assertRaises(OSError):
3368 s.connect((HOST, server.port))
3369 self.assertIn("no shared cipher", server.conn_errors[0])
3370
3371 def test_version_basic(self):
3372 """
3373 Basic tests for SSLSocket.version().
3374 More tests are done in the test_protocol_*() methods.
3375 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003376 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3377 context.check_hostname = False
3378 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003379 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003380 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003381 chatty=False) as server:
3382 with context.wrap_socket(socket.socket()) as s:
3383 self.assertIs(s.version(), None)
3384 s.connect((HOST, server.port))
Christian Heimesa170fa12017-09-15 20:27:30 +02003385 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3386 self.assertEqual(s.version(), 'TLSv1.2')
3387 else: # 0.9.8 to 1.0.1
3388 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003389 self.assertIs(s.version(), None)
3390
Christian Heimescb5b68a2017-09-07 18:07:00 -07003391 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3392 "test requires TLSv1.3 enabled OpenSSL")
3393 def test_tls1_3(self):
3394 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3395 context.load_cert_chain(CERTFILE)
3396 # disable all but TLS 1.3
3397 context.options |= (
3398 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3399 )
3400 with ThreadedEchoServer(context=context) as server:
3401 with context.wrap_socket(socket.socket()) as s:
3402 s.connect((HOST, server.port))
3403 self.assertIn(s.cipher()[0], [
3404 'TLS13-AES-256-GCM-SHA384',
3405 'TLS13-CHACHA20-POLY1305-SHA256',
3406 'TLS13-AES-128-GCM-SHA256',
3407 ])
3408
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003409 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3410 def test_default_ecdh_curve(self):
3411 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3412 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003413 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003414 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003415 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3416 # cipher name.
3417 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003418 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3419 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3420 # our default cipher list should prefer ECDH-based ciphers
3421 # automatically.
3422 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3423 context.set_ciphers("ECCdraft:ECDH")
3424 with ThreadedEchoServer(context=context) as server:
3425 with context.wrap_socket(socket.socket()) as s:
3426 s.connect((HOST, server.port))
3427 self.assertIn("ECDH", s.cipher()[0])
3428
3429 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3430 "'tls-unique' channel binding not available")
3431 def test_tls_unique_channel_binding(self):
3432 """Test tls-unique channel binding."""
3433 if support.verbose:
3434 sys.stdout.write("\n")
3435
3436 server = ThreadedEchoServer(CERTFILE,
3437 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003438 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003439 cacerts=CERTFILE,
3440 chatty=True,
3441 connectionchatty=False)
3442 with server:
3443 s = test_wrap_socket(socket.socket(),
3444 server_side=False,
3445 certfile=CERTFILE,
3446 ca_certs=CERTFILE,
3447 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003448 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003449 s.connect((HOST, server.port))
3450 # get the data
3451 cb_data = s.get_channel_binding("tls-unique")
3452 if support.verbose:
3453 sys.stdout.write(" got channel binding data: {0!r}\n"
3454 .format(cb_data))
3455
3456 # check if it is sane
3457 self.assertIsNotNone(cb_data)
3458 self.assertEqual(len(cb_data), 12) # True for TLSv1
3459
3460 # and compare with the peers version
3461 s.write(b"CB tls-unique\n")
3462 peer_data_repr = s.read().strip()
3463 self.assertEqual(peer_data_repr,
3464 repr(cb_data).encode("us-ascii"))
3465 s.close()
3466
3467 # now, again
3468 s = test_wrap_socket(socket.socket(),
3469 server_side=False,
3470 certfile=CERTFILE,
3471 ca_certs=CERTFILE,
3472 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003473 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003474 s.connect((HOST, server.port))
3475 new_cb_data = s.get_channel_binding("tls-unique")
3476 if support.verbose:
3477 sys.stdout.write(" got another channel binding data: {0!r}\n"
3478 .format(new_cb_data))
3479 # is it really unique
3480 self.assertNotEqual(cb_data, new_cb_data)
3481 self.assertIsNotNone(cb_data)
3482 self.assertEqual(len(cb_data), 12) # True for TLSv1
3483 s.write(b"CB tls-unique\n")
3484 peer_data_repr = s.read().strip()
3485 self.assertEqual(peer_data_repr,
3486 repr(new_cb_data).encode("us-ascii"))
3487 s.close()
3488
3489 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003490 client_context, server_context, hostname = testing_context()
3491 stats = server_params_test(client_context, server_context,
3492 chatty=True, connectionchatty=True,
3493 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003494 if support.verbose:
3495 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3496 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3497
3498 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3499 "ssl.OP_NO_COMPRESSION needed for this test")
3500 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003501 client_context, server_context, hostname = testing_context()
3502 client_context.options |= ssl.OP_NO_COMPRESSION
3503 server_context.options |= ssl.OP_NO_COMPRESSION
3504 stats = server_params_test(client_context, server_context,
3505 chatty=True, connectionchatty=True,
3506 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003507 self.assertIs(stats['compression'], None)
3508
3509 def test_dh_params(self):
3510 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003511 client_context, server_context, hostname = testing_context()
3512 server_context.load_dh_params(DHFILE)
3513 server_context.set_ciphers("kEDH")
3514 stats = server_params_test(client_context, server_context,
3515 chatty=True, connectionchatty=True,
3516 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003517 cipher = stats["cipher"][0]
3518 parts = cipher.split("-")
3519 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3520 self.fail("Non-DH cipher: " + cipher[0])
3521
3522 def test_selected_alpn_protocol(self):
3523 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003524 client_context, server_context, hostname = testing_context()
3525 stats = server_params_test(client_context, server_context,
3526 chatty=True, connectionchatty=True,
3527 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003528 self.assertIs(stats['client_alpn_protocol'], None)
3529
3530 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3531 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3532 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003533 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003534 server_context.set_alpn_protocols(['foo', 'bar'])
3535 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003536 chatty=True, connectionchatty=True,
3537 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003538 self.assertIs(stats['client_alpn_protocol'], None)
3539
3540 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3541 def test_alpn_protocols(self):
3542 server_protocols = ['foo', 'bar', 'milkshake']
3543 protocol_tests = [
3544 (['foo', 'bar'], 'foo'),
3545 (['bar', 'foo'], 'foo'),
3546 (['milkshake'], 'milkshake'),
3547 (['http/3.0', 'http/4.0'], None)
3548 ]
3549 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003550 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003551 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003552 client_context.set_alpn_protocols(client_protocols)
3553
3554 try:
3555 stats = server_params_test(client_context,
3556 server_context,
3557 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003558 connectionchatty=True,
3559 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003560 except ssl.SSLError as e:
3561 stats = e
3562
3563 if (expected is None and IS_OPENSSL_1_1
3564 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3565 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3566 self.assertIsInstance(stats, ssl.SSLError)
3567 else:
3568 msg = "failed trying %s (s) and %s (c).\n" \
3569 "was expecting %s, but got %%s from the %%s" \
3570 % (str(server_protocols), str(client_protocols),
3571 str(expected))
3572 client_result = stats['client_alpn_protocol']
3573 self.assertEqual(client_result, expected,
3574 msg % (client_result, "client"))
3575 server_result = stats['server_alpn_protocols'][-1] \
3576 if len(stats['server_alpn_protocols']) else 'nothing'
3577 self.assertEqual(server_result, expected,
3578 msg % (server_result, "server"))
3579
3580 def test_selected_npn_protocol(self):
3581 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003582 client_context, server_context, hostname = testing_context()
3583 stats = server_params_test(client_context, server_context,
3584 chatty=True, connectionchatty=True,
3585 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003586 self.assertIs(stats['client_npn_protocol'], None)
3587
3588 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3589 def test_npn_protocols(self):
3590 server_protocols = ['http/1.1', 'spdy/2']
3591 protocol_tests = [
3592 (['http/1.1', 'spdy/2'], 'http/1.1'),
3593 (['spdy/2', 'http/1.1'], 'http/1.1'),
3594 (['spdy/2', 'test'], 'spdy/2'),
3595 (['abc', 'def'], 'abc')
3596 ]
3597 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003598 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003599 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003600 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003601 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003602 chatty=True, connectionchatty=True,
3603 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003604 msg = "failed trying %s (s) and %s (c).\n" \
3605 "was expecting %s, but got %%s from the %%s" \
3606 % (str(server_protocols), str(client_protocols),
3607 str(expected))
3608 client_result = stats['client_npn_protocol']
3609 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3610 server_result = stats['server_npn_protocols'][-1] \
3611 if len(stats['server_npn_protocols']) else 'nothing'
3612 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3613
3614 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003615 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003616 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003617 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003618 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003619 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003620 client_context.load_verify_locations(SIGNING_CA)
3621 return server_context, other_context, client_context
3622
3623 def check_common_name(self, stats, name):
3624 cert = stats['peercert']
3625 self.assertIn((('commonName', name),), cert['subject'])
3626
3627 @needs_sni
3628 def test_sni_callback(self):
3629 calls = []
3630 server_context, other_context, client_context = self.sni_contexts()
3631
Christian Heimesa170fa12017-09-15 20:27:30 +02003632 client_context.check_hostname = False
3633
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003634 def servername_cb(ssl_sock, server_name, initial_context):
3635 calls.append((server_name, initial_context))
3636 if server_name is not None:
3637 ssl_sock.context = other_context
3638 server_context.set_servername_callback(servername_cb)
3639
3640 stats = server_params_test(client_context, server_context,
3641 chatty=True,
3642 sni_name='supermessage')
3643 # The hostname was fetched properly, and the certificate was
3644 # changed for the connection.
3645 self.assertEqual(calls, [("supermessage", server_context)])
3646 # CERTFILE4 was selected
3647 self.check_common_name(stats, 'fakehostname')
3648
3649 calls = []
3650 # The callback is called with server_name=None
3651 stats = server_params_test(client_context, server_context,
3652 chatty=True,
3653 sni_name=None)
3654 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003655 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003656
3657 # Check disabling the callback
3658 calls = []
3659 server_context.set_servername_callback(None)
3660
3661 stats = server_params_test(client_context, server_context,
3662 chatty=True,
3663 sni_name='notfunny')
3664 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003665 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003666 self.assertEqual(calls, [])
3667
3668 @needs_sni
3669 def test_sni_callback_alert(self):
3670 # Returning a TLS alert is reflected to the connecting client
3671 server_context, other_context, client_context = self.sni_contexts()
3672
3673 def cb_returning_alert(ssl_sock, server_name, initial_context):
3674 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3675 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003676 with self.assertRaises(ssl.SSLError) as cm:
3677 stats = server_params_test(client_context, server_context,
3678 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003679 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003680 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003681
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003682 @needs_sni
3683 def test_sni_callback_raising(self):
3684 # Raising fails the connection with a TLS handshake failure alert.
3685 server_context, other_context, client_context = self.sni_contexts()
3686
3687 def cb_raising(ssl_sock, server_name, initial_context):
3688 1/0
3689 server_context.set_servername_callback(cb_raising)
3690
3691 with self.assertRaises(ssl.SSLError) as cm, \
3692 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003693 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003694 chatty=False,
3695 sni_name='supermessage')
3696 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3697 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003698
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003699 @needs_sni
3700 def test_sni_callback_wrong_return_type(self):
3701 # Returning the wrong return type terminates the TLS connection
3702 # with an internal error alert.
3703 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003704
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003705 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3706 return "foo"
3707 server_context.set_servername_callback(cb_wrong_return_type)
3708
3709 with self.assertRaises(ssl.SSLError) as cm, \
3710 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003711 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003712 chatty=False,
3713 sni_name='supermessage')
3714 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3715 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003716
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003717 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003718 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003719 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3720 client_context.set_ciphers("AES128:AES256")
3721 server_context.set_ciphers("AES256")
3722 alg1 = "AES256"
3723 alg2 = "AES-256"
3724 else:
3725 client_context.set_ciphers("AES:3DES")
3726 server_context.set_ciphers("3DES")
3727 alg1 = "3DES"
3728 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003729
Christian Heimesa170fa12017-09-15 20:27:30 +02003730 stats = server_params_test(client_context, server_context,
3731 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003732 ciphers = stats['server_shared_ciphers'][0]
3733 self.assertGreater(len(ciphers), 0)
3734 for name, tls_version, bits in ciphers:
3735 if not alg1 in name.split("-") and alg2 not in name:
3736 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003737
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003738 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003739 client_context, server_context, hostname = testing_context()
3740 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003741
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003742 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003743 s = client_context.wrap_socket(socket.socket(),
3744 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003745 s.connect((HOST, server.port))
3746 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003747
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003748 self.assertRaises(ValueError, s.read, 1024)
3749 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003750
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003751 def test_sendfile(self):
3752 TEST_DATA = b"x" * 512
3753 with open(support.TESTFN, 'wb') as f:
3754 f.write(TEST_DATA)
3755 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003756 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003757 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003758 context.load_verify_locations(SIGNING_CA)
3759 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003760 server = ThreadedEchoServer(context=context, chatty=False)
3761 with server:
3762 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003763 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003764 with open(support.TESTFN, 'rb') as file:
3765 s.sendfile(file)
3766 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003767
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003768 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003769 client_context, server_context, hostname = testing_context()
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003770
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003771 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003772 stats = server_params_test(client_context, server_context,
3773 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003774 session = stats['session']
3775 self.assertTrue(session.id)
3776 self.assertGreater(session.time, 0)
3777 self.assertGreater(session.timeout, 0)
3778 self.assertTrue(session.has_ticket)
3779 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3780 self.assertGreater(session.ticket_lifetime_hint, 0)
3781 self.assertFalse(stats['session_reused'])
3782 sess_stat = server_context.session_stats()
3783 self.assertEqual(sess_stat['accept'], 1)
3784 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003785
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003786 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003787 stats = server_params_test(client_context, server_context,
3788 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003789 sess_stat = server_context.session_stats()
3790 self.assertEqual(sess_stat['accept'], 2)
3791 self.assertEqual(sess_stat['hits'], 1)
3792 self.assertTrue(stats['session_reused'])
3793 session2 = stats['session']
3794 self.assertEqual(session2.id, session.id)
3795 self.assertEqual(session2, session)
3796 self.assertIsNot(session2, session)
3797 self.assertGreaterEqual(session2.time, session.time)
3798 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003799
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003800 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003801 stats = server_params_test(client_context, server_context,
3802 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003803 self.assertFalse(stats['session_reused'])
3804 session3 = stats['session']
3805 self.assertNotEqual(session3.id, session.id)
3806 self.assertNotEqual(session3, session)
3807 sess_stat = server_context.session_stats()
3808 self.assertEqual(sess_stat['accept'], 3)
3809 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003810
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003811 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02003812 stats = server_params_test(client_context, server_context,
3813 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003814 self.assertTrue(stats['session_reused'])
3815 session4 = stats['session']
3816 self.assertEqual(session4.id, session.id)
3817 self.assertEqual(session4, session)
3818 self.assertGreaterEqual(session4.time, session.time)
3819 self.assertGreaterEqual(session4.timeout, session.timeout)
3820 sess_stat = server_context.session_stats()
3821 self.assertEqual(sess_stat['accept'], 4)
3822 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02003823
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003824 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003825 client_context, server_context, hostname = testing_context()
3826 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02003827
Christian Heimescb5b68a2017-09-07 18:07:00 -07003828 # TODO: session reuse does not work with TLS 1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02003829 client_context.options |= ssl.OP_NO_TLSv1_3
3830 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07003831
Christian Heimesa170fa12017-09-15 20:27:30 +02003832 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003833 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003834 with client_context.wrap_socket(socket.socket(),
3835 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003836 # session is None before handshake
3837 self.assertEqual(s.session, None)
3838 self.assertEqual(s.session_reused, None)
3839 s.connect((HOST, server.port))
3840 session = s.session
3841 self.assertTrue(session)
3842 with self.assertRaises(TypeError) as e:
3843 s.session = object
3844 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02003845
Christian Heimesa170fa12017-09-15 20:27:30 +02003846 with client_context.wrap_socket(socket.socket(),
3847 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003848 s.connect((HOST, server.port))
3849 # cannot set session after handshake
3850 with self.assertRaises(ValueError) as e:
3851 s.session = session
3852 self.assertEqual(str(e.exception),
3853 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02003854
Christian Heimesa170fa12017-09-15 20:27:30 +02003855 with client_context.wrap_socket(socket.socket(),
3856 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003857 # can set session before handshake and before the
3858 # connection was established
3859 s.session = session
3860 s.connect((HOST, server.port))
3861 self.assertEqual(s.session.id, session.id)
3862 self.assertEqual(s.session, session)
3863 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02003864
Christian Heimesa170fa12017-09-15 20:27:30 +02003865 with client_context2.wrap_socket(socket.socket(),
3866 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003867 # cannot re-use session with a different SSLContext
3868 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02003869 s.session = session
3870 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003871 self.assertEqual(str(e.exception),
3872 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02003873
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003874
Thomas Woutersed03b412007-08-28 21:37:11 +00003875def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003876 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003877 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003878 plats = {
3879 'Linux': platform.linux_distribution,
3880 'Mac': platform.mac_ver,
3881 'Windows': platform.win32_ver,
3882 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003883 with warnings.catch_warnings():
3884 warnings.filterwarnings(
3885 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003886 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003887 'functions are deprecated .*',
3888 PendingDeprecationWarning,
3889 )
3890 for name, func in plats.items():
3891 plat = func()
3892 if plat and plat[0]:
3893 plat = '%s %r' % (name, plat)
3894 break
3895 else:
3896 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003897 print("test_ssl: testing with %r %r" %
3898 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3899 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003900 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003901 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3902 try:
3903 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3904 except AttributeError:
3905 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003906
Antoine Pitrou152efa22010-05-16 18:19:27 +00003907 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003908 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003909 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003910 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003911 BADCERT, BADKEY, EMPTYCERT]:
3912 if not os.path.exists(filename):
3913 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003914
Martin Panter3840b2a2016-03-27 01:53:46 +00003915 tests = [
3916 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003917 SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00003918 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003919
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003920 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003921 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003922
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003923 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00003924 try:
3925 support.run_unittest(*tests)
3926 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003927 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003928
3929if __name__ == "__main__":
3930 test_main()