blob: 523322da2f60ed4bebe6d9f79cd8fba08e3c179e [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes888bbdc2017-09-07 14:18:21 -070021try:
22 import ctypes
23except ImportError:
24 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000025
Antoine Pitrou05d936d2010-10-13 11:38:36 +000026ssl = support.import_module("ssl")
27
Martin Panter3840b2a2016-03-27 01:53:46 +000028
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010029PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000030HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020031IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
32IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
33
Antoine Pitrou152efa22010-05-16 18:19:27 +000034
Christian Heimesefff7062013-11-21 03:35:02 +010035def data_file(*name):
36 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000037
Antoine Pitrou81564092010-10-08 23:06:24 +000038# The custom key and certificate files used in test_ssl are generated
39# using Lib/test/make_ssl_certs.py.
40# Other certificates are simply fetched from the Internet servers they
41# are meant to authenticate.
42
Antoine Pitrou152efa22010-05-16 18:19:27 +000043CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000044BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000045ONLYCERT = data_file("ssl_cert.pem")
46ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000047BYTES_ONLYCERT = os.fsencode(ONLYCERT)
48BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020049CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
50ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
51KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000052CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000053BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010054CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
55CAFILE_CACERT = data_file("capath", "5ed36f99.0")
56
Antoine Pitrou152efa22010-05-16 18:19:27 +000057
Christian Heimes22587792013-11-21 23:56:13 +010058# empty CRL
59CRLFILE = data_file("revocation.crl")
60
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010061# Two keys and certs signed by the same CA (for SNI tests)
62SIGNED_CERTFILE = data_file("keycert3.pem")
63SIGNED_CERTFILE2 = data_file("keycert4.pem")
Martin Panter3840b2a2016-03-27 01:53:46 +000064# Same certificate as pycacert.pem, but without extra text in file
65SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +020066# cert with all kinds of subject alt names
67ALLSANFILE = data_file("allsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010068
Martin Panter3d81d932016-01-14 09:36:00 +000069REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +000070
71EMPTYCERT = data_file("nullcert.pem")
72BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +000073NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000074BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020075NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020076NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000077
Benjamin Petersona7eaf562015-04-02 00:04:06 -040078DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +010079BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000080
Christian Heimes358cfd42016-09-10 22:43:48 +020081# Not defined in all versions of OpenSSL
82OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
83OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
84OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
85OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
86
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010087
Thomas Woutersed03b412007-08-28 21:37:11 +000088def handle_error(prefix):
89 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000090 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000091 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000092
Antoine Pitroub5218772010-05-21 09:56:06 +000093def can_clear_options():
94 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020095 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000096
97def no_sslv2_implies_sslv3_hello():
98 # 0.9.7h or higher
99 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
100
Christian Heimes2427b502013-11-23 11:24:32 +0100101def have_verify_flags():
102 # 0.9.8 or higher
103 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
104
Antoine Pitrouc695c952014-04-28 20:57:36 +0200105def utc_offset(): #NOTE: ignore issues like #1647654
106 # local time = utc time + utc offset
107 if time.daylight and time.localtime().tm_isdst > 0:
108 return -time.altzone # seconds
109 return -time.timezone
110
Christian Heimes9424bb42013-06-17 15:32:57 +0200111def asn1time(cert_time):
112 # Some versions of OpenSSL ignore seconds, see #18207
113 # 0.9.8.i
114 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
115 fmt = "%b %d %H:%M:%S %Y GMT"
116 dt = datetime.datetime.strptime(cert_time, fmt)
117 dt = dt.replace(second=0)
118 cert_time = dt.strftime(fmt)
119 # %d adds leading zero but ASN1_TIME_print() uses leading space
120 if cert_time[4] == "0":
121 cert_time = cert_time[:4] + " " + cert_time[5:]
122
123 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000124
Antoine Pitrou23df4832010-08-04 17:14:06 +0000125# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
126def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200127 if hasattr(ssl, 'PROTOCOL_SSLv2'):
128 @functools.wraps(func)
129 def f(*args, **kwargs):
130 try:
131 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
132 except ssl.SSLError:
133 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
134 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
135 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
136 return func(*args, **kwargs)
137 return f
138 else:
139 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000140
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100141needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
142
Antoine Pitrou23df4832010-08-04 17:14:06 +0000143
Christian Heimesd0486372016-09-10 23:23:33 +0200144def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
145 cert_reqs=ssl.CERT_NONE, ca_certs=None,
146 ciphers=None, certfile=None, keyfile=None,
147 **kwargs):
148 context = ssl.SSLContext(ssl_version)
149 if cert_reqs is not None:
150 context.verify_mode = cert_reqs
151 if ca_certs is not None:
152 context.load_verify_locations(ca_certs)
153 if certfile is not None or keyfile is not None:
154 context.load_cert_chain(certfile, keyfile)
155 if ciphers is not None:
156 context.set_ciphers(ciphers)
157 return context.wrap_socket(sock, **kwargs)
158
Antoine Pitrou152efa22010-05-16 18:19:27 +0000159class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000160
Antoine Pitrou480a1242010-04-28 21:37:09 +0000161 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000162 ssl.CERT_NONE
163 ssl.CERT_OPTIONAL
164 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100165 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100166 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100167 if ssl.HAS_ECDH:
168 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100169 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
170 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000171 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100172 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700173 ssl.OP_NO_SSLv2
174 ssl.OP_NO_SSLv3
175 ssl.OP_NO_TLSv1
176 ssl.OP_NO_TLSv1_3
177 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
178 ssl.OP_NO_TLSv1_1
179 ssl.OP_NO_TLSv1_2
Thomas Woutersed03b412007-08-28 21:37:11 +0000180
Antoine Pitrou172f0252014-04-18 20:33:08 +0200181 def test_str_for_enums(self):
182 # Make sure that the PROTOCOL_* constants have enum-like string
183 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200184 proto = ssl.PROTOCOL_TLS
185 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200186 ctx = ssl.SSLContext(proto)
187 self.assertIs(ctx.protocol, proto)
188
Antoine Pitrou480a1242010-04-28 21:37:09 +0000189 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000190 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000191 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000192 sys.stdout.write("\n RAND_status is %d (%s)\n"
193 % (v, (v and "sufficient randomness") or
194 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200195
196 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
197 self.assertEqual(len(data), 16)
198 self.assertEqual(is_cryptographic, v == 1)
199 if v:
200 data = ssl.RAND_bytes(16)
201 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200202 else:
203 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200204
Victor Stinner1e81a392013-12-19 16:47:04 +0100205 # negative num is invalid
206 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
207 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
208
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100209 if hasattr(ssl, 'RAND_egd'):
210 self.assertRaises(TypeError, ssl.RAND_egd, 1)
211 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000212 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200213 ssl.RAND_add(b"this is a random bytes object", 75.0)
214 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000215
Christian Heimesf77b4b22013-08-21 13:26:05 +0200216 @unittest.skipUnless(os.name == 'posix', 'requires posix')
217 def test_random_fork(self):
218 status = ssl.RAND_status()
219 if not status:
220 self.fail("OpenSSL's PRNG has insufficient randomness")
221
222 rfd, wfd = os.pipe()
223 pid = os.fork()
224 if pid == 0:
225 try:
226 os.close(rfd)
227 child_random = ssl.RAND_pseudo_bytes(16)[0]
228 self.assertEqual(len(child_random), 16)
229 os.write(wfd, child_random)
230 os.close(wfd)
231 except BaseException:
232 os._exit(1)
233 else:
234 os._exit(0)
235 else:
236 os.close(wfd)
237 self.addCleanup(os.close, rfd)
238 _, status = os.waitpid(pid, 0)
239 self.assertEqual(status, 0)
240
241 child_random = os.read(rfd, 16)
242 self.assertEqual(len(child_random), 16)
243 parent_random = ssl.RAND_pseudo_bytes(16)[0]
244 self.assertEqual(len(parent_random), 16)
245
246 self.assertNotEqual(child_random, parent_random)
247
Antoine Pitrou480a1242010-04-28 21:37:09 +0000248 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000249 # note that this uses an 'unofficial' function in _ssl.c,
250 # provided solely for this test, to exercise the certificate
251 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000252 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000253 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000254 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200255 self.assertEqual(p['issuer'],
256 ((('countryName', 'XY'),),
257 (('localityName', 'Castle Anthrax'),),
258 (('organizationName', 'Python Software Foundation'),),
259 (('commonName', 'localhost'),))
260 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100261 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200262 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
263 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200264 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
265 self.assertEqual(p['subject'],
266 ((('countryName', 'XY'),),
267 (('localityName', 'Castle Anthrax'),),
268 (('organizationName', 'Python Software Foundation'),),
269 (('commonName', 'localhost'),))
270 )
271 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
272 # Issue #13034: the subjectAltName in some certificates
273 # (notably projects.developer.nokia.com:443) wasn't parsed
274 p = ssl._ssl._test_decode_cert(NOKIACERT)
275 if support.verbose:
276 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
277 self.assertEqual(p['subjectAltName'],
278 (('DNS', 'projects.developer.nokia.com'),
279 ('DNS', 'projects.forum.nokia.com'))
280 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100281 # extra OCSP and AIA fields
282 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
283 self.assertEqual(p['caIssuers'],
284 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
285 self.assertEqual(p['crlDistributionPoints'],
286 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000287
Christian Heimes824f7f32013-08-17 00:54:47 +0200288 def test_parse_cert_CVE_2013_4238(self):
289 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
290 if support.verbose:
291 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
292 subject = ((('countryName', 'US'),),
293 (('stateOrProvinceName', 'Oregon'),),
294 (('localityName', 'Beaverton'),),
295 (('organizationName', 'Python Software Foundation'),),
296 (('organizationalUnitName', 'Python Core Development'),),
297 (('commonName', 'null.python.org\x00example.org'),),
298 (('emailAddress', 'python-dev@python.org'),))
299 self.assertEqual(p['subject'], subject)
300 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200301 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
302 san = (('DNS', 'altnull.python.org\x00example.com'),
303 ('email', 'null@python.org\x00user@example.org'),
304 ('URI', 'http://null.python.org\x00http://example.org'),
305 ('IP Address', '192.0.2.1'),
306 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
307 else:
308 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
309 san = (('DNS', 'altnull.python.org\x00example.com'),
310 ('email', 'null@python.org\x00user@example.org'),
311 ('URI', 'http://null.python.org\x00http://example.org'),
312 ('IP Address', '192.0.2.1'),
313 ('IP Address', '<invalid>'))
314
315 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200316
Christian Heimes1c03abd2016-09-06 23:25:35 +0200317 def test_parse_all_sans(self):
318 p = ssl._ssl._test_decode_cert(ALLSANFILE)
319 self.assertEqual(p['subjectAltName'],
320 (
321 ('DNS', 'allsans'),
322 ('othername', '<unsupported>'),
323 ('othername', '<unsupported>'),
324 ('email', 'user@example.org'),
325 ('DNS', 'www.example.org'),
326 ('DirName',
327 ((('countryName', 'XY'),),
328 (('localityName', 'Castle Anthrax'),),
329 (('organizationName', 'Python Software Foundation'),),
330 (('commonName', 'dirname example'),))),
331 ('URI', 'https://www.python.org/'),
332 ('IP Address', '127.0.0.1'),
333 ('IP Address', '0:0:0:0:0:0:0:1\n'),
334 ('Registered ID', '1.2.3.4.5')
335 )
336 )
337
Antoine Pitrou480a1242010-04-28 21:37:09 +0000338 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000339 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000340 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000341 d1 = ssl.PEM_cert_to_DER_cert(pem)
342 p2 = ssl.DER_cert_to_PEM_cert(d1)
343 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000344 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000345 if not p2.startswith(ssl.PEM_HEADER + '\n'):
346 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
347 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
348 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000349
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000350 def test_openssl_version(self):
351 n = ssl.OPENSSL_VERSION_NUMBER
352 t = ssl.OPENSSL_VERSION_INFO
353 s = ssl.OPENSSL_VERSION
354 self.assertIsInstance(n, int)
355 self.assertIsInstance(t, tuple)
356 self.assertIsInstance(s, str)
357 # Some sanity checks follow
358 # >= 0.9
359 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400360 # < 3.0
361 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000362 major, minor, fix, patch, status = t
363 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400364 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000365 self.assertGreaterEqual(minor, 0)
366 self.assertLess(minor, 256)
367 self.assertGreaterEqual(fix, 0)
368 self.assertLess(fix, 256)
369 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100370 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000371 self.assertGreaterEqual(status, 0)
372 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400373 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200374 if IS_LIBRESSL:
375 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100376 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400377 else:
378 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100379 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000380
Antoine Pitrou9d543662010-04-23 23:10:32 +0000381 @support.cpython_only
382 def test_refcycle(self):
383 # Issue #7943: an SSL object doesn't create reference cycles with
384 # itself.
385 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200386 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000387 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100388 with support.check_warnings(("", ResourceWarning)):
389 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100390 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000391
Antoine Pitroua468adc2010-09-14 14:43:44 +0000392 def test_wrapped_unconnected(self):
393 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200394 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000395 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200396 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100397 self.assertRaises(OSError, ss.recv, 1)
398 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
399 self.assertRaises(OSError, ss.recvfrom, 1)
400 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
401 self.assertRaises(OSError, ss.send, b'x')
402 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000403
Antoine Pitrou40f08742010-04-24 22:04:40 +0000404 def test_timeout(self):
405 # Issue #8524: when creating an SSL socket, the timeout of the
406 # original socket should be retained.
407 for timeout in (None, 0.0, 5.0):
408 s = socket.socket(socket.AF_INET)
409 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200410 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100411 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000412
Christian Heimesd0486372016-09-10 23:23:33 +0200413 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000414 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000415 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000416 "certfile must be specified",
417 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000418 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000419 "certfile must be specified for server-side operations",
420 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000421 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000422 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200423 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100424 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
425 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200426 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200427 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000428 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000429 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000430 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200431 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000432 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000433 ssl.wrap_socket(sock,
434 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000435 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200436 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000437 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000438 ssl.wrap_socket(sock,
439 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000440 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000441
Martin Panter3464ea22016-02-01 21:58:11 +0000442 def bad_cert_test(self, certfile):
443 """Check that trying to use the given client certificate fails"""
444 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
445 certfile)
446 sock = socket.socket()
447 self.addCleanup(sock.close)
448 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200449 test_wrap_socket(sock,
Martin Panter3464ea22016-02-01 21:58:11 +0000450 certfile=certfile,
451 ssl_version=ssl.PROTOCOL_TLSv1)
452
453 def test_empty_cert(self):
454 """Wrapping with an empty cert file"""
455 self.bad_cert_test("nullcert.pem")
456
457 def test_malformed_cert(self):
458 """Wrapping with a badly formatted certificate (syntax error)"""
459 self.bad_cert_test("badcert.pem")
460
461 def test_malformed_key(self):
462 """Wrapping with a badly formatted key (syntax error)"""
463 self.bad_cert_test("badkey.pem")
464
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000465 def test_match_hostname(self):
466 def ok(cert, hostname):
467 ssl.match_hostname(cert, hostname)
468 def fail(cert, hostname):
469 self.assertRaises(ssl.CertificateError,
470 ssl.match_hostname, cert, hostname)
471
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100472 # -- Hostname matching --
473
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000474 cert = {'subject': ((('commonName', 'example.com'),),)}
475 ok(cert, 'example.com')
476 ok(cert, 'ExAmple.cOm')
477 fail(cert, 'www.example.com')
478 fail(cert, '.example.com')
479 fail(cert, 'example.org')
480 fail(cert, 'exampleXcom')
481
482 cert = {'subject': ((('commonName', '*.a.com'),),)}
483 ok(cert, 'foo.a.com')
484 fail(cert, 'bar.foo.a.com')
485 fail(cert, 'a.com')
486 fail(cert, 'Xa.com')
487 fail(cert, '.a.com')
488
Georg Brandl72c98d32013-10-27 07:16:53 +0100489 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000490 cert = {'subject': ((('commonName', 'f*.com'),),)}
491 ok(cert, 'foo.com')
492 ok(cert, 'f.com')
493 fail(cert, 'bar.com')
494 fail(cert, 'foo.a.com')
495 fail(cert, 'bar.foo.com')
496
Christian Heimes824f7f32013-08-17 00:54:47 +0200497 # NULL bytes are bad, CVE-2013-4073
498 cert = {'subject': ((('commonName',
499 'null.python.org\x00example.org'),),)}
500 ok(cert, 'null.python.org\x00example.org') # or raise an error?
501 fail(cert, 'example.org')
502 fail(cert, 'null.python.org')
503
Georg Brandl72c98d32013-10-27 07:16:53 +0100504 # error cases with wildcards
505 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
506 fail(cert, 'bar.foo.a.com')
507 fail(cert, 'a.com')
508 fail(cert, 'Xa.com')
509 fail(cert, '.a.com')
510
511 cert = {'subject': ((('commonName', 'a.*.com'),),)}
512 fail(cert, 'a.foo.com')
513 fail(cert, 'a..com')
514 fail(cert, 'a.com')
515
516 # wildcard doesn't match IDNA prefix 'xn--'
517 idna = 'püthon.python.org'.encode("idna").decode("ascii")
518 cert = {'subject': ((('commonName', idna),),)}
519 ok(cert, idna)
520 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
521 fail(cert, idna)
522 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
523 fail(cert, idna)
524
525 # wildcard in first fragment and IDNA A-labels in sequent fragments
526 # are supported.
527 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
528 cert = {'subject': ((('commonName', idna),),)}
529 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
530 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
531 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
532 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
533
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000534 # Slightly fake real-world example
535 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
536 'subject': ((('commonName', 'linuxfrz.org'),),),
537 'subjectAltName': (('DNS', 'linuxfr.org'),
538 ('DNS', 'linuxfr.com'),
539 ('othername', '<unsupported>'))}
540 ok(cert, 'linuxfr.org')
541 ok(cert, 'linuxfr.com')
542 # Not a "DNS" entry
543 fail(cert, '<unsupported>')
544 # When there is a subjectAltName, commonName isn't used
545 fail(cert, 'linuxfrz.org')
546
547 # A pristine real-world example
548 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
549 'subject': ((('countryName', 'US'),),
550 (('stateOrProvinceName', 'California'),),
551 (('localityName', 'Mountain View'),),
552 (('organizationName', 'Google Inc'),),
553 (('commonName', 'mail.google.com'),))}
554 ok(cert, 'mail.google.com')
555 fail(cert, 'gmail.com')
556 # Only commonName is considered
557 fail(cert, 'California')
558
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100559 # -- IPv4 matching --
560 cert = {'subject': ((('commonName', 'example.com'),),),
561 'subjectAltName': (('DNS', 'example.com'),
562 ('IP Address', '10.11.12.13'),
563 ('IP Address', '14.15.16.17'))}
564 ok(cert, '10.11.12.13')
565 ok(cert, '14.15.16.17')
566 fail(cert, '14.15.16.18')
567 fail(cert, 'example.net')
568
569 # -- IPv6 matching --
570 cert = {'subject': ((('commonName', 'example.com'),),),
571 'subjectAltName': (('DNS', 'example.com'),
572 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
573 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
574 ok(cert, '2001::cafe')
575 ok(cert, '2003::baba')
576 fail(cert, '2003::bebe')
577 fail(cert, 'example.net')
578
579 # -- Miscellaneous --
580
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000581 # Neither commonName nor subjectAltName
582 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
583 'subject': ((('countryName', 'US'),),
584 (('stateOrProvinceName', 'California'),),
585 (('localityName', 'Mountain View'),),
586 (('organizationName', 'Google Inc'),))}
587 fail(cert, 'mail.google.com')
588
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200589 # No DNS entry in subjectAltName but a commonName
590 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
591 'subject': ((('countryName', 'US'),),
592 (('stateOrProvinceName', 'California'),),
593 (('localityName', 'Mountain View'),),
594 (('commonName', 'mail.google.com'),)),
595 'subjectAltName': (('othername', 'blabla'), )}
596 ok(cert, 'mail.google.com')
597
598 # No DNS entry subjectAltName and no commonName
599 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
600 'subject': ((('countryName', 'US'),),
601 (('stateOrProvinceName', 'California'),),
602 (('localityName', 'Mountain View'),),
603 (('organizationName', 'Google Inc'),)),
604 'subjectAltName': (('othername', 'blabla'),)}
605 fail(cert, 'google.com')
606
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000607 # Empty cert / no cert
608 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
609 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
610
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200611 # Issue #17980: avoid denials of service by refusing more than one
612 # wildcard per fragment.
613 cert = {'subject': ((('commonName', 'a*b.com'),),)}
614 ok(cert, 'axxb.com')
615 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100616 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200617 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
618 with self.assertRaises(ssl.CertificateError) as cm:
619 ssl.match_hostname(cert, 'axxbxxc.com')
620 self.assertIn("too many wildcards", str(cm.exception))
621
Antoine Pitroud5323212010-10-22 18:19:07 +0000622 def test_server_side(self):
623 # server_hostname doesn't work for server sockets
624 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000625 with socket.socket() as sock:
626 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
627 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000628
Antoine Pitroud6494802011-07-21 01:11:30 +0200629 def test_unknown_channel_binding(self):
630 # should raise ValueError for unknown type
631 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200632 s.bind(('127.0.0.1', 0))
633 s.listen()
634 c = socket.socket(socket.AF_INET)
635 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200636 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100637 with self.assertRaises(ValueError):
638 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200639 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200640
641 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
642 "'tls-unique' channel binding not available")
643 def test_tls_unique_channel_binding(self):
644 # unconnected should return None for known type
645 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200646 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100647 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200648 # the same for server-side
649 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200650 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100651 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200652
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600653 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200654 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600655 r = repr(ss)
656 with self.assertWarns(ResourceWarning) as cm:
657 ss = None
658 support.gc_collect()
659 self.assertIn(r, str(cm.warning.args[0]))
660
Christian Heimes6d7ad132013-06-09 18:02:55 +0200661 def test_get_default_verify_paths(self):
662 paths = ssl.get_default_verify_paths()
663 self.assertEqual(len(paths), 6)
664 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
665
666 with support.EnvironmentVarGuard() as env:
667 env["SSL_CERT_DIR"] = CAPATH
668 env["SSL_CERT_FILE"] = CERTFILE
669 paths = ssl.get_default_verify_paths()
670 self.assertEqual(paths.cafile, CERTFILE)
671 self.assertEqual(paths.capath, CAPATH)
672
Christian Heimes44109d72013-11-22 01:51:30 +0100673 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
674 def test_enum_certificates(self):
675 self.assertTrue(ssl.enum_certificates("CA"))
676 self.assertTrue(ssl.enum_certificates("ROOT"))
677
678 self.assertRaises(TypeError, ssl.enum_certificates)
679 self.assertRaises(WindowsError, ssl.enum_certificates, "")
680
Christian Heimesc2d65e12013-11-22 16:13:55 +0100681 trust_oids = set()
682 for storename in ("CA", "ROOT"):
683 store = ssl.enum_certificates(storename)
684 self.assertIsInstance(store, list)
685 for element in store:
686 self.assertIsInstance(element, tuple)
687 self.assertEqual(len(element), 3)
688 cert, enc, trust = element
689 self.assertIsInstance(cert, bytes)
690 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
691 self.assertIsInstance(trust, (set, bool))
692 if isinstance(trust, set):
693 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100694
695 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100696 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200697
Christian Heimes46bebee2013-06-09 19:03:31 +0200698 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100699 def test_enum_crls(self):
700 self.assertTrue(ssl.enum_crls("CA"))
701 self.assertRaises(TypeError, ssl.enum_crls)
702 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200703
Christian Heimes44109d72013-11-22 01:51:30 +0100704 crls = ssl.enum_crls("CA")
705 self.assertIsInstance(crls, list)
706 for element in crls:
707 self.assertIsInstance(element, tuple)
708 self.assertEqual(len(element), 2)
709 self.assertIsInstance(element[0], bytes)
710 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200711
Christian Heimes46bebee2013-06-09 19:03:31 +0200712
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100713 def test_asn1object(self):
714 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
715 '1.3.6.1.5.5.7.3.1')
716
717 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
718 self.assertEqual(val, expected)
719 self.assertEqual(val.nid, 129)
720 self.assertEqual(val.shortname, 'serverAuth')
721 self.assertEqual(val.longname, 'TLS Web Server Authentication')
722 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
723 self.assertIsInstance(val, ssl._ASN1Object)
724 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
725
726 val = ssl._ASN1Object.fromnid(129)
727 self.assertEqual(val, expected)
728 self.assertIsInstance(val, ssl._ASN1Object)
729 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100730 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
731 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100732 for i in range(1000):
733 try:
734 obj = ssl._ASN1Object.fromnid(i)
735 except ValueError:
736 pass
737 else:
738 self.assertIsInstance(obj.nid, int)
739 self.assertIsInstance(obj.shortname, str)
740 self.assertIsInstance(obj.longname, str)
741 self.assertIsInstance(obj.oid, (str, type(None)))
742
743 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
744 self.assertEqual(val, expected)
745 self.assertIsInstance(val, ssl._ASN1Object)
746 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
747 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
748 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100749 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
750 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100751
Christian Heimes72d28502013-11-23 13:56:58 +0100752 def test_purpose_enum(self):
753 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
754 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
755 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
756 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
757 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
758 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
759 '1.3.6.1.5.5.7.3.1')
760
761 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
762 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
763 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
764 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
765 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
766 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
767 '1.3.6.1.5.5.7.3.2')
768
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100769 def test_unsupported_dtls(self):
770 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
771 self.addCleanup(s.close)
772 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200773 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100774 self.assertEqual(str(cx.exception), "only stream sockets are supported")
775 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
776 with self.assertRaises(NotImplementedError) as cx:
777 ctx.wrap_socket(s)
778 self.assertEqual(str(cx.exception), "only stream sockets are supported")
779
Antoine Pitrouc695c952014-04-28 20:57:36 +0200780 def cert_time_ok(self, timestring, timestamp):
781 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
782
783 def cert_time_fail(self, timestring):
784 with self.assertRaises(ValueError):
785 ssl.cert_time_to_seconds(timestring)
786
787 @unittest.skipUnless(utc_offset(),
788 'local time needs to be different from UTC')
789 def test_cert_time_to_seconds_timezone(self):
790 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
791 # results if local timezone is not UTC
792 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
793 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
794
795 def test_cert_time_to_seconds(self):
796 timestring = "Jan 5 09:34:43 2018 GMT"
797 ts = 1515144883.0
798 self.cert_time_ok(timestring, ts)
799 # accept keyword parameter, assert its name
800 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
801 # accept both %e and %d (space or zero generated by strftime)
802 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
803 # case-insensitive
804 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
805 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
806 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
807 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
808 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
809 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
810 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
811 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
812
813 newyear_ts = 1230768000.0
814 # leap seconds
815 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
816 # same timestamp
817 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
818
819 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
820 # allow 60th second (even if it is not a leap second)
821 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
822 # allow 2nd leap second for compatibility with time.strptime()
823 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
824 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
825
826 # no special treatement for the special value:
827 # 99991231235959Z (rfc 5280)
828 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
829
830 @support.run_with_locale('LC_ALL', '')
831 def test_cert_time_to_seconds_locale(self):
832 # `cert_time_to_seconds()` should be locale independent
833
834 def local_february_name():
835 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
836
837 if local_february_name().lower() == 'feb':
838 self.skipTest("locale-specific month name needs to be "
839 "different from C locale")
840
841 # locale-independent
842 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
843 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
844
Martin Panter3840b2a2016-03-27 01:53:46 +0000845 def test_connect_ex_error(self):
846 server = socket.socket(socket.AF_INET)
847 self.addCleanup(server.close)
848 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200849 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000850 cert_reqs=ssl.CERT_REQUIRED)
851 self.addCleanup(s.close)
852 rc = s.connect_ex((HOST, port))
853 # Issue #19919: Windows machines or VMs hosted on Windows
854 # machines sometimes return EWOULDBLOCK.
855 errors = (
856 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
857 errno.EWOULDBLOCK,
858 )
859 self.assertIn(rc, errors)
860
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100861
Antoine Pitrou152efa22010-05-16 18:19:27 +0000862class ContextTests(unittest.TestCase):
863
Antoine Pitrou23df4832010-08-04 17:14:06 +0000864 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000865 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100866 for protocol in PROTOCOLS:
867 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200868 ctx = ssl.SSLContext()
869 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000870 self.assertRaises(ValueError, ssl.SSLContext, -1)
871 self.assertRaises(ValueError, ssl.SSLContext, 42)
872
Antoine Pitrou23df4832010-08-04 17:14:06 +0000873 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000874 def test_protocol(self):
875 for proto in PROTOCOLS:
876 ctx = ssl.SSLContext(proto)
877 self.assertEqual(ctx.protocol, proto)
878
879 def test_ciphers(self):
880 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
881 ctx.set_ciphers("ALL")
882 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000883 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000884 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000885
Christian Heimes25bfcd52016-09-06 00:04:45 +0200886 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
887 def test_get_ciphers(self):
888 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200889 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200890 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +0200891 self.assertIn('AES256-GCM-SHA384', names)
892 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +0200893
Antoine Pitrou23df4832010-08-04 17:14:06 +0000894 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000895 def test_options(self):
896 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800897 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +0200898 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +0200899 # SSLContext also enables these by default
900 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
901 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
Christian Heimes598894f2016-09-05 23:19:05 +0200902 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800903 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +0200904 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000905 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +0200906 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
907 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000908 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -0700909 # Ubuntu has OP_NO_SSLv3 forced on by default
910 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +0000911 else:
912 with self.assertRaises(ValueError):
913 ctx.options = 0
914
Christian Heimes22587792013-11-21 23:56:13 +0100915 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000916 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
917 # Default value
918 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
919 ctx.verify_mode = ssl.CERT_OPTIONAL
920 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
921 ctx.verify_mode = ssl.CERT_REQUIRED
922 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
923 ctx.verify_mode = ssl.CERT_NONE
924 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
925 with self.assertRaises(TypeError):
926 ctx.verify_mode = None
927 with self.assertRaises(ValueError):
928 ctx.verify_mode = 42
929
Christian Heimes2427b502013-11-23 11:24:32 +0100930 @unittest.skipUnless(have_verify_flags(),
931 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100932 def test_verify_flags(self):
933 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -0500934 # default value
935 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
936 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +0100937 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
938 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
939 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
940 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
941 ctx.verify_flags = ssl.VERIFY_DEFAULT
942 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
943 # supports any value
944 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
945 self.assertEqual(ctx.verify_flags,
946 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
947 with self.assertRaises(TypeError):
948 ctx.verify_flags = None
949
Antoine Pitrou152efa22010-05-16 18:19:27 +0000950 def test_load_cert_chain(self):
951 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
952 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500953 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000954 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
955 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200956 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000957 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000958 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000959 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000960 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000961 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000962 ctx.load_cert_chain(EMPTYCERT)
963 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000964 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000965 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
966 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
967 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000968 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000969 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000970 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000971 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000972 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000973 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
974 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000975 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000976 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +0000977 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200978 # Password protected key and cert
979 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
980 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
981 ctx.load_cert_chain(CERTFILE_PROTECTED,
982 password=bytearray(KEY_PASSWORD.encode()))
983 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
984 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
985 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
986 bytearray(KEY_PASSWORD.encode()))
987 with self.assertRaisesRegex(TypeError, "should be a string"):
988 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
989 with self.assertRaises(ssl.SSLError):
990 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
991 with self.assertRaisesRegex(ValueError, "cannot be longer"):
992 # openssl has a fixed limit on the password buffer.
993 # PEM_BUFSIZE is generally set to 1kb.
994 # Return a string larger than this.
995 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
996 # Password callback
997 def getpass_unicode():
998 return KEY_PASSWORD
999 def getpass_bytes():
1000 return KEY_PASSWORD.encode()
1001 def getpass_bytearray():
1002 return bytearray(KEY_PASSWORD.encode())
1003 def getpass_badpass():
1004 return "badpass"
1005 def getpass_huge():
1006 return b'a' * (1024 * 1024)
1007 def getpass_bad_type():
1008 return 9
1009 def getpass_exception():
1010 raise Exception('getpass error')
1011 class GetPassCallable:
1012 def __call__(self):
1013 return KEY_PASSWORD
1014 def getpass(self):
1015 return KEY_PASSWORD
1016 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1017 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1018 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1019 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1020 ctx.load_cert_chain(CERTFILE_PROTECTED,
1021 password=GetPassCallable().getpass)
1022 with self.assertRaises(ssl.SSLError):
1023 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1024 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1025 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1026 with self.assertRaisesRegex(TypeError, "must return a string"):
1027 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1028 with self.assertRaisesRegex(Exception, "getpass error"):
1029 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1030 # Make sure the password function isn't called if it isn't needed
1031 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001032
1033 def test_load_verify_locations(self):
1034 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1035 ctx.load_verify_locations(CERTFILE)
1036 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1037 ctx.load_verify_locations(BYTES_CERTFILE)
1038 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1039 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001040 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001041 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001042 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001043 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001044 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001045 ctx.load_verify_locations(BADCERT)
1046 ctx.load_verify_locations(CERTFILE, CAPATH)
1047 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1048
Victor Stinner80f75e62011-01-29 11:31:20 +00001049 # Issue #10989: crash if the second argument type is invalid
1050 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1051
Christian Heimesefff7062013-11-21 03:35:02 +01001052 def test_load_verify_cadata(self):
1053 # test cadata
1054 with open(CAFILE_CACERT) as f:
1055 cacert_pem = f.read()
1056 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1057 with open(CAFILE_NEURONIO) as f:
1058 neuronio_pem = f.read()
1059 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1060
1061 # test PEM
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1063 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1064 ctx.load_verify_locations(cadata=cacert_pem)
1065 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1066 ctx.load_verify_locations(cadata=neuronio_pem)
1067 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1068 # cert already in hash table
1069 ctx.load_verify_locations(cadata=neuronio_pem)
1070 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1071
1072 # combined
1073 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1074 combined = "\n".join((cacert_pem, neuronio_pem))
1075 ctx.load_verify_locations(cadata=combined)
1076 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1077
1078 # with junk around the certs
1079 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1080 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1081 neuronio_pem, "tail"]
1082 ctx.load_verify_locations(cadata="\n".join(combined))
1083 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1084
1085 # test DER
1086 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1087 ctx.load_verify_locations(cadata=cacert_der)
1088 ctx.load_verify_locations(cadata=neuronio_der)
1089 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1090 # cert already in hash table
1091 ctx.load_verify_locations(cadata=cacert_der)
1092 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1093
1094 # combined
1095 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1096 combined = b"".join((cacert_der, neuronio_der))
1097 ctx.load_verify_locations(cadata=combined)
1098 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1099
1100 # error cases
1101 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1102 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1103
1104 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1105 ctx.load_verify_locations(cadata="broken")
1106 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1107 ctx.load_verify_locations(cadata=b"broken")
1108
1109
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001110 def test_load_dh_params(self):
1111 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1112 ctx.load_dh_params(DHFILE)
1113 if os.name != 'nt':
1114 ctx.load_dh_params(BYTES_DHFILE)
1115 self.assertRaises(TypeError, ctx.load_dh_params)
1116 self.assertRaises(TypeError, ctx.load_dh_params, None)
1117 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001118 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001119 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001120 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001121 ctx.load_dh_params(CERTFILE)
1122
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001123 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001124 def test_session_stats(self):
1125 for proto in PROTOCOLS:
1126 ctx = ssl.SSLContext(proto)
1127 self.assertEqual(ctx.session_stats(), {
1128 'number': 0,
1129 'connect': 0,
1130 'connect_good': 0,
1131 'connect_renegotiate': 0,
1132 'accept': 0,
1133 'accept_good': 0,
1134 'accept_renegotiate': 0,
1135 'hits': 0,
1136 'misses': 0,
1137 'timeouts': 0,
1138 'cache_full': 0,
1139 })
1140
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001141 def test_set_default_verify_paths(self):
1142 # There's not much we can do to test that it acts as expected,
1143 # so just check it doesn't crash or raise an exception.
1144 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1145 ctx.set_default_verify_paths()
1146
Antoine Pitrou501da612011-12-21 09:27:41 +01001147 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001148 def test_set_ecdh_curve(self):
1149 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1150 ctx.set_ecdh_curve("prime256v1")
1151 ctx.set_ecdh_curve(b"prime256v1")
1152 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1153 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1154 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1155 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1156
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001157 @needs_sni
1158 def test_sni_callback(self):
1159 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1160
1161 # set_servername_callback expects a callable, or None
1162 self.assertRaises(TypeError, ctx.set_servername_callback)
1163 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1164 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1165 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1166
1167 def dummycallback(sock, servername, ctx):
1168 pass
1169 ctx.set_servername_callback(None)
1170 ctx.set_servername_callback(dummycallback)
1171
1172 @needs_sni
1173 def test_sni_callback_refcycle(self):
1174 # Reference cycles through the servername callback are detected
1175 # and cleared.
1176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1177 def dummycallback(sock, servername, ctx, cycle=ctx):
1178 pass
1179 ctx.set_servername_callback(dummycallback)
1180 wr = weakref.ref(ctx)
1181 del ctx, dummycallback
1182 gc.collect()
1183 self.assertIs(wr(), None)
1184
Christian Heimes9a5395a2013-06-17 15:44:12 +02001185 def test_cert_store_stats(self):
1186 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1187 self.assertEqual(ctx.cert_store_stats(),
1188 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1189 ctx.load_cert_chain(CERTFILE)
1190 self.assertEqual(ctx.cert_store_stats(),
1191 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1192 ctx.load_verify_locations(CERTFILE)
1193 self.assertEqual(ctx.cert_store_stats(),
1194 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001195 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001196 self.assertEqual(ctx.cert_store_stats(),
1197 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1198
1199 def test_get_ca_certs(self):
1200 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1201 self.assertEqual(ctx.get_ca_certs(), [])
1202 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1203 ctx.load_verify_locations(CERTFILE)
1204 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001205 # but CAFILE_CACERT is a CA cert
1206 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001207 self.assertEqual(ctx.get_ca_certs(),
1208 [{'issuer': ((('organizationName', 'Root CA'),),
1209 (('organizationalUnitName', 'http://www.cacert.org'),),
1210 (('commonName', 'CA Cert Signing Authority'),),
1211 (('emailAddress', 'support@cacert.org'),)),
1212 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1213 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1214 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001215 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001216 'subject': ((('organizationName', 'Root CA'),),
1217 (('organizationalUnitName', 'http://www.cacert.org'),),
1218 (('commonName', 'CA Cert Signing Authority'),),
1219 (('emailAddress', 'support@cacert.org'),)),
1220 'version': 3}])
1221
Martin Panterb55f8b72016-01-14 12:53:56 +00001222 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001223 pem = f.read()
1224 der = ssl.PEM_cert_to_DER_cert(pem)
1225 self.assertEqual(ctx.get_ca_certs(True), [der])
1226
Christian Heimes72d28502013-11-23 13:56:58 +01001227 def test_load_default_certs(self):
1228 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1229 ctx.load_default_certs()
1230
1231 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1232 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1233 ctx.load_default_certs()
1234
1235 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1236 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1237
1238 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1239 self.assertRaises(TypeError, ctx.load_default_certs, None)
1240 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1241
Benjamin Peterson91244e02014-10-03 18:17:15 -04001242 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001243 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001244 def test_load_default_certs_env(self):
1245 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1246 with support.EnvironmentVarGuard() as env:
1247 env["SSL_CERT_DIR"] = CAPATH
1248 env["SSL_CERT_FILE"] = CERTFILE
1249 ctx.load_default_certs()
1250 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1251
Benjamin Peterson91244e02014-10-03 18:17:15 -04001252 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001253 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001254 def test_load_default_certs_env_windows(self):
1255 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1256 ctx.load_default_certs()
1257 stats = ctx.cert_store_stats()
1258
1259 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1260 with support.EnvironmentVarGuard() as env:
1261 env["SSL_CERT_DIR"] = CAPATH
1262 env["SSL_CERT_FILE"] = CERTFILE
1263 ctx.load_default_certs()
1264 stats["x509"] += 1
1265 self.assertEqual(ctx.cert_store_stats(), stats)
1266
Christian Heimes358cfd42016-09-10 22:43:48 +02001267 def _assert_context_options(self, ctx):
1268 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1269 if OP_NO_COMPRESSION != 0:
1270 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1271 OP_NO_COMPRESSION)
1272 if OP_SINGLE_DH_USE != 0:
1273 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1274 OP_SINGLE_DH_USE)
1275 if OP_SINGLE_ECDH_USE != 0:
1276 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1277 OP_SINGLE_ECDH_USE)
1278 if OP_CIPHER_SERVER_PREFERENCE != 0:
1279 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1280 OP_CIPHER_SERVER_PREFERENCE)
1281
Christian Heimes4c05b472013-11-23 15:58:30 +01001282 def test_create_default_context(self):
1283 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001284
Donald Stufft6a2ba942014-03-23 19:05:28 -04001285 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001286 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001287 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001288 self._assert_context_options(ctx)
1289
Christian Heimes4c05b472013-11-23 15:58:30 +01001290
1291 with open(SIGNING_CA) as f:
1292 cadata = f.read()
1293 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1294 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001295 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001296 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001297 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001298
1299 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001300 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001301 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001302 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001303
Christian Heimes67986f92013-11-23 22:43:47 +01001304 def test__create_stdlib_context(self):
1305 ctx = ssl._create_stdlib_context()
1306 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1307 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001308 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001309 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001310
1311 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1312 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1313 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001314 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001315
1316 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001317 cert_reqs=ssl.CERT_REQUIRED,
1318 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001319 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1320 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001321 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001322 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001323
1324 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1325 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1326 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001327 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001328
Christian Heimes1aa9a752013-12-02 02:41:19 +01001329 def test_check_hostname(self):
1330 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1331 self.assertFalse(ctx.check_hostname)
1332
1333 # Requires CERT_REQUIRED or CERT_OPTIONAL
1334 with self.assertRaises(ValueError):
1335 ctx.check_hostname = True
1336 ctx.verify_mode = ssl.CERT_REQUIRED
1337 self.assertFalse(ctx.check_hostname)
1338 ctx.check_hostname = True
1339 self.assertTrue(ctx.check_hostname)
1340
1341 ctx.verify_mode = ssl.CERT_OPTIONAL
1342 ctx.check_hostname = True
1343 self.assertTrue(ctx.check_hostname)
1344
1345 # Cannot set CERT_NONE with check_hostname enabled
1346 with self.assertRaises(ValueError):
1347 ctx.verify_mode = ssl.CERT_NONE
1348 ctx.check_hostname = False
1349 self.assertFalse(ctx.check_hostname)
1350
Christian Heimes5fe668c2016-09-12 00:01:11 +02001351 def test_context_client_server(self):
1352 # PROTOCOL_TLS_CLIENT has sane defaults
1353 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1354 self.assertTrue(ctx.check_hostname)
1355 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1356
1357 # PROTOCOL_TLS_SERVER has different but also sane defaults
1358 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1359 self.assertFalse(ctx.check_hostname)
1360 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1361
Antoine Pitrou152efa22010-05-16 18:19:27 +00001362
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001363class SSLErrorTests(unittest.TestCase):
1364
1365 def test_str(self):
1366 # The str() of a SSLError doesn't include the errno
1367 e = ssl.SSLError(1, "foo")
1368 self.assertEqual(str(e), "foo")
1369 self.assertEqual(e.errno, 1)
1370 # Same for a subclass
1371 e = ssl.SSLZeroReturnError(1, "foo")
1372 self.assertEqual(str(e), "foo")
1373 self.assertEqual(e.errno, 1)
1374
1375 def test_lib_reason(self):
1376 # Test the library and reason attributes
1377 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1378 with self.assertRaises(ssl.SSLError) as cm:
1379 ctx.load_dh_params(CERTFILE)
1380 self.assertEqual(cm.exception.library, 'PEM')
1381 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1382 s = str(cm.exception)
1383 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1384
1385 def test_subclass(self):
1386 # Check that the appropriate SSLError subclass is raised
1387 # (this only tests one of them)
1388 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1389 with socket.socket() as s:
1390 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001391 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001392 c = socket.socket()
1393 c.connect(s.getsockname())
1394 c.setblocking(False)
1395 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001396 with self.assertRaises(ssl.SSLWantReadError) as cm:
1397 c.do_handshake()
1398 s = str(cm.exception)
1399 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1400 # For compatibility
1401 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1402
Nathaniel J. Smith59fdf0f2017-06-09 02:35:16 -07001403 def test_bad_idna_in_server_hostname(self):
1404 # Note: this test is testing some code that probably shouldn't exist
1405 # in the first place, so if it starts failing at some point because
1406 # you made the ssl module stop doing IDNA decoding then please feel
1407 # free to remove it. The test was mainly added because this case used
1408 # to cause memory corruption (see bpo-30594).
1409 ctx = ssl.create_default_context()
1410 with self.assertRaises(UnicodeError):
1411 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1412 server_hostname="xn--.com")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001413
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001414class MemoryBIOTests(unittest.TestCase):
1415
1416 def test_read_write(self):
1417 bio = ssl.MemoryBIO()
1418 bio.write(b'foo')
1419 self.assertEqual(bio.read(), b'foo')
1420 self.assertEqual(bio.read(), b'')
1421 bio.write(b'foo')
1422 bio.write(b'bar')
1423 self.assertEqual(bio.read(), b'foobar')
1424 self.assertEqual(bio.read(), b'')
1425 bio.write(b'baz')
1426 self.assertEqual(bio.read(2), b'ba')
1427 self.assertEqual(bio.read(1), b'z')
1428 self.assertEqual(bio.read(1), b'')
1429
1430 def test_eof(self):
1431 bio = ssl.MemoryBIO()
1432 self.assertFalse(bio.eof)
1433 self.assertEqual(bio.read(), b'')
1434 self.assertFalse(bio.eof)
1435 bio.write(b'foo')
1436 self.assertFalse(bio.eof)
1437 bio.write_eof()
1438 self.assertFalse(bio.eof)
1439 self.assertEqual(bio.read(2), b'fo')
1440 self.assertFalse(bio.eof)
1441 self.assertEqual(bio.read(1), b'o')
1442 self.assertTrue(bio.eof)
1443 self.assertEqual(bio.read(), b'')
1444 self.assertTrue(bio.eof)
1445
1446 def test_pending(self):
1447 bio = ssl.MemoryBIO()
1448 self.assertEqual(bio.pending, 0)
1449 bio.write(b'foo')
1450 self.assertEqual(bio.pending, 3)
1451 for i in range(3):
1452 bio.read(1)
1453 self.assertEqual(bio.pending, 3-i-1)
1454 for i in range(3):
1455 bio.write(b'x')
1456 self.assertEqual(bio.pending, i+1)
1457 bio.read()
1458 self.assertEqual(bio.pending, 0)
1459
1460 def test_buffer_types(self):
1461 bio = ssl.MemoryBIO()
1462 bio.write(b'foo')
1463 self.assertEqual(bio.read(), b'foo')
1464 bio.write(bytearray(b'bar'))
1465 self.assertEqual(bio.read(), b'bar')
1466 bio.write(memoryview(b'baz'))
1467 self.assertEqual(bio.read(), b'baz')
1468
1469 def test_error_types(self):
1470 bio = ssl.MemoryBIO()
1471 self.assertRaises(TypeError, bio.write, 'foo')
1472 self.assertRaises(TypeError, bio.write, None)
1473 self.assertRaises(TypeError, bio.write, True)
1474 self.assertRaises(TypeError, bio.write, 1)
1475
1476
Martin Panter3840b2a2016-03-27 01:53:46 +00001477class SimpleBackgroundTests(unittest.TestCase):
1478
1479 """Tests that connect to a simple server running in the background"""
1480
1481 def setUp(self):
1482 server = ThreadedEchoServer(SIGNED_CERTFILE)
1483 self.server_addr = (HOST, server.port)
1484 server.__enter__()
1485 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001486
Antoine Pitrou480a1242010-04-28 21:37:09 +00001487 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001488 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001489 cert_reqs=ssl.CERT_NONE) as s:
1490 s.connect(self.server_addr)
1491 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001492 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001493
Martin Panter3840b2a2016-03-27 01:53:46 +00001494 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001495 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001496 cert_reqs=ssl.CERT_REQUIRED,
1497 ca_certs=SIGNING_CA) as s:
1498 s.connect(self.server_addr)
1499 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001500 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001501
Martin Panter3840b2a2016-03-27 01:53:46 +00001502 def test_connect_fail(self):
1503 # This should fail because we have no verification certs. Connection
1504 # failure crashes ThreadedEchoServer, so run this in an independent
1505 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001506 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001507 cert_reqs=ssl.CERT_REQUIRED)
1508 self.addCleanup(s.close)
1509 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1510 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001511
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001512 def test_connect_ex(self):
1513 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001514 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001515 cert_reqs=ssl.CERT_REQUIRED,
1516 ca_certs=SIGNING_CA)
1517 self.addCleanup(s.close)
1518 self.assertEqual(0, s.connect_ex(self.server_addr))
1519 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001520
1521 def test_non_blocking_connect_ex(self):
1522 # Issue #11326: non-blocking connect_ex() should allow handshake
1523 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001524 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001525 cert_reqs=ssl.CERT_REQUIRED,
1526 ca_certs=SIGNING_CA,
1527 do_handshake_on_connect=False)
1528 self.addCleanup(s.close)
1529 s.setblocking(False)
1530 rc = s.connect_ex(self.server_addr)
1531 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1532 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1533 # Wait for connect to finish
1534 select.select([], [s], [], 5.0)
1535 # Non-blocking handshake
1536 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001537 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001538 s.do_handshake()
1539 break
1540 except ssl.SSLWantReadError:
1541 select.select([s], [], [], 5.0)
1542 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001543 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001544 # SSL established
1545 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001546
Antoine Pitrou152efa22010-05-16 18:19:27 +00001547 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001548 # Same as test_connect, but with a separately created context
1549 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1550 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1551 s.connect(self.server_addr)
1552 self.assertEqual({}, s.getpeercert())
1553 # Same with a server hostname
1554 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1555 server_hostname="dummy") as s:
1556 s.connect(self.server_addr)
1557 ctx.verify_mode = ssl.CERT_REQUIRED
1558 # This should succeed because we specify the root cert
1559 ctx.load_verify_locations(SIGNING_CA)
1560 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1561 s.connect(self.server_addr)
1562 cert = s.getpeercert()
1563 self.assertTrue(cert)
1564
1565 def test_connect_with_context_fail(self):
1566 # This should fail because we have no verification certs. Connection
1567 # failure crashes ThreadedEchoServer, so run this in an independent
1568 # test method.
1569 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1570 ctx.verify_mode = ssl.CERT_REQUIRED
1571 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1572 self.addCleanup(s.close)
1573 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1574 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001575
1576 def test_connect_capath(self):
1577 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001578 # NOTE: the subject hashing algorithm has been changed between
1579 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1580 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001581 # filename) for this test to be portable across OpenSSL releases.
Martin Panter3840b2a2016-03-27 01:53:46 +00001582 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1583 ctx.verify_mode = ssl.CERT_REQUIRED
1584 ctx.load_verify_locations(capath=CAPATH)
1585 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1586 s.connect(self.server_addr)
1587 cert = s.getpeercert()
1588 self.assertTrue(cert)
1589 # Same with a bytes `capath` argument
1590 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1591 ctx.verify_mode = ssl.CERT_REQUIRED
1592 ctx.load_verify_locations(capath=BYTES_CAPATH)
1593 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1594 s.connect(self.server_addr)
1595 cert = s.getpeercert()
1596 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001597
Christian Heimesefff7062013-11-21 03:35:02 +01001598 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001599 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001600 pem = f.read()
1601 der = ssl.PEM_cert_to_DER_cert(pem)
Martin Panter3840b2a2016-03-27 01:53:46 +00001602 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1603 ctx.verify_mode = ssl.CERT_REQUIRED
1604 ctx.load_verify_locations(cadata=pem)
1605 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1606 s.connect(self.server_addr)
1607 cert = s.getpeercert()
1608 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001609
Martin Panter3840b2a2016-03-27 01:53:46 +00001610 # same with DER
1611 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1612 ctx.verify_mode = ssl.CERT_REQUIRED
1613 ctx.load_verify_locations(cadata=der)
1614 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1615 s.connect(self.server_addr)
1616 cert = s.getpeercert()
1617 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001618
Antoine Pitroue3220242010-04-24 11:13:53 +00001619 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1620 def test_makefile_close(self):
1621 # Issue #5238: creating a file-like object with makefile() shouldn't
1622 # delay closing the underlying "real socket" (here tested with its
1623 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001624 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001625 ss.connect(self.server_addr)
1626 fd = ss.fileno()
1627 f = ss.makefile()
1628 f.close()
1629 # The fd is still open
1630 os.read(fd, 0)
1631 # Closing the SSL socket should close the fd too
1632 ss.close()
1633 gc.collect()
1634 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001635 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001636 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001637
Antoine Pitrou480a1242010-04-28 21:37:09 +00001638 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001639 s = socket.socket(socket.AF_INET)
1640 s.connect(self.server_addr)
1641 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001642 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001643 cert_reqs=ssl.CERT_NONE,
1644 do_handshake_on_connect=False)
1645 self.addCleanup(s.close)
1646 count = 0
1647 while True:
1648 try:
1649 count += 1
1650 s.do_handshake()
1651 break
1652 except ssl.SSLWantReadError:
1653 select.select([s], [], [])
1654 except ssl.SSLWantWriteError:
1655 select.select([], [s], [])
1656 if support.verbose:
1657 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001658
Antoine Pitrou480a1242010-04-28 21:37:09 +00001659 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001660 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001661
Martin Panter3840b2a2016-03-27 01:53:46 +00001662 def test_get_server_certificate_fail(self):
1663 # Connection failure crashes ThreadedEchoServer, so run this in an
1664 # independent test method
1665 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001666
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001667 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001668 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001669 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1670 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001671 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001672 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1673 s.connect(self.server_addr)
1674 # Error checking can happen at instantiation or when connecting
1675 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1676 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001677 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001678 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1679 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001680
Christian Heimes9a5395a2013-06-17 15:44:12 +02001681 def test_get_ca_certs_capath(self):
1682 # capath certs are loaded on request
Martin Panter3840b2a2016-03-27 01:53:46 +00001683 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1684 ctx.verify_mode = ssl.CERT_REQUIRED
1685 ctx.load_verify_locations(capath=CAPATH)
1686 self.assertEqual(ctx.get_ca_certs(), [])
1687 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1688 s.connect(self.server_addr)
1689 cert = s.getpeercert()
1690 self.assertTrue(cert)
1691 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001692
Christian Heimes575596e2013-12-15 21:49:17 +01001693 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001694 def test_context_setget(self):
1695 # Check that the context of a connected socket can be replaced.
Martin Panter3840b2a2016-03-27 01:53:46 +00001696 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1697 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1698 s = socket.socket(socket.AF_INET)
1699 with ctx1.wrap_socket(s) as ss:
1700 ss.connect(self.server_addr)
1701 self.assertIs(ss.context, ctx1)
1702 self.assertIs(ss._sslobj.context, ctx1)
1703 ss.context = ctx2
1704 self.assertIs(ss.context, ctx2)
1705 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001706
1707 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1708 # A simple IO loop. Call func(*args) depending on the error we get
1709 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1710 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001711 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001712 count = 0
1713 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001714 if time.monotonic() > deadline:
1715 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001716 errno = None
1717 count += 1
1718 try:
1719 ret = func(*args)
1720 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001721 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001722 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001723 raise
1724 errno = e.errno
1725 # Get any data from the outgoing BIO irrespective of any error, and
1726 # send it to the socket.
1727 buf = outgoing.read()
1728 sock.sendall(buf)
1729 # If there's no error, we're done. For WANT_READ, we need to get
1730 # data from the socket and put it in the incoming BIO.
1731 if errno is None:
1732 break
1733 elif errno == ssl.SSL_ERROR_WANT_READ:
1734 buf = sock.recv(32768)
1735 if buf:
1736 incoming.write(buf)
1737 else:
1738 incoming.write_eof()
1739 if support.verbose:
1740 sys.stdout.write("Needed %d calls to complete %s().\n"
1741 % (count, func.__name__))
1742 return ret
1743
Martin Panter3840b2a2016-03-27 01:53:46 +00001744 def test_bio_handshake(self):
1745 sock = socket.socket(socket.AF_INET)
1746 self.addCleanup(sock.close)
1747 sock.connect(self.server_addr)
1748 incoming = ssl.MemoryBIO()
1749 outgoing = ssl.MemoryBIO()
1750 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1751 ctx.verify_mode = ssl.CERT_REQUIRED
1752 ctx.load_verify_locations(SIGNING_CA)
1753 ctx.check_hostname = True
1754 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
1755 self.assertIs(sslobj._sslobj.owner, sslobj)
1756 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07001757 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02001758 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001759 self.assertRaises(ValueError, sslobj.getpeercert)
1760 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1761 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1762 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1763 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001764 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07001765 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00001766 self.assertTrue(sslobj.getpeercert())
1767 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1768 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1769 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001770 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001771 except ssl.SSLSyscallError:
1772 # If the server shuts down the TCP connection without sending a
1773 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1774 pass
1775 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1776
1777 def test_bio_read_write_data(self):
1778 sock = socket.socket(socket.AF_INET)
1779 self.addCleanup(sock.close)
1780 sock.connect(self.server_addr)
1781 incoming = ssl.MemoryBIO()
1782 outgoing = ssl.MemoryBIO()
1783 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1784 ctx.verify_mode = ssl.CERT_NONE
1785 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1786 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1787 req = b'FOO\n'
1788 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1789 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1790 self.assertEqual(buf, b'foo\n')
1791 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001792
1793
Martin Panter3840b2a2016-03-27 01:53:46 +00001794class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001795
Martin Panter3840b2a2016-03-27 01:53:46 +00001796 def test_timeout_connect_ex(self):
1797 # Issue #12065: on a timeout, connect_ex() should return the original
1798 # errno (mimicking the behaviour of non-SSL sockets).
1799 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02001800 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001801 cert_reqs=ssl.CERT_REQUIRED,
1802 do_handshake_on_connect=False)
1803 self.addCleanup(s.close)
1804 s.settimeout(0.0000001)
1805 rc = s.connect_ex((REMOTE_HOST, 443))
1806 if rc == 0:
1807 self.skipTest("REMOTE_HOST responded too quickly")
1808 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1809
1810 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1811 def test_get_server_certificate_ipv6(self):
1812 with support.transient_internet('ipv6.google.com'):
1813 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1814 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1815
Martin Panter3840b2a2016-03-27 01:53:46 +00001816
1817def _test_get_server_certificate(test, host, port, cert=None):
1818 pem = ssl.get_server_certificate((host, port))
1819 if not pem:
1820 test.fail("No server certificate on %s:%s!" % (host, port))
1821
1822 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1823 if not pem:
1824 test.fail("No server certificate on %s:%s!" % (host, port))
1825 if support.verbose:
1826 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1827
1828def _test_get_server_certificate_fail(test, host, port):
1829 try:
1830 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1831 except ssl.SSLError as x:
1832 #should fail
1833 if support.verbose:
1834 sys.stdout.write("%s\n" % x)
1835 else:
1836 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1837
1838
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001839from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001840
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001841class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001842
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001843 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001844
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001845 """A mildly complicated class, because we want it to work both
1846 with and without the SSL wrapper around the socket connection, so
1847 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001848
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001849 def __init__(self, server, connsock, addr):
1850 self.server = server
1851 self.running = False
1852 self.sock = connsock
1853 self.addr = addr
1854 self.sock.setblocking(1)
1855 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001856 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001857 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001858
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001859 def wrap_conn(self):
1860 try:
1861 self.sslconn = self.server.context.wrap_socket(
1862 self.sock, server_side=True)
1863 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1864 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
1865 except (ssl.SSLError, ConnectionResetError, OSError) as e:
1866 # We treat ConnectionResetError as though it were an
1867 # SSLError - OpenSSL on Ubuntu abruptly closes the
1868 # connection when asked to use an unsupported protocol.
1869 #
1870 # OSError may occur with wrong protocols, e.g. both
1871 # sides use PROTOCOL_TLS_SERVER.
1872 #
1873 # XXX Various errors can have happened here, for example
1874 # a mismatching protocol version, an invalid certificate,
1875 # or a low-level bug. This should be made more discriminating.
1876 #
1877 # bpo-31323: Store the exception as string to prevent
1878 # a reference leak: server -> conn_errors -> exception
1879 # -> traceback -> self (ConnectionHandler) -> server
1880 self.server.conn_errors.append(str(e))
1881 if self.server.chatty:
1882 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
1883 self.running = False
1884 self.server.stop()
1885 self.close()
1886 return False
1887 else:
1888 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
1889 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1890 cert = self.sslconn.getpeercert()
1891 if support.verbose and self.server.chatty:
1892 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1893 cert_binary = self.sslconn.getpeercert(True)
1894 if support.verbose and self.server.chatty:
1895 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1896 cipher = self.sslconn.cipher()
1897 if support.verbose and self.server.chatty:
1898 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1899 sys.stdout.write(" server: selected protocol is now "
1900 + str(self.sslconn.selected_npn_protocol()) + "\n")
1901 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001902
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001903 def read(self):
1904 if self.sslconn:
1905 return self.sslconn.read()
1906 else:
1907 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001908
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001909 def write(self, bytes):
1910 if self.sslconn:
1911 return self.sslconn.write(bytes)
1912 else:
1913 return self.sock.send(bytes)
1914
1915 def close(self):
1916 if self.sslconn:
1917 self.sslconn.close()
1918 else:
1919 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001920
Antoine Pitrou480a1242010-04-28 21:37:09 +00001921 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001922 self.running = True
1923 if not self.server.starttls_server:
1924 if not self.wrap_conn():
1925 return
1926 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001927 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001928 msg = self.read()
1929 stripped = msg.strip()
1930 if not stripped:
1931 # eof, so quit this handler
1932 self.running = False
1933 try:
1934 self.sock = self.sslconn.unwrap()
1935 except OSError:
1936 # Many tests shut the TCP connection down
1937 # without an SSL shutdown. This causes
1938 # unwrap() to raise OSError with errno=0!
1939 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001940 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001941 self.sslconn = None
1942 self.close()
1943 elif stripped == b'over':
1944 if support.verbose and self.server.connectionchatty:
1945 sys.stdout.write(" server: client closed connection\n")
1946 self.close()
1947 return
1948 elif (self.server.starttls_server and
1949 stripped == b'STARTTLS'):
1950 if support.verbose and self.server.connectionchatty:
1951 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
1952 self.write(b"OK\n")
1953 if not self.wrap_conn():
1954 return
1955 elif (self.server.starttls_server and self.sslconn
1956 and stripped == b'ENDTLS'):
1957 if support.verbose and self.server.connectionchatty:
1958 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
1959 self.write(b"OK\n")
1960 self.sock = self.sslconn.unwrap()
1961 self.sslconn = None
1962 if support.verbose and self.server.connectionchatty:
1963 sys.stdout.write(" server: connection is now unencrypted...\n")
1964 elif stripped == b'CB tls-unique':
1965 if support.verbose and self.server.connectionchatty:
1966 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1967 data = self.sslconn.get_channel_binding("tls-unique")
1968 self.write(repr(data).encode("us-ascii") + b"\n")
1969 else:
1970 if (support.verbose and
1971 self.server.connectionchatty):
1972 ctype = (self.sslconn and "encrypted") or "unencrypted"
1973 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1974 % (msg, ctype, msg.lower(), ctype))
1975 self.write(msg.lower())
1976 except OSError:
1977 if self.server.chatty:
1978 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00001979 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001980 self.running = False
1981 # normally, we'd just stop here, but for the test
1982 # harness, we want to stop the server
1983 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001984
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001985 def __init__(self, certificate=None, ssl_version=None,
1986 certreqs=None, cacerts=None,
1987 chatty=True, connectionchatty=False, starttls_server=False,
1988 npn_protocols=None, alpn_protocols=None,
1989 ciphers=None, context=None):
1990 if context:
1991 self.context = context
1992 else:
1993 self.context = ssl.SSLContext(ssl_version
1994 if ssl_version is not None
1995 else ssl.PROTOCOL_TLSv1)
1996 self.context.verify_mode = (certreqs if certreqs is not None
1997 else ssl.CERT_NONE)
1998 if cacerts:
1999 self.context.load_verify_locations(cacerts)
2000 if certificate:
2001 self.context.load_cert_chain(certificate)
2002 if npn_protocols:
2003 self.context.set_npn_protocols(npn_protocols)
2004 if alpn_protocols:
2005 self.context.set_alpn_protocols(alpn_protocols)
2006 if ciphers:
2007 self.context.set_ciphers(ciphers)
2008 self.chatty = chatty
2009 self.connectionchatty = connectionchatty
2010 self.starttls_server = starttls_server
2011 self.sock = socket.socket()
2012 self.port = support.bind_port(self.sock)
2013 self.flag = None
2014 self.active = False
2015 self.selected_npn_protocols = []
2016 self.selected_alpn_protocols = []
2017 self.shared_ciphers = []
2018 self.conn_errors = []
2019 threading.Thread.__init__(self)
2020 self.daemon = True
2021
2022 def __enter__(self):
2023 self.start(threading.Event())
2024 self.flag.wait()
2025 return self
2026
2027 def __exit__(self, *args):
2028 self.stop()
2029 self.join()
2030
2031 def start(self, flag=None):
2032 self.flag = flag
2033 threading.Thread.start(self)
2034
2035 def run(self):
2036 self.sock.settimeout(0.05)
2037 self.sock.listen()
2038 self.active = True
2039 if self.flag:
2040 # signal an event
2041 self.flag.set()
2042 while self.active:
2043 try:
2044 newconn, connaddr = self.sock.accept()
2045 if support.verbose and self.chatty:
2046 sys.stdout.write(' server: new connection from '
2047 + repr(connaddr) + '\n')
2048 handler = self.ConnectionHandler(self, newconn, connaddr)
2049 handler.start()
2050 handler.join()
2051 except socket.timeout:
2052 pass
2053 except KeyboardInterrupt:
2054 self.stop()
2055 self.sock.close()
2056
2057 def stop(self):
2058 self.active = False
2059
2060class AsyncoreEchoServer(threading.Thread):
2061
2062 # this one's based on asyncore.dispatcher
2063
2064 class EchoServer (asyncore.dispatcher):
2065
2066 class ConnectionHandler(asyncore.dispatcher_with_send):
2067
2068 def __init__(self, conn, certfile):
2069 self.socket = test_wrap_socket(conn, server_side=True,
2070 certfile=certfile,
2071 do_handshake_on_connect=False)
2072 asyncore.dispatcher_with_send.__init__(self, self.socket)
2073 self._ssl_accepting = True
2074 self._do_ssl_handshake()
2075
2076 def readable(self):
2077 if isinstance(self.socket, ssl.SSLSocket):
2078 while self.socket.pending() > 0:
2079 self.handle_read_event()
2080 return True
2081
2082 def _do_ssl_handshake(self):
2083 try:
2084 self.socket.do_handshake()
2085 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2086 return
2087 except ssl.SSLEOFError:
2088 return self.handle_close()
2089 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002090 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002091 except OSError as err:
2092 if err.args[0] == errno.ECONNABORTED:
2093 return self.handle_close()
2094 else:
2095 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002096
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002097 def handle_read(self):
2098 if self._ssl_accepting:
2099 self._do_ssl_handshake()
2100 else:
2101 data = self.recv(1024)
2102 if support.verbose:
2103 sys.stdout.write(" server: read %s from client\n" % repr(data))
2104 if not data:
2105 self.close()
2106 else:
2107 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002108
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002109 def handle_close(self):
2110 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002111 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002112 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002113
2114 def handle_error(self):
2115 raise
2116
Trent Nelson78520002008-04-10 20:54:35 +00002117 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002118 self.certfile = certfile
2119 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2120 self.port = support.bind_port(sock, '')
2121 asyncore.dispatcher.__init__(self, sock)
2122 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002123
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002124 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002125 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002126 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2127 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002128
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002129 def handle_error(self):
2130 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002131
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002132 def __init__(self, certfile):
2133 self.flag = None
2134 self.active = False
2135 self.server = self.EchoServer(certfile)
2136 self.port = self.server.port
2137 threading.Thread.__init__(self)
2138 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002139
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002140 def __str__(self):
2141 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002142
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002143 def __enter__(self):
2144 self.start(threading.Event())
2145 self.flag.wait()
2146 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002147
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002148 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002149 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002150 sys.stdout.write(" cleanup: stopping server.\n")
2151 self.stop()
2152 if support.verbose:
2153 sys.stdout.write(" cleanup: joining server thread.\n")
2154 self.join()
2155 if support.verbose:
2156 sys.stdout.write(" cleanup: successfully joined.\n")
2157 # make sure that ConnectionHandler is removed from socket_map
2158 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002159
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002160 def start (self, flag=None):
2161 self.flag = flag
2162 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002163
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002164 def run(self):
2165 self.active = True
2166 if self.flag:
2167 self.flag.set()
2168 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002169 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002170 asyncore.loop(1)
2171 except:
2172 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002173
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002174 def stop(self):
2175 self.active = False
2176 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002177
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002178def server_params_test(client_context, server_context, indata=b"FOO\n",
2179 chatty=True, connectionchatty=False, sni_name=None,
2180 session=None):
2181 """
2182 Launch a server, connect a client to it and try various reads
2183 and writes.
2184 """
2185 stats = {}
2186 server = ThreadedEchoServer(context=server_context,
2187 chatty=chatty,
2188 connectionchatty=False)
2189 with server:
2190 with client_context.wrap_socket(socket.socket(),
2191 server_hostname=sni_name, session=session) as s:
2192 s.connect((HOST, server.port))
2193 for arg in [indata, bytearray(indata), memoryview(indata)]:
2194 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002195 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002196 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002197 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002198 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002199 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002200 if connectionchatty:
2201 if support.verbose:
2202 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002203 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002204 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002205 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2206 % (outdata[:20], len(outdata),
2207 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002208 s.write(b"over\n")
2209 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002210 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002211 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002212 stats.update({
2213 'compression': s.compression(),
2214 'cipher': s.cipher(),
2215 'peercert': s.getpeercert(),
2216 'client_alpn_protocol': s.selected_alpn_protocol(),
2217 'client_npn_protocol': s.selected_npn_protocol(),
2218 'version': s.version(),
2219 'session_reused': s.session_reused,
2220 'session': s.session,
2221 })
2222 s.close()
2223 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2224 stats['server_npn_protocols'] = server.selected_npn_protocols
2225 stats['server_shared_ciphers'] = server.shared_ciphers
2226 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002227
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002228def try_protocol_combo(server_protocol, client_protocol, expect_success,
2229 certsreqs=None, server_options=0, client_options=0):
2230 """
2231 Try to SSL-connect using *client_protocol* to *server_protocol*.
2232 If *expect_success* is true, assert that the connection succeeds,
2233 if it's false, assert that the connection fails.
2234 Also, if *expect_success* is a string, assert that it is the protocol
2235 version actually used by the connection.
2236 """
2237 if certsreqs is None:
2238 certsreqs = ssl.CERT_NONE
2239 certtype = {
2240 ssl.CERT_NONE: "CERT_NONE",
2241 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2242 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2243 }[certsreqs]
2244 if support.verbose:
2245 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2246 sys.stdout.write(formatstr %
2247 (ssl.get_protocol_name(client_protocol),
2248 ssl.get_protocol_name(server_protocol),
2249 certtype))
2250 client_context = ssl.SSLContext(client_protocol)
2251 client_context.options |= client_options
2252 server_context = ssl.SSLContext(server_protocol)
2253 server_context.options |= server_options
2254
2255 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2256 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2257 # starting from OpenSSL 1.0.0 (see issue #8322).
2258 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2259 client_context.set_ciphers("ALL")
2260
2261 for ctx in (client_context, server_context):
2262 ctx.verify_mode = certsreqs
2263 ctx.load_cert_chain(CERTFILE)
2264 ctx.load_verify_locations(CERTFILE)
2265 try:
2266 stats = server_params_test(client_context, server_context,
2267 chatty=False, connectionchatty=False)
2268 # Protocol mismatch can result in either an SSLError, or a
2269 # "Connection reset by peer" error.
2270 except ssl.SSLError:
2271 if expect_success:
2272 raise
2273 except OSError as e:
2274 if expect_success or e.errno != errno.ECONNRESET:
2275 raise
2276 else:
2277 if not expect_success:
2278 raise AssertionError(
2279 "Client protocol %s succeeded with server protocol %s!"
2280 % (ssl.get_protocol_name(client_protocol),
2281 ssl.get_protocol_name(server_protocol)))
2282 elif (expect_success is not True
2283 and expect_success != stats['version']):
2284 raise AssertionError("version mismatch: expected %r, got %r"
2285 % (expect_success, stats['version']))
2286
2287
2288class ThreadedTests(unittest.TestCase):
2289
2290 @skip_if_broken_ubuntu_ssl
2291 def test_echo(self):
2292 """Basic test of an SSL client connecting to a server"""
2293 if support.verbose:
2294 sys.stdout.write("\n")
2295 for protocol in PROTOCOLS:
2296 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2297 continue
2298 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2299 context = ssl.SSLContext(protocol)
2300 context.load_cert_chain(CERTFILE)
2301 server_params_test(context, context,
2302 chatty=True, connectionchatty=True)
2303
2304 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2305 client_context.load_verify_locations(SIGNING_CA)
2306 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2307 # server_context.load_verify_locations(SIGNING_CA)
2308 server_context.load_cert_chain(SIGNED_CERTFILE2)
2309
2310 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2311 server_params_test(client_context=client_context,
2312 server_context=server_context,
2313 chatty=True, connectionchatty=True,
2314 sni_name='fakehostname')
2315
2316 client_context.check_hostname = False
2317 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2318 with self.assertRaises(ssl.SSLError) as e:
2319 server_params_test(client_context=server_context,
2320 server_context=client_context,
2321 chatty=True, connectionchatty=True,
2322 sni_name='fakehostname')
2323 self.assertIn('called a function you should not call',
2324 str(e.exception))
2325
2326 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2327 with self.assertRaises(ssl.SSLError) as e:
2328 server_params_test(client_context=server_context,
2329 server_context=server_context,
2330 chatty=True, connectionchatty=True)
2331 self.assertIn('called a function you should not call',
2332 str(e.exception))
2333
2334 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2335 with self.assertRaises(ssl.SSLError) as e:
2336 server_params_test(client_context=server_context,
2337 server_context=client_context,
2338 chatty=True, connectionchatty=True)
2339 self.assertIn('called a function you should not call',
2340 str(e.exception))
2341
2342
2343 def test_getpeercert(self):
2344 if support.verbose:
2345 sys.stdout.write("\n")
2346 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2347 context.verify_mode = ssl.CERT_REQUIRED
2348 context.load_verify_locations(CERTFILE)
2349 context.load_cert_chain(CERTFILE)
2350 server = ThreadedEchoServer(context=context, chatty=False)
2351 with server:
2352 s = context.wrap_socket(socket.socket(),
2353 do_handshake_on_connect=False)
2354 s.connect((HOST, server.port))
2355 # getpeercert() raise ValueError while the handshake isn't
2356 # done.
2357 with self.assertRaises(ValueError):
2358 s.getpeercert()
2359 s.do_handshake()
2360 cert = s.getpeercert()
2361 self.assertTrue(cert, "Can't get peer certificate.")
2362 cipher = s.cipher()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002363 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002364 sys.stdout.write(pprint.pformat(cert) + '\n')
2365 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2366 if 'subject' not in cert:
2367 self.fail("No subject field in certificate: %s." %
2368 pprint.pformat(cert))
2369 if ((('organizationName', 'Python Software Foundation'),)
2370 not in cert['subject']):
2371 self.fail(
2372 "Missing or invalid 'organizationName' field in certificate subject; "
2373 "should be 'Python Software Foundation'.")
2374 self.assertIn('notBefore', cert)
2375 self.assertIn('notAfter', cert)
2376 before = ssl.cert_time_to_seconds(cert['notBefore'])
2377 after = ssl.cert_time_to_seconds(cert['notAfter'])
2378 self.assertLess(before, after)
2379 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002380
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002381 @unittest.skipUnless(have_verify_flags(),
2382 "verify_flags need OpenSSL > 0.9.8")
2383 def test_crl_check(self):
2384 if support.verbose:
2385 sys.stdout.write("\n")
2386
2387 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2388 server_context.load_cert_chain(SIGNED_CERTFILE)
2389
2390 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2391 context.verify_mode = ssl.CERT_REQUIRED
2392 context.load_verify_locations(SIGNING_CA)
2393 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2394 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
2395
2396 # VERIFY_DEFAULT should pass
2397 server = ThreadedEchoServer(context=server_context, chatty=True)
2398 with server:
2399 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002400 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002401 cert = s.getpeercert()
2402 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002403
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002404 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2405 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002406
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002407 server = ThreadedEchoServer(context=server_context, chatty=True)
2408 with server:
2409 with context.wrap_socket(socket.socket()) as s:
2410 with self.assertRaisesRegex(ssl.SSLError,
2411 "certificate verify failed"):
2412 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002413
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002414 # now load a CRL file. The CRL file is signed by the CA.
2415 context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002416
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002417 server = ThreadedEchoServer(context=server_context, chatty=True)
2418 with server:
2419 with context.wrap_socket(socket.socket()) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002420 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002421 cert = s.getpeercert()
2422 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002423
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002424 def test_check_hostname(self):
2425 if support.verbose:
2426 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002427
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002428 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2429 server_context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002430
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002431 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2432 context.verify_mode = ssl.CERT_REQUIRED
2433 context.check_hostname = True
2434 context.load_verify_locations(SIGNING_CA)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002435
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002436 # correct hostname should verify
2437 server = ThreadedEchoServer(context=server_context, chatty=True)
2438 with server:
2439 with context.wrap_socket(socket.socket(),
2440 server_hostname="localhost") as s:
2441 s.connect((HOST, server.port))
2442 cert = s.getpeercert()
2443 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002444
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002445 # incorrect hostname should raise an exception
2446 server = ThreadedEchoServer(context=server_context, chatty=True)
2447 with server:
2448 with context.wrap_socket(socket.socket(),
2449 server_hostname="invalid") as s:
2450 with self.assertRaisesRegex(ssl.CertificateError,
2451 "hostname 'invalid' doesn't match 'localhost'"):
2452 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002453
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002454 # missing server_hostname arg should cause an exception, too
2455 server = ThreadedEchoServer(context=server_context, chatty=True)
2456 with server:
2457 with socket.socket() as s:
2458 with self.assertRaisesRegex(ValueError,
2459 "check_hostname requires server_hostname"):
2460 context.wrap_socket(s)
2461
2462 def test_wrong_cert(self):
2463 """Connecting when the server rejects the client's certificate
2464
2465 Launch a server with CERT_REQUIRED, and check that trying to
2466 connect to it with a wrong client certificate fails.
2467 """
2468 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2469 "wrongcert.pem")
2470 server = ThreadedEchoServer(CERTFILE,
2471 certreqs=ssl.CERT_REQUIRED,
2472 cacerts=CERTFILE, chatty=False,
2473 connectionchatty=False)
2474 with server, \
2475 socket.socket() as sock, \
2476 test_wrap_socket(sock,
2477 certfile=certfile,
2478 ssl_version=ssl.PROTOCOL_TLSv1) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002479 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002480 # Expect either an SSL error about the server rejecting
2481 # the connection, or a low-level connection reset (which
2482 # sometimes happens on Windows)
2483 s.connect((HOST, server.port))
2484 except ssl.SSLError as e:
2485 if support.verbose:
2486 sys.stdout.write("\nSSLError is %r\n" % e)
2487 except OSError as e:
2488 if e.errno != errno.ECONNRESET:
2489 raise
2490 if support.verbose:
2491 sys.stdout.write("\nsocket.error is %r\n" % e)
2492 else:
2493 self.fail("Use of invalid cert should have failed!")
2494
2495 def test_rude_shutdown(self):
2496 """A brutal shutdown of an SSL server should raise an OSError
2497 in the client when attempting handshake.
2498 """
2499 listener_ready = threading.Event()
2500 listener_gone = threading.Event()
2501
2502 s = socket.socket()
2503 port = support.bind_port(s, HOST)
2504
2505 # `listener` runs in a thread. It sits in an accept() until
2506 # the main thread connects. Then it rudely closes the socket,
2507 # and sets Event `listener_gone` to let the main thread know
2508 # the socket is gone.
2509 def listener():
2510 s.listen()
2511 listener_ready.set()
2512 newsock, addr = s.accept()
2513 newsock.close()
2514 s.close()
2515 listener_gone.set()
2516
2517 def connector():
2518 listener_ready.wait()
2519 with socket.socket() as c:
2520 c.connect((HOST, port))
2521 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002522 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002523 ssl_sock = test_wrap_socket(c)
2524 except OSError:
2525 pass
2526 else:
2527 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002528
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002529 t = threading.Thread(target=listener)
2530 t.start()
2531 try:
2532 connector()
2533 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002534 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002535
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002536 def test_ssl_cert_verify_error(self):
2537 if support.verbose:
2538 sys.stdout.write("\n")
2539
2540 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2541 server_context.load_cert_chain(SIGNED_CERTFILE)
2542
2543 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2544
2545 server = ThreadedEchoServer(context=server_context, chatty=True)
2546 with server:
2547 with context.wrap_socket(socket.socket(),
2548 server_hostname="localhost") as s:
2549 try:
2550 s.connect((HOST, server.port))
2551 except ssl.SSLError as e:
2552 msg = 'unable to get local issuer certificate'
2553 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2554 self.assertEqual(e.verify_code, 20)
2555 self.assertEqual(e.verify_message, msg)
2556 self.assertIn(msg, repr(e))
2557 self.assertIn('certificate verify failed', repr(e))
2558
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002559 @skip_if_broken_ubuntu_ssl
2560 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2561 "OpenSSL is compiled without SSLv2 support")
2562 def test_protocol_sslv2(self):
2563 """Connecting to an SSLv2 server with various client options"""
2564 if support.verbose:
2565 sys.stdout.write("\n")
2566 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2567 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2568 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
2569 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
2570 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2571 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2572 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2573 # SSLv23 client with specific SSL options
2574 if no_sslv2_implies_sslv3_hello():
2575 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2576 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2577 client_options=ssl.OP_NO_SSLv2)
2578 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2579 client_options=ssl.OP_NO_SSLv3)
2580 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2581 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002582
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002583 @skip_if_broken_ubuntu_ssl
2584 def test_protocol_sslv23(self):
2585 """Connecting to an SSLv23 server with various client options"""
2586 if support.verbose:
2587 sys.stdout.write("\n")
2588 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002589 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002590 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2591 except OSError as x:
2592 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2593 if support.verbose:
2594 sys.stdout.write(
2595 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2596 % str(x))
2597 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2598 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
2599 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
2600 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002601
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002602 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2603 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2604 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
2605 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2606
2607 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2608 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2609 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
2610 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2611
2612 # Server with specific SSL options
2613 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2614 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2615 server_options=ssl.OP_NO_SSLv3)
2616 # Will choose TLSv1
2617 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2618 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2619 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2620 server_options=ssl.OP_NO_TLSv1)
2621
2622
2623 @skip_if_broken_ubuntu_ssl
2624 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2625 "OpenSSL is compiled without SSLv3 support")
2626 def test_protocol_sslv3(self):
2627 """Connecting to an SSLv3 server with various client options"""
2628 if support.verbose:
2629 sys.stdout.write("\n")
2630 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2631 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2632 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2633 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2634 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
2635 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2636 client_options=ssl.OP_NO_SSLv3)
2637 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2638 if no_sslv2_implies_sslv3_hello():
2639 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2640 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2641 False, client_options=ssl.OP_NO_SSLv2)
2642
2643 @skip_if_broken_ubuntu_ssl
2644 def test_protocol_tlsv1(self):
2645 """Connecting to a TLSv1 server with various client options"""
2646 if support.verbose:
2647 sys.stdout.write("\n")
2648 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2649 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2650 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2651 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2652 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2653 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2654 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
2655 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2656 client_options=ssl.OP_NO_TLSv1)
2657
2658 @skip_if_broken_ubuntu_ssl
2659 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2660 "TLS version 1.1 not supported.")
2661 def test_protocol_tlsv1_1(self):
2662 """Connecting to a TLSv1.1 server with various client options.
2663 Testing against older TLS versions."""
2664 if support.verbose:
2665 sys.stdout.write("\n")
2666 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2667 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2668 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2669 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2670 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2671 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2672 client_options=ssl.OP_NO_TLSv1_1)
2673
2674 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2675 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2676 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2677
2678
2679 @skip_if_broken_ubuntu_ssl
2680 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2681 "TLS version 1.2 not supported.")
2682 def test_protocol_tlsv1_2(self):
2683 """Connecting to a TLSv1.2 server with various client options.
2684 Testing against older TLS versions."""
2685 if support.verbose:
2686 sys.stdout.write("\n")
2687 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2688 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2689 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2690 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2691 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2692 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2693 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2694 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2695 client_options=ssl.OP_NO_TLSv1_2)
2696
2697 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
2698 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2699 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2700 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2701 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2702
2703 def test_starttls(self):
2704 """Switching from clear text to encrypted and back again."""
2705 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2706
2707 server = ThreadedEchoServer(CERTFILE,
Antoine Pitrou47e40422014-09-04 21:00:10 +02002708 ssl_version=ssl.PROTOCOL_TLSv1,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002709 starttls_server=True,
2710 chatty=True,
2711 connectionchatty=True)
2712 wrapped = False
2713 with server:
2714 s = socket.socket()
2715 s.setblocking(1)
2716 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002717 if support.verbose:
2718 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002719 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02002720 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002721 sys.stdout.write(
2722 " client: sending %r...\n" % indata)
2723 if wrapped:
2724 conn.write(indata)
2725 outdata = conn.read()
2726 else:
2727 s.send(indata)
2728 outdata = s.recv(1024)
2729 msg = outdata.strip().lower()
2730 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2731 # STARTTLS ok, switch to secure mode
2732 if support.verbose:
2733 sys.stdout.write(
2734 " client: read %r from server, starting TLS...\n"
2735 % msg)
2736 conn = test_wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2737 wrapped = True
2738 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2739 # ENDTLS ok, switch back to clear text
2740 if support.verbose:
2741 sys.stdout.write(
2742 " client: read %r from server, ending TLS...\n"
2743 % msg)
2744 s = conn.unwrap()
2745 wrapped = False
2746 else:
2747 if support.verbose:
2748 sys.stdout.write(
2749 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002750 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002751 sys.stdout.write(" client: closing connection.\n")
2752 if wrapped:
2753 conn.write(b"over\n")
2754 else:
2755 s.send(b"over\n")
2756 if wrapped:
2757 conn.close()
2758 else:
2759 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002760
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002761 def test_socketserver(self):
2762 """Using socketserver to create and manage SSL connections."""
2763 server = make_https_server(self, certfile=CERTFILE)
2764 # try to connect
2765 if support.verbose:
2766 sys.stdout.write('\n')
2767 with open(CERTFILE, 'rb') as f:
2768 d1 = f.read()
2769 d2 = ''
2770 # now fetch the same data from the HTTPS server
2771 url = 'https://localhost:%d/%s' % (
2772 server.port, os.path.split(CERTFILE)[1])
2773 context = ssl.create_default_context(cafile=CERTFILE)
2774 f = urllib.request.urlopen(url, context=context)
2775 try:
2776 dlen = f.info().get("content-length")
2777 if dlen and (int(dlen) > 0):
2778 d2 = f.read(int(dlen))
2779 if support.verbose:
2780 sys.stdout.write(
2781 " client: read %d bytes from remote server '%s'\n"
2782 % (len(d2), server))
2783 finally:
2784 f.close()
2785 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002786
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002787 def test_asyncore_server(self):
2788 """Check the example asyncore integration."""
2789 if support.verbose:
2790 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01002791
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002792 indata = b"FOO\n"
2793 server = AsyncoreEchoServer(CERTFILE)
2794 with server:
2795 s = test_wrap_socket(socket.socket())
2796 s.connect(('127.0.0.1', server.port))
2797 if support.verbose:
2798 sys.stdout.write(
2799 " client: sending %r...\n" % indata)
2800 s.write(indata)
2801 outdata = s.read()
2802 if support.verbose:
2803 sys.stdout.write(" client: read %r\n" % outdata)
2804 if outdata != indata.lower():
2805 self.fail(
2806 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2807 % (outdata[:20], len(outdata),
2808 indata[:20].lower(), len(indata)))
2809 s.write(b"over\n")
2810 if support.verbose:
2811 sys.stdout.write(" client: closing connection.\n")
2812 s.close()
2813 if support.verbose:
2814 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05002815
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002816 def test_recv_send(self):
2817 """Test recv(), send() and friends."""
2818 if support.verbose:
2819 sys.stdout.write("\n")
2820
2821 server = ThreadedEchoServer(CERTFILE,
2822 certreqs=ssl.CERT_NONE,
2823 ssl_version=ssl.PROTOCOL_TLSv1,
2824 cacerts=CERTFILE,
2825 chatty=True,
2826 connectionchatty=False)
2827 with server:
2828 s = test_wrap_socket(socket.socket(),
2829 server_side=False,
2830 certfile=CERTFILE,
2831 ca_certs=CERTFILE,
2832 cert_reqs=ssl.CERT_NONE,
2833 ssl_version=ssl.PROTOCOL_TLSv1)
2834 s.connect((HOST, server.port))
2835 # helper methods for standardising recv* method signatures
2836 def _recv_into():
2837 b = bytearray(b"\0"*100)
2838 count = s.recv_into(b)
2839 return b[:count]
2840
2841 def _recvfrom_into():
2842 b = bytearray(b"\0"*100)
2843 count, addr = s.recvfrom_into(b)
2844 return b[:count]
2845
2846 # (name, method, expect success?, *args, return value func)
2847 send_methods = [
2848 ('send', s.send, True, [], len),
2849 ('sendto', s.sendto, False, ["some.address"], len),
2850 ('sendall', s.sendall, True, [], lambda x: None),
2851 ]
2852 # (name, method, whether to expect success, *args)
2853 recv_methods = [
2854 ('recv', s.recv, True, []),
2855 ('recvfrom', s.recvfrom, False, ["some.address"]),
2856 ('recv_into', _recv_into, True, []),
2857 ('recvfrom_into', _recvfrom_into, False, []),
2858 ]
2859 data_prefix = "PREFIX_"
2860
2861 for (meth_name, send_meth, expect_success, args,
2862 ret_val_meth) in send_methods:
2863 indata = (data_prefix + meth_name).encode('ascii')
2864 try:
2865 ret = send_meth(indata, *args)
2866 msg = "sending with {}".format(meth_name)
2867 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
2868 outdata = s.read()
2869 if outdata != indata.lower():
2870 self.fail(
2871 "While sending with <<{name:s}>> bad data "
2872 "<<{outdata:r}>> ({nout:d}) received; "
2873 "expected <<{indata:r}>> ({nin:d})\n".format(
2874 name=meth_name, outdata=outdata[:20],
2875 nout=len(outdata),
2876 indata=indata[:20], nin=len(indata)
2877 )
2878 )
2879 except ValueError as e:
2880 if expect_success:
2881 self.fail(
2882 "Failed to send with method <<{name:s}>>; "
2883 "expected to succeed.\n".format(name=meth_name)
2884 )
2885 if not str(e).startswith(meth_name):
2886 self.fail(
2887 "Method <<{name:s}>> failed with unexpected "
2888 "exception message: {exp:s}\n".format(
2889 name=meth_name, exp=e
2890 )
2891 )
2892
2893 for meth_name, recv_meth, expect_success, args in recv_methods:
2894 indata = (data_prefix + meth_name).encode('ascii')
2895 try:
2896 s.send(indata)
2897 outdata = recv_meth(*args)
2898 if outdata != indata.lower():
2899 self.fail(
2900 "While receiving with <<{name:s}>> bad data "
2901 "<<{outdata:r}>> ({nout:d}) received; "
2902 "expected <<{indata:r}>> ({nin:d})\n".format(
2903 name=meth_name, outdata=outdata[:20],
2904 nout=len(outdata),
2905 indata=indata[:20], nin=len(indata)
2906 )
2907 )
2908 except ValueError as e:
2909 if expect_success:
2910 self.fail(
2911 "Failed to receive with method <<{name:s}>>; "
2912 "expected to succeed.\n".format(name=meth_name)
2913 )
2914 if not str(e).startswith(meth_name):
2915 self.fail(
2916 "Method <<{name:s}>> failed with unexpected "
2917 "exception message: {exp:s}\n".format(
2918 name=meth_name, exp=e
2919 )
2920 )
2921 # consume data
2922 s.read()
2923
2924 # read(-1, buffer) is supported, even though read(-1) is not
2925 data = b"data"
2926 s.send(data)
2927 buffer = bytearray(len(data))
2928 self.assertEqual(s.read(-1, buffer), len(data))
2929 self.assertEqual(buffer, data)
2930
Christian Heimes888bbdc2017-09-07 14:18:21 -07002931 # sendall accepts bytes-like objects
2932 if ctypes is not None:
2933 ubyte = ctypes.c_ubyte * len(data)
2934 byteslike = ubyte.from_buffer_copy(data)
2935 s.sendall(byteslike)
2936 self.assertEqual(s.read(), data)
2937
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002938 # Make sure sendmsg et al are disallowed to avoid
2939 # inadvertent disclosure of data and/or corruption
2940 # of the encrypted data stream
2941 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2942 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2943 self.assertRaises(NotImplementedError,
2944 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002945 s.write(b"over\n")
2946
2947 self.assertRaises(ValueError, s.recv, -1)
2948 self.assertRaises(ValueError, s.read, -1)
2949
2950 s.close()
2951
2952 def test_recv_zero(self):
2953 server = ThreadedEchoServer(CERTFILE)
2954 server.__enter__()
2955 self.addCleanup(server.__exit__, None, None)
2956 s = socket.create_connection((HOST, server.port))
2957 self.addCleanup(s.close)
2958 s = test_wrap_socket(s, suppress_ragged_eofs=False)
2959 self.addCleanup(s.close)
2960
2961 # recv/read(0) should return no data
2962 s.send(b"data")
2963 self.assertEqual(s.recv(0), b"")
2964 self.assertEqual(s.read(0), b"")
2965 self.assertEqual(s.read(), b"data")
2966
2967 # Should not block if the other end sends no data
2968 s.setblocking(False)
2969 self.assertEqual(s.recv(0), b"")
2970 self.assertEqual(s.recv_into(bytearray()), 0)
2971
2972 def test_nonblocking_send(self):
2973 server = ThreadedEchoServer(CERTFILE,
2974 certreqs=ssl.CERT_NONE,
2975 ssl_version=ssl.PROTOCOL_TLSv1,
2976 cacerts=CERTFILE,
2977 chatty=True,
2978 connectionchatty=False)
2979 with server:
2980 s = test_wrap_socket(socket.socket(),
2981 server_side=False,
2982 certfile=CERTFILE,
2983 ca_certs=CERTFILE,
2984 cert_reqs=ssl.CERT_NONE,
2985 ssl_version=ssl.PROTOCOL_TLSv1)
2986 s.connect((HOST, server.port))
2987 s.setblocking(False)
2988
2989 # If we keep sending data, at some point the buffers
2990 # will be full and the call will block
2991 buf = bytearray(8192)
2992 def fill_buffer():
2993 while True:
2994 s.send(buf)
2995 self.assertRaises((ssl.SSLWantWriteError,
2996 ssl.SSLWantReadError), fill_buffer)
2997
2998 # Now read all the output and discard it
2999 s.setblocking(True)
3000 s.close()
3001
3002 def test_handshake_timeout(self):
3003 # Issue #5103: SSL handshake must respect the socket timeout
3004 server = socket.socket(socket.AF_INET)
3005 host = "127.0.0.1"
3006 port = support.bind_port(server)
3007 started = threading.Event()
3008 finish = False
3009
3010 def serve():
3011 server.listen()
3012 started.set()
3013 conns = []
3014 while not finish:
3015 r, w, e = select.select([server], [], [], 0.1)
3016 if server in r:
3017 # Let the socket hang around rather than having
3018 # it closed by garbage collection.
3019 conns.append(server.accept()[0])
3020 for sock in conns:
3021 sock.close()
3022
3023 t = threading.Thread(target=serve)
3024 t.start()
3025 started.wait()
3026
3027 try:
3028 try:
3029 c = socket.socket(socket.AF_INET)
3030 c.settimeout(0.2)
3031 c.connect((host, port))
3032 # Will attempt handshake and time out
3033 self.assertRaisesRegex(socket.timeout, "timed out",
3034 test_wrap_socket, c)
3035 finally:
3036 c.close()
3037 try:
3038 c = socket.socket(socket.AF_INET)
3039 c = test_wrap_socket(c)
3040 c.settimeout(0.2)
3041 # Will attempt handshake and time out
3042 self.assertRaisesRegex(socket.timeout, "timed out",
3043 c.connect, (host, port))
3044 finally:
3045 c.close()
3046 finally:
3047 finish = True
3048 t.join()
3049 server.close()
3050
3051 def test_server_accept(self):
3052 # Issue #16357: accept() on a SSLSocket created through
3053 # SSLContext.wrap_socket().
3054 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3055 context.verify_mode = ssl.CERT_REQUIRED
3056 context.load_verify_locations(CERTFILE)
3057 context.load_cert_chain(CERTFILE)
3058 server = socket.socket(socket.AF_INET)
3059 host = "127.0.0.1"
3060 port = support.bind_port(server)
3061 server = context.wrap_socket(server, server_side=True)
3062 self.assertTrue(server.server_side)
3063
3064 evt = threading.Event()
3065 remote = None
3066 peer = None
3067 def serve():
3068 nonlocal remote, peer
3069 server.listen()
3070 # Block on the accept and wait on the connection to close.
3071 evt.set()
3072 remote, peer = server.accept()
3073 remote.recv(1)
3074
3075 t = threading.Thread(target=serve)
3076 t.start()
3077 # Client wait until server setup and perform a connect.
3078 evt.wait()
3079 client = context.wrap_socket(socket.socket())
3080 client.connect((host, port))
3081 client_addr = client.getsockname()
3082 client.close()
3083 t.join()
3084 remote.close()
3085 server.close()
3086 # Sanity checks.
3087 self.assertIsInstance(remote, ssl.SSLSocket)
3088 self.assertEqual(peer, client_addr)
3089
3090 def test_getpeercert_enotconn(self):
3091 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3092 with context.wrap_socket(socket.socket()) as sock:
3093 with self.assertRaises(OSError) as cm:
3094 sock.getpeercert()
3095 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3096
3097 def test_do_handshake_enotconn(self):
3098 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3099 with context.wrap_socket(socket.socket()) as sock:
3100 with self.assertRaises(OSError) as cm:
3101 sock.do_handshake()
3102 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3103
3104 def test_default_ciphers(self):
3105 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3106 try:
3107 # Force a set of weak ciphers on our client context
3108 context.set_ciphers("DES")
3109 except ssl.SSLError:
3110 self.skipTest("no DES cipher available")
3111 with ThreadedEchoServer(CERTFILE,
3112 ssl_version=ssl.PROTOCOL_SSLv23,
3113 chatty=False) as server:
3114 with context.wrap_socket(socket.socket()) as s:
3115 with self.assertRaises(OSError):
3116 s.connect((HOST, server.port))
3117 self.assertIn("no shared cipher", server.conn_errors[0])
3118
3119 def test_version_basic(self):
3120 """
3121 Basic tests for SSLSocket.version().
3122 More tests are done in the test_protocol_*() methods.
3123 """
3124 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3125 with ThreadedEchoServer(CERTFILE,
3126 ssl_version=ssl.PROTOCOL_TLSv1,
3127 chatty=False) as server:
3128 with context.wrap_socket(socket.socket()) as s:
3129 self.assertIs(s.version(), None)
3130 s.connect((HOST, server.port))
3131 self.assertEqual(s.version(), 'TLSv1')
3132 self.assertIs(s.version(), None)
3133
Christian Heimescb5b68a2017-09-07 18:07:00 -07003134 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3135 "test requires TLSv1.3 enabled OpenSSL")
3136 def test_tls1_3(self):
3137 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3138 context.load_cert_chain(CERTFILE)
3139 # disable all but TLS 1.3
3140 context.options |= (
3141 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3142 )
3143 with ThreadedEchoServer(context=context) as server:
3144 with context.wrap_socket(socket.socket()) as s:
3145 s.connect((HOST, server.port))
3146 self.assertIn(s.cipher()[0], [
3147 'TLS13-AES-256-GCM-SHA384',
3148 'TLS13-CHACHA20-POLY1305-SHA256',
3149 'TLS13-AES-128-GCM-SHA256',
3150 ])
3151
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003152 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3153 def test_default_ecdh_curve(self):
3154 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3155 # should be enabled by default on SSL contexts.
3156 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3157 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003158 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3159 # cipher name.
3160 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003161 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3162 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3163 # our default cipher list should prefer ECDH-based ciphers
3164 # automatically.
3165 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3166 context.set_ciphers("ECCdraft:ECDH")
3167 with ThreadedEchoServer(context=context) as server:
3168 with context.wrap_socket(socket.socket()) as s:
3169 s.connect((HOST, server.port))
3170 self.assertIn("ECDH", s.cipher()[0])
3171
3172 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3173 "'tls-unique' channel binding not available")
3174 def test_tls_unique_channel_binding(self):
3175 """Test tls-unique channel binding."""
3176 if support.verbose:
3177 sys.stdout.write("\n")
3178
3179 server = ThreadedEchoServer(CERTFILE,
3180 certreqs=ssl.CERT_NONE,
3181 ssl_version=ssl.PROTOCOL_TLSv1,
3182 cacerts=CERTFILE,
3183 chatty=True,
3184 connectionchatty=False)
3185 with server:
3186 s = test_wrap_socket(socket.socket(),
3187 server_side=False,
3188 certfile=CERTFILE,
3189 ca_certs=CERTFILE,
3190 cert_reqs=ssl.CERT_NONE,
3191 ssl_version=ssl.PROTOCOL_TLSv1)
3192 s.connect((HOST, server.port))
3193 # get the data
3194 cb_data = s.get_channel_binding("tls-unique")
3195 if support.verbose:
3196 sys.stdout.write(" got channel binding data: {0!r}\n"
3197 .format(cb_data))
3198
3199 # check if it is sane
3200 self.assertIsNotNone(cb_data)
3201 self.assertEqual(len(cb_data), 12) # True for TLSv1
3202
3203 # and compare with the peers version
3204 s.write(b"CB tls-unique\n")
3205 peer_data_repr = s.read().strip()
3206 self.assertEqual(peer_data_repr,
3207 repr(cb_data).encode("us-ascii"))
3208 s.close()
3209
3210 # now, again
3211 s = test_wrap_socket(socket.socket(),
3212 server_side=False,
3213 certfile=CERTFILE,
3214 ca_certs=CERTFILE,
3215 cert_reqs=ssl.CERT_NONE,
3216 ssl_version=ssl.PROTOCOL_TLSv1)
3217 s.connect((HOST, server.port))
3218 new_cb_data = s.get_channel_binding("tls-unique")
3219 if support.verbose:
3220 sys.stdout.write(" got another channel binding data: {0!r}\n"
3221 .format(new_cb_data))
3222 # is it really unique
3223 self.assertNotEqual(cb_data, new_cb_data)
3224 self.assertIsNotNone(cb_data)
3225 self.assertEqual(len(cb_data), 12) # True for TLSv1
3226 s.write(b"CB tls-unique\n")
3227 peer_data_repr = s.read().strip()
3228 self.assertEqual(peer_data_repr,
3229 repr(new_cb_data).encode("us-ascii"))
3230 s.close()
3231
3232 def test_compression(self):
3233 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3234 context.load_cert_chain(CERTFILE)
3235 stats = server_params_test(context, context,
3236 chatty=True, connectionchatty=True)
3237 if support.verbose:
3238 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3239 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3240
3241 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3242 "ssl.OP_NO_COMPRESSION needed for this test")
3243 def test_compression_disabled(self):
3244 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3245 context.load_cert_chain(CERTFILE)
3246 context.options |= ssl.OP_NO_COMPRESSION
3247 stats = server_params_test(context, context,
3248 chatty=True, connectionchatty=True)
3249 self.assertIs(stats['compression'], None)
3250
3251 def test_dh_params(self):
3252 # Check we can get a connection with ephemeral Diffie-Hellman
3253 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3254 context.load_cert_chain(CERTFILE)
3255 context.load_dh_params(DHFILE)
3256 context.set_ciphers("kEDH")
3257 stats = server_params_test(context, context,
3258 chatty=True, connectionchatty=True)
3259 cipher = stats["cipher"][0]
3260 parts = cipher.split("-")
3261 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3262 self.fail("Non-DH cipher: " + cipher[0])
3263
3264 def test_selected_alpn_protocol(self):
3265 # selected_alpn_protocol() is None unless ALPN is used.
3266 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3267 context.load_cert_chain(CERTFILE)
3268 stats = server_params_test(context, context,
3269 chatty=True, connectionchatty=True)
3270 self.assertIs(stats['client_alpn_protocol'], None)
3271
3272 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3273 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3274 # selected_alpn_protocol() is None unless ALPN is used by the client.
3275 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3276 client_context.load_verify_locations(CERTFILE)
3277 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3278 server_context.load_cert_chain(CERTFILE)
3279 server_context.set_alpn_protocols(['foo', 'bar'])
3280 stats = server_params_test(client_context, server_context,
3281 chatty=True, connectionchatty=True)
3282 self.assertIs(stats['client_alpn_protocol'], None)
3283
3284 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3285 def test_alpn_protocols(self):
3286 server_protocols = ['foo', 'bar', 'milkshake']
3287 protocol_tests = [
3288 (['foo', 'bar'], 'foo'),
3289 (['bar', 'foo'], 'foo'),
3290 (['milkshake'], 'milkshake'),
3291 (['http/3.0', 'http/4.0'], None)
3292 ]
3293 for client_protocols, expected in protocol_tests:
3294 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
3295 server_context.load_cert_chain(CERTFILE)
3296 server_context.set_alpn_protocols(server_protocols)
3297 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
3298 client_context.load_cert_chain(CERTFILE)
3299 client_context.set_alpn_protocols(client_protocols)
3300
3301 try:
3302 stats = server_params_test(client_context,
3303 server_context,
3304 chatty=True,
3305 connectionchatty=True)
3306 except ssl.SSLError as e:
3307 stats = e
3308
3309 if (expected is None and IS_OPENSSL_1_1
3310 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3311 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3312 self.assertIsInstance(stats, ssl.SSLError)
3313 else:
3314 msg = "failed trying %s (s) and %s (c).\n" \
3315 "was expecting %s, but got %%s from the %%s" \
3316 % (str(server_protocols), str(client_protocols),
3317 str(expected))
3318 client_result = stats['client_alpn_protocol']
3319 self.assertEqual(client_result, expected,
3320 msg % (client_result, "client"))
3321 server_result = stats['server_alpn_protocols'][-1] \
3322 if len(stats['server_alpn_protocols']) else 'nothing'
3323 self.assertEqual(server_result, expected,
3324 msg % (server_result, "server"))
3325
3326 def test_selected_npn_protocol(self):
3327 # selected_npn_protocol() is None unless NPN is used
3328 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3329 context.load_cert_chain(CERTFILE)
3330 stats = server_params_test(context, context,
3331 chatty=True, connectionchatty=True)
3332 self.assertIs(stats['client_npn_protocol'], None)
3333
3334 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3335 def test_npn_protocols(self):
3336 server_protocols = ['http/1.1', 'spdy/2']
3337 protocol_tests = [
3338 (['http/1.1', 'spdy/2'], 'http/1.1'),
3339 (['spdy/2', 'http/1.1'], 'http/1.1'),
3340 (['spdy/2', 'test'], 'spdy/2'),
3341 (['abc', 'def'], 'abc')
3342 ]
3343 for client_protocols, expected in protocol_tests:
Benjamin Petersoncca27322015-01-23 16:35:37 -05003344 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3345 server_context.load_cert_chain(CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003346 server_context.set_npn_protocols(server_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003347 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003348 client_context.load_cert_chain(CERTFILE)
3349 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003350 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003351 chatty=True, connectionchatty=True)
3352
3353 msg = "failed trying %s (s) and %s (c).\n" \
3354 "was expecting %s, but got %%s from the %%s" \
3355 % (str(server_protocols), str(client_protocols),
3356 str(expected))
3357 client_result = stats['client_npn_protocol']
3358 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3359 server_result = stats['server_npn_protocols'][-1] \
3360 if len(stats['server_npn_protocols']) else 'nothing'
3361 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3362
3363 def sni_contexts(self):
3364 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3365 server_context.load_cert_chain(SIGNED_CERTFILE)
3366 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3367 other_context.load_cert_chain(SIGNED_CERTFILE2)
3368 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3369 client_context.verify_mode = ssl.CERT_REQUIRED
3370 client_context.load_verify_locations(SIGNING_CA)
3371 return server_context, other_context, client_context
3372
3373 def check_common_name(self, stats, name):
3374 cert = stats['peercert']
3375 self.assertIn((('commonName', name),), cert['subject'])
3376
3377 @needs_sni
3378 def test_sni_callback(self):
3379 calls = []
3380 server_context, other_context, client_context = self.sni_contexts()
3381
3382 def servername_cb(ssl_sock, server_name, initial_context):
3383 calls.append((server_name, initial_context))
3384 if server_name is not None:
3385 ssl_sock.context = other_context
3386 server_context.set_servername_callback(servername_cb)
3387
3388 stats = server_params_test(client_context, server_context,
3389 chatty=True,
3390 sni_name='supermessage')
3391 # The hostname was fetched properly, and the certificate was
3392 # changed for the connection.
3393 self.assertEqual(calls, [("supermessage", server_context)])
3394 # CERTFILE4 was selected
3395 self.check_common_name(stats, 'fakehostname')
3396
3397 calls = []
3398 # The callback is called with server_name=None
3399 stats = server_params_test(client_context, server_context,
3400 chatty=True,
3401 sni_name=None)
3402 self.assertEqual(calls, [(None, server_context)])
3403 self.check_common_name(stats, 'localhost')
3404
3405 # Check disabling the callback
3406 calls = []
3407 server_context.set_servername_callback(None)
3408
3409 stats = server_params_test(client_context, server_context,
3410 chatty=True,
3411 sni_name='notfunny')
3412 # Certificate didn't change
3413 self.check_common_name(stats, 'localhost')
3414 self.assertEqual(calls, [])
3415
3416 @needs_sni
3417 def test_sni_callback_alert(self):
3418 # Returning a TLS alert is reflected to the connecting client
3419 server_context, other_context, client_context = self.sni_contexts()
3420
3421 def cb_returning_alert(ssl_sock, server_name, initial_context):
3422 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3423 server_context.set_servername_callback(cb_returning_alert)
3424
3425 with self.assertRaises(ssl.SSLError) as cm:
3426 stats = server_params_test(client_context, server_context,
3427 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003428 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003429 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003430
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003431 @needs_sni
3432 def test_sni_callback_raising(self):
3433 # Raising fails the connection with a TLS handshake failure alert.
3434 server_context, other_context, client_context = self.sni_contexts()
3435
3436 def cb_raising(ssl_sock, server_name, initial_context):
3437 1/0
3438 server_context.set_servername_callback(cb_raising)
3439
3440 with self.assertRaises(ssl.SSLError) as cm, \
3441 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003442 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003443 chatty=False,
3444 sni_name='supermessage')
3445 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3446 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003447
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003448 @needs_sni
3449 def test_sni_callback_wrong_return_type(self):
3450 # Returning the wrong return type terminates the TLS connection
3451 # with an internal error alert.
3452 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003453
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003454 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3455 return "foo"
3456 server_context.set_servername_callback(cb_wrong_return_type)
3457
3458 with self.assertRaises(ssl.SSLError) as cm, \
3459 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003460 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003461 chatty=False,
3462 sni_name='supermessage')
3463 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3464 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003465
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003466 def test_shared_ciphers(self):
3467 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3468 server_context.load_cert_chain(SIGNED_CERTFILE)
3469 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3470 client_context.verify_mode = ssl.CERT_REQUIRED
3471 client_context.load_verify_locations(SIGNING_CA)
3472 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3473 client_context.set_ciphers("AES128:AES256")
3474 server_context.set_ciphers("AES256")
3475 alg1 = "AES256"
3476 alg2 = "AES-256"
3477 else:
3478 client_context.set_ciphers("AES:3DES")
3479 server_context.set_ciphers("3DES")
3480 alg1 = "3DES"
3481 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003482
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003483 stats = server_params_test(client_context, server_context)
3484 ciphers = stats['server_shared_ciphers'][0]
3485 self.assertGreater(len(ciphers), 0)
3486 for name, tls_version, bits in ciphers:
3487 if not alg1 in name.split("-") and alg2 not in name:
3488 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003489
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003490 def test_read_write_after_close_raises_valuerror(self):
3491 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3492 context.verify_mode = ssl.CERT_REQUIRED
3493 context.load_verify_locations(CERTFILE)
3494 context.load_cert_chain(CERTFILE)
3495 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003496
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003497 with server:
3498 s = context.wrap_socket(socket.socket())
3499 s.connect((HOST, server.port))
3500 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003501
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003502 self.assertRaises(ValueError, s.read, 1024)
3503 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003504
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003505 def test_sendfile(self):
3506 TEST_DATA = b"x" * 512
3507 with open(support.TESTFN, 'wb') as f:
3508 f.write(TEST_DATA)
3509 self.addCleanup(support.unlink, support.TESTFN)
3510 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3511 context.verify_mode = ssl.CERT_REQUIRED
3512 context.load_verify_locations(CERTFILE)
3513 context.load_cert_chain(CERTFILE)
3514 server = ThreadedEchoServer(context=context, chatty=False)
3515 with server:
3516 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003517 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003518 with open(support.TESTFN, 'rb') as file:
3519 s.sendfile(file)
3520 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003521
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003522 def test_session(self):
3523 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3524 server_context.load_cert_chain(SIGNED_CERTFILE)
3525 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3526 client_context.verify_mode = ssl.CERT_REQUIRED
3527 client_context.load_verify_locations(SIGNING_CA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003528
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003529 # first connection without session
3530 stats = server_params_test(client_context, server_context)
3531 session = stats['session']
3532 self.assertTrue(session.id)
3533 self.assertGreater(session.time, 0)
3534 self.assertGreater(session.timeout, 0)
3535 self.assertTrue(session.has_ticket)
3536 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3537 self.assertGreater(session.ticket_lifetime_hint, 0)
3538 self.assertFalse(stats['session_reused'])
3539 sess_stat = server_context.session_stats()
3540 self.assertEqual(sess_stat['accept'], 1)
3541 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003542
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003543 # reuse session
3544 stats = server_params_test(client_context, server_context, session=session)
3545 sess_stat = server_context.session_stats()
3546 self.assertEqual(sess_stat['accept'], 2)
3547 self.assertEqual(sess_stat['hits'], 1)
3548 self.assertTrue(stats['session_reused'])
3549 session2 = stats['session']
3550 self.assertEqual(session2.id, session.id)
3551 self.assertEqual(session2, session)
3552 self.assertIsNot(session2, session)
3553 self.assertGreaterEqual(session2.time, session.time)
3554 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003555
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003556 # another one without session
3557 stats = server_params_test(client_context, server_context)
3558 self.assertFalse(stats['session_reused'])
3559 session3 = stats['session']
3560 self.assertNotEqual(session3.id, session.id)
3561 self.assertNotEqual(session3, session)
3562 sess_stat = server_context.session_stats()
3563 self.assertEqual(sess_stat['accept'], 3)
3564 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003565
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003566 # reuse session again
3567 stats = server_params_test(client_context, server_context, session=session)
3568 self.assertTrue(stats['session_reused'])
3569 session4 = stats['session']
3570 self.assertEqual(session4.id, session.id)
3571 self.assertEqual(session4, session)
3572 self.assertGreaterEqual(session4.time, session.time)
3573 self.assertGreaterEqual(session4.timeout, session.timeout)
3574 sess_stat = server_context.session_stats()
3575 self.assertEqual(sess_stat['accept'], 4)
3576 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02003577
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003578 def test_session_handling(self):
3579 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3580 context.verify_mode = ssl.CERT_REQUIRED
3581 context.load_verify_locations(CERTFILE)
3582 context.load_cert_chain(CERTFILE)
Christian Heimes99a65702016-09-10 23:44:53 +02003583
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003584 context2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3585 context2.verify_mode = ssl.CERT_REQUIRED
3586 context2.load_verify_locations(CERTFILE)
3587 context2.load_cert_chain(CERTFILE)
Christian Heimes99a65702016-09-10 23:44:53 +02003588
Christian Heimescb5b68a2017-09-07 18:07:00 -07003589 # TODO: session reuse does not work with TLS 1.3
3590 context.options |= ssl.OP_NO_TLSv1_3
3591 context2.options |= ssl.OP_NO_TLSv1_3
3592
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003593 server = ThreadedEchoServer(context=context, chatty=False)
3594 with server:
3595 with context.wrap_socket(socket.socket()) as s:
3596 # session is None before handshake
3597 self.assertEqual(s.session, None)
3598 self.assertEqual(s.session_reused, None)
3599 s.connect((HOST, server.port))
3600 session = s.session
3601 self.assertTrue(session)
3602 with self.assertRaises(TypeError) as e:
3603 s.session = object
3604 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02003605
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003606 with context.wrap_socket(socket.socket()) as s:
3607 s.connect((HOST, server.port))
3608 # cannot set session after handshake
3609 with self.assertRaises(ValueError) as e:
3610 s.session = session
3611 self.assertEqual(str(e.exception),
3612 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02003613
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003614 with context.wrap_socket(socket.socket()) as s:
3615 # can set session before handshake and before the
3616 # connection was established
3617 s.session = session
3618 s.connect((HOST, server.port))
3619 self.assertEqual(s.session.id, session.id)
3620 self.assertEqual(s.session, session)
3621 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02003622
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003623 with context2.wrap_socket(socket.socket()) as s:
3624 # cannot re-use session with a different SSLContext
3625 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02003626 s.session = session
3627 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003628 self.assertEqual(str(e.exception),
3629 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02003630
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003631
Thomas Woutersed03b412007-08-28 21:37:11 +00003632def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003633 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003634 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003635 plats = {
3636 'Linux': platform.linux_distribution,
3637 'Mac': platform.mac_ver,
3638 'Windows': platform.win32_ver,
3639 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003640 with warnings.catch_warnings():
3641 warnings.filterwarnings(
3642 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003643 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003644 'functions are deprecated .*',
3645 PendingDeprecationWarning,
3646 )
3647 for name, func in plats.items():
3648 plat = func()
3649 if plat and plat[0]:
3650 plat = '%s %r' % (name, plat)
3651 break
3652 else:
3653 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003654 print("test_ssl: testing with %r %r" %
3655 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3656 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003657 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003658 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3659 try:
3660 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3661 except AttributeError:
3662 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003663
Antoine Pitrou152efa22010-05-16 18:19:27 +00003664 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003665 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003666 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003667 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003668 BADCERT, BADKEY, EMPTYCERT]:
3669 if not os.path.exists(filename):
3670 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003671
Martin Panter3840b2a2016-03-27 01:53:46 +00003672 tests = [
3673 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003674 SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00003675 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003676
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003677 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003678 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003679
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003680 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00003681 try:
3682 support.run_unittest(*tests)
3683 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003684 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003685
3686if __name__ == "__main__":
3687 test_main()