blob: 7f6b93148f452769685f31c64597135662d28181 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Christian Heimes892d66e2018-01-29 14:10:18 +010020import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070021try:
22 import ctypes
23except ImportError:
24 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000025
Antoine Pitrou05d936d2010-10-13 11:38:36 +000026ssl = support.import_module("ssl")
27
Martin Panter3840b2a2016-03-27 01:53:46 +000028
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010029PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000030HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020031IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
Christian Heimes05d9fe32018-02-27 08:55:39 +010032IS_OPENSSL_1_1_0 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
33IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Christian Heimesefff7062013-11-21 03:35:02 +010036def data_file(*name):
37 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Antoine Pitrou81564092010-10-08 23:06:24 +000039# The custom key and certificate files used in test_ssl are generated
40# using Lib/test/make_ssl_certs.py.
41# Other certificates are simply fetched from the Internet servers they
42# are meant to authenticate.
43
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000046ONLYCERT = data_file("ssl_cert.pem")
47ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000048BYTES_ONLYCERT = os.fsencode(ONLYCERT)
49BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020050CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
51ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
52KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000053CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000054BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010055CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
56CAFILE_CACERT = data_file("capath", "5ed36f99.0")
57
Christian Heimesbd5c7d22018-01-20 15:16:30 +010058CERTFILE_INFO = {
59 'issuer': ((('countryName', 'XY'),),
60 (('localityName', 'Castle Anthrax'),),
61 (('organizationName', 'Python Software Foundation'),),
62 (('commonName', 'localhost'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020063 'notAfter': 'Aug 26 14:23:15 2028 GMT',
64 'notBefore': 'Aug 29 14:23:15 2018 GMT',
65 'serialNumber': '98A7CF88C74A32ED',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010066 'subject': ((('countryName', 'XY'),),
67 (('localityName', 'Castle Anthrax'),),
68 (('organizationName', 'Python Software Foundation'),),
69 (('commonName', 'localhost'),)),
70 'subjectAltName': (('DNS', 'localhost'),),
71 'version': 3
72}
Antoine Pitrou152efa22010-05-16 18:19:27 +000073
Christian Heimes22587792013-11-21 23:56:13 +010074# empty CRL
75CRLFILE = data_file("revocation.crl")
76
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010077# Two keys and certs signed by the same CA (for SNI tests)
78SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020079SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010080
81SIGNED_CERTFILE_INFO = {
82 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
83 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
84 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
85 'issuer': ((('countryName', 'XY'),),
86 (('organizationName', 'Python Software Foundation CA'),),
87 (('commonName', 'our-ca-server'),)),
Christian Heimese6dac002018-08-30 07:25:49 +020088 'notAfter': 'Jul 7 14:23:16 2028 GMT',
89 'notBefore': 'Aug 29 14:23:16 2018 GMT',
90 'serialNumber': 'CB2D80995A69525C',
Christian Heimesbd5c7d22018-01-20 15:16:30 +010091 'subject': ((('countryName', 'XY'),),
92 (('localityName', 'Castle Anthrax'),),
93 (('organizationName', 'Python Software Foundation'),),
94 (('commonName', 'localhost'),)),
95 'subjectAltName': (('DNS', 'localhost'),),
96 'version': 3
97}
98
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010099SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
102SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
103
Martin Panter3840b2a2016-03-27 01:53:46 +0000104# Same certificate as pycacert.pem, but without extra text in file
105SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200106# cert with all kinds of subject alt names
107ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100108IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100109
Martin Panter3d81d932016-01-14 09:36:00 +0000110REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000111
112EMPTYCERT = data_file("nullcert.pem")
113BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000114NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000115BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200116NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200117NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000118
Christian Heimes88bfd0b2018-08-14 12:54:19 +0200119DHFILE = data_file("ffdh3072.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100120BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000121
Christian Heimes358cfd42016-09-10 22:43:48 +0200122# Not defined in all versions of OpenSSL
123OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
124OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
125OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
126OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
Christian Heimes05d9fe32018-02-27 08:55:39 +0100127OP_ENABLE_MIDDLEBOX_COMPAT = getattr(ssl, "OP_ENABLE_MIDDLEBOX_COMPAT", 0)
Christian Heimes358cfd42016-09-10 22:43:48 +0200128
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100129
Thomas Woutersed03b412007-08-28 21:37:11 +0000130def handle_error(prefix):
131 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000132 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000133 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000134
Antoine Pitroub5218772010-05-21 09:56:06 +0000135def can_clear_options():
136 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200137 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000138
139def no_sslv2_implies_sslv3_hello():
140 # 0.9.7h or higher
141 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
142
Christian Heimes2427b502013-11-23 11:24:32 +0100143def have_verify_flags():
144 # 0.9.8 or higher
145 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
146
Christian Heimesb7b92252018-02-25 09:49:31 +0100147def _have_secp_curves():
148 if not ssl.HAS_ECDH:
149 return False
150 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
151 try:
152 ctx.set_ecdh_curve("secp384r1")
153 except ValueError:
154 return False
155 else:
156 return True
157
158
159HAVE_SECP_CURVES = _have_secp_curves()
160
161
Antoine Pitrouc695c952014-04-28 20:57:36 +0200162def utc_offset(): #NOTE: ignore issues like #1647654
163 # local time = utc time + utc offset
164 if time.daylight and time.localtime().tm_isdst > 0:
165 return -time.altzone # seconds
166 return -time.timezone
167
Christian Heimes9424bb42013-06-17 15:32:57 +0200168def asn1time(cert_time):
169 # Some versions of OpenSSL ignore seconds, see #18207
170 # 0.9.8.i
171 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
172 fmt = "%b %d %H:%M:%S %Y GMT"
173 dt = datetime.datetime.strptime(cert_time, fmt)
174 dt = dt.replace(second=0)
175 cert_time = dt.strftime(fmt)
176 # %d adds leading zero but ASN1_TIME_print() uses leading space
177 if cert_time[4] == "0":
178 cert_time = cert_time[:4] + " " + cert_time[5:]
179
180 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000181
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100182needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184
Christian Heimesd0486372016-09-10 23:23:33 +0200185def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
186 cert_reqs=ssl.CERT_NONE, ca_certs=None,
187 ciphers=None, certfile=None, keyfile=None,
188 **kwargs):
189 context = ssl.SSLContext(ssl_version)
190 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200191 if cert_reqs == ssl.CERT_NONE:
192 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200193 context.verify_mode = cert_reqs
194 if ca_certs is not None:
195 context.load_verify_locations(ca_certs)
196 if certfile is not None or keyfile is not None:
197 context.load_cert_chain(certfile, keyfile)
198 if ciphers is not None:
199 context.set_ciphers(ciphers)
200 return context.wrap_socket(sock, **kwargs)
201
Christian Heimesa170fa12017-09-15 20:27:30 +0200202
203def testing_context(server_cert=SIGNED_CERTFILE):
204 """Create context
205
206 client_context, server_context, hostname = testing_context()
207 """
208 if server_cert == SIGNED_CERTFILE:
209 hostname = SIGNED_CERTFILE_HOSTNAME
210 elif server_cert == SIGNED_CERTFILE2:
211 hostname = SIGNED_CERTFILE2_HOSTNAME
212 else:
213 raise ValueError(server_cert)
214
215 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
216 client_context.load_verify_locations(SIGNING_CA)
217
218 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
219 server_context.load_cert_chain(server_cert)
Christian Heimes9fb051f2018-09-23 08:32:31 +0200220 server_context.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +0200221
222 return client_context, server_context, hostname
223
224
Antoine Pitrou152efa22010-05-16 18:19:27 +0000225class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000226
Antoine Pitrou480a1242010-04-28 21:37:09 +0000227 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000228 ssl.CERT_NONE
229 ssl.CERT_OPTIONAL
230 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100231 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100232 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100233 if ssl.HAS_ECDH:
234 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100235 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
236 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000237 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100238 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700239 ssl.OP_NO_SSLv2
240 ssl.OP_NO_SSLv3
241 ssl.OP_NO_TLSv1
242 ssl.OP_NO_TLSv1_3
243 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
244 ssl.OP_NO_TLSv1_1
245 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200246 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000247
Christian Heimes9d50ab52018-02-27 10:17:30 +0100248 def test_private_init(self):
249 with self.assertRaisesRegex(TypeError, "public constructor"):
250 with socket.socket() as s:
251 ssl.SSLSocket(s)
252
Antoine Pitrou172f0252014-04-18 20:33:08 +0200253 def test_str_for_enums(self):
254 # Make sure that the PROTOCOL_* constants have enum-like string
255 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200256 proto = ssl.PROTOCOL_TLS
257 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200258 ctx = ssl.SSLContext(proto)
259 self.assertIs(ctx.protocol, proto)
260
Antoine Pitrou480a1242010-04-28 21:37:09 +0000261 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000262 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000263 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000264 sys.stdout.write("\n RAND_status is %d (%s)\n"
265 % (v, (v and "sufficient randomness") or
266 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200267
268 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
269 self.assertEqual(len(data), 16)
270 self.assertEqual(is_cryptographic, v == 1)
271 if v:
272 data = ssl.RAND_bytes(16)
273 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200274 else:
275 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200276
Victor Stinner1e81a392013-12-19 16:47:04 +0100277 # negative num is invalid
278 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
279 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
280
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100281 if hasattr(ssl, 'RAND_egd'):
282 self.assertRaises(TypeError, ssl.RAND_egd, 1)
283 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000284 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200285 ssl.RAND_add(b"this is a random bytes object", 75.0)
286 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000287
Christian Heimesf77b4b22013-08-21 13:26:05 +0200288 @unittest.skipUnless(os.name == 'posix', 'requires posix')
289 def test_random_fork(self):
290 status = ssl.RAND_status()
291 if not status:
292 self.fail("OpenSSL's PRNG has insufficient randomness")
293
294 rfd, wfd = os.pipe()
295 pid = os.fork()
296 if pid == 0:
297 try:
298 os.close(rfd)
299 child_random = ssl.RAND_pseudo_bytes(16)[0]
300 self.assertEqual(len(child_random), 16)
301 os.write(wfd, child_random)
302 os.close(wfd)
303 except BaseException:
304 os._exit(1)
305 else:
306 os._exit(0)
307 else:
308 os.close(wfd)
309 self.addCleanup(os.close, rfd)
310 _, status = os.waitpid(pid, 0)
311 self.assertEqual(status, 0)
312
313 child_random = os.read(rfd, 16)
314 self.assertEqual(len(child_random), 16)
315 parent_random = ssl.RAND_pseudo_bytes(16)[0]
316 self.assertEqual(len(parent_random), 16)
317
318 self.assertNotEqual(child_random, parent_random)
319
Christian Heimese6dac002018-08-30 07:25:49 +0200320 maxDiff = None
321
Antoine Pitrou480a1242010-04-28 21:37:09 +0000322 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000323 # note that this uses an 'unofficial' function in _ssl.c,
324 # provided solely for this test, to exercise the certificate
325 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100326 self.assertEqual(
327 ssl._ssl._test_decode_cert(CERTFILE),
328 CERTFILE_INFO
329 )
330 self.assertEqual(
331 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
332 SIGNED_CERTFILE_INFO
333 )
334
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200335 # Issue #13034: the subjectAltName in some certificates
336 # (notably projects.developer.nokia.com:443) wasn't parsed
337 p = ssl._ssl._test_decode_cert(NOKIACERT)
338 if support.verbose:
339 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
340 self.assertEqual(p['subjectAltName'],
341 (('DNS', 'projects.developer.nokia.com'),
342 ('DNS', 'projects.forum.nokia.com'))
343 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100344 # extra OCSP and AIA fields
345 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
346 self.assertEqual(p['caIssuers'],
347 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
348 self.assertEqual(p['crlDistributionPoints'],
349 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000350
Christian Heimes824f7f32013-08-17 00:54:47 +0200351 def test_parse_cert_CVE_2013_4238(self):
352 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
353 if support.verbose:
354 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
355 subject = ((('countryName', 'US'),),
356 (('stateOrProvinceName', 'Oregon'),),
357 (('localityName', 'Beaverton'),),
358 (('organizationName', 'Python Software Foundation'),),
359 (('organizationalUnitName', 'Python Core Development'),),
360 (('commonName', 'null.python.org\x00example.org'),),
361 (('emailAddress', 'python-dev@python.org'),))
362 self.assertEqual(p['subject'], subject)
363 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200364 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
365 san = (('DNS', 'altnull.python.org\x00example.com'),
366 ('email', 'null@python.org\x00user@example.org'),
367 ('URI', 'http://null.python.org\x00http://example.org'),
368 ('IP Address', '192.0.2.1'),
369 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
370 else:
371 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
372 san = (('DNS', 'altnull.python.org\x00example.com'),
373 ('email', 'null@python.org\x00user@example.org'),
374 ('URI', 'http://null.python.org\x00http://example.org'),
375 ('IP Address', '192.0.2.1'),
376 ('IP Address', '<invalid>'))
377
378 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200379
Christian Heimes1c03abd2016-09-06 23:25:35 +0200380 def test_parse_all_sans(self):
381 p = ssl._ssl._test_decode_cert(ALLSANFILE)
382 self.assertEqual(p['subjectAltName'],
383 (
384 ('DNS', 'allsans'),
385 ('othername', '<unsupported>'),
386 ('othername', '<unsupported>'),
387 ('email', 'user@example.org'),
388 ('DNS', 'www.example.org'),
389 ('DirName',
390 ((('countryName', 'XY'),),
391 (('localityName', 'Castle Anthrax'),),
392 (('organizationName', 'Python Software Foundation'),),
393 (('commonName', 'dirname example'),))),
394 ('URI', 'https://www.python.org/'),
395 ('IP Address', '127.0.0.1'),
396 ('IP Address', '0:0:0:0:0:0:0:1\n'),
397 ('Registered ID', '1.2.3.4.5')
398 )
399 )
400
Antoine Pitrou480a1242010-04-28 21:37:09 +0000401 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000402 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000403 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000404 d1 = ssl.PEM_cert_to_DER_cert(pem)
405 p2 = ssl.DER_cert_to_PEM_cert(d1)
406 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000407 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000408 if not p2.startswith(ssl.PEM_HEADER + '\n'):
409 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
410 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
411 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000412
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000413 def test_openssl_version(self):
414 n = ssl.OPENSSL_VERSION_NUMBER
415 t = ssl.OPENSSL_VERSION_INFO
416 s = ssl.OPENSSL_VERSION
417 self.assertIsInstance(n, int)
418 self.assertIsInstance(t, tuple)
419 self.assertIsInstance(s, str)
420 # Some sanity checks follow
421 # >= 0.9
422 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400423 # < 3.0
424 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000425 major, minor, fix, patch, status = t
426 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400427 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000428 self.assertGreaterEqual(minor, 0)
429 self.assertLess(minor, 256)
430 self.assertGreaterEqual(fix, 0)
431 self.assertLess(fix, 256)
432 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100433 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000434 self.assertGreaterEqual(status, 0)
435 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400436 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200437 if IS_LIBRESSL:
438 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100439 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400440 else:
441 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100442 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000443
Antoine Pitrou9d543662010-04-23 23:10:32 +0000444 @support.cpython_only
445 def test_refcycle(self):
446 # Issue #7943: an SSL object doesn't create reference cycles with
447 # itself.
448 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200449 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000450 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100451 with support.check_warnings(("", ResourceWarning)):
452 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100453 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000454
Antoine Pitroua468adc2010-09-14 14:43:44 +0000455 def test_wrapped_unconnected(self):
456 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200457 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000458 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200459 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100460 self.assertRaises(OSError, ss.recv, 1)
461 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
462 self.assertRaises(OSError, ss.recvfrom, 1)
463 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
464 self.assertRaises(OSError, ss.send, b'x')
465 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200466 self.assertRaises(NotImplementedError, ss.dup)
Christian Heimes141c5e82018-02-24 21:10:57 +0100467 self.assertRaises(NotImplementedError, ss.sendmsg,
468 [b'x'], (), 0, ('0.0.0.0', 0))
Serhiy Storchaka42b1d612018-12-06 22:36:55 +0200469 self.assertRaises(NotImplementedError, ss.recvmsg, 100)
470 self.assertRaises(NotImplementedError, ss.recvmsg_into,
471 [bytearray(100)])
Antoine Pitroua468adc2010-09-14 14:43:44 +0000472
Antoine Pitrou40f08742010-04-24 22:04:40 +0000473 def test_timeout(self):
474 # Issue #8524: when creating an SSL socket, the timeout of the
475 # original socket should be retained.
476 for timeout in (None, 0.0, 5.0):
477 s = socket.socket(socket.AF_INET)
478 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200479 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100480 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000481
Christian Heimesd0486372016-09-10 23:23:33 +0200482 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000483 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000484 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000485 "certfile must be specified",
486 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000487 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000488 "certfile must be specified for server-side operations",
489 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000490 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000491 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200492 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100493 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
494 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200495 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200496 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000497 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000498 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000499 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200500 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000501 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000502 ssl.wrap_socket(sock,
503 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000504 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200505 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000506 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000507 ssl.wrap_socket(sock,
508 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000509 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000510
Martin Panter3464ea22016-02-01 21:58:11 +0000511 def bad_cert_test(self, certfile):
512 """Check that trying to use the given client certificate fails"""
513 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
514 certfile)
515 sock = socket.socket()
516 self.addCleanup(sock.close)
517 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200518 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200519 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000520
521 def test_empty_cert(self):
522 """Wrapping with an empty cert file"""
523 self.bad_cert_test("nullcert.pem")
524
525 def test_malformed_cert(self):
526 """Wrapping with a badly formatted certificate (syntax error)"""
527 self.bad_cert_test("badcert.pem")
528
529 def test_malformed_key(self):
530 """Wrapping with a badly formatted key (syntax error)"""
531 self.bad_cert_test("badkey.pem")
532
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000533 def test_match_hostname(self):
534 def ok(cert, hostname):
535 ssl.match_hostname(cert, hostname)
536 def fail(cert, hostname):
537 self.assertRaises(ssl.CertificateError,
538 ssl.match_hostname, cert, hostname)
539
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100540 # -- Hostname matching --
541
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000542 cert = {'subject': ((('commonName', 'example.com'),),)}
543 ok(cert, 'example.com')
544 ok(cert, 'ExAmple.cOm')
545 fail(cert, 'www.example.com')
546 fail(cert, '.example.com')
547 fail(cert, 'example.org')
548 fail(cert, 'exampleXcom')
549
550 cert = {'subject': ((('commonName', '*.a.com'),),)}
551 ok(cert, 'foo.a.com')
552 fail(cert, 'bar.foo.a.com')
553 fail(cert, 'a.com')
554 fail(cert, 'Xa.com')
555 fail(cert, '.a.com')
556
Mandeep Singhede2ac92017-11-27 04:01:27 +0530557 # only match wildcards when they are the only thing
558 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000559 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530560 fail(cert, 'foo.com')
561 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000562 fail(cert, 'bar.com')
563 fail(cert, 'foo.a.com')
564 fail(cert, 'bar.foo.com')
565
Christian Heimes824f7f32013-08-17 00:54:47 +0200566 # NULL bytes are bad, CVE-2013-4073
567 cert = {'subject': ((('commonName',
568 'null.python.org\x00example.org'),),)}
569 ok(cert, 'null.python.org\x00example.org') # or raise an error?
570 fail(cert, 'example.org')
571 fail(cert, 'null.python.org')
572
Georg Brandl72c98d32013-10-27 07:16:53 +0100573 # error cases with wildcards
574 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
575 fail(cert, 'bar.foo.a.com')
576 fail(cert, 'a.com')
577 fail(cert, 'Xa.com')
578 fail(cert, '.a.com')
579
580 cert = {'subject': ((('commonName', 'a.*.com'),),)}
581 fail(cert, 'a.foo.com')
582 fail(cert, 'a..com')
583 fail(cert, 'a.com')
584
585 # wildcard doesn't match IDNA prefix 'xn--'
586 idna = 'püthon.python.org'.encode("idna").decode("ascii")
587 cert = {'subject': ((('commonName', idna),),)}
588 ok(cert, idna)
589 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
590 fail(cert, idna)
591 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
592 fail(cert, idna)
593
594 # wildcard in first fragment and IDNA A-labels in sequent fragments
595 # are supported.
596 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
597 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530598 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
599 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100600 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
601 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
602
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000603 # Slightly fake real-world example
604 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
605 'subject': ((('commonName', 'linuxfrz.org'),),),
606 'subjectAltName': (('DNS', 'linuxfr.org'),
607 ('DNS', 'linuxfr.com'),
608 ('othername', '<unsupported>'))}
609 ok(cert, 'linuxfr.org')
610 ok(cert, 'linuxfr.com')
611 # Not a "DNS" entry
612 fail(cert, '<unsupported>')
613 # When there is a subjectAltName, commonName isn't used
614 fail(cert, 'linuxfrz.org')
615
616 # A pristine real-world example
617 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
618 'subject': ((('countryName', 'US'),),
619 (('stateOrProvinceName', 'California'),),
620 (('localityName', 'Mountain View'),),
621 (('organizationName', 'Google Inc'),),
622 (('commonName', 'mail.google.com'),))}
623 ok(cert, 'mail.google.com')
624 fail(cert, 'gmail.com')
625 # Only commonName is considered
626 fail(cert, 'California')
627
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100628 # -- IPv4 matching --
629 cert = {'subject': ((('commonName', 'example.com'),),),
630 'subjectAltName': (('DNS', 'example.com'),
631 ('IP Address', '10.11.12.13'),
632 ('IP Address', '14.15.16.17'))}
633 ok(cert, '10.11.12.13')
634 ok(cert, '14.15.16.17')
635 fail(cert, '14.15.16.18')
636 fail(cert, 'example.net')
637
638 # -- IPv6 matching --
Christian Heimesaef12832018-02-24 14:35:56 +0100639 if hasattr(socket, 'AF_INET6'):
640 cert = {'subject': ((('commonName', 'example.com'),),),
641 'subjectAltName': (
642 ('DNS', 'example.com'),
643 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
644 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
645 ok(cert, '2001::cafe')
646 ok(cert, '2003::baba')
647 fail(cert, '2003::bebe')
648 fail(cert, 'example.net')
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100649
650 # -- Miscellaneous --
651
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000652 # Neither commonName nor subjectAltName
653 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
654 'subject': ((('countryName', 'US'),),
655 (('stateOrProvinceName', 'California'),),
656 (('localityName', 'Mountain View'),),
657 (('organizationName', 'Google Inc'),))}
658 fail(cert, 'mail.google.com')
659
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200660 # No DNS entry in subjectAltName but a commonName
661 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
662 'subject': ((('countryName', 'US'),),
663 (('stateOrProvinceName', 'California'),),
664 (('localityName', 'Mountain View'),),
665 (('commonName', 'mail.google.com'),)),
666 'subjectAltName': (('othername', 'blabla'), )}
667 ok(cert, 'mail.google.com')
668
669 # No DNS entry subjectAltName and no commonName
670 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
671 'subject': ((('countryName', 'US'),),
672 (('stateOrProvinceName', 'California'),),
673 (('localityName', 'Mountain View'),),
674 (('organizationName', 'Google Inc'),)),
675 'subjectAltName': (('othername', 'blabla'),)}
676 fail(cert, 'google.com')
677
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000678 # Empty cert / no cert
679 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
680 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
681
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200682 # Issue #17980: avoid denials of service by refusing more than one
683 # wildcard per fragment.
Christian Heimesaef12832018-02-24 14:35:56 +0100684 cert = {'subject': ((('commonName', 'a*b.example.com'),),)}
685 with self.assertRaisesRegex(
686 ssl.CertificateError,
687 "partial wildcards in leftmost label are not supported"):
688 ssl.match_hostname(cert, 'axxb.example.com')
689
690 cert = {'subject': ((('commonName', 'www.*.example.com'),),)}
691 with self.assertRaisesRegex(
692 ssl.CertificateError,
693 "wildcard can only be present in the leftmost label"):
694 ssl.match_hostname(cert, 'www.sub.example.com')
695
696 cert = {'subject': ((('commonName', 'a*b*.example.com'),),)}
697 with self.assertRaisesRegex(
698 ssl.CertificateError,
699 "too many wildcards"):
700 ssl.match_hostname(cert, 'axxbxxc.example.com')
701
702 cert = {'subject': ((('commonName', '*'),),)}
703 with self.assertRaisesRegex(
704 ssl.CertificateError,
705 "sole wildcard without additional labels are not support"):
706 ssl.match_hostname(cert, 'host')
707
708 cert = {'subject': ((('commonName', '*.com'),),)}
709 with self.assertRaisesRegex(
710 ssl.CertificateError,
711 r"hostname 'com' doesn't match '\*.com'"):
712 ssl.match_hostname(cert, 'com')
713
714 # extra checks for _inet_paton()
715 for invalid in ['1', '', '1.2.3', '256.0.0.1', '127.0.0.1/24']:
716 with self.assertRaises(ValueError):
717 ssl._inet_paton(invalid)
718 for ipaddr in ['127.0.0.1', '192.168.0.1']:
719 self.assertTrue(ssl._inet_paton(ipaddr))
720 if hasattr(socket, 'AF_INET6'):
721 for ipaddr in ['::1', '2001:db8:85a3::8a2e:370:7334']:
722 self.assertTrue(ssl._inet_paton(ipaddr))
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200723
Antoine Pitroud5323212010-10-22 18:19:07 +0000724 def test_server_side(self):
725 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200726 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000727 with socket.socket() as sock:
728 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
729 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000730
Antoine Pitroud6494802011-07-21 01:11:30 +0200731 def test_unknown_channel_binding(self):
732 # should raise ValueError for unknown type
733 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200734 s.bind(('127.0.0.1', 0))
735 s.listen()
736 c = socket.socket(socket.AF_INET)
737 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200738 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100739 with self.assertRaises(ValueError):
740 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200741 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200742
743 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
744 "'tls-unique' channel binding not available")
745 def test_tls_unique_channel_binding(self):
746 # unconnected should return None for known type
747 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200748 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100749 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200750 # the same for server-side
751 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200752 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100753 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200754
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600755 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200756 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600757 r = repr(ss)
758 with self.assertWarns(ResourceWarning) as cm:
759 ss = None
760 support.gc_collect()
761 self.assertIn(r, str(cm.warning.args[0]))
762
Christian Heimes6d7ad132013-06-09 18:02:55 +0200763 def test_get_default_verify_paths(self):
764 paths = ssl.get_default_verify_paths()
765 self.assertEqual(len(paths), 6)
766 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
767
768 with support.EnvironmentVarGuard() as env:
769 env["SSL_CERT_DIR"] = CAPATH
770 env["SSL_CERT_FILE"] = CERTFILE
771 paths = ssl.get_default_verify_paths()
772 self.assertEqual(paths.cafile, CERTFILE)
773 self.assertEqual(paths.capath, CAPATH)
774
Christian Heimes44109d72013-11-22 01:51:30 +0100775 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
776 def test_enum_certificates(self):
777 self.assertTrue(ssl.enum_certificates("CA"))
778 self.assertTrue(ssl.enum_certificates("ROOT"))
779
780 self.assertRaises(TypeError, ssl.enum_certificates)
781 self.assertRaises(WindowsError, ssl.enum_certificates, "")
782
Christian Heimesc2d65e12013-11-22 16:13:55 +0100783 trust_oids = set()
784 for storename in ("CA", "ROOT"):
785 store = ssl.enum_certificates(storename)
786 self.assertIsInstance(store, list)
787 for element in store:
788 self.assertIsInstance(element, tuple)
789 self.assertEqual(len(element), 3)
790 cert, enc, trust = element
791 self.assertIsInstance(cert, bytes)
792 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
793 self.assertIsInstance(trust, (set, bool))
794 if isinstance(trust, set):
795 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100796
797 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100798 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200799
Christian Heimes46bebee2013-06-09 19:03:31 +0200800 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100801 def test_enum_crls(self):
802 self.assertTrue(ssl.enum_crls("CA"))
803 self.assertRaises(TypeError, ssl.enum_crls)
804 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200805
Christian Heimes44109d72013-11-22 01:51:30 +0100806 crls = ssl.enum_crls("CA")
807 self.assertIsInstance(crls, list)
808 for element in crls:
809 self.assertIsInstance(element, tuple)
810 self.assertEqual(len(element), 2)
811 self.assertIsInstance(element[0], bytes)
812 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200813
Christian Heimes46bebee2013-06-09 19:03:31 +0200814
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100815 def test_asn1object(self):
816 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
817 '1.3.6.1.5.5.7.3.1')
818
819 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
820 self.assertEqual(val, expected)
821 self.assertEqual(val.nid, 129)
822 self.assertEqual(val.shortname, 'serverAuth')
823 self.assertEqual(val.longname, 'TLS Web Server Authentication')
824 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
825 self.assertIsInstance(val, ssl._ASN1Object)
826 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
827
828 val = ssl._ASN1Object.fromnid(129)
829 self.assertEqual(val, expected)
830 self.assertIsInstance(val, ssl._ASN1Object)
831 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100832 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
833 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100834 for i in range(1000):
835 try:
836 obj = ssl._ASN1Object.fromnid(i)
837 except ValueError:
838 pass
839 else:
840 self.assertIsInstance(obj.nid, int)
841 self.assertIsInstance(obj.shortname, str)
842 self.assertIsInstance(obj.longname, str)
843 self.assertIsInstance(obj.oid, (str, type(None)))
844
845 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
846 self.assertEqual(val, expected)
847 self.assertIsInstance(val, ssl._ASN1Object)
848 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
849 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
850 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100851 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
852 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100853
Christian Heimes72d28502013-11-23 13:56:58 +0100854 def test_purpose_enum(self):
855 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
856 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
857 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
858 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
859 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
860 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
861 '1.3.6.1.5.5.7.3.1')
862
863 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
864 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
865 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
866 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
867 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
868 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
869 '1.3.6.1.5.5.7.3.2')
870
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100871 def test_unsupported_dtls(self):
872 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
873 self.addCleanup(s.close)
874 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200875 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100876 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200877 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100878 with self.assertRaises(NotImplementedError) as cx:
879 ctx.wrap_socket(s)
880 self.assertEqual(str(cx.exception), "only stream sockets are supported")
881
Antoine Pitrouc695c952014-04-28 20:57:36 +0200882 def cert_time_ok(self, timestring, timestamp):
883 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
884
885 def cert_time_fail(self, timestring):
886 with self.assertRaises(ValueError):
887 ssl.cert_time_to_seconds(timestring)
888
889 @unittest.skipUnless(utc_offset(),
890 'local time needs to be different from UTC')
891 def test_cert_time_to_seconds_timezone(self):
892 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
893 # results if local timezone is not UTC
894 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
895 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
896
897 def test_cert_time_to_seconds(self):
898 timestring = "Jan 5 09:34:43 2018 GMT"
899 ts = 1515144883.0
900 self.cert_time_ok(timestring, ts)
901 # accept keyword parameter, assert its name
902 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
903 # accept both %e and %d (space or zero generated by strftime)
904 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
905 # case-insensitive
906 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
907 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
908 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
909 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
910 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
911 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
912 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
913 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
914
915 newyear_ts = 1230768000.0
916 # leap seconds
917 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
918 # same timestamp
919 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
920
921 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
922 # allow 60th second (even if it is not a leap second)
923 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
924 # allow 2nd leap second for compatibility with time.strptime()
925 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
926 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
927
Mike53f7a7c2017-12-14 14:04:53 +0300928 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200929 # 99991231235959Z (rfc 5280)
930 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
931
932 @support.run_with_locale('LC_ALL', '')
933 def test_cert_time_to_seconds_locale(self):
934 # `cert_time_to_seconds()` should be locale independent
935
936 def local_february_name():
937 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
938
939 if local_february_name().lower() == 'feb':
940 self.skipTest("locale-specific month name needs to be "
941 "different from C locale")
942
943 # locale-independent
944 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
945 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
946
Martin Panter3840b2a2016-03-27 01:53:46 +0000947 def test_connect_ex_error(self):
948 server = socket.socket(socket.AF_INET)
949 self.addCleanup(server.close)
950 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200951 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000952 cert_reqs=ssl.CERT_REQUIRED)
953 self.addCleanup(s.close)
954 rc = s.connect_ex((HOST, port))
955 # Issue #19919: Windows machines or VMs hosted on Windows
956 # machines sometimes return EWOULDBLOCK.
957 errors = (
958 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
959 errno.EWOULDBLOCK,
960 )
961 self.assertIn(rc, errors)
962
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100963
Antoine Pitrou152efa22010-05-16 18:19:27 +0000964class ContextTests(unittest.TestCase):
965
966 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100967 for protocol in PROTOCOLS:
968 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200969 ctx = ssl.SSLContext()
970 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000971 self.assertRaises(ValueError, ssl.SSLContext, -1)
972 self.assertRaises(ValueError, ssl.SSLContext, 42)
973
974 def test_protocol(self):
975 for proto in PROTOCOLS:
976 ctx = ssl.SSLContext(proto)
977 self.assertEqual(ctx.protocol, proto)
978
979 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200980 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000981 ctx.set_ciphers("ALL")
982 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000983 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000984 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000985
Christian Heimes892d66e2018-01-29 14:10:18 +0100986 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
987 "Test applies only to Python default ciphers")
988 def test_python_ciphers(self):
989 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
990 ciphers = ctx.get_ciphers()
991 for suite in ciphers:
992 name = suite['name']
993 self.assertNotIn("PSK", name)
994 self.assertNotIn("SRP", name)
995 self.assertNotIn("MD5", name)
996 self.assertNotIn("RC4", name)
997 self.assertNotIn("3DES", name)
998
Christian Heimes25bfcd52016-09-06 00:04:45 +0200999 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
1000 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001001 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +02001002 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +02001003 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +02001004 self.assertIn('AES256-GCM-SHA384', names)
1005 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +02001006
Antoine Pitroub5218772010-05-21 09:56:06 +00001007 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001008 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001009 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +02001010 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +02001011 # SSLContext also enables these by default
1012 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
Christian Heimes05d9fe32018-02-27 08:55:39 +01001013 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE |
1014 OP_ENABLE_MIDDLEBOX_COMPAT)
Christian Heimes598894f2016-09-05 23:19:05 +02001015 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08001016 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +02001017 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001018 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +02001019 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
1020 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +00001021 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -07001022 # Ubuntu has OP_NO_SSLv3 forced on by default
1023 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +00001024 else:
1025 with self.assertRaises(ValueError):
1026 ctx.options = 0
1027
Christian Heimesa170fa12017-09-15 20:27:30 +02001028 def test_verify_mode_protocol(self):
1029 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001030 # Default value
1031 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1032 ctx.verify_mode = ssl.CERT_OPTIONAL
1033 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1034 ctx.verify_mode = ssl.CERT_REQUIRED
1035 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1036 ctx.verify_mode = ssl.CERT_NONE
1037 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1038 with self.assertRaises(TypeError):
1039 ctx.verify_mode = None
1040 with self.assertRaises(ValueError):
1041 ctx.verify_mode = 42
1042
Christian Heimesa170fa12017-09-15 20:27:30 +02001043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1044 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1045 self.assertFalse(ctx.check_hostname)
1046
1047 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1048 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1049 self.assertTrue(ctx.check_hostname)
1050
Christian Heimes61d478c2018-01-27 15:51:38 +01001051 def test_hostname_checks_common_name(self):
1052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1053 self.assertTrue(ctx.hostname_checks_common_name)
1054 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1055 ctx.hostname_checks_common_name = True
1056 self.assertTrue(ctx.hostname_checks_common_name)
1057 ctx.hostname_checks_common_name = False
1058 self.assertFalse(ctx.hostname_checks_common_name)
1059 ctx.hostname_checks_common_name = True
1060 self.assertTrue(ctx.hostname_checks_common_name)
1061 else:
1062 with self.assertRaises(AttributeError):
1063 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001064
Christian Heimes698dde12018-02-27 11:54:43 +01001065 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
1066 "required OpenSSL 1.1.0g")
1067 def test_min_max_version(self):
1068 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1069 self.assertEqual(
1070 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1071 )
1072 self.assertEqual(
1073 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1074 )
1075
1076 ctx.minimum_version = ssl.TLSVersion.TLSv1_1
1077 ctx.maximum_version = ssl.TLSVersion.TLSv1_2
1078 self.assertEqual(
1079 ctx.minimum_version, ssl.TLSVersion.TLSv1_1
1080 )
1081 self.assertEqual(
1082 ctx.maximum_version, ssl.TLSVersion.TLSv1_2
1083 )
1084
1085 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1086 ctx.maximum_version = ssl.TLSVersion.TLSv1
1087 self.assertEqual(
1088 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1089 )
1090 self.assertEqual(
1091 ctx.maximum_version, ssl.TLSVersion.TLSv1
1092 )
1093
1094 ctx.maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1095 self.assertEqual(
1096 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1097 )
1098
1099 ctx.maximum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1100 self.assertIn(
1101 ctx.maximum_version,
1102 {ssl.TLSVersion.TLSv1, ssl.TLSVersion.SSLv3}
1103 )
1104
1105 ctx.minimum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
1106 self.assertIn(
1107 ctx.minimum_version,
1108 {ssl.TLSVersion.TLSv1_2, ssl.TLSVersion.TLSv1_3}
1109 )
1110
1111 with self.assertRaises(ValueError):
1112 ctx.minimum_version = 42
1113
1114 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
1115
1116 self.assertEqual(
1117 ctx.minimum_version, ssl.TLSVersion.MINIMUM_SUPPORTED
1118 )
1119 self.assertEqual(
1120 ctx.maximum_version, ssl.TLSVersion.MAXIMUM_SUPPORTED
1121 )
1122 with self.assertRaises(ValueError):
1123 ctx.minimum_version = ssl.TLSVersion.MINIMUM_SUPPORTED
1124 with self.assertRaises(ValueError):
1125 ctx.maximum_version = ssl.TLSVersion.TLSv1
1126
1127
Christian Heimes2427b502013-11-23 11:24:32 +01001128 @unittest.skipUnless(have_verify_flags(),
1129 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001130 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001131 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001132 # default value
1133 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1134 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001135 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1136 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1137 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1138 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1139 ctx.verify_flags = ssl.VERIFY_DEFAULT
1140 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1141 # supports any value
1142 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1143 self.assertEqual(ctx.verify_flags,
1144 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1145 with self.assertRaises(TypeError):
1146 ctx.verify_flags = None
1147
Antoine Pitrou152efa22010-05-16 18:19:27 +00001148 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001149 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001150 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001151 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001152 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1153 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001154 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001155 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001156 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001157 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001158 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001159 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001160 ctx.load_cert_chain(EMPTYCERT)
1161 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001162 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001163 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1164 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1165 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001166 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001167 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001168 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001169 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001170 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001171 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1172 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001174 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001175 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001176 # Password protected key and cert
1177 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1178 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1179 ctx.load_cert_chain(CERTFILE_PROTECTED,
1180 password=bytearray(KEY_PASSWORD.encode()))
1181 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1182 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1183 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1184 bytearray(KEY_PASSWORD.encode()))
1185 with self.assertRaisesRegex(TypeError, "should be a string"):
1186 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1187 with self.assertRaises(ssl.SSLError):
1188 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1189 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1190 # openssl has a fixed limit on the password buffer.
1191 # PEM_BUFSIZE is generally set to 1kb.
1192 # Return a string larger than this.
1193 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1194 # Password callback
1195 def getpass_unicode():
1196 return KEY_PASSWORD
1197 def getpass_bytes():
1198 return KEY_PASSWORD.encode()
1199 def getpass_bytearray():
1200 return bytearray(KEY_PASSWORD.encode())
1201 def getpass_badpass():
1202 return "badpass"
1203 def getpass_huge():
1204 return b'a' * (1024 * 1024)
1205 def getpass_bad_type():
1206 return 9
1207 def getpass_exception():
1208 raise Exception('getpass error')
1209 class GetPassCallable:
1210 def __call__(self):
1211 return KEY_PASSWORD
1212 def getpass(self):
1213 return KEY_PASSWORD
1214 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1215 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1216 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1217 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1218 ctx.load_cert_chain(CERTFILE_PROTECTED,
1219 password=GetPassCallable().getpass)
1220 with self.assertRaises(ssl.SSLError):
1221 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1222 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1223 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1224 with self.assertRaisesRegex(TypeError, "must return a string"):
1225 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1226 with self.assertRaisesRegex(Exception, "getpass error"):
1227 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1228 # Make sure the password function isn't called if it isn't needed
1229 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001230
1231 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001232 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001233 ctx.load_verify_locations(CERTFILE)
1234 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1235 ctx.load_verify_locations(BYTES_CERTFILE)
1236 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1237 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001238 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001239 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001240 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001241 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001242 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001243 ctx.load_verify_locations(BADCERT)
1244 ctx.load_verify_locations(CERTFILE, CAPATH)
1245 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1246
Victor Stinner80f75e62011-01-29 11:31:20 +00001247 # Issue #10989: crash if the second argument type is invalid
1248 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1249
Christian Heimesefff7062013-11-21 03:35:02 +01001250 def test_load_verify_cadata(self):
1251 # test cadata
1252 with open(CAFILE_CACERT) as f:
1253 cacert_pem = f.read()
1254 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1255 with open(CAFILE_NEURONIO) as f:
1256 neuronio_pem = f.read()
1257 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1258
1259 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001260 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001261 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1262 ctx.load_verify_locations(cadata=cacert_pem)
1263 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1264 ctx.load_verify_locations(cadata=neuronio_pem)
1265 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1266 # cert already in hash table
1267 ctx.load_verify_locations(cadata=neuronio_pem)
1268 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1269
1270 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001271 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001272 combined = "\n".join((cacert_pem, neuronio_pem))
1273 ctx.load_verify_locations(cadata=combined)
1274 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1275
1276 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001277 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001278 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1279 neuronio_pem, "tail"]
1280 ctx.load_verify_locations(cadata="\n".join(combined))
1281 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1282
1283 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001284 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001285 ctx.load_verify_locations(cadata=cacert_der)
1286 ctx.load_verify_locations(cadata=neuronio_der)
1287 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1288 # cert already in hash table
1289 ctx.load_verify_locations(cadata=cacert_der)
1290 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1291
1292 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001293 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001294 combined = b"".join((cacert_der, neuronio_der))
1295 ctx.load_verify_locations(cadata=combined)
1296 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1297
1298 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001299 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001300 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1301
1302 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1303 ctx.load_verify_locations(cadata="broken")
1304 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1305 ctx.load_verify_locations(cadata=b"broken")
1306
1307
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001308 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001309 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001310 ctx.load_dh_params(DHFILE)
1311 if os.name != 'nt':
1312 ctx.load_dh_params(BYTES_DHFILE)
1313 self.assertRaises(TypeError, ctx.load_dh_params)
1314 self.assertRaises(TypeError, ctx.load_dh_params, None)
1315 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001316 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001317 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001318 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001319 ctx.load_dh_params(CERTFILE)
1320
Antoine Pitroub0182c82010-10-12 20:09:02 +00001321 def test_session_stats(self):
1322 for proto in PROTOCOLS:
1323 ctx = ssl.SSLContext(proto)
1324 self.assertEqual(ctx.session_stats(), {
1325 'number': 0,
1326 'connect': 0,
1327 'connect_good': 0,
1328 'connect_renegotiate': 0,
1329 'accept': 0,
1330 'accept_good': 0,
1331 'accept_renegotiate': 0,
1332 'hits': 0,
1333 'misses': 0,
1334 'timeouts': 0,
1335 'cache_full': 0,
1336 })
1337
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001338 def test_set_default_verify_paths(self):
1339 # There's not much we can do to test that it acts as expected,
1340 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001341 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001342 ctx.set_default_verify_paths()
1343
Antoine Pitrou501da612011-12-21 09:27:41 +01001344 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001345 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001346 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001347 ctx.set_ecdh_curve("prime256v1")
1348 ctx.set_ecdh_curve(b"prime256v1")
1349 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1350 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1351 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1352 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1353
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001354 @needs_sni
1355 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001356 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001357
1358 # set_servername_callback expects a callable, or None
1359 self.assertRaises(TypeError, ctx.set_servername_callback)
1360 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1361 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1362 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1363
1364 def dummycallback(sock, servername, ctx):
1365 pass
1366 ctx.set_servername_callback(None)
1367 ctx.set_servername_callback(dummycallback)
1368
1369 @needs_sni
1370 def test_sni_callback_refcycle(self):
1371 # Reference cycles through the servername callback are detected
1372 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001373 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001374 def dummycallback(sock, servername, ctx, cycle=ctx):
1375 pass
1376 ctx.set_servername_callback(dummycallback)
1377 wr = weakref.ref(ctx)
1378 del ctx, dummycallback
1379 gc.collect()
1380 self.assertIs(wr(), None)
1381
Christian Heimes9a5395a2013-06-17 15:44:12 +02001382 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001383 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001384 self.assertEqual(ctx.cert_store_stats(),
1385 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1386 ctx.load_cert_chain(CERTFILE)
1387 self.assertEqual(ctx.cert_store_stats(),
1388 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1389 ctx.load_verify_locations(CERTFILE)
1390 self.assertEqual(ctx.cert_store_stats(),
1391 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001392 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001393 self.assertEqual(ctx.cert_store_stats(),
1394 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1395
1396 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001397 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001398 self.assertEqual(ctx.get_ca_certs(), [])
1399 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1400 ctx.load_verify_locations(CERTFILE)
1401 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001402 # but CAFILE_CACERT is a CA cert
1403 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001404 self.assertEqual(ctx.get_ca_certs(),
1405 [{'issuer': ((('organizationName', 'Root CA'),),
1406 (('organizationalUnitName', 'http://www.cacert.org'),),
1407 (('commonName', 'CA Cert Signing Authority'),),
1408 (('emailAddress', 'support@cacert.org'),)),
1409 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1410 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1411 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001412 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001413 'subject': ((('organizationName', 'Root CA'),),
1414 (('organizationalUnitName', 'http://www.cacert.org'),),
1415 (('commonName', 'CA Cert Signing Authority'),),
1416 (('emailAddress', 'support@cacert.org'),)),
1417 'version': 3}])
1418
Martin Panterb55f8b72016-01-14 12:53:56 +00001419 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001420 pem = f.read()
1421 der = ssl.PEM_cert_to_DER_cert(pem)
1422 self.assertEqual(ctx.get_ca_certs(True), [der])
1423
Christian Heimes72d28502013-11-23 13:56:58 +01001424 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001426 ctx.load_default_certs()
1427
Christian Heimesa170fa12017-09-15 20:27:30 +02001428 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001429 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1430 ctx.load_default_certs()
1431
Christian Heimesa170fa12017-09-15 20:27:30 +02001432 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001433 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1434
Christian Heimesa170fa12017-09-15 20:27:30 +02001435 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001436 self.assertRaises(TypeError, ctx.load_default_certs, None)
1437 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1438
Benjamin Peterson91244e02014-10-03 18:17:15 -04001439 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001440 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001441 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001442 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001443 with support.EnvironmentVarGuard() as env:
1444 env["SSL_CERT_DIR"] = CAPATH
1445 env["SSL_CERT_FILE"] = CERTFILE
1446 ctx.load_default_certs()
1447 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1448
Benjamin Peterson91244e02014-10-03 18:17:15 -04001449 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001450 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001451 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001452 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001453 ctx.load_default_certs()
1454 stats = ctx.cert_store_stats()
1455
Christian Heimesa170fa12017-09-15 20:27:30 +02001456 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001457 with support.EnvironmentVarGuard() as env:
1458 env["SSL_CERT_DIR"] = CAPATH
1459 env["SSL_CERT_FILE"] = CERTFILE
1460 ctx.load_default_certs()
1461 stats["x509"] += 1
1462 self.assertEqual(ctx.cert_store_stats(), stats)
1463
Christian Heimes358cfd42016-09-10 22:43:48 +02001464 def _assert_context_options(self, ctx):
1465 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1466 if OP_NO_COMPRESSION != 0:
1467 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1468 OP_NO_COMPRESSION)
1469 if OP_SINGLE_DH_USE != 0:
1470 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1471 OP_SINGLE_DH_USE)
1472 if OP_SINGLE_ECDH_USE != 0:
1473 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1474 OP_SINGLE_ECDH_USE)
1475 if OP_CIPHER_SERVER_PREFERENCE != 0:
1476 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1477 OP_CIPHER_SERVER_PREFERENCE)
1478
Christian Heimes4c05b472013-11-23 15:58:30 +01001479 def test_create_default_context(self):
1480 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001481
Christian Heimesa170fa12017-09-15 20:27:30 +02001482 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001483 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001484 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001485 self._assert_context_options(ctx)
1486
Christian Heimes4c05b472013-11-23 15:58:30 +01001487 with open(SIGNING_CA) as f:
1488 cadata = f.read()
1489 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1490 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001491 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001492 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001493 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001494
1495 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001496 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001497 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001498 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001499
Christian Heimes67986f92013-11-23 22:43:47 +01001500 def test__create_stdlib_context(self):
1501 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001502 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001503 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001504 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001505 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001506
1507 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1508 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1509 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001510 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001511
1512 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001513 cert_reqs=ssl.CERT_REQUIRED,
1514 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001515 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1516 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001517 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001518 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001519
1520 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001521 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001522 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001523 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001524
Christian Heimes1aa9a752013-12-02 02:41:19 +01001525 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001526 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001527 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001528 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001529
Christian Heimese82c0342017-09-15 20:29:57 +02001530 # Auto set CERT_REQUIRED
1531 ctx.check_hostname = True
1532 self.assertTrue(ctx.check_hostname)
1533 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1534 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001535 ctx.verify_mode = ssl.CERT_REQUIRED
1536 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001537 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001538
Christian Heimese82c0342017-09-15 20:29:57 +02001539 # Changing verify_mode does not affect check_hostname
1540 ctx.check_hostname = False
1541 ctx.verify_mode = ssl.CERT_NONE
1542 ctx.check_hostname = False
1543 self.assertFalse(ctx.check_hostname)
1544 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1545 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001546 ctx.check_hostname = True
1547 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001548 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1549
1550 ctx.check_hostname = False
1551 ctx.verify_mode = ssl.CERT_OPTIONAL
1552 ctx.check_hostname = False
1553 self.assertFalse(ctx.check_hostname)
1554 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1555 # keep CERT_OPTIONAL
1556 ctx.check_hostname = True
1557 self.assertTrue(ctx.check_hostname)
1558 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001559
1560 # Cannot set CERT_NONE with check_hostname enabled
1561 with self.assertRaises(ValueError):
1562 ctx.verify_mode = ssl.CERT_NONE
1563 ctx.check_hostname = False
1564 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001565 ctx.verify_mode = ssl.CERT_NONE
1566 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001567
Christian Heimes5fe668c2016-09-12 00:01:11 +02001568 def test_context_client_server(self):
1569 # PROTOCOL_TLS_CLIENT has sane defaults
1570 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1571 self.assertTrue(ctx.check_hostname)
1572 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1573
1574 # PROTOCOL_TLS_SERVER has different but also sane defaults
1575 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1576 self.assertFalse(ctx.check_hostname)
1577 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1578
Christian Heimes4df60f12017-09-15 20:26:05 +02001579 def test_context_custom_class(self):
1580 class MySSLSocket(ssl.SSLSocket):
1581 pass
1582
1583 class MySSLObject(ssl.SSLObject):
1584 pass
1585
1586 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1587 ctx.sslsocket_class = MySSLSocket
1588 ctx.sslobject_class = MySSLObject
1589
1590 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1591 self.assertIsInstance(sock, MySSLSocket)
1592 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1593 self.assertIsInstance(obj, MySSLObject)
1594
Antoine Pitrou152efa22010-05-16 18:19:27 +00001595
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001596class SSLErrorTests(unittest.TestCase):
1597
1598 def test_str(self):
1599 # The str() of a SSLError doesn't include the errno
1600 e = ssl.SSLError(1, "foo")
1601 self.assertEqual(str(e), "foo")
1602 self.assertEqual(e.errno, 1)
1603 # Same for a subclass
1604 e = ssl.SSLZeroReturnError(1, "foo")
1605 self.assertEqual(str(e), "foo")
1606 self.assertEqual(e.errno, 1)
1607
1608 def test_lib_reason(self):
1609 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001610 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001611 with self.assertRaises(ssl.SSLError) as cm:
1612 ctx.load_dh_params(CERTFILE)
1613 self.assertEqual(cm.exception.library, 'PEM')
1614 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1615 s = str(cm.exception)
1616 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1617
1618 def test_subclass(self):
1619 # Check that the appropriate SSLError subclass is raised
1620 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001621 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1622 ctx.check_hostname = False
1623 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001624 with socket.socket() as s:
1625 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001626 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001627 c = socket.socket()
1628 c.connect(s.getsockname())
1629 c.setblocking(False)
1630 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001631 with self.assertRaises(ssl.SSLWantReadError) as cm:
1632 c.do_handshake()
1633 s = str(cm.exception)
1634 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1635 # For compatibility
1636 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1637
1638
Christian Heimes61d478c2018-01-27 15:51:38 +01001639 def test_bad_server_hostname(self):
1640 ctx = ssl.create_default_context()
1641 with self.assertRaises(ValueError):
1642 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1643 server_hostname="")
1644 with self.assertRaises(ValueError):
1645 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1646 server_hostname=".example.org")
Christian Heimesd02ac252018-03-25 12:36:13 +02001647 with self.assertRaises(TypeError):
1648 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1649 server_hostname="example.org\x00evil.com")
Christian Heimes61d478c2018-01-27 15:51:38 +01001650
1651
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001652class MemoryBIOTests(unittest.TestCase):
1653
1654 def test_read_write(self):
1655 bio = ssl.MemoryBIO()
1656 bio.write(b'foo')
1657 self.assertEqual(bio.read(), b'foo')
1658 self.assertEqual(bio.read(), b'')
1659 bio.write(b'foo')
1660 bio.write(b'bar')
1661 self.assertEqual(bio.read(), b'foobar')
1662 self.assertEqual(bio.read(), b'')
1663 bio.write(b'baz')
1664 self.assertEqual(bio.read(2), b'ba')
1665 self.assertEqual(bio.read(1), b'z')
1666 self.assertEqual(bio.read(1), b'')
1667
1668 def test_eof(self):
1669 bio = ssl.MemoryBIO()
1670 self.assertFalse(bio.eof)
1671 self.assertEqual(bio.read(), b'')
1672 self.assertFalse(bio.eof)
1673 bio.write(b'foo')
1674 self.assertFalse(bio.eof)
1675 bio.write_eof()
1676 self.assertFalse(bio.eof)
1677 self.assertEqual(bio.read(2), b'fo')
1678 self.assertFalse(bio.eof)
1679 self.assertEqual(bio.read(1), b'o')
1680 self.assertTrue(bio.eof)
1681 self.assertEqual(bio.read(), b'')
1682 self.assertTrue(bio.eof)
1683
1684 def test_pending(self):
1685 bio = ssl.MemoryBIO()
1686 self.assertEqual(bio.pending, 0)
1687 bio.write(b'foo')
1688 self.assertEqual(bio.pending, 3)
1689 for i in range(3):
1690 bio.read(1)
1691 self.assertEqual(bio.pending, 3-i-1)
1692 for i in range(3):
1693 bio.write(b'x')
1694 self.assertEqual(bio.pending, i+1)
1695 bio.read()
1696 self.assertEqual(bio.pending, 0)
1697
1698 def test_buffer_types(self):
1699 bio = ssl.MemoryBIO()
1700 bio.write(b'foo')
1701 self.assertEqual(bio.read(), b'foo')
1702 bio.write(bytearray(b'bar'))
1703 self.assertEqual(bio.read(), b'bar')
1704 bio.write(memoryview(b'baz'))
1705 self.assertEqual(bio.read(), b'baz')
1706
1707 def test_error_types(self):
1708 bio = ssl.MemoryBIO()
1709 self.assertRaises(TypeError, bio.write, 'foo')
1710 self.assertRaises(TypeError, bio.write, None)
1711 self.assertRaises(TypeError, bio.write, True)
1712 self.assertRaises(TypeError, bio.write, 1)
1713
1714
Christian Heimes9d50ab52018-02-27 10:17:30 +01001715class SSLObjectTests(unittest.TestCase):
1716 def test_private_init(self):
1717 bio = ssl.MemoryBIO()
1718 with self.assertRaisesRegex(TypeError, "public constructor"):
1719 ssl.SSLObject(bio, bio)
1720
Nathaniel J. Smithc0da5822018-09-21 21:44:12 -07001721 def test_unwrap(self):
1722 client_ctx, server_ctx, hostname = testing_context()
1723 c_in = ssl.MemoryBIO()
1724 c_out = ssl.MemoryBIO()
1725 s_in = ssl.MemoryBIO()
1726 s_out = ssl.MemoryBIO()
1727 client = client_ctx.wrap_bio(c_in, c_out, server_hostname=hostname)
1728 server = server_ctx.wrap_bio(s_in, s_out, server_side=True)
1729
1730 # Loop on the handshake for a bit to get it settled
1731 for _ in range(5):
1732 try:
1733 client.do_handshake()
1734 except ssl.SSLWantReadError:
1735 pass
1736 if c_out.pending:
1737 s_in.write(c_out.read())
1738 try:
1739 server.do_handshake()
1740 except ssl.SSLWantReadError:
1741 pass
1742 if s_out.pending:
1743 c_in.write(s_out.read())
1744 # Now the handshakes should be complete (don't raise WantReadError)
1745 client.do_handshake()
1746 server.do_handshake()
1747
1748 # Now if we unwrap one side unilaterally, it should send close-notify
1749 # and raise WantReadError:
1750 with self.assertRaises(ssl.SSLWantReadError):
1751 client.unwrap()
1752
1753 # But server.unwrap() does not raise, because it reads the client's
1754 # close-notify:
1755 s_in.write(c_out.read())
1756 server.unwrap()
1757
1758 # And now that the client gets the server's close-notify, it doesn't
1759 # raise either.
1760 c_in.write(s_out.read())
1761 client.unwrap()
Christian Heimes9d50ab52018-02-27 10:17:30 +01001762
Martin Panter3840b2a2016-03-27 01:53:46 +00001763class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001764 """Tests that connect to a simple server running in the background"""
1765
1766 def setUp(self):
1767 server = ThreadedEchoServer(SIGNED_CERTFILE)
1768 self.server_addr = (HOST, server.port)
1769 server.__enter__()
1770 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001771
Antoine Pitrou480a1242010-04-28 21:37:09 +00001772 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001773 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001774 cert_reqs=ssl.CERT_NONE) as s:
1775 s.connect(self.server_addr)
1776 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001777 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001778
Martin Panter3840b2a2016-03-27 01:53:46 +00001779 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001780 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001781 cert_reqs=ssl.CERT_REQUIRED,
1782 ca_certs=SIGNING_CA) as s:
1783 s.connect(self.server_addr)
1784 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001785 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001786
Martin Panter3840b2a2016-03-27 01:53:46 +00001787 def test_connect_fail(self):
1788 # This should fail because we have no verification certs. Connection
1789 # failure crashes ThreadedEchoServer, so run this in an independent
1790 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001791 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001792 cert_reqs=ssl.CERT_REQUIRED)
1793 self.addCleanup(s.close)
1794 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1795 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001796
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001797 def test_connect_ex(self):
1798 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001799 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001800 cert_reqs=ssl.CERT_REQUIRED,
1801 ca_certs=SIGNING_CA)
1802 self.addCleanup(s.close)
1803 self.assertEqual(0, s.connect_ex(self.server_addr))
1804 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001805
1806 def test_non_blocking_connect_ex(self):
1807 # Issue #11326: non-blocking connect_ex() should allow handshake
1808 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001809 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001810 cert_reqs=ssl.CERT_REQUIRED,
1811 ca_certs=SIGNING_CA,
1812 do_handshake_on_connect=False)
1813 self.addCleanup(s.close)
1814 s.setblocking(False)
1815 rc = s.connect_ex(self.server_addr)
1816 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1817 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1818 # Wait for connect to finish
1819 select.select([], [s], [], 5.0)
1820 # Non-blocking handshake
1821 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001822 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001823 s.do_handshake()
1824 break
1825 except ssl.SSLWantReadError:
1826 select.select([s], [], [], 5.0)
1827 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001828 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001829 # SSL established
1830 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001831
Antoine Pitrou152efa22010-05-16 18:19:27 +00001832 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001833 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001834 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001835 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1836 s.connect(self.server_addr)
1837 self.assertEqual({}, s.getpeercert())
1838 # Same with a server hostname
1839 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1840 server_hostname="dummy") as s:
1841 s.connect(self.server_addr)
1842 ctx.verify_mode = ssl.CERT_REQUIRED
1843 # This should succeed because we specify the root cert
1844 ctx.load_verify_locations(SIGNING_CA)
1845 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1846 s.connect(self.server_addr)
1847 cert = s.getpeercert()
1848 self.assertTrue(cert)
1849
1850 def test_connect_with_context_fail(self):
1851 # This should fail because we have no verification certs. Connection
1852 # failure crashes ThreadedEchoServer, so run this in an independent
1853 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001854 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001855 ctx.verify_mode = ssl.CERT_REQUIRED
1856 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1857 self.addCleanup(s.close)
1858 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1859 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001860
1861 def test_connect_capath(self):
1862 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001863 # NOTE: the subject hashing algorithm has been changed between
1864 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1865 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001866 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001867 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001868 ctx.verify_mode = ssl.CERT_REQUIRED
1869 ctx.load_verify_locations(capath=CAPATH)
1870 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1871 s.connect(self.server_addr)
1872 cert = s.getpeercert()
1873 self.assertTrue(cert)
Christian Heimes529525f2018-05-23 22:24:45 +02001874
Martin Panter3840b2a2016-03-27 01:53:46 +00001875 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001876 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001877 ctx.verify_mode = ssl.CERT_REQUIRED
1878 ctx.load_verify_locations(capath=BYTES_CAPATH)
1879 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1880 s.connect(self.server_addr)
1881 cert = s.getpeercert()
1882 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001883
Christian Heimesefff7062013-11-21 03:35:02 +01001884 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001885 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001886 pem = f.read()
1887 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001888 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001889 ctx.verify_mode = ssl.CERT_REQUIRED
1890 ctx.load_verify_locations(cadata=pem)
1891 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1892 s.connect(self.server_addr)
1893 cert = s.getpeercert()
1894 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001895
Martin Panter3840b2a2016-03-27 01:53:46 +00001896 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001897 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001898 ctx.verify_mode = ssl.CERT_REQUIRED
1899 ctx.load_verify_locations(cadata=der)
1900 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1901 s.connect(self.server_addr)
1902 cert = s.getpeercert()
1903 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001904
Antoine Pitroue3220242010-04-24 11:13:53 +00001905 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1906 def test_makefile_close(self):
1907 # Issue #5238: creating a file-like object with makefile() shouldn't
1908 # delay closing the underlying "real socket" (here tested with its
1909 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001910 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001911 ss.connect(self.server_addr)
1912 fd = ss.fileno()
1913 f = ss.makefile()
1914 f.close()
1915 # The fd is still open
1916 os.read(fd, 0)
1917 # Closing the SSL socket should close the fd too
1918 ss.close()
1919 gc.collect()
1920 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001921 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001922 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001923
Antoine Pitrou480a1242010-04-28 21:37:09 +00001924 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001925 s = socket.socket(socket.AF_INET)
1926 s.connect(self.server_addr)
1927 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001928 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001929 cert_reqs=ssl.CERT_NONE,
1930 do_handshake_on_connect=False)
1931 self.addCleanup(s.close)
1932 count = 0
1933 while True:
1934 try:
1935 count += 1
1936 s.do_handshake()
1937 break
1938 except ssl.SSLWantReadError:
1939 select.select([s], [], [])
1940 except ssl.SSLWantWriteError:
1941 select.select([], [s], [])
1942 if support.verbose:
1943 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001944
Antoine Pitrou480a1242010-04-28 21:37:09 +00001945 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001946 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001947
Martin Panter3840b2a2016-03-27 01:53:46 +00001948 def test_get_server_certificate_fail(self):
1949 # Connection failure crashes ThreadedEchoServer, so run this in an
1950 # independent test method
1951 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001952
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001953 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001954 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001955 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1956 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001957 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001958 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1959 s.connect(self.server_addr)
1960 # Error checking can happen at instantiation or when connecting
1961 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1962 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001963 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001964 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1965 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001966
Christian Heimes9a5395a2013-06-17 15:44:12 +02001967 def test_get_ca_certs_capath(self):
1968 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001969 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001970 ctx.load_verify_locations(capath=CAPATH)
1971 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001972 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1973 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001974 s.connect(self.server_addr)
1975 cert = s.getpeercert()
1976 self.assertTrue(cert)
1977 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001978
Christian Heimes575596e2013-12-15 21:49:17 +01001979 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001980 def test_context_setget(self):
1981 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001982 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1983 ctx1.load_verify_locations(capath=CAPATH)
1984 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1985 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001986 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001987 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001988 ss.connect(self.server_addr)
1989 self.assertIs(ss.context, ctx1)
1990 self.assertIs(ss._sslobj.context, ctx1)
1991 ss.context = ctx2
1992 self.assertIs(ss.context, ctx2)
1993 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001994
1995 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1996 # A simple IO loop. Call func(*args) depending on the error we get
1997 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1998 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001999 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002000 count = 0
2001 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07002002 if time.monotonic() > deadline:
2003 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002004 errno = None
2005 count += 1
2006 try:
2007 ret = func(*args)
2008 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002009 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00002010 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002011 raise
2012 errno = e.errno
2013 # Get any data from the outgoing BIO irrespective of any error, and
2014 # send it to the socket.
2015 buf = outgoing.read()
2016 sock.sendall(buf)
2017 # If there's no error, we're done. For WANT_READ, we need to get
2018 # data from the socket and put it in the incoming BIO.
2019 if errno is None:
2020 break
2021 elif errno == ssl.SSL_ERROR_WANT_READ:
2022 buf = sock.recv(32768)
2023 if buf:
2024 incoming.write(buf)
2025 else:
2026 incoming.write_eof()
2027 if support.verbose:
2028 sys.stdout.write("Needed %d calls to complete %s().\n"
2029 % (count, func.__name__))
2030 return ret
2031
Martin Panter3840b2a2016-03-27 01:53:46 +00002032 def test_bio_handshake(self):
2033 sock = socket.socket(socket.AF_INET)
2034 self.addCleanup(sock.close)
2035 sock.connect(self.server_addr)
2036 incoming = ssl.MemoryBIO()
2037 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002038 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2039 self.assertTrue(ctx.check_hostname)
2040 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00002041 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02002042 sslobj = ctx.wrap_bio(incoming, outgoing, False,
2043 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00002044 self.assertIs(sslobj._sslobj.owner, sslobj)
2045 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07002046 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02002047 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00002048 self.assertRaises(ValueError, sslobj.getpeercert)
2049 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2050 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
2051 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2052 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02002053 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07002054 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00002055 self.assertTrue(sslobj.getpeercert())
2056 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
2057 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
2058 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002059 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00002060 except ssl.SSLSyscallError:
2061 # If the server shuts down the TCP connection without sending a
2062 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
2063 pass
2064 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
2065
2066 def test_bio_read_write_data(self):
2067 sock = socket.socket(socket.AF_INET)
2068 self.addCleanup(sock.close)
2069 sock.connect(self.server_addr)
2070 incoming = ssl.MemoryBIO()
2071 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02002072 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00002073 ctx.verify_mode = ssl.CERT_NONE
2074 sslobj = ctx.wrap_bio(incoming, outgoing, False)
2075 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
2076 req = b'FOO\n'
2077 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
2078 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
2079 self.assertEqual(buf, b'foo\n')
2080 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02002081
2082
Martin Panter3840b2a2016-03-27 01:53:46 +00002083class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002084
Martin Panter3840b2a2016-03-27 01:53:46 +00002085 def test_timeout_connect_ex(self):
2086 # Issue #12065: on a timeout, connect_ex() should return the original
2087 # errno (mimicking the behaviour of non-SSL sockets).
2088 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02002089 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00002090 cert_reqs=ssl.CERT_REQUIRED,
2091 do_handshake_on_connect=False)
2092 self.addCleanup(s.close)
2093 s.settimeout(0.0000001)
2094 rc = s.connect_ex((REMOTE_HOST, 443))
2095 if rc == 0:
2096 self.skipTest("REMOTE_HOST responded too quickly")
2097 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
2098
2099 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
2100 def test_get_server_certificate_ipv6(self):
2101 with support.transient_internet('ipv6.google.com'):
2102 _test_get_server_certificate(self, 'ipv6.google.com', 443)
2103 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
2104
Martin Panter3840b2a2016-03-27 01:53:46 +00002105
2106def _test_get_server_certificate(test, host, port, cert=None):
2107 pem = ssl.get_server_certificate((host, port))
2108 if not pem:
2109 test.fail("No server certificate on %s:%s!" % (host, port))
2110
2111 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
2112 if not pem:
2113 test.fail("No server certificate on %s:%s!" % (host, port))
2114 if support.verbose:
2115 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
2116
2117def _test_get_server_certificate_fail(test, host, port):
2118 try:
2119 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
2120 except ssl.SSLError as x:
2121 #should fail
2122 if support.verbose:
2123 sys.stdout.write("%s\n" % x)
2124 else:
2125 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
2126
2127
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002128from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002129
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002130class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002131
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002132 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002133
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002134 """A mildly complicated class, because we want it to work both
2135 with and without the SSL wrapper around the socket connection, so
2136 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002137
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002138 def __init__(self, server, connsock, addr):
2139 self.server = server
2140 self.running = False
2141 self.sock = connsock
2142 self.addr = addr
2143 self.sock.setblocking(1)
2144 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002145 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002146 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002147
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002148 def wrap_conn(self):
2149 try:
2150 self.sslconn = self.server.context.wrap_socket(
2151 self.sock, server_side=True)
2152 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2153 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Christian Heimes529525f2018-05-23 22:24:45 +02002154 except (ConnectionResetError, BrokenPipeError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002155 # We treat ConnectionResetError as though it were an
2156 # SSLError - OpenSSL on Ubuntu abruptly closes the
2157 # connection when asked to use an unsupported protocol.
2158 #
Christian Heimes529525f2018-05-23 22:24:45 +02002159 # BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
2160 # tries to send session tickets after handshake.
2161 # https://github.com/openssl/openssl/issues/6342
2162 self.server.conn_errors.append(str(e))
2163 if self.server.chatty:
2164 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2165 self.running = False
2166 self.close()
2167 return False
2168 except (ssl.SSLError, OSError) as e:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002169 # OSError may occur with wrong protocols, e.g. both
2170 # sides use PROTOCOL_TLS_SERVER.
2171 #
2172 # XXX Various errors can have happened here, for example
2173 # a mismatching protocol version, an invalid certificate,
2174 # or a low-level bug. This should be made more discriminating.
2175 #
2176 # bpo-31323: Store the exception as string to prevent
2177 # a reference leak: server -> conn_errors -> exception
2178 # -> traceback -> self (ConnectionHandler) -> server
2179 self.server.conn_errors.append(str(e))
2180 if self.server.chatty:
2181 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2182 self.running = False
2183 self.server.stop()
2184 self.close()
2185 return False
2186 else:
2187 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2188 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2189 cert = self.sslconn.getpeercert()
2190 if support.verbose and self.server.chatty:
2191 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2192 cert_binary = self.sslconn.getpeercert(True)
2193 if support.verbose and self.server.chatty:
2194 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2195 cipher = self.sslconn.cipher()
2196 if support.verbose and self.server.chatty:
2197 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2198 sys.stdout.write(" server: selected protocol is now "
2199 + str(self.sslconn.selected_npn_protocol()) + "\n")
2200 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002201
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002202 def read(self):
2203 if self.sslconn:
2204 return self.sslconn.read()
2205 else:
2206 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002207
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002208 def write(self, bytes):
2209 if self.sslconn:
2210 return self.sslconn.write(bytes)
2211 else:
2212 return self.sock.send(bytes)
2213
2214 def close(self):
2215 if self.sslconn:
2216 self.sslconn.close()
2217 else:
2218 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002219
Antoine Pitrou480a1242010-04-28 21:37:09 +00002220 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002221 self.running = True
2222 if not self.server.starttls_server:
2223 if not self.wrap_conn():
2224 return
2225 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002226 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002227 msg = self.read()
2228 stripped = msg.strip()
2229 if not stripped:
2230 # eof, so quit this handler
2231 self.running = False
2232 try:
2233 self.sock = self.sslconn.unwrap()
2234 except OSError:
2235 # Many tests shut the TCP connection down
2236 # without an SSL shutdown. This causes
2237 # unwrap() to raise OSError with errno=0!
2238 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002239 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002240 self.sslconn = None
2241 self.close()
2242 elif stripped == b'over':
2243 if support.verbose and self.server.connectionchatty:
2244 sys.stdout.write(" server: client closed connection\n")
2245 self.close()
2246 return
2247 elif (self.server.starttls_server and
2248 stripped == b'STARTTLS'):
2249 if support.verbose and self.server.connectionchatty:
2250 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2251 self.write(b"OK\n")
2252 if not self.wrap_conn():
2253 return
2254 elif (self.server.starttls_server and self.sslconn
2255 and stripped == b'ENDTLS'):
2256 if support.verbose and self.server.connectionchatty:
2257 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2258 self.write(b"OK\n")
2259 self.sock = self.sslconn.unwrap()
2260 self.sslconn = None
2261 if support.verbose and self.server.connectionchatty:
2262 sys.stdout.write(" server: connection is now unencrypted...\n")
2263 elif stripped == b'CB tls-unique':
2264 if support.verbose and self.server.connectionchatty:
2265 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2266 data = self.sslconn.get_channel_binding("tls-unique")
2267 self.write(repr(data).encode("us-ascii") + b"\n")
Christian Heimes9fb051f2018-09-23 08:32:31 +02002268 elif stripped == b'PHA':
2269 if support.verbose and self.server.connectionchatty:
2270 sys.stdout.write(" server: initiating post handshake auth\n")
2271 try:
2272 self.sslconn.verify_client_post_handshake()
2273 except ssl.SSLError as e:
2274 self.write(repr(e).encode("us-ascii") + b"\n")
2275 else:
2276 self.write(b"OK\n")
2277 elif stripped == b'HASCERT':
2278 if self.sslconn.getpeercert() is not None:
2279 self.write(b'TRUE\n')
2280 else:
2281 self.write(b'FALSE\n')
2282 elif stripped == b'GETCERT':
2283 cert = self.sslconn.getpeercert()
2284 self.write(repr(cert).encode("us-ascii") + b"\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002285 else:
2286 if (support.verbose and
2287 self.server.connectionchatty):
2288 ctype = (self.sslconn and "encrypted") or "unencrypted"
2289 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2290 % (msg, ctype, msg.lower(), ctype))
2291 self.write(msg.lower())
Christian Heimes529525f2018-05-23 22:24:45 +02002292 except ConnectionResetError:
2293 # XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
2294 # when connection is not shut down gracefully.
2295 if self.server.chatty and support.verbose:
2296 sys.stdout.write(
2297 " Connection reset by peer: {}\n".format(
2298 self.addr)
2299 )
2300 self.close()
2301 self.running = False
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002302 except OSError:
2303 if self.server.chatty:
2304 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002305 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002306 self.running = False
Christian Heimes529525f2018-05-23 22:24:45 +02002307
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002308 # normally, we'd just stop here, but for the test
2309 # harness, we want to stop the server
2310 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002311
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002312 def __init__(self, certificate=None, ssl_version=None,
2313 certreqs=None, cacerts=None,
2314 chatty=True, connectionchatty=False, starttls_server=False,
2315 npn_protocols=None, alpn_protocols=None,
2316 ciphers=None, context=None):
2317 if context:
2318 self.context = context
2319 else:
2320 self.context = ssl.SSLContext(ssl_version
2321 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002322 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002323 self.context.verify_mode = (certreqs if certreqs is not None
2324 else ssl.CERT_NONE)
2325 if cacerts:
2326 self.context.load_verify_locations(cacerts)
2327 if certificate:
2328 self.context.load_cert_chain(certificate)
2329 if npn_protocols:
2330 self.context.set_npn_protocols(npn_protocols)
2331 if alpn_protocols:
2332 self.context.set_alpn_protocols(alpn_protocols)
2333 if ciphers:
2334 self.context.set_ciphers(ciphers)
2335 self.chatty = chatty
2336 self.connectionchatty = connectionchatty
2337 self.starttls_server = starttls_server
2338 self.sock = socket.socket()
2339 self.port = support.bind_port(self.sock)
2340 self.flag = None
2341 self.active = False
2342 self.selected_npn_protocols = []
2343 self.selected_alpn_protocols = []
2344 self.shared_ciphers = []
2345 self.conn_errors = []
2346 threading.Thread.__init__(self)
2347 self.daemon = True
2348
2349 def __enter__(self):
2350 self.start(threading.Event())
2351 self.flag.wait()
2352 return self
2353
2354 def __exit__(self, *args):
2355 self.stop()
2356 self.join()
2357
2358 def start(self, flag=None):
2359 self.flag = flag
2360 threading.Thread.start(self)
2361
2362 def run(self):
2363 self.sock.settimeout(0.05)
2364 self.sock.listen()
2365 self.active = True
2366 if self.flag:
2367 # signal an event
2368 self.flag.set()
2369 while self.active:
2370 try:
2371 newconn, connaddr = self.sock.accept()
2372 if support.verbose and self.chatty:
2373 sys.stdout.write(' server: new connection from '
2374 + repr(connaddr) + '\n')
2375 handler = self.ConnectionHandler(self, newconn, connaddr)
2376 handler.start()
2377 handler.join()
2378 except socket.timeout:
2379 pass
2380 except KeyboardInterrupt:
2381 self.stop()
Christian Heimes529525f2018-05-23 22:24:45 +02002382 except BaseException as e:
2383 if support.verbose and self.chatty:
2384 sys.stdout.write(
2385 ' connection handling failed: ' + repr(e) + '\n')
2386
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002387 self.sock.close()
2388
2389 def stop(self):
2390 self.active = False
2391
2392class AsyncoreEchoServer(threading.Thread):
2393
2394 # this one's based on asyncore.dispatcher
2395
2396 class EchoServer (asyncore.dispatcher):
2397
2398 class ConnectionHandler(asyncore.dispatcher_with_send):
2399
2400 def __init__(self, conn, certfile):
2401 self.socket = test_wrap_socket(conn, server_side=True,
2402 certfile=certfile,
2403 do_handshake_on_connect=False)
2404 asyncore.dispatcher_with_send.__init__(self, self.socket)
2405 self._ssl_accepting = True
2406 self._do_ssl_handshake()
2407
2408 def readable(self):
2409 if isinstance(self.socket, ssl.SSLSocket):
2410 while self.socket.pending() > 0:
2411 self.handle_read_event()
2412 return True
2413
2414 def _do_ssl_handshake(self):
2415 try:
2416 self.socket.do_handshake()
2417 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2418 return
2419 except ssl.SSLEOFError:
2420 return self.handle_close()
2421 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002422 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002423 except OSError as err:
2424 if err.args[0] == errno.ECONNABORTED:
2425 return self.handle_close()
2426 else:
2427 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002428
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002429 def handle_read(self):
2430 if self._ssl_accepting:
2431 self._do_ssl_handshake()
2432 else:
2433 data = self.recv(1024)
2434 if support.verbose:
2435 sys.stdout.write(" server: read %s from client\n" % repr(data))
2436 if not data:
2437 self.close()
2438 else:
2439 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002440
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002441 def handle_close(self):
2442 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002443 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002444 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002445
2446 def handle_error(self):
2447 raise
2448
Trent Nelson78520002008-04-10 20:54:35 +00002449 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002450 self.certfile = certfile
2451 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2452 self.port = support.bind_port(sock, '')
2453 asyncore.dispatcher.__init__(self, sock)
2454 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002455
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002456 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002457 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002458 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2459 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002460
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002461 def handle_error(self):
2462 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002463
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002464 def __init__(self, certfile):
2465 self.flag = None
2466 self.active = False
2467 self.server = self.EchoServer(certfile)
2468 self.port = self.server.port
2469 threading.Thread.__init__(self)
2470 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002471
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002472 def __str__(self):
2473 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002474
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002475 def __enter__(self):
2476 self.start(threading.Event())
2477 self.flag.wait()
2478 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002479
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002480 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002481 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002482 sys.stdout.write(" cleanup: stopping server.\n")
2483 self.stop()
2484 if support.verbose:
2485 sys.stdout.write(" cleanup: joining server thread.\n")
2486 self.join()
2487 if support.verbose:
2488 sys.stdout.write(" cleanup: successfully joined.\n")
2489 # make sure that ConnectionHandler is removed from socket_map
2490 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002491
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002492 def start (self, flag=None):
2493 self.flag = flag
2494 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002495
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002496 def run(self):
2497 self.active = True
2498 if self.flag:
2499 self.flag.set()
2500 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002501 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002502 asyncore.loop(1)
2503 except:
2504 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002505
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002506 def stop(self):
2507 self.active = False
2508 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002509
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002510def server_params_test(client_context, server_context, indata=b"FOO\n",
2511 chatty=True, connectionchatty=False, sni_name=None,
2512 session=None):
2513 """
2514 Launch a server, connect a client to it and try various reads
2515 and writes.
2516 """
2517 stats = {}
2518 server = ThreadedEchoServer(context=server_context,
2519 chatty=chatty,
2520 connectionchatty=False)
2521 with server:
2522 with client_context.wrap_socket(socket.socket(),
2523 server_hostname=sni_name, session=session) as s:
2524 s.connect((HOST, server.port))
2525 for arg in [indata, bytearray(indata), memoryview(indata)]:
2526 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002527 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002528 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002529 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002530 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002531 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002532 if connectionchatty:
2533 if support.verbose:
2534 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002535 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002536 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002537 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2538 % (outdata[:20], len(outdata),
2539 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002540 s.write(b"over\n")
2541 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002542 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002543 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002544 stats.update({
2545 'compression': s.compression(),
2546 'cipher': s.cipher(),
2547 'peercert': s.getpeercert(),
2548 'client_alpn_protocol': s.selected_alpn_protocol(),
2549 'client_npn_protocol': s.selected_npn_protocol(),
2550 'version': s.version(),
2551 'session_reused': s.session_reused,
2552 'session': s.session,
2553 })
2554 s.close()
2555 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2556 stats['server_npn_protocols'] = server.selected_npn_protocols
2557 stats['server_shared_ciphers'] = server.shared_ciphers
2558 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002559
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002560def try_protocol_combo(server_protocol, client_protocol, expect_success,
2561 certsreqs=None, server_options=0, client_options=0):
2562 """
2563 Try to SSL-connect using *client_protocol* to *server_protocol*.
2564 If *expect_success* is true, assert that the connection succeeds,
2565 if it's false, assert that the connection fails.
2566 Also, if *expect_success* is a string, assert that it is the protocol
2567 version actually used by the connection.
2568 """
2569 if certsreqs is None:
2570 certsreqs = ssl.CERT_NONE
2571 certtype = {
2572 ssl.CERT_NONE: "CERT_NONE",
2573 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2574 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2575 }[certsreqs]
2576 if support.verbose:
2577 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2578 sys.stdout.write(formatstr %
2579 (ssl.get_protocol_name(client_protocol),
2580 ssl.get_protocol_name(server_protocol),
2581 certtype))
2582 client_context = ssl.SSLContext(client_protocol)
2583 client_context.options |= client_options
2584 server_context = ssl.SSLContext(server_protocol)
2585 server_context.options |= server_options
2586
2587 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2588 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2589 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002590 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002591 client_context.set_ciphers("ALL")
2592
2593 for ctx in (client_context, server_context):
2594 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002595 ctx.load_cert_chain(SIGNED_CERTFILE)
2596 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002597 try:
2598 stats = server_params_test(client_context, server_context,
2599 chatty=False, connectionchatty=False)
2600 # Protocol mismatch can result in either an SSLError, or a
2601 # "Connection reset by peer" error.
2602 except ssl.SSLError:
2603 if expect_success:
2604 raise
2605 except OSError as e:
2606 if expect_success or e.errno != errno.ECONNRESET:
2607 raise
2608 else:
2609 if not expect_success:
2610 raise AssertionError(
2611 "Client protocol %s succeeded with server protocol %s!"
2612 % (ssl.get_protocol_name(client_protocol),
2613 ssl.get_protocol_name(server_protocol)))
2614 elif (expect_success is not True
2615 and expect_success != stats['version']):
2616 raise AssertionError("version mismatch: expected %r, got %r"
2617 % (expect_success, stats['version']))
2618
2619
2620class ThreadedTests(unittest.TestCase):
2621
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002622 def test_echo(self):
2623 """Basic test of an SSL client connecting to a server"""
2624 if support.verbose:
2625 sys.stdout.write("\n")
2626 for protocol in PROTOCOLS:
2627 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2628 continue
2629 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2630 context = ssl.SSLContext(protocol)
2631 context.load_cert_chain(CERTFILE)
2632 server_params_test(context, context,
2633 chatty=True, connectionchatty=True)
2634
Christian Heimesa170fa12017-09-15 20:27:30 +02002635 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002636
2637 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2638 server_params_test(client_context=client_context,
2639 server_context=server_context,
2640 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002641 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002642
2643 client_context.check_hostname = False
2644 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2645 with self.assertRaises(ssl.SSLError) as e:
2646 server_params_test(client_context=server_context,
2647 server_context=client_context,
2648 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002649 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002650 self.assertIn('called a function you should not call',
2651 str(e.exception))
2652
2653 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2654 with self.assertRaises(ssl.SSLError) as e:
2655 server_params_test(client_context=server_context,
2656 server_context=server_context,
2657 chatty=True, connectionchatty=True)
2658 self.assertIn('called a function you should not call',
2659 str(e.exception))
2660
2661 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2662 with self.assertRaises(ssl.SSLError) as e:
2663 server_params_test(client_context=server_context,
2664 server_context=client_context,
2665 chatty=True, connectionchatty=True)
2666 self.assertIn('called a function you should not call',
2667 str(e.exception))
2668
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002669 def test_getpeercert(self):
2670 if support.verbose:
2671 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002672
2673 client_context, server_context, hostname = testing_context()
2674 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002675 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002676 with client_context.wrap_socket(socket.socket(),
2677 do_handshake_on_connect=False,
2678 server_hostname=hostname) as s:
2679 s.connect((HOST, server.port))
2680 # getpeercert() raise ValueError while the handshake isn't
2681 # done.
2682 with self.assertRaises(ValueError):
2683 s.getpeercert()
2684 s.do_handshake()
2685 cert = s.getpeercert()
2686 self.assertTrue(cert, "Can't get peer certificate.")
2687 cipher = s.cipher()
2688 if support.verbose:
2689 sys.stdout.write(pprint.pformat(cert) + '\n')
2690 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2691 if 'subject' not in cert:
2692 self.fail("No subject field in certificate: %s." %
2693 pprint.pformat(cert))
2694 if ((('organizationName', 'Python Software Foundation'),)
2695 not in cert['subject']):
2696 self.fail(
2697 "Missing or invalid 'organizationName' field in certificate subject; "
2698 "should be 'Python Software Foundation'.")
2699 self.assertIn('notBefore', cert)
2700 self.assertIn('notAfter', cert)
2701 before = ssl.cert_time_to_seconds(cert['notBefore'])
2702 after = ssl.cert_time_to_seconds(cert['notAfter'])
2703 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002704
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002705 @unittest.skipUnless(have_verify_flags(),
2706 "verify_flags need OpenSSL > 0.9.8")
2707 def test_crl_check(self):
2708 if support.verbose:
2709 sys.stdout.write("\n")
2710
Christian Heimesa170fa12017-09-15 20:27:30 +02002711 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002712
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002713 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002714 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002715
2716 # VERIFY_DEFAULT should pass
2717 server = ThreadedEchoServer(context=server_context, chatty=True)
2718 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002719 with client_context.wrap_socket(socket.socket(),
2720 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002721 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002722 cert = s.getpeercert()
2723 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002724
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002725 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002726 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002727
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002728 server = ThreadedEchoServer(context=server_context, chatty=True)
2729 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002730 with client_context.wrap_socket(socket.socket(),
2731 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002732 with self.assertRaisesRegex(ssl.SSLError,
2733 "certificate verify failed"):
2734 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002735
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002736 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002737 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002738
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002739 server = ThreadedEchoServer(context=server_context, chatty=True)
2740 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002741 with client_context.wrap_socket(socket.socket(),
2742 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002743 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002744 cert = s.getpeercert()
2745 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002746
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002747 def test_check_hostname(self):
2748 if support.verbose:
2749 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002750
Christian Heimesa170fa12017-09-15 20:27:30 +02002751 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002752
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002753 # correct hostname should verify
2754 server = ThreadedEchoServer(context=server_context, chatty=True)
2755 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002756 with client_context.wrap_socket(socket.socket(),
2757 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002758 s.connect((HOST, server.port))
2759 cert = s.getpeercert()
2760 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002761
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002762 # incorrect hostname should raise an exception
2763 server = ThreadedEchoServer(context=server_context, chatty=True)
2764 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002765 with client_context.wrap_socket(socket.socket(),
2766 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002767 with self.assertRaisesRegex(
2768 ssl.CertificateError,
2769 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002770 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002771
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002772 # missing server_hostname arg should cause an exception, too
2773 server = ThreadedEchoServer(context=server_context, chatty=True)
2774 with server:
2775 with socket.socket() as s:
2776 with self.assertRaisesRegex(ValueError,
2777 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002778 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002779
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002780 def test_ecc_cert(self):
2781 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2782 client_context.load_verify_locations(SIGNING_CA)
Christian Heimese8eb6cb2018-05-22 22:50:12 +02002783 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002784 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2785
2786 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2787 # load ECC cert
2788 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2789
2790 # correct hostname should verify
2791 server = ThreadedEchoServer(context=server_context, chatty=True)
2792 with server:
2793 with client_context.wrap_socket(socket.socket(),
2794 server_hostname=hostname) as s:
2795 s.connect((HOST, server.port))
2796 cert = s.getpeercert()
2797 self.assertTrue(cert, "Can't get peer certificate.")
2798 cipher = s.cipher()[0].split('-')
2799 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2800
2801 def test_dual_rsa_ecc(self):
2802 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2803 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002804 # TODO: fix TLSv1.3 once SSLContext can restrict signature
2805 # algorithms.
2806 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002807 # only ECDSA certs
2808 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2809 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2810
2811 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2812 # load ECC and RSA key/cert pairs
2813 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2814 server_context.load_cert_chain(SIGNED_CERTFILE)
2815
2816 # correct hostname should verify
2817 server = ThreadedEchoServer(context=server_context, chatty=True)
2818 with server:
2819 with client_context.wrap_socket(socket.socket(),
2820 server_hostname=hostname) as s:
2821 s.connect((HOST, server.port))
2822 cert = s.getpeercert()
2823 self.assertTrue(cert, "Can't get peer certificate.")
2824 cipher = s.cipher()[0].split('-')
2825 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2826
Christian Heimes66e57422018-01-29 14:25:13 +01002827 def test_check_hostname_idn(self):
2828 if support.verbose:
2829 sys.stdout.write("\n")
2830
Christian Heimes11a14932018-02-24 02:35:08 +01002831 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Christian Heimes66e57422018-01-29 14:25:13 +01002832 server_context.load_cert_chain(IDNSANSFILE)
2833
Christian Heimes11a14932018-02-24 02:35:08 +01002834 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes66e57422018-01-29 14:25:13 +01002835 context.verify_mode = ssl.CERT_REQUIRED
2836 context.check_hostname = True
2837 context.load_verify_locations(SIGNING_CA)
2838
2839 # correct hostname should verify, when specified in several
2840 # different ways
2841 idn_hostnames = [
2842 ('könig.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002843 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002844 ('xn--knig-5qa.idn.pythontest.net',
2845 'xn--knig-5qa.idn.pythontest.net'),
2846 (b'xn--knig-5qa.idn.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002847 'xn--knig-5qa.idn.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002848
2849 ('königsgäßchen.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002850 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
Christian Heimes66e57422018-01-29 14:25:13 +01002851 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2852 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2853 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
Christian Heimes11a14932018-02-24 02:35:08 +01002854 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2855
2856 # ('königsgäßchen.idna2008.pythontest.net',
2857 # 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2858 ('xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2859 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2860 (b'xn--knigsgchen-b4a3dun.idna2008.pythontest.net',
2861 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'),
2862
Christian Heimes66e57422018-01-29 14:25:13 +01002863 ]
2864 for server_hostname, expected_hostname in idn_hostnames:
2865 server = ThreadedEchoServer(context=server_context, chatty=True)
2866 with server:
2867 with context.wrap_socket(socket.socket(),
2868 server_hostname=server_hostname) as s:
2869 self.assertEqual(s.server_hostname, expected_hostname)
2870 s.connect((HOST, server.port))
2871 cert = s.getpeercert()
2872 self.assertEqual(s.server_hostname, expected_hostname)
2873 self.assertTrue(cert, "Can't get peer certificate.")
2874
Christian Heimes66e57422018-01-29 14:25:13 +01002875 # incorrect hostname should raise an exception
2876 server = ThreadedEchoServer(context=server_context, chatty=True)
2877 with server:
2878 with context.wrap_socket(socket.socket(),
2879 server_hostname="python.example.org") as s:
2880 with self.assertRaises(ssl.CertificateError):
2881 s.connect((HOST, server.port))
2882
Christian Heimes529525f2018-05-23 22:24:45 +02002883 def test_wrong_cert_tls12(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002884 """Connecting when the server rejects the client's certificate
2885
2886 Launch a server with CERT_REQUIRED, and check that trying to
2887 connect to it with a wrong client certificate fails.
2888 """
Christian Heimes05d9fe32018-02-27 08:55:39 +01002889 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002890 # load client cert that is not signed by trusted CA
2891 client_context.load_cert_chain(CERTFILE)
Christian Heimes05d9fe32018-02-27 08:55:39 +01002892 # require TLS client authentication
2893 server_context.verify_mode = ssl.CERT_REQUIRED
Christian Heimes529525f2018-05-23 22:24:45 +02002894 # TLS 1.3 has different handshake
2895 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
Christian Heimes05d9fe32018-02-27 08:55:39 +01002896
2897 server = ThreadedEchoServer(
2898 context=server_context, chatty=True, connectionchatty=True,
2899 )
2900
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002901 with server, \
Christian Heimes05d9fe32018-02-27 08:55:39 +01002902 client_context.wrap_socket(socket.socket(),
2903 server_hostname=hostname) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002904 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002905 # Expect either an SSL error about the server rejecting
2906 # the connection, or a low-level connection reset (which
2907 # sometimes happens on Windows)
2908 s.connect((HOST, server.port))
2909 except ssl.SSLError as e:
2910 if support.verbose:
2911 sys.stdout.write("\nSSLError is %r\n" % e)
2912 except OSError as e:
2913 if e.errno != errno.ECONNRESET:
2914 raise
2915 if support.verbose:
2916 sys.stdout.write("\nsocket.error is %r\n" % e)
2917 else:
2918 self.fail("Use of invalid cert should have failed!")
2919
Christian Heimes529525f2018-05-23 22:24:45 +02002920 @unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
2921 def test_wrong_cert_tls13(self):
2922 client_context, server_context, hostname = testing_context()
Christian Heimes88bfd0b2018-08-14 12:54:19 +02002923 # load client cert that is not signed by trusted CA
2924 client_context.load_cert_chain(CERTFILE)
Christian Heimes529525f2018-05-23 22:24:45 +02002925 server_context.verify_mode = ssl.CERT_REQUIRED
2926 server_context.minimum_version = ssl.TLSVersion.TLSv1_3
2927 client_context.minimum_version = ssl.TLSVersion.TLSv1_3
2928
2929 server = ThreadedEchoServer(
2930 context=server_context, chatty=True, connectionchatty=True,
2931 )
2932 with server, \
2933 client_context.wrap_socket(socket.socket(),
2934 server_hostname=hostname) as s:
2935 # TLS 1.3 perform client cert exchange after handshake
2936 s.connect((HOST, server.port))
2937 try:
2938 s.write(b'data')
2939 s.read(4)
2940 except ssl.SSLError as e:
2941 if support.verbose:
2942 sys.stdout.write("\nSSLError is %r\n" % e)
2943 except OSError as e:
2944 if e.errno != errno.ECONNRESET:
2945 raise
2946 if support.verbose:
2947 sys.stdout.write("\nsocket.error is %r\n" % e)
2948 else:
2949 self.fail("Use of invalid cert should have failed!")
2950
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002951 def test_rude_shutdown(self):
2952 """A brutal shutdown of an SSL server should raise an OSError
2953 in the client when attempting handshake.
2954 """
2955 listener_ready = threading.Event()
2956 listener_gone = threading.Event()
2957
2958 s = socket.socket()
2959 port = support.bind_port(s, HOST)
2960
2961 # `listener` runs in a thread. It sits in an accept() until
2962 # the main thread connects. Then it rudely closes the socket,
2963 # and sets Event `listener_gone` to let the main thread know
2964 # the socket is gone.
2965 def listener():
2966 s.listen()
2967 listener_ready.set()
2968 newsock, addr = s.accept()
2969 newsock.close()
2970 s.close()
2971 listener_gone.set()
2972
2973 def connector():
2974 listener_ready.wait()
2975 with socket.socket() as c:
2976 c.connect((HOST, port))
2977 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002978 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002979 ssl_sock = test_wrap_socket(c)
2980 except OSError:
2981 pass
2982 else:
2983 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002984
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002985 t = threading.Thread(target=listener)
2986 t.start()
2987 try:
2988 connector()
2989 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002990 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002991
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002992 def test_ssl_cert_verify_error(self):
2993 if support.verbose:
2994 sys.stdout.write("\n")
2995
2996 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2997 server_context.load_cert_chain(SIGNED_CERTFILE)
2998
2999 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3000
3001 server = ThreadedEchoServer(context=server_context, chatty=True)
3002 with server:
3003 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02003004 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07003005 try:
3006 s.connect((HOST, server.port))
3007 except ssl.SSLError as e:
3008 msg = 'unable to get local issuer certificate'
3009 self.assertIsInstance(e, ssl.SSLCertVerificationError)
3010 self.assertEqual(e.verify_code, 20)
3011 self.assertEqual(e.verify_message, msg)
3012 self.assertIn(msg, repr(e))
3013 self.assertIn('certificate verify failed', repr(e))
3014
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003015 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
3016 "OpenSSL is compiled without SSLv2 support")
3017 def test_protocol_sslv2(self):
3018 """Connecting to an SSLv2 server with various client options"""
3019 if support.verbose:
3020 sys.stdout.write("\n")
3021 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
3022 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
3023 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02003024 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003025 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3026 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
3027 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
3028 # SSLv23 client with specific SSL options
3029 if no_sslv2_implies_sslv3_hello():
3030 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003031 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003032 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003033 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003034 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003035 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003036 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02003037
Christian Heimesa170fa12017-09-15 20:27:30 +02003038 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003039 """Connecting to an SSLv23 server with various client options"""
3040 if support.verbose:
3041 sys.stdout.write("\n")
3042 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003043 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02003044 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003045 except OSError as x:
3046 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
3047 if support.verbose:
3048 sys.stdout.write(
3049 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
3050 % str(x))
3051 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003052 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
3053 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
3054 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003055
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003056 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003057 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
3058 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
3059 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003060
3061 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003062 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
3063 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
3064 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003065
3066 # Server with specific SSL options
3067 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02003068 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003069 server_options=ssl.OP_NO_SSLv3)
3070 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02003071 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003072 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02003073 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003074 server_options=ssl.OP_NO_TLSv1)
3075
3076
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003077 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
3078 "OpenSSL is compiled without SSLv3 support")
3079 def test_protocol_sslv3(self):
3080 """Connecting to an SSLv3 server with various client options"""
3081 if support.verbose:
3082 sys.stdout.write("\n")
3083 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
3084 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
3085 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
3086 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3087 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003088 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003089 client_options=ssl.OP_NO_SSLv3)
3090 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
3091 if no_sslv2_implies_sslv3_hello():
3092 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02003093 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003094 False, client_options=ssl.OP_NO_SSLv2)
3095
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003096 def test_protocol_tlsv1(self):
3097 """Connecting to a TLSv1 server with various client options"""
3098 if support.verbose:
3099 sys.stdout.write("\n")
3100 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
3101 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
3102 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
3103 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3104 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
3105 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3106 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003107 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003108 client_options=ssl.OP_NO_TLSv1)
3109
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003110 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
3111 "TLS version 1.1 not supported.")
3112 def test_protocol_tlsv1_1(self):
3113 """Connecting to a TLSv1.1 server with various client options.
3114 Testing against older TLS versions."""
3115 if support.verbose:
3116 sys.stdout.write("\n")
3117 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
3118 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3119 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
3120 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3121 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003122 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003123 client_options=ssl.OP_NO_TLSv1_1)
3124
Christian Heimesa170fa12017-09-15 20:27:30 +02003125 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003126 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
3127 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
3128
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003129 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
3130 "TLS version 1.2 not supported.")
3131 def test_protocol_tlsv1_2(self):
3132 """Connecting to a TLSv1.2 server with various client options.
3133 Testing against older TLS versions."""
3134 if support.verbose:
3135 sys.stdout.write("\n")
3136 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
3137 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
3138 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
3139 if hasattr(ssl, 'PROTOCOL_SSLv2'):
3140 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
3141 if hasattr(ssl, 'PROTOCOL_SSLv3'):
3142 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02003143 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003144 client_options=ssl.OP_NO_TLSv1_2)
3145
Christian Heimesa170fa12017-09-15 20:27:30 +02003146 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003147 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
3148 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
3149 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
3150 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
3151
3152 def test_starttls(self):
3153 """Switching from clear text to encrypted and back again."""
3154 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
3155
3156 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003157 starttls_server=True,
3158 chatty=True,
3159 connectionchatty=True)
3160 wrapped = False
3161 with server:
3162 s = socket.socket()
3163 s.setblocking(1)
3164 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003165 if support.verbose:
3166 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003167 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02003168 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003169 sys.stdout.write(
3170 " client: sending %r...\n" % indata)
3171 if wrapped:
3172 conn.write(indata)
3173 outdata = conn.read()
3174 else:
3175 s.send(indata)
3176 outdata = s.recv(1024)
3177 msg = outdata.strip().lower()
3178 if indata == b"STARTTLS" and msg.startswith(b"ok"):
3179 # STARTTLS ok, switch to secure mode
3180 if support.verbose:
3181 sys.stdout.write(
3182 " client: read %r from server, starting TLS...\n"
3183 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02003184 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003185 wrapped = True
3186 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
3187 # ENDTLS ok, switch back to clear text
3188 if support.verbose:
3189 sys.stdout.write(
3190 " client: read %r from server, ending TLS...\n"
3191 % msg)
3192 s = conn.unwrap()
3193 wrapped = False
3194 else:
3195 if support.verbose:
3196 sys.stdout.write(
3197 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003198 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003199 sys.stdout.write(" client: closing connection.\n")
3200 if wrapped:
3201 conn.write(b"over\n")
3202 else:
3203 s.send(b"over\n")
3204 if wrapped:
3205 conn.close()
3206 else:
3207 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003208
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003209 def test_socketserver(self):
3210 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003211 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003212 # try to connect
3213 if support.verbose:
3214 sys.stdout.write('\n')
3215 with open(CERTFILE, 'rb') as f:
3216 d1 = f.read()
3217 d2 = ''
3218 # now fetch the same data from the HTTPS server
3219 url = 'https://localhost:%d/%s' % (
3220 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003221 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003222 f = urllib.request.urlopen(url, context=context)
3223 try:
3224 dlen = f.info().get("content-length")
3225 if dlen and (int(dlen) > 0):
3226 d2 = f.read(int(dlen))
3227 if support.verbose:
3228 sys.stdout.write(
3229 " client: read %d bytes from remote server '%s'\n"
3230 % (len(d2), server))
3231 finally:
3232 f.close()
3233 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003234
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003235 def test_asyncore_server(self):
3236 """Check the example asyncore integration."""
3237 if support.verbose:
3238 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003239
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003240 indata = b"FOO\n"
3241 server = AsyncoreEchoServer(CERTFILE)
3242 with server:
3243 s = test_wrap_socket(socket.socket())
3244 s.connect(('127.0.0.1', server.port))
3245 if support.verbose:
3246 sys.stdout.write(
3247 " client: sending %r...\n" % indata)
3248 s.write(indata)
3249 outdata = s.read()
3250 if support.verbose:
3251 sys.stdout.write(" client: read %r\n" % outdata)
3252 if outdata != indata.lower():
3253 self.fail(
3254 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3255 % (outdata[:20], len(outdata),
3256 indata[:20].lower(), len(indata)))
3257 s.write(b"over\n")
3258 if support.verbose:
3259 sys.stdout.write(" client: closing connection.\n")
3260 s.close()
3261 if support.verbose:
3262 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003263
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003264 def test_recv_send(self):
3265 """Test recv(), send() and friends."""
3266 if support.verbose:
3267 sys.stdout.write("\n")
3268
3269 server = ThreadedEchoServer(CERTFILE,
3270 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003271 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003272 cacerts=CERTFILE,
3273 chatty=True,
3274 connectionchatty=False)
3275 with server:
3276 s = test_wrap_socket(socket.socket(),
3277 server_side=False,
3278 certfile=CERTFILE,
3279 ca_certs=CERTFILE,
3280 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003281 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003282 s.connect((HOST, server.port))
3283 # helper methods for standardising recv* method signatures
3284 def _recv_into():
3285 b = bytearray(b"\0"*100)
3286 count = s.recv_into(b)
3287 return b[:count]
3288
3289 def _recvfrom_into():
3290 b = bytearray(b"\0"*100)
3291 count, addr = s.recvfrom_into(b)
3292 return b[:count]
3293
3294 # (name, method, expect success?, *args, return value func)
3295 send_methods = [
3296 ('send', s.send, True, [], len),
3297 ('sendto', s.sendto, False, ["some.address"], len),
3298 ('sendall', s.sendall, True, [], lambda x: None),
3299 ]
3300 # (name, method, whether to expect success, *args)
3301 recv_methods = [
3302 ('recv', s.recv, True, []),
3303 ('recvfrom', s.recvfrom, False, ["some.address"]),
3304 ('recv_into', _recv_into, True, []),
3305 ('recvfrom_into', _recvfrom_into, False, []),
3306 ]
3307 data_prefix = "PREFIX_"
3308
3309 for (meth_name, send_meth, expect_success, args,
3310 ret_val_meth) in send_methods:
3311 indata = (data_prefix + meth_name).encode('ascii')
3312 try:
3313 ret = send_meth(indata, *args)
3314 msg = "sending with {}".format(meth_name)
3315 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3316 outdata = s.read()
3317 if outdata != indata.lower():
3318 self.fail(
3319 "While sending with <<{name:s}>> bad data "
3320 "<<{outdata:r}>> ({nout:d}) received; "
3321 "expected <<{indata:r}>> ({nin:d})\n".format(
3322 name=meth_name, outdata=outdata[:20],
3323 nout=len(outdata),
3324 indata=indata[:20], nin=len(indata)
3325 )
3326 )
3327 except ValueError as e:
3328 if expect_success:
3329 self.fail(
3330 "Failed to send with method <<{name:s}>>; "
3331 "expected to succeed.\n".format(name=meth_name)
3332 )
3333 if not str(e).startswith(meth_name):
3334 self.fail(
3335 "Method <<{name:s}>> failed with unexpected "
3336 "exception message: {exp:s}\n".format(
3337 name=meth_name, exp=e
3338 )
3339 )
3340
3341 for meth_name, recv_meth, expect_success, args in recv_methods:
3342 indata = (data_prefix + meth_name).encode('ascii')
3343 try:
3344 s.send(indata)
3345 outdata = recv_meth(*args)
3346 if outdata != indata.lower():
3347 self.fail(
3348 "While receiving with <<{name:s}>> bad data "
3349 "<<{outdata:r}>> ({nout:d}) received; "
3350 "expected <<{indata:r}>> ({nin:d})\n".format(
3351 name=meth_name, outdata=outdata[:20],
3352 nout=len(outdata),
3353 indata=indata[:20], nin=len(indata)
3354 )
3355 )
3356 except ValueError as e:
3357 if expect_success:
3358 self.fail(
3359 "Failed to receive with method <<{name:s}>>; "
3360 "expected to succeed.\n".format(name=meth_name)
3361 )
3362 if not str(e).startswith(meth_name):
3363 self.fail(
3364 "Method <<{name:s}>> failed with unexpected "
3365 "exception message: {exp:s}\n".format(
3366 name=meth_name, exp=e
3367 )
3368 )
3369 # consume data
3370 s.read()
3371
3372 # read(-1, buffer) is supported, even though read(-1) is not
3373 data = b"data"
3374 s.send(data)
3375 buffer = bytearray(len(data))
3376 self.assertEqual(s.read(-1, buffer), len(data))
3377 self.assertEqual(buffer, data)
3378
Christian Heimes888bbdc2017-09-07 14:18:21 -07003379 # sendall accepts bytes-like objects
3380 if ctypes is not None:
3381 ubyte = ctypes.c_ubyte * len(data)
3382 byteslike = ubyte.from_buffer_copy(data)
3383 s.sendall(byteslike)
3384 self.assertEqual(s.read(), data)
3385
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003386 # Make sure sendmsg et al are disallowed to avoid
3387 # inadvertent disclosure of data and/or corruption
3388 # of the encrypted data stream
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003389 self.assertRaises(NotImplementedError, s.dup)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003390 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3391 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3392 self.assertRaises(NotImplementedError,
Serhiy Storchaka42b1d612018-12-06 22:36:55 +02003393 s.recvmsg_into, [bytearray(100)])
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003394 s.write(b"over\n")
3395
3396 self.assertRaises(ValueError, s.recv, -1)
3397 self.assertRaises(ValueError, s.read, -1)
3398
3399 s.close()
3400
3401 def test_recv_zero(self):
3402 server = ThreadedEchoServer(CERTFILE)
3403 server.__enter__()
3404 self.addCleanup(server.__exit__, None, None)
3405 s = socket.create_connection((HOST, server.port))
3406 self.addCleanup(s.close)
3407 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3408 self.addCleanup(s.close)
3409
3410 # recv/read(0) should return no data
3411 s.send(b"data")
3412 self.assertEqual(s.recv(0), b"")
3413 self.assertEqual(s.read(0), b"")
3414 self.assertEqual(s.read(), b"data")
3415
3416 # Should not block if the other end sends no data
3417 s.setblocking(False)
3418 self.assertEqual(s.recv(0), b"")
3419 self.assertEqual(s.recv_into(bytearray()), 0)
3420
3421 def test_nonblocking_send(self):
3422 server = ThreadedEchoServer(CERTFILE,
3423 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003424 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003425 cacerts=CERTFILE,
3426 chatty=True,
3427 connectionchatty=False)
3428 with server:
3429 s = test_wrap_socket(socket.socket(),
3430 server_side=False,
3431 certfile=CERTFILE,
3432 ca_certs=CERTFILE,
3433 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003434 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003435 s.connect((HOST, server.port))
3436 s.setblocking(False)
3437
3438 # If we keep sending data, at some point the buffers
3439 # will be full and the call will block
3440 buf = bytearray(8192)
3441 def fill_buffer():
3442 while True:
3443 s.send(buf)
3444 self.assertRaises((ssl.SSLWantWriteError,
3445 ssl.SSLWantReadError), fill_buffer)
3446
3447 # Now read all the output and discard it
3448 s.setblocking(True)
3449 s.close()
3450
3451 def test_handshake_timeout(self):
3452 # Issue #5103: SSL handshake must respect the socket timeout
3453 server = socket.socket(socket.AF_INET)
3454 host = "127.0.0.1"
3455 port = support.bind_port(server)
3456 started = threading.Event()
3457 finish = False
3458
3459 def serve():
3460 server.listen()
3461 started.set()
3462 conns = []
3463 while not finish:
3464 r, w, e = select.select([server], [], [], 0.1)
3465 if server in r:
3466 # Let the socket hang around rather than having
3467 # it closed by garbage collection.
3468 conns.append(server.accept()[0])
3469 for sock in conns:
3470 sock.close()
3471
3472 t = threading.Thread(target=serve)
3473 t.start()
3474 started.wait()
3475
3476 try:
3477 try:
3478 c = socket.socket(socket.AF_INET)
3479 c.settimeout(0.2)
3480 c.connect((host, port))
3481 # Will attempt handshake and time out
3482 self.assertRaisesRegex(socket.timeout, "timed out",
3483 test_wrap_socket, c)
3484 finally:
3485 c.close()
3486 try:
3487 c = socket.socket(socket.AF_INET)
3488 c = test_wrap_socket(c)
3489 c.settimeout(0.2)
3490 # Will attempt handshake and time out
3491 self.assertRaisesRegex(socket.timeout, "timed out",
3492 c.connect, (host, port))
3493 finally:
3494 c.close()
3495 finally:
3496 finish = True
3497 t.join()
3498 server.close()
3499
3500 def test_server_accept(self):
3501 # Issue #16357: accept() on a SSLSocket created through
3502 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003503 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003504 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003505 context.load_verify_locations(SIGNING_CA)
3506 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003507 server = socket.socket(socket.AF_INET)
3508 host = "127.0.0.1"
3509 port = support.bind_port(server)
3510 server = context.wrap_socket(server, server_side=True)
3511 self.assertTrue(server.server_side)
3512
3513 evt = threading.Event()
3514 remote = None
3515 peer = None
3516 def serve():
3517 nonlocal remote, peer
3518 server.listen()
3519 # Block on the accept and wait on the connection to close.
3520 evt.set()
3521 remote, peer = server.accept()
Christian Heimes529525f2018-05-23 22:24:45 +02003522 remote.send(remote.recv(4))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003523
3524 t = threading.Thread(target=serve)
3525 t.start()
3526 # Client wait until server setup and perform a connect.
3527 evt.wait()
3528 client = context.wrap_socket(socket.socket())
3529 client.connect((host, port))
Christian Heimes529525f2018-05-23 22:24:45 +02003530 client.send(b'data')
3531 client.recv()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003532 client_addr = client.getsockname()
3533 client.close()
3534 t.join()
3535 remote.close()
3536 server.close()
3537 # Sanity checks.
3538 self.assertIsInstance(remote, ssl.SSLSocket)
3539 self.assertEqual(peer, client_addr)
3540
3541 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003542 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003543 with context.wrap_socket(socket.socket()) as sock:
3544 with self.assertRaises(OSError) as cm:
3545 sock.getpeercert()
3546 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3547
3548 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003549 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003550 with context.wrap_socket(socket.socket()) as sock:
3551 with self.assertRaises(OSError) as cm:
3552 sock.do_handshake()
3553 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3554
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003555 def test_no_shared_ciphers(self):
3556 client_context, server_context, hostname = testing_context()
3557 # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
3558 client_context.options |= ssl.OP_NO_TLSv1_3
Victor Stinner5e922652018-09-07 17:30:33 +02003559 # Force different suites on client and server
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003560 client_context.set_ciphers("AES128")
3561 server_context.set_ciphers("AES256")
3562 with ThreadedEchoServer(context=server_context) as server:
3563 with client_context.wrap_socket(socket.socket(),
3564 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003565 with self.assertRaises(OSError):
3566 s.connect((HOST, server.port))
3567 self.assertIn("no shared cipher", server.conn_errors[0])
3568
3569 def test_version_basic(self):
3570 """
3571 Basic tests for SSLSocket.version().
3572 More tests are done in the test_protocol_*() methods.
3573 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003574 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3575 context.check_hostname = False
3576 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003577 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003578 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003579 chatty=False) as server:
3580 with context.wrap_socket(socket.socket()) as s:
3581 self.assertIs(s.version(), None)
Christian Heimes141c5e82018-02-24 21:10:57 +01003582 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003583 s.connect((HOST, server.port))
Christian Heimes529525f2018-05-23 22:24:45 +02003584 if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003585 self.assertEqual(s.version(), 'TLSv1.3')
3586 elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
Christian Heimesa170fa12017-09-15 20:27:30 +02003587 self.assertEqual(s.version(), 'TLSv1.2')
3588 else: # 0.9.8 to 1.0.1
3589 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Christian Heimes141c5e82018-02-24 21:10:57 +01003590 self.assertIs(s._sslobj, None)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003591 self.assertIs(s.version(), None)
3592
Christian Heimescb5b68a2017-09-07 18:07:00 -07003593 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3594 "test requires TLSv1.3 enabled OpenSSL")
3595 def test_tls1_3(self):
3596 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3597 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003598 context.options |= (
3599 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3600 )
3601 with ThreadedEchoServer(context=context) as server:
3602 with context.wrap_socket(socket.socket()) as s:
3603 s.connect((HOST, server.port))
Christian Heimes05d9fe32018-02-27 08:55:39 +01003604 self.assertIn(s.cipher()[0], {
Christian Heimese8eb6cb2018-05-22 22:50:12 +02003605 'TLS_AES_256_GCM_SHA384',
3606 'TLS_CHACHA20_POLY1305_SHA256',
3607 'TLS_AES_128_GCM_SHA256',
Christian Heimes05d9fe32018-02-27 08:55:39 +01003608 })
3609 self.assertEqual(s.version(), 'TLSv1.3')
Christian Heimescb5b68a2017-09-07 18:07:00 -07003610
Christian Heimes698dde12018-02-27 11:54:43 +01003611 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3612 "required OpenSSL 1.1.0g")
3613 def test_min_max_version(self):
3614 client_context, server_context, hostname = testing_context()
3615 # client TLSv1.0 to 1.2
3616 client_context.minimum_version = ssl.TLSVersion.TLSv1
3617 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
3618 # server only TLSv1.2
3619 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3620 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3621
3622 with ThreadedEchoServer(context=server_context) as server:
3623 with client_context.wrap_socket(socket.socket(),
3624 server_hostname=hostname) as s:
3625 s.connect((HOST, server.port))
3626 self.assertEqual(s.version(), 'TLSv1.2')
3627
3628 # client 1.0 to 1.2, server 1.0 to 1.1
3629 server_context.minimum_version = ssl.TLSVersion.TLSv1
3630 server_context.maximum_version = ssl.TLSVersion.TLSv1_1
3631
3632 with ThreadedEchoServer(context=server_context) as server:
3633 with client_context.wrap_socket(socket.socket(),
3634 server_hostname=hostname) as s:
3635 s.connect((HOST, server.port))
3636 self.assertEqual(s.version(), 'TLSv1.1')
3637
3638 # client 1.0, server 1.2 (mismatch)
3639 server_context.minimum_version = ssl.TLSVersion.TLSv1_2
3640 server_context.maximum_version = ssl.TLSVersion.TLSv1_2
3641 client_context.minimum_version = ssl.TLSVersion.TLSv1
3642 client_context.maximum_version = ssl.TLSVersion.TLSv1
3643 with ThreadedEchoServer(context=server_context) as server:
3644 with client_context.wrap_socket(socket.socket(),
3645 server_hostname=hostname) as s:
3646 with self.assertRaises(ssl.SSLError) as e:
3647 s.connect((HOST, server.port))
3648 self.assertIn("alert", str(e.exception))
3649
3650
3651 @unittest.skipUnless(hasattr(ssl.SSLContext, 'minimum_version'),
3652 "required OpenSSL 1.1.0g")
3653 @unittest.skipUnless(ssl.HAS_SSLv3, "requires SSLv3 support")
3654 def test_min_max_version_sslv3(self):
3655 client_context, server_context, hostname = testing_context()
3656 server_context.minimum_version = ssl.TLSVersion.SSLv3
3657 client_context.minimum_version = ssl.TLSVersion.SSLv3
3658 client_context.maximum_version = ssl.TLSVersion.SSLv3
3659 with ThreadedEchoServer(context=server_context) as server:
3660 with client_context.wrap_socket(socket.socket(),
3661 server_hostname=hostname) as s:
3662 s.connect((HOST, server.port))
3663 self.assertEqual(s.version(), 'SSLv3')
3664
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003665 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3666 def test_default_ecdh_curve(self):
3667 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3668 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003669 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003670 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003671 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3672 # cipher name.
3673 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003674 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3675 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3676 # our default cipher list should prefer ECDH-based ciphers
3677 # automatically.
3678 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3679 context.set_ciphers("ECCdraft:ECDH")
3680 with ThreadedEchoServer(context=context) as server:
3681 with context.wrap_socket(socket.socket()) as s:
3682 s.connect((HOST, server.port))
3683 self.assertIn("ECDH", s.cipher()[0])
3684
3685 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3686 "'tls-unique' channel binding not available")
3687 def test_tls_unique_channel_binding(self):
3688 """Test tls-unique channel binding."""
3689 if support.verbose:
3690 sys.stdout.write("\n")
3691
Christian Heimes05d9fe32018-02-27 08:55:39 +01003692 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003693
3694 server = ThreadedEchoServer(context=server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003695 chatty=True,
3696 connectionchatty=False)
Christian Heimes05d9fe32018-02-27 08:55:39 +01003697
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003698 with server:
Christian Heimes05d9fe32018-02-27 08:55:39 +01003699 with client_context.wrap_socket(
3700 socket.socket(),
3701 server_hostname=hostname) as s:
3702 s.connect((HOST, server.port))
3703 # get the data
3704 cb_data = s.get_channel_binding("tls-unique")
3705 if support.verbose:
3706 sys.stdout.write(
3707 " got channel binding data: {0!r}\n".format(cb_data))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003708
Christian Heimes05d9fe32018-02-27 08:55:39 +01003709 # check if it is sane
3710 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003711 if s.version() == 'TLSv1.3':
3712 self.assertEqual(len(cb_data), 48)
3713 else:
3714 self.assertEqual(len(cb_data), 12) # True for TLSv1
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003715
Christian Heimes05d9fe32018-02-27 08:55:39 +01003716 # and compare with the peers version
3717 s.write(b"CB tls-unique\n")
3718 peer_data_repr = s.read().strip()
3719 self.assertEqual(peer_data_repr,
3720 repr(cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003721
3722 # now, again
Christian Heimes05d9fe32018-02-27 08:55:39 +01003723 with client_context.wrap_socket(
3724 socket.socket(),
3725 server_hostname=hostname) as s:
3726 s.connect((HOST, server.port))
3727 new_cb_data = s.get_channel_binding("tls-unique")
3728 if support.verbose:
3729 sys.stdout.write(
3730 "got another channel binding data: {0!r}\n".format(
3731 new_cb_data)
3732 )
3733 # is it really unique
3734 self.assertNotEqual(cb_data, new_cb_data)
3735 self.assertIsNotNone(cb_data)
Christian Heimes529525f2018-05-23 22:24:45 +02003736 if s.version() == 'TLSv1.3':
3737 self.assertEqual(len(cb_data), 48)
3738 else:
3739 self.assertEqual(len(cb_data), 12) # True for TLSv1
Christian Heimes05d9fe32018-02-27 08:55:39 +01003740 s.write(b"CB tls-unique\n")
3741 peer_data_repr = s.read().strip()
3742 self.assertEqual(peer_data_repr,
3743 repr(new_cb_data).encode("us-ascii"))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003744
3745 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003746 client_context, server_context, hostname = testing_context()
3747 stats = server_params_test(client_context, server_context,
3748 chatty=True, connectionchatty=True,
3749 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003750 if support.verbose:
3751 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3752 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3753
3754 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3755 "ssl.OP_NO_COMPRESSION needed for this test")
3756 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003757 client_context, server_context, hostname = testing_context()
3758 client_context.options |= ssl.OP_NO_COMPRESSION
3759 server_context.options |= ssl.OP_NO_COMPRESSION
3760 stats = server_params_test(client_context, server_context,
3761 chatty=True, connectionchatty=True,
3762 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003763 self.assertIs(stats['compression'], None)
3764
3765 def test_dh_params(self):
3766 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003767 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003768 # test scenario needs TLS <= 1.2
3769 client_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003770 server_context.load_dh_params(DHFILE)
3771 server_context.set_ciphers("kEDH")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003772 server_context.options |= ssl.OP_NO_TLSv1_3
Christian Heimesa170fa12017-09-15 20:27:30 +02003773 stats = server_params_test(client_context, server_context,
3774 chatty=True, connectionchatty=True,
3775 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003776 cipher = stats["cipher"][0]
3777 parts = cipher.split("-")
3778 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3779 self.fail("Non-DH cipher: " + cipher[0])
3780
Christian Heimesb7b92252018-02-25 09:49:31 +01003781 @unittest.skipUnless(HAVE_SECP_CURVES, "needs secp384r1 curve support")
Christian Heimes05d9fe32018-02-27 08:55:39 +01003782 @unittest.skipIf(IS_OPENSSL_1_1_1, "TODO: Test doesn't work on 1.1.1")
Christian Heimesb7b92252018-02-25 09:49:31 +01003783 def test_ecdh_curve(self):
3784 # server secp384r1, client auto
3785 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01003786
Christian Heimesb7b92252018-02-25 09:49:31 +01003787 server_context.set_ecdh_curve("secp384r1")
3788 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3789 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3790 stats = server_params_test(client_context, server_context,
3791 chatty=True, connectionchatty=True,
3792 sni_name=hostname)
3793
3794 # server auto, client secp384r1
3795 client_context, server_context, hostname = testing_context()
3796 client_context.set_ecdh_curve("secp384r1")
3797 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3798 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3799 stats = server_params_test(client_context, server_context,
3800 chatty=True, connectionchatty=True,
3801 sni_name=hostname)
3802
3803 # server / client curve mismatch
3804 client_context, server_context, hostname = testing_context()
3805 client_context.set_ecdh_curve("prime256v1")
3806 server_context.set_ecdh_curve("secp384r1")
3807 server_context.set_ciphers("ECDHE:!eNULL:!aNULL")
3808 server_context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
3809 try:
3810 stats = server_params_test(client_context, server_context,
3811 chatty=True, connectionchatty=True,
3812 sni_name=hostname)
3813 except ssl.SSLError:
3814 pass
3815 else:
3816 # OpenSSL 1.0.2 does not fail although it should.
Christian Heimes05d9fe32018-02-27 08:55:39 +01003817 if IS_OPENSSL_1_1_0:
Christian Heimesb7b92252018-02-25 09:49:31 +01003818 self.fail("mismatch curve did not fail")
3819
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003820 def test_selected_alpn_protocol(self):
3821 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003822 client_context, server_context, hostname = testing_context()
3823 stats = server_params_test(client_context, server_context,
3824 chatty=True, connectionchatty=True,
3825 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003826 self.assertIs(stats['client_alpn_protocol'], None)
3827
3828 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3829 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3830 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003831 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003832 server_context.set_alpn_protocols(['foo', 'bar'])
3833 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003834 chatty=True, connectionchatty=True,
3835 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003836 self.assertIs(stats['client_alpn_protocol'], None)
3837
3838 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3839 def test_alpn_protocols(self):
3840 server_protocols = ['foo', 'bar', 'milkshake']
3841 protocol_tests = [
3842 (['foo', 'bar'], 'foo'),
3843 (['bar', 'foo'], 'foo'),
3844 (['milkshake'], 'milkshake'),
3845 (['http/3.0', 'http/4.0'], None)
3846 ]
3847 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003848 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003849 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003850 client_context.set_alpn_protocols(client_protocols)
3851
3852 try:
3853 stats = server_params_test(client_context,
3854 server_context,
3855 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003856 connectionchatty=True,
3857 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003858 except ssl.SSLError as e:
3859 stats = e
3860
Christian Heimes05d9fe32018-02-27 08:55:39 +01003861 if (expected is None and IS_OPENSSL_1_1_0
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003862 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3863 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3864 self.assertIsInstance(stats, ssl.SSLError)
3865 else:
3866 msg = "failed trying %s (s) and %s (c).\n" \
3867 "was expecting %s, but got %%s from the %%s" \
3868 % (str(server_protocols), str(client_protocols),
3869 str(expected))
3870 client_result = stats['client_alpn_protocol']
3871 self.assertEqual(client_result, expected,
3872 msg % (client_result, "client"))
3873 server_result = stats['server_alpn_protocols'][-1] \
3874 if len(stats['server_alpn_protocols']) else 'nothing'
3875 self.assertEqual(server_result, expected,
3876 msg % (server_result, "server"))
3877
3878 def test_selected_npn_protocol(self):
3879 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003880 client_context, server_context, hostname = testing_context()
3881 stats = server_params_test(client_context, server_context,
3882 chatty=True, connectionchatty=True,
3883 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003884 self.assertIs(stats['client_npn_protocol'], None)
3885
3886 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3887 def test_npn_protocols(self):
3888 server_protocols = ['http/1.1', 'spdy/2']
3889 protocol_tests = [
3890 (['http/1.1', 'spdy/2'], 'http/1.1'),
3891 (['spdy/2', 'http/1.1'], 'http/1.1'),
3892 (['spdy/2', 'test'], 'spdy/2'),
3893 (['abc', 'def'], 'abc')
3894 ]
3895 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003896 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003897 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003898 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003899 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003900 chatty=True, connectionchatty=True,
3901 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003902 msg = "failed trying %s (s) and %s (c).\n" \
3903 "was expecting %s, but got %%s from the %%s" \
3904 % (str(server_protocols), str(client_protocols),
3905 str(expected))
3906 client_result = stats['client_npn_protocol']
3907 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3908 server_result = stats['server_npn_protocols'][-1] \
3909 if len(stats['server_npn_protocols']) else 'nothing'
3910 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3911
3912 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003913 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003914 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003915 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003916 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003917 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003918 client_context.load_verify_locations(SIGNING_CA)
3919 return server_context, other_context, client_context
3920
3921 def check_common_name(self, stats, name):
3922 cert = stats['peercert']
3923 self.assertIn((('commonName', name),), cert['subject'])
3924
3925 @needs_sni
3926 def test_sni_callback(self):
3927 calls = []
3928 server_context, other_context, client_context = self.sni_contexts()
3929
Christian Heimesa170fa12017-09-15 20:27:30 +02003930 client_context.check_hostname = False
3931
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003932 def servername_cb(ssl_sock, server_name, initial_context):
3933 calls.append((server_name, initial_context))
3934 if server_name is not None:
3935 ssl_sock.context = other_context
3936 server_context.set_servername_callback(servername_cb)
3937
3938 stats = server_params_test(client_context, server_context,
3939 chatty=True,
3940 sni_name='supermessage')
3941 # The hostname was fetched properly, and the certificate was
3942 # changed for the connection.
3943 self.assertEqual(calls, [("supermessage", server_context)])
3944 # CERTFILE4 was selected
3945 self.check_common_name(stats, 'fakehostname')
3946
3947 calls = []
3948 # The callback is called with server_name=None
3949 stats = server_params_test(client_context, server_context,
3950 chatty=True,
3951 sni_name=None)
3952 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003953 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003954
3955 # Check disabling the callback
3956 calls = []
3957 server_context.set_servername_callback(None)
3958
3959 stats = server_params_test(client_context, server_context,
3960 chatty=True,
3961 sni_name='notfunny')
3962 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003963 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003964 self.assertEqual(calls, [])
3965
3966 @needs_sni
3967 def test_sni_callback_alert(self):
3968 # Returning a TLS alert is reflected to the connecting client
3969 server_context, other_context, client_context = self.sni_contexts()
3970
3971 def cb_returning_alert(ssl_sock, server_name, initial_context):
3972 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3973 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003974 with self.assertRaises(ssl.SSLError) as cm:
3975 stats = server_params_test(client_context, server_context,
3976 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003977 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003978 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003979
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003980 @needs_sni
3981 def test_sni_callback_raising(self):
3982 # Raising fails the connection with a TLS handshake failure alert.
3983 server_context, other_context, client_context = self.sni_contexts()
3984
3985 def cb_raising(ssl_sock, server_name, initial_context):
3986 1/0
3987 server_context.set_servername_callback(cb_raising)
3988
3989 with self.assertRaises(ssl.SSLError) as cm, \
3990 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003991 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003992 chatty=False,
3993 sni_name='supermessage')
3994 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3995 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003996
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003997 @needs_sni
3998 def test_sni_callback_wrong_return_type(self):
3999 # Returning the wrong return type terminates the TLS connection
4000 # with an internal error alert.
4001 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004002
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004003 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
4004 return "foo"
4005 server_context.set_servername_callback(cb_wrong_return_type)
4006
4007 with self.assertRaises(ssl.SSLError) as cm, \
4008 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004009 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004010 chatty=False,
4011 sni_name='supermessage')
4012 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
4013 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004014
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004015 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004016 client_context, server_context, hostname = testing_context()
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004017 client_context.set_ciphers("AES128:AES256")
4018 server_context.set_ciphers("AES256")
4019 expected_algs = [
4020 "AES256", "AES-256",
4021 # TLS 1.3 ciphers are always enabled
4022 "TLS_CHACHA20", "TLS_AES",
4023 ]
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004024
Christian Heimesa170fa12017-09-15 20:27:30 +02004025 stats = server_params_test(client_context, server_context,
4026 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004027 ciphers = stats['server_shared_ciphers'][0]
4028 self.assertGreater(len(ciphers), 0)
4029 for name, tls_version, bits in ciphers:
Christian Heimese8eb6cb2018-05-22 22:50:12 +02004030 if not any(alg in name for alg in expected_algs):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004031 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004032
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004033 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004034 client_context, server_context, hostname = testing_context()
4035 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004036
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004037 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004038 s = client_context.wrap_socket(socket.socket(),
4039 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004040 s.connect((HOST, server.port))
4041 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004042
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004043 self.assertRaises(ValueError, s.read, 1024)
4044 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004045
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004046 def test_sendfile(self):
4047 TEST_DATA = b"x" * 512
4048 with open(support.TESTFN, 'wb') as f:
4049 f.write(TEST_DATA)
4050 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02004051 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004052 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01004053 context.load_verify_locations(SIGNING_CA)
4054 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004055 server = ThreadedEchoServer(context=context, chatty=False)
4056 with server:
4057 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004058 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004059 with open(support.TESTFN, 'rb') as file:
4060 s.sendfile(file)
4061 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004062
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004063 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004064 client_context, server_context, hostname = testing_context()
Christian Heimes05d9fe32018-02-27 08:55:39 +01004065 # TODO: sessions aren't compatible with TLSv1.3 yet
4066 client_context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitrou60a26e02013-07-20 19:35:16 +02004067
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004068 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004069 stats = server_params_test(client_context, server_context,
4070 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004071 session = stats['session']
4072 self.assertTrue(session.id)
4073 self.assertGreater(session.time, 0)
4074 self.assertGreater(session.timeout, 0)
4075 self.assertTrue(session.has_ticket)
4076 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
4077 self.assertGreater(session.ticket_lifetime_hint, 0)
4078 self.assertFalse(stats['session_reused'])
4079 sess_stat = server_context.session_stats()
4080 self.assertEqual(sess_stat['accept'], 1)
4081 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02004082
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004083 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02004084 stats = server_params_test(client_context, server_context,
4085 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004086 sess_stat = server_context.session_stats()
4087 self.assertEqual(sess_stat['accept'], 2)
4088 self.assertEqual(sess_stat['hits'], 1)
4089 self.assertTrue(stats['session_reused'])
4090 session2 = stats['session']
4091 self.assertEqual(session2.id, session.id)
4092 self.assertEqual(session2, session)
4093 self.assertIsNot(session2, session)
4094 self.assertGreaterEqual(session2.time, session.time)
4095 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02004096
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004097 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02004098 stats = server_params_test(client_context, server_context,
4099 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004100 self.assertFalse(stats['session_reused'])
4101 session3 = stats['session']
4102 self.assertNotEqual(session3.id, session.id)
4103 self.assertNotEqual(session3, session)
4104 sess_stat = server_context.session_stats()
4105 self.assertEqual(sess_stat['accept'], 3)
4106 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02004107
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004108 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02004109 stats = server_params_test(client_context, server_context,
4110 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004111 self.assertTrue(stats['session_reused'])
4112 session4 = stats['session']
4113 self.assertEqual(session4.id, session.id)
4114 self.assertEqual(session4, session)
4115 self.assertGreaterEqual(session4.time, session.time)
4116 self.assertGreaterEqual(session4.timeout, session.timeout)
4117 sess_stat = server_context.session_stats()
4118 self.assertEqual(sess_stat['accept'], 4)
4119 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02004120
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004121 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02004122 client_context, server_context, hostname = testing_context()
4123 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02004124
Christian Heimes05d9fe32018-02-27 08:55:39 +01004125 # TODO: session reuse does not work with TLSv1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02004126 client_context.options |= ssl.OP_NO_TLSv1_3
4127 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07004128
Christian Heimesa170fa12017-09-15 20:27:30 +02004129 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004130 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02004131 with client_context.wrap_socket(socket.socket(),
4132 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004133 # session is None before handshake
4134 self.assertEqual(s.session, None)
4135 self.assertEqual(s.session_reused, None)
4136 s.connect((HOST, server.port))
4137 session = s.session
4138 self.assertTrue(session)
4139 with self.assertRaises(TypeError) as e:
4140 s.session = object
Ned Deily4531ec72018-06-11 20:26:28 -04004141 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02004142
Christian Heimesa170fa12017-09-15 20:27:30 +02004143 with client_context.wrap_socket(socket.socket(),
4144 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004145 s.connect((HOST, server.port))
4146 # cannot set session after handshake
4147 with self.assertRaises(ValueError) as e:
4148 s.session = session
4149 self.assertEqual(str(e.exception),
4150 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02004151
Christian Heimesa170fa12017-09-15 20:27:30 +02004152 with client_context.wrap_socket(socket.socket(),
4153 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004154 # can set session before handshake and before the
4155 # connection was established
4156 s.session = session
4157 s.connect((HOST, server.port))
4158 self.assertEqual(s.session.id, session.id)
4159 self.assertEqual(s.session, session)
4160 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02004161
Christian Heimesa170fa12017-09-15 20:27:30 +02004162 with client_context2.wrap_socket(socket.socket(),
4163 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004164 # cannot re-use session with a different SSLContext
4165 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02004166 s.session = session
4167 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004168 self.assertEqual(str(e.exception),
4169 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02004170
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01004171
Christian Heimes9fb051f2018-09-23 08:32:31 +02004172@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
4173class TestPostHandshakeAuth(unittest.TestCase):
4174 def test_pha_setter(self):
4175 protocols = [
4176 ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
4177 ]
4178 for protocol in protocols:
4179 ctx = ssl.SSLContext(protocol)
4180 self.assertEqual(ctx.post_handshake_auth, False)
4181
4182 ctx.post_handshake_auth = True
4183 self.assertEqual(ctx.post_handshake_auth, True)
4184
4185 ctx.verify_mode = ssl.CERT_REQUIRED
4186 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4187 self.assertEqual(ctx.post_handshake_auth, True)
4188
4189 ctx.post_handshake_auth = False
4190 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
4191 self.assertEqual(ctx.post_handshake_auth, False)
4192
4193 ctx.verify_mode = ssl.CERT_OPTIONAL
4194 ctx.post_handshake_auth = True
4195 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
4196 self.assertEqual(ctx.post_handshake_auth, True)
4197
4198 def test_pha_required(self):
4199 client_context, server_context, hostname = testing_context()
4200 server_context.post_handshake_auth = True
4201 server_context.verify_mode = ssl.CERT_REQUIRED
4202 client_context.post_handshake_auth = True
4203 client_context.load_cert_chain(SIGNED_CERTFILE)
4204
4205 server = ThreadedEchoServer(context=server_context, chatty=False)
4206 with server:
4207 with client_context.wrap_socket(socket.socket(),
4208 server_hostname=hostname) as s:
4209 s.connect((HOST, server.port))
4210 s.write(b'HASCERT')
4211 self.assertEqual(s.recv(1024), b'FALSE\n')
4212 s.write(b'PHA')
4213 self.assertEqual(s.recv(1024), b'OK\n')
4214 s.write(b'HASCERT')
4215 self.assertEqual(s.recv(1024), b'TRUE\n')
4216 # PHA method just returns true when cert is already available
4217 s.write(b'PHA')
4218 self.assertEqual(s.recv(1024), b'OK\n')
4219 s.write(b'GETCERT')
4220 cert_text = s.recv(4096).decode('us-ascii')
4221 self.assertIn('Python Software Foundation CA', cert_text)
4222
4223 def test_pha_required_nocert(self):
4224 client_context, server_context, hostname = testing_context()
4225 server_context.post_handshake_auth = True
4226 server_context.verify_mode = ssl.CERT_REQUIRED
4227 client_context.post_handshake_auth = True
4228
4229 server = ThreadedEchoServer(context=server_context, chatty=False)
4230 with server:
4231 with client_context.wrap_socket(socket.socket(),
4232 server_hostname=hostname) as s:
4233 s.connect((HOST, server.port))
4234 s.write(b'PHA')
4235 # receive CertificateRequest
4236 self.assertEqual(s.recv(1024), b'OK\n')
4237 # send empty Certificate + Finish
4238 s.write(b'HASCERT')
4239 # receive alert
4240 with self.assertRaisesRegex(
4241 ssl.SSLError,
4242 'tlsv13 alert certificate required'):
4243 s.recv(1024)
4244
4245 def test_pha_optional(self):
4246 if support.verbose:
4247 sys.stdout.write("\n")
4248
4249 client_context, server_context, hostname = testing_context()
4250 server_context.post_handshake_auth = True
4251 server_context.verify_mode = ssl.CERT_REQUIRED
4252 client_context.post_handshake_auth = True
4253 client_context.load_cert_chain(SIGNED_CERTFILE)
4254
4255 # check CERT_OPTIONAL
4256 server_context.verify_mode = ssl.CERT_OPTIONAL
4257 server = ThreadedEchoServer(context=server_context, chatty=False)
4258 with server:
4259 with client_context.wrap_socket(socket.socket(),
4260 server_hostname=hostname) as s:
4261 s.connect((HOST, server.port))
4262 s.write(b'HASCERT')
4263 self.assertEqual(s.recv(1024), b'FALSE\n')
4264 s.write(b'PHA')
4265 self.assertEqual(s.recv(1024), b'OK\n')
4266 s.write(b'HASCERT')
4267 self.assertEqual(s.recv(1024), b'TRUE\n')
4268
4269 def test_pha_optional_nocert(self):
4270 if support.verbose:
4271 sys.stdout.write("\n")
4272
4273 client_context, server_context, hostname = testing_context()
4274 server_context.post_handshake_auth = True
4275 server_context.verify_mode = ssl.CERT_OPTIONAL
4276 client_context.post_handshake_auth = True
4277
4278 server = ThreadedEchoServer(context=server_context, chatty=False)
4279 with server:
4280 with client_context.wrap_socket(socket.socket(),
4281 server_hostname=hostname) as s:
4282 s.connect((HOST, server.port))
4283 s.write(b'HASCERT')
4284 self.assertEqual(s.recv(1024), b'FALSE\n')
4285 s.write(b'PHA')
4286 self.assertEqual(s.recv(1024), b'OK\n')
4287 # optional doens't fail when client does not have a cert
4288 s.write(b'HASCERT')
4289 self.assertEqual(s.recv(1024), b'FALSE\n')
4290
4291 def test_pha_no_pha_client(self):
4292 client_context, server_context, hostname = testing_context()
4293 server_context.post_handshake_auth = True
4294 server_context.verify_mode = ssl.CERT_REQUIRED
4295 client_context.load_cert_chain(SIGNED_CERTFILE)
4296
4297 server = ThreadedEchoServer(context=server_context, chatty=False)
4298 with server:
4299 with client_context.wrap_socket(socket.socket(),
4300 server_hostname=hostname) as s:
4301 s.connect((HOST, server.port))
4302 with self.assertRaisesRegex(ssl.SSLError, 'not server'):
4303 s.verify_client_post_handshake()
4304 s.write(b'PHA')
4305 self.assertIn(b'extension not received', s.recv(1024))
4306
4307 def test_pha_no_pha_server(self):
4308 # server doesn't have PHA enabled, cert is requested in handshake
4309 client_context, server_context, hostname = testing_context()
4310 server_context.verify_mode = ssl.CERT_REQUIRED
4311 client_context.post_handshake_auth = True
4312 client_context.load_cert_chain(SIGNED_CERTFILE)
4313
4314 server = ThreadedEchoServer(context=server_context, chatty=False)
4315 with server:
4316 with client_context.wrap_socket(socket.socket(),
4317 server_hostname=hostname) as s:
4318 s.connect((HOST, server.port))
4319 s.write(b'HASCERT')
4320 self.assertEqual(s.recv(1024), b'TRUE\n')
4321 # PHA doesn't fail if there is already a cert
4322 s.write(b'PHA')
4323 self.assertEqual(s.recv(1024), b'OK\n')
4324 s.write(b'HASCERT')
4325 self.assertEqual(s.recv(1024), b'TRUE\n')
4326
4327 def test_pha_not_tls13(self):
4328 # TLS 1.2
4329 client_context, server_context, hostname = testing_context()
4330 server_context.verify_mode = ssl.CERT_REQUIRED
4331 client_context.maximum_version = ssl.TLSVersion.TLSv1_2
4332 client_context.post_handshake_auth = True
4333 client_context.load_cert_chain(SIGNED_CERTFILE)
4334
4335 server = ThreadedEchoServer(context=server_context, chatty=False)
4336 with server:
4337 with client_context.wrap_socket(socket.socket(),
4338 server_hostname=hostname) as s:
4339 s.connect((HOST, server.port))
4340 # PHA fails for TLS != 1.3
4341 s.write(b'PHA')
4342 self.assertIn(b'WRONG_SSL_VERSION', s.recv(1024))
4343
4344
Thomas Woutersed03b412007-08-28 21:37:11 +00004345def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00004346 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03004347 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00004348 plats = {
Antoine Pitrou15cee622010-08-04 16:45:21 +00004349 'Mac': platform.mac_ver,
4350 'Windows': platform.win32_ver,
4351 }
Petr Viktorin8b94b412018-05-16 11:51:18 -04004352 for name, func in plats.items():
4353 plat = func()
4354 if plat and plat[0]:
4355 plat = '%s %r' % (name, plat)
4356 break
4357 else:
4358 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00004359 print("test_ssl: testing with %r %r" %
4360 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
4361 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00004362 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01004363 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
4364 try:
4365 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
4366 except AttributeError:
4367 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00004368
Antoine Pitrou152efa22010-05-16 18:19:27 +00004369 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00004370 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004371 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01004372 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00004373 BADCERT, BADKEY, EMPTYCERT]:
4374 if not os.path.exists(filename):
4375 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00004376
Martin Panter3840b2a2016-03-27 01:53:46 +00004377 tests = [
4378 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Christian Heimes9d50ab52018-02-27 10:17:30 +01004379 SSLObjectTests, SimpleBackgroundTests, ThreadedTests,
Christian Heimes9fb051f2018-09-23 08:32:31 +02004380 TestPostHandshakeAuth
Martin Panter3840b2a2016-03-27 01:53:46 +00004381 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00004382
Benjamin Petersonee8712c2008-05-20 21:35:26 +00004383 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00004384 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00004385
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004386 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00004387 try:
4388 support.run_unittest(*tests)
4389 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004390 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00004391
4392if __name__ == "__main__":
4393 test_main()