blob: 0bdb86d5f23451659b54ab2129afe8f4ebd5600b [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Thomas Woutersed03b412007-08-28 21:37:11 +000021
Antoine Pitrou05d936d2010-10-13 11:38:36 +000022ssl = support.import_module("ssl")
23
Martin Panter3840b2a2016-03-27 01:53:46 +000024try:
25 import threading
26except ImportError:
27 _have_threads = False
28else:
29 _have_threads = True
30
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010031PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020033IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
34IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
35
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059
Christian Heimes22587792013-11-21 23:56:13 +010060# empty CRL
61CRLFILE = data_file("revocation.crl")
62
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010063# Two keys and certs signed by the same CA (for SNI tests)
64SIGNED_CERTFILE = data_file("keycert3.pem")
65SIGNED_CERTFILE2 = data_file("keycert4.pem")
Martin Panter3840b2a2016-03-27 01:53:46 +000066# Same certificate as pycacert.pem, but without extra text in file
67SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +020068# cert with all kinds of subject alt names
69ALLSANFILE = data_file("allsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010070
Martin Panter3d81d932016-01-14 09:36:00 +000071REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +000072
73EMPTYCERT = data_file("nullcert.pem")
74BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +000075NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000076BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020077NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020078NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000079
Benjamin Petersona7eaf562015-04-02 00:04:06 -040080DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +010081BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000082
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010083
Thomas Woutersed03b412007-08-28 21:37:11 +000084def handle_error(prefix):
85 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000086 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000087 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000088
Antoine Pitroub5218772010-05-21 09:56:06 +000089def can_clear_options():
90 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020091 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000092
93def no_sslv2_implies_sslv3_hello():
94 # 0.9.7h or higher
95 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
96
Christian Heimes2427b502013-11-23 11:24:32 +010097def have_verify_flags():
98 # 0.9.8 or higher
99 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
100
Antoine Pitrouc695c952014-04-28 20:57:36 +0200101def utc_offset(): #NOTE: ignore issues like #1647654
102 # local time = utc time + utc offset
103 if time.daylight and time.localtime().tm_isdst > 0:
104 return -time.altzone # seconds
105 return -time.timezone
106
Christian Heimes9424bb42013-06-17 15:32:57 +0200107def asn1time(cert_time):
108 # Some versions of OpenSSL ignore seconds, see #18207
109 # 0.9.8.i
110 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
111 fmt = "%b %d %H:%M:%S %Y GMT"
112 dt = datetime.datetime.strptime(cert_time, fmt)
113 dt = dt.replace(second=0)
114 cert_time = dt.strftime(fmt)
115 # %d adds leading zero but ASN1_TIME_print() uses leading space
116 if cert_time[4] == "0":
117 cert_time = cert_time[:4] + " " + cert_time[5:]
118
119 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000120
Antoine Pitrou23df4832010-08-04 17:14:06 +0000121# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
122def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200123 if hasattr(ssl, 'PROTOCOL_SSLv2'):
124 @functools.wraps(func)
125 def f(*args, **kwargs):
126 try:
127 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
128 except ssl.SSLError:
129 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
130 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
131 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
132 return func(*args, **kwargs)
133 return f
134 else:
135 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000136
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100137needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
138
Antoine Pitrou23df4832010-08-04 17:14:06 +0000139
Antoine Pitrou152efa22010-05-16 18:19:27 +0000140class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000141
Antoine Pitrou480a1242010-04-28 21:37:09 +0000142 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000143 ssl.CERT_NONE
144 ssl.CERT_OPTIONAL
145 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100146 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100147 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100148 if ssl.HAS_ECDH:
149 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100150 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
151 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000152 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100153 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000154
Antoine Pitrou172f0252014-04-18 20:33:08 +0200155 def test_str_for_enums(self):
156 # Make sure that the PROTOCOL_* constants have enum-like string
157 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200158 proto = ssl.PROTOCOL_TLS
159 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200160 ctx = ssl.SSLContext(proto)
161 self.assertIs(ctx.protocol, proto)
162
Antoine Pitrou480a1242010-04-28 21:37:09 +0000163 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000164 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000165 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000166 sys.stdout.write("\n RAND_status is %d (%s)\n"
167 % (v, (v and "sufficient randomness") or
168 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200169
170 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
171 self.assertEqual(len(data), 16)
172 self.assertEqual(is_cryptographic, v == 1)
173 if v:
174 data = ssl.RAND_bytes(16)
175 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200176 else:
177 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200178
Victor Stinner1e81a392013-12-19 16:47:04 +0100179 # negative num is invalid
180 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
181 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
182
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100183 if hasattr(ssl, 'RAND_egd'):
184 self.assertRaises(TypeError, ssl.RAND_egd, 1)
185 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000186 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200187 ssl.RAND_add(b"this is a random bytes object", 75.0)
188 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000189
Christian Heimesf77b4b22013-08-21 13:26:05 +0200190 @unittest.skipUnless(os.name == 'posix', 'requires posix')
191 def test_random_fork(self):
192 status = ssl.RAND_status()
193 if not status:
194 self.fail("OpenSSL's PRNG has insufficient randomness")
195
196 rfd, wfd = os.pipe()
197 pid = os.fork()
198 if pid == 0:
199 try:
200 os.close(rfd)
201 child_random = ssl.RAND_pseudo_bytes(16)[0]
202 self.assertEqual(len(child_random), 16)
203 os.write(wfd, child_random)
204 os.close(wfd)
205 except BaseException:
206 os._exit(1)
207 else:
208 os._exit(0)
209 else:
210 os.close(wfd)
211 self.addCleanup(os.close, rfd)
212 _, status = os.waitpid(pid, 0)
213 self.assertEqual(status, 0)
214
215 child_random = os.read(rfd, 16)
216 self.assertEqual(len(child_random), 16)
217 parent_random = ssl.RAND_pseudo_bytes(16)[0]
218 self.assertEqual(len(parent_random), 16)
219
220 self.assertNotEqual(child_random, parent_random)
221
Antoine Pitrou480a1242010-04-28 21:37:09 +0000222 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000223 # note that this uses an 'unofficial' function in _ssl.c,
224 # provided solely for this test, to exercise the certificate
225 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000226 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000227 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000228 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200229 self.assertEqual(p['issuer'],
230 ((('countryName', 'XY'),),
231 (('localityName', 'Castle Anthrax'),),
232 (('organizationName', 'Python Software Foundation'),),
233 (('commonName', 'localhost'),))
234 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100235 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200236 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
237 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200238 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
239 self.assertEqual(p['subject'],
240 ((('countryName', 'XY'),),
241 (('localityName', 'Castle Anthrax'),),
242 (('organizationName', 'Python Software Foundation'),),
243 (('commonName', 'localhost'),))
244 )
245 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
246 # Issue #13034: the subjectAltName in some certificates
247 # (notably projects.developer.nokia.com:443) wasn't parsed
248 p = ssl._ssl._test_decode_cert(NOKIACERT)
249 if support.verbose:
250 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
251 self.assertEqual(p['subjectAltName'],
252 (('DNS', 'projects.developer.nokia.com'),
253 ('DNS', 'projects.forum.nokia.com'))
254 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100255 # extra OCSP and AIA fields
256 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
257 self.assertEqual(p['caIssuers'],
258 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
259 self.assertEqual(p['crlDistributionPoints'],
260 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000261
Christian Heimes824f7f32013-08-17 00:54:47 +0200262 def test_parse_cert_CVE_2013_4238(self):
263 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
264 if support.verbose:
265 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
266 subject = ((('countryName', 'US'),),
267 (('stateOrProvinceName', 'Oregon'),),
268 (('localityName', 'Beaverton'),),
269 (('organizationName', 'Python Software Foundation'),),
270 (('organizationalUnitName', 'Python Core Development'),),
271 (('commonName', 'null.python.org\x00example.org'),),
272 (('emailAddress', 'python-dev@python.org'),))
273 self.assertEqual(p['subject'], subject)
274 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200275 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
276 san = (('DNS', 'altnull.python.org\x00example.com'),
277 ('email', 'null@python.org\x00user@example.org'),
278 ('URI', 'http://null.python.org\x00http://example.org'),
279 ('IP Address', '192.0.2.1'),
280 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
281 else:
282 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
283 san = (('DNS', 'altnull.python.org\x00example.com'),
284 ('email', 'null@python.org\x00user@example.org'),
285 ('URI', 'http://null.python.org\x00http://example.org'),
286 ('IP Address', '192.0.2.1'),
287 ('IP Address', '<invalid>'))
288
289 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200290
Christian Heimes1c03abd2016-09-06 23:25:35 +0200291 def test_parse_all_sans(self):
292 p = ssl._ssl._test_decode_cert(ALLSANFILE)
293 self.assertEqual(p['subjectAltName'],
294 (
295 ('DNS', 'allsans'),
296 ('othername', '<unsupported>'),
297 ('othername', '<unsupported>'),
298 ('email', 'user@example.org'),
299 ('DNS', 'www.example.org'),
300 ('DirName',
301 ((('countryName', 'XY'),),
302 (('localityName', 'Castle Anthrax'),),
303 (('organizationName', 'Python Software Foundation'),),
304 (('commonName', 'dirname example'),))),
305 ('URI', 'https://www.python.org/'),
306 ('IP Address', '127.0.0.1'),
307 ('IP Address', '0:0:0:0:0:0:0:1\n'),
308 ('Registered ID', '1.2.3.4.5')
309 )
310 )
311
Antoine Pitrou480a1242010-04-28 21:37:09 +0000312 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000313 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000314 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000315 d1 = ssl.PEM_cert_to_DER_cert(pem)
316 p2 = ssl.DER_cert_to_PEM_cert(d1)
317 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000318 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000319 if not p2.startswith(ssl.PEM_HEADER + '\n'):
320 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
321 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
322 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000323
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000324 def test_openssl_version(self):
325 n = ssl.OPENSSL_VERSION_NUMBER
326 t = ssl.OPENSSL_VERSION_INFO
327 s = ssl.OPENSSL_VERSION
328 self.assertIsInstance(n, int)
329 self.assertIsInstance(t, tuple)
330 self.assertIsInstance(s, str)
331 # Some sanity checks follow
332 # >= 0.9
333 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400334 # < 3.0
335 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000336 major, minor, fix, patch, status = t
337 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400338 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000339 self.assertGreaterEqual(minor, 0)
340 self.assertLess(minor, 256)
341 self.assertGreaterEqual(fix, 0)
342 self.assertLess(fix, 256)
343 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100344 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000345 self.assertGreaterEqual(status, 0)
346 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400347 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200348 if IS_LIBRESSL:
349 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100350 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400351 else:
352 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100353 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000354
Antoine Pitrou9d543662010-04-23 23:10:32 +0000355 @support.cpython_only
356 def test_refcycle(self):
357 # Issue #7943: an SSL object doesn't create reference cycles with
358 # itself.
359 s = socket.socket(socket.AF_INET)
360 ss = ssl.wrap_socket(s)
361 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100362 with support.check_warnings(("", ResourceWarning)):
363 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100364 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000365
Antoine Pitroua468adc2010-09-14 14:43:44 +0000366 def test_wrapped_unconnected(self):
367 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200368 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000369 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100370 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100371 self.assertRaises(OSError, ss.recv, 1)
372 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
373 self.assertRaises(OSError, ss.recvfrom, 1)
374 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
375 self.assertRaises(OSError, ss.send, b'x')
376 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000377
Antoine Pitrou40f08742010-04-24 22:04:40 +0000378 def test_timeout(self):
379 # Issue #8524: when creating an SSL socket, the timeout of the
380 # original socket should be retained.
381 for timeout in (None, 0.0, 5.0):
382 s = socket.socket(socket.AF_INET)
383 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100384 with ssl.wrap_socket(s) as ss:
385 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000386
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000387 def test_errors(self):
388 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000389 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000390 "certfile must be specified",
391 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000392 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000393 "certfile must be specified for server-side operations",
394 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000395 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000396 "certfile must be specified for server-side operations",
397 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100398 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
399 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
400 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200401 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000402 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000403 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000404 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200405 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000406 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000407 ssl.wrap_socket(sock,
408 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000409 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200410 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000411 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000412 ssl.wrap_socket(sock,
413 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000414 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000415
Martin Panter3464ea22016-02-01 21:58:11 +0000416 def bad_cert_test(self, certfile):
417 """Check that trying to use the given client certificate fails"""
418 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
419 certfile)
420 sock = socket.socket()
421 self.addCleanup(sock.close)
422 with self.assertRaises(ssl.SSLError):
423 ssl.wrap_socket(sock,
424 certfile=certfile,
425 ssl_version=ssl.PROTOCOL_TLSv1)
426
427 def test_empty_cert(self):
428 """Wrapping with an empty cert file"""
429 self.bad_cert_test("nullcert.pem")
430
431 def test_malformed_cert(self):
432 """Wrapping with a badly formatted certificate (syntax error)"""
433 self.bad_cert_test("badcert.pem")
434
435 def test_malformed_key(self):
436 """Wrapping with a badly formatted key (syntax error)"""
437 self.bad_cert_test("badkey.pem")
438
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000439 def test_match_hostname(self):
440 def ok(cert, hostname):
441 ssl.match_hostname(cert, hostname)
442 def fail(cert, hostname):
443 self.assertRaises(ssl.CertificateError,
444 ssl.match_hostname, cert, hostname)
445
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100446 # -- Hostname matching --
447
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000448 cert = {'subject': ((('commonName', 'example.com'),),)}
449 ok(cert, 'example.com')
450 ok(cert, 'ExAmple.cOm')
451 fail(cert, 'www.example.com')
452 fail(cert, '.example.com')
453 fail(cert, 'example.org')
454 fail(cert, 'exampleXcom')
455
456 cert = {'subject': ((('commonName', '*.a.com'),),)}
457 ok(cert, 'foo.a.com')
458 fail(cert, 'bar.foo.a.com')
459 fail(cert, 'a.com')
460 fail(cert, 'Xa.com')
461 fail(cert, '.a.com')
462
Georg Brandl72c98d32013-10-27 07:16:53 +0100463 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000464 cert = {'subject': ((('commonName', 'f*.com'),),)}
465 ok(cert, 'foo.com')
466 ok(cert, 'f.com')
467 fail(cert, 'bar.com')
468 fail(cert, 'foo.a.com')
469 fail(cert, 'bar.foo.com')
470
Christian Heimes824f7f32013-08-17 00:54:47 +0200471 # NULL bytes are bad, CVE-2013-4073
472 cert = {'subject': ((('commonName',
473 'null.python.org\x00example.org'),),)}
474 ok(cert, 'null.python.org\x00example.org') # or raise an error?
475 fail(cert, 'example.org')
476 fail(cert, 'null.python.org')
477
Georg Brandl72c98d32013-10-27 07:16:53 +0100478 # error cases with wildcards
479 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
480 fail(cert, 'bar.foo.a.com')
481 fail(cert, 'a.com')
482 fail(cert, 'Xa.com')
483 fail(cert, '.a.com')
484
485 cert = {'subject': ((('commonName', 'a.*.com'),),)}
486 fail(cert, 'a.foo.com')
487 fail(cert, 'a..com')
488 fail(cert, 'a.com')
489
490 # wildcard doesn't match IDNA prefix 'xn--'
491 idna = 'püthon.python.org'.encode("idna").decode("ascii")
492 cert = {'subject': ((('commonName', idna),),)}
493 ok(cert, idna)
494 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
495 fail(cert, idna)
496 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
497 fail(cert, idna)
498
499 # wildcard in first fragment and IDNA A-labels in sequent fragments
500 # are supported.
501 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
502 cert = {'subject': ((('commonName', idna),),)}
503 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
504 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
505 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
506 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
507
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000508 # Slightly fake real-world example
509 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
510 'subject': ((('commonName', 'linuxfrz.org'),),),
511 'subjectAltName': (('DNS', 'linuxfr.org'),
512 ('DNS', 'linuxfr.com'),
513 ('othername', '<unsupported>'))}
514 ok(cert, 'linuxfr.org')
515 ok(cert, 'linuxfr.com')
516 # Not a "DNS" entry
517 fail(cert, '<unsupported>')
518 # When there is a subjectAltName, commonName isn't used
519 fail(cert, 'linuxfrz.org')
520
521 # A pristine real-world example
522 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
523 'subject': ((('countryName', 'US'),),
524 (('stateOrProvinceName', 'California'),),
525 (('localityName', 'Mountain View'),),
526 (('organizationName', 'Google Inc'),),
527 (('commonName', 'mail.google.com'),))}
528 ok(cert, 'mail.google.com')
529 fail(cert, 'gmail.com')
530 # Only commonName is considered
531 fail(cert, 'California')
532
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100533 # -- IPv4 matching --
534 cert = {'subject': ((('commonName', 'example.com'),),),
535 'subjectAltName': (('DNS', 'example.com'),
536 ('IP Address', '10.11.12.13'),
537 ('IP Address', '14.15.16.17'))}
538 ok(cert, '10.11.12.13')
539 ok(cert, '14.15.16.17')
540 fail(cert, '14.15.16.18')
541 fail(cert, 'example.net')
542
543 # -- IPv6 matching --
544 cert = {'subject': ((('commonName', 'example.com'),),),
545 'subjectAltName': (('DNS', 'example.com'),
546 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
547 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
548 ok(cert, '2001::cafe')
549 ok(cert, '2003::baba')
550 fail(cert, '2003::bebe')
551 fail(cert, 'example.net')
552
553 # -- Miscellaneous --
554
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 # Neither commonName nor subjectAltName
556 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
557 'subject': ((('countryName', 'US'),),
558 (('stateOrProvinceName', 'California'),),
559 (('localityName', 'Mountain View'),),
560 (('organizationName', 'Google Inc'),))}
561 fail(cert, 'mail.google.com')
562
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200563 # No DNS entry in subjectAltName but a commonName
564 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
565 'subject': ((('countryName', 'US'),),
566 (('stateOrProvinceName', 'California'),),
567 (('localityName', 'Mountain View'),),
568 (('commonName', 'mail.google.com'),)),
569 'subjectAltName': (('othername', 'blabla'), )}
570 ok(cert, 'mail.google.com')
571
572 # No DNS entry subjectAltName and no commonName
573 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
574 'subject': ((('countryName', 'US'),),
575 (('stateOrProvinceName', 'California'),),
576 (('localityName', 'Mountain View'),),
577 (('organizationName', 'Google Inc'),)),
578 'subjectAltName': (('othername', 'blabla'),)}
579 fail(cert, 'google.com')
580
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000581 # Empty cert / no cert
582 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
583 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
584
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200585 # Issue #17980: avoid denials of service by refusing more than one
586 # wildcard per fragment.
587 cert = {'subject': ((('commonName', 'a*b.com'),),)}
588 ok(cert, 'axxb.com')
589 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100590 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200591 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
592 with self.assertRaises(ssl.CertificateError) as cm:
593 ssl.match_hostname(cert, 'axxbxxc.com')
594 self.assertIn("too many wildcards", str(cm.exception))
595
Antoine Pitroud5323212010-10-22 18:19:07 +0000596 def test_server_side(self):
597 # server_hostname doesn't work for server sockets
598 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000599 with socket.socket() as sock:
600 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
601 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000602
Antoine Pitroud6494802011-07-21 01:11:30 +0200603 def test_unknown_channel_binding(self):
604 # should raise ValueError for unknown type
605 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200606 s.bind(('127.0.0.1', 0))
607 s.listen()
608 c = socket.socket(socket.AF_INET)
609 c.connect(s.getsockname())
610 with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100611 with self.assertRaises(ValueError):
612 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200613 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200614
615 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
616 "'tls-unique' channel binding not available")
617 def test_tls_unique_channel_binding(self):
618 # unconnected should return None for known type
619 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100620 with ssl.wrap_socket(s) as ss:
621 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200622 # the same for server-side
623 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100624 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
625 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200626
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600627 def test_dealloc_warn(self):
628 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
629 r = repr(ss)
630 with self.assertWarns(ResourceWarning) as cm:
631 ss = None
632 support.gc_collect()
633 self.assertIn(r, str(cm.warning.args[0]))
634
Christian Heimes6d7ad132013-06-09 18:02:55 +0200635 def test_get_default_verify_paths(self):
636 paths = ssl.get_default_verify_paths()
637 self.assertEqual(len(paths), 6)
638 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
639
640 with support.EnvironmentVarGuard() as env:
641 env["SSL_CERT_DIR"] = CAPATH
642 env["SSL_CERT_FILE"] = CERTFILE
643 paths = ssl.get_default_verify_paths()
644 self.assertEqual(paths.cafile, CERTFILE)
645 self.assertEqual(paths.capath, CAPATH)
646
Christian Heimes44109d72013-11-22 01:51:30 +0100647 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
648 def test_enum_certificates(self):
649 self.assertTrue(ssl.enum_certificates("CA"))
650 self.assertTrue(ssl.enum_certificates("ROOT"))
651
652 self.assertRaises(TypeError, ssl.enum_certificates)
653 self.assertRaises(WindowsError, ssl.enum_certificates, "")
654
Christian Heimesc2d65e12013-11-22 16:13:55 +0100655 trust_oids = set()
656 for storename in ("CA", "ROOT"):
657 store = ssl.enum_certificates(storename)
658 self.assertIsInstance(store, list)
659 for element in store:
660 self.assertIsInstance(element, tuple)
661 self.assertEqual(len(element), 3)
662 cert, enc, trust = element
663 self.assertIsInstance(cert, bytes)
664 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
665 self.assertIsInstance(trust, (set, bool))
666 if isinstance(trust, set):
667 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100668
669 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100670 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200671
Christian Heimes46bebee2013-06-09 19:03:31 +0200672 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100673 def test_enum_crls(self):
674 self.assertTrue(ssl.enum_crls("CA"))
675 self.assertRaises(TypeError, ssl.enum_crls)
676 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200677
Christian Heimes44109d72013-11-22 01:51:30 +0100678 crls = ssl.enum_crls("CA")
679 self.assertIsInstance(crls, list)
680 for element in crls:
681 self.assertIsInstance(element, tuple)
682 self.assertEqual(len(element), 2)
683 self.assertIsInstance(element[0], bytes)
684 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200685
Christian Heimes46bebee2013-06-09 19:03:31 +0200686
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100687 def test_asn1object(self):
688 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
689 '1.3.6.1.5.5.7.3.1')
690
691 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
692 self.assertEqual(val, expected)
693 self.assertEqual(val.nid, 129)
694 self.assertEqual(val.shortname, 'serverAuth')
695 self.assertEqual(val.longname, 'TLS Web Server Authentication')
696 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
697 self.assertIsInstance(val, ssl._ASN1Object)
698 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
699
700 val = ssl._ASN1Object.fromnid(129)
701 self.assertEqual(val, expected)
702 self.assertIsInstance(val, ssl._ASN1Object)
703 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100704 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
705 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100706 for i in range(1000):
707 try:
708 obj = ssl._ASN1Object.fromnid(i)
709 except ValueError:
710 pass
711 else:
712 self.assertIsInstance(obj.nid, int)
713 self.assertIsInstance(obj.shortname, str)
714 self.assertIsInstance(obj.longname, str)
715 self.assertIsInstance(obj.oid, (str, type(None)))
716
717 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
718 self.assertEqual(val, expected)
719 self.assertIsInstance(val, ssl._ASN1Object)
720 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
721 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
722 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100723 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
724 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100725
Christian Heimes72d28502013-11-23 13:56:58 +0100726 def test_purpose_enum(self):
727 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
728 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
729 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
730 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
731 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
732 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
733 '1.3.6.1.5.5.7.3.1')
734
735 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
736 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
737 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
738 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
739 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
740 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
741 '1.3.6.1.5.5.7.3.2')
742
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100743 def test_unsupported_dtls(self):
744 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
745 self.addCleanup(s.close)
746 with self.assertRaises(NotImplementedError) as cx:
747 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
748 self.assertEqual(str(cx.exception), "only stream sockets are supported")
749 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
750 with self.assertRaises(NotImplementedError) as cx:
751 ctx.wrap_socket(s)
752 self.assertEqual(str(cx.exception), "only stream sockets are supported")
753
Antoine Pitrouc695c952014-04-28 20:57:36 +0200754 def cert_time_ok(self, timestring, timestamp):
755 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
756
757 def cert_time_fail(self, timestring):
758 with self.assertRaises(ValueError):
759 ssl.cert_time_to_seconds(timestring)
760
761 @unittest.skipUnless(utc_offset(),
762 'local time needs to be different from UTC')
763 def test_cert_time_to_seconds_timezone(self):
764 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
765 # results if local timezone is not UTC
766 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
767 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
768
769 def test_cert_time_to_seconds(self):
770 timestring = "Jan 5 09:34:43 2018 GMT"
771 ts = 1515144883.0
772 self.cert_time_ok(timestring, ts)
773 # accept keyword parameter, assert its name
774 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
775 # accept both %e and %d (space or zero generated by strftime)
776 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
777 # case-insensitive
778 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
779 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
780 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
781 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
782 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
783 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
784 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
785 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
786
787 newyear_ts = 1230768000.0
788 # leap seconds
789 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
790 # same timestamp
791 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
792
793 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
794 # allow 60th second (even if it is not a leap second)
795 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
796 # allow 2nd leap second for compatibility with time.strptime()
797 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
798 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
799
800 # no special treatement for the special value:
801 # 99991231235959Z (rfc 5280)
802 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
803
804 @support.run_with_locale('LC_ALL', '')
805 def test_cert_time_to_seconds_locale(self):
806 # `cert_time_to_seconds()` should be locale independent
807
808 def local_february_name():
809 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
810
811 if local_february_name().lower() == 'feb':
812 self.skipTest("locale-specific month name needs to be "
813 "different from C locale")
814
815 # locale-independent
816 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
817 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
818
Martin Panter3840b2a2016-03-27 01:53:46 +0000819 def test_connect_ex_error(self):
820 server = socket.socket(socket.AF_INET)
821 self.addCleanup(server.close)
822 port = support.bind_port(server) # Reserve port but don't listen
823 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
824 cert_reqs=ssl.CERT_REQUIRED)
825 self.addCleanup(s.close)
826 rc = s.connect_ex((HOST, port))
827 # Issue #19919: Windows machines or VMs hosted on Windows
828 # machines sometimes return EWOULDBLOCK.
829 errors = (
830 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
831 errno.EWOULDBLOCK,
832 )
833 self.assertIn(rc, errors)
834
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100835
Antoine Pitrou152efa22010-05-16 18:19:27 +0000836class ContextTests(unittest.TestCase):
837
Antoine Pitrou23df4832010-08-04 17:14:06 +0000838 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000839 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100840 for protocol in PROTOCOLS:
841 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200842 ctx = ssl.SSLContext()
843 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000844 self.assertRaises(ValueError, ssl.SSLContext, -1)
845 self.assertRaises(ValueError, ssl.SSLContext, 42)
846
Antoine Pitrou23df4832010-08-04 17:14:06 +0000847 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000848 def test_protocol(self):
849 for proto in PROTOCOLS:
850 ctx = ssl.SSLContext(proto)
851 self.assertEqual(ctx.protocol, proto)
852
853 def test_ciphers(self):
854 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
855 ctx.set_ciphers("ALL")
856 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000857 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000858 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000859
Christian Heimes25bfcd52016-09-06 00:04:45 +0200860 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
861 def test_get_ciphers(self):
862 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200863 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200864 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +0200865 self.assertIn('AES256-GCM-SHA384', names)
866 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +0200867
Antoine Pitrou23df4832010-08-04 17:14:06 +0000868 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000869 def test_options(self):
870 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800871 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +0200872 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
873 if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
874 default |= ssl.OP_NO_COMPRESSION
875 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800876 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +0200877 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000878 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +0200879 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
880 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000881 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -0700882 # Ubuntu has OP_NO_SSLv3 forced on by default
883 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +0000884 else:
885 with self.assertRaises(ValueError):
886 ctx.options = 0
887
Christian Heimes22587792013-11-21 23:56:13 +0100888 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000889 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
890 # Default value
891 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
892 ctx.verify_mode = ssl.CERT_OPTIONAL
893 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
894 ctx.verify_mode = ssl.CERT_REQUIRED
895 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
896 ctx.verify_mode = ssl.CERT_NONE
897 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
898 with self.assertRaises(TypeError):
899 ctx.verify_mode = None
900 with self.assertRaises(ValueError):
901 ctx.verify_mode = 42
902
Christian Heimes2427b502013-11-23 11:24:32 +0100903 @unittest.skipUnless(have_verify_flags(),
904 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100905 def test_verify_flags(self):
906 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -0500907 # default value
908 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
909 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +0100910 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
911 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
912 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
913 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
914 ctx.verify_flags = ssl.VERIFY_DEFAULT
915 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
916 # supports any value
917 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
918 self.assertEqual(ctx.verify_flags,
919 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
920 with self.assertRaises(TypeError):
921 ctx.verify_flags = None
922
Antoine Pitrou152efa22010-05-16 18:19:27 +0000923 def test_load_cert_chain(self):
924 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
925 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500926 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000927 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
928 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200929 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000930 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000931 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000932 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000933 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000934 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000935 ctx.load_cert_chain(EMPTYCERT)
936 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000937 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000938 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
939 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
940 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000941 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000942 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000943 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000944 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000945 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000946 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
947 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000948 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000949 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +0000950 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200951 # Password protected key and cert
952 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
953 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
954 ctx.load_cert_chain(CERTFILE_PROTECTED,
955 password=bytearray(KEY_PASSWORD.encode()))
956 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
957 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
958 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
959 bytearray(KEY_PASSWORD.encode()))
960 with self.assertRaisesRegex(TypeError, "should be a string"):
961 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
962 with self.assertRaises(ssl.SSLError):
963 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
964 with self.assertRaisesRegex(ValueError, "cannot be longer"):
965 # openssl has a fixed limit on the password buffer.
966 # PEM_BUFSIZE is generally set to 1kb.
967 # Return a string larger than this.
968 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
969 # Password callback
970 def getpass_unicode():
971 return KEY_PASSWORD
972 def getpass_bytes():
973 return KEY_PASSWORD.encode()
974 def getpass_bytearray():
975 return bytearray(KEY_PASSWORD.encode())
976 def getpass_badpass():
977 return "badpass"
978 def getpass_huge():
979 return b'a' * (1024 * 1024)
980 def getpass_bad_type():
981 return 9
982 def getpass_exception():
983 raise Exception('getpass error')
984 class GetPassCallable:
985 def __call__(self):
986 return KEY_PASSWORD
987 def getpass(self):
988 return KEY_PASSWORD
989 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
990 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
991 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
992 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
993 ctx.load_cert_chain(CERTFILE_PROTECTED,
994 password=GetPassCallable().getpass)
995 with self.assertRaises(ssl.SSLError):
996 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
997 with self.assertRaisesRegex(ValueError, "cannot be longer"):
998 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
999 with self.assertRaisesRegex(TypeError, "must return a string"):
1000 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1001 with self.assertRaisesRegex(Exception, "getpass error"):
1002 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1003 # Make sure the password function isn't called if it isn't needed
1004 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001005
1006 def test_load_verify_locations(self):
1007 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1008 ctx.load_verify_locations(CERTFILE)
1009 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1010 ctx.load_verify_locations(BYTES_CERTFILE)
1011 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1012 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001013 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001014 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001015 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001016 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001017 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001018 ctx.load_verify_locations(BADCERT)
1019 ctx.load_verify_locations(CERTFILE, CAPATH)
1020 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1021
Victor Stinner80f75e62011-01-29 11:31:20 +00001022 # Issue #10989: crash if the second argument type is invalid
1023 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1024
Christian Heimesefff7062013-11-21 03:35:02 +01001025 def test_load_verify_cadata(self):
1026 # test cadata
1027 with open(CAFILE_CACERT) as f:
1028 cacert_pem = f.read()
1029 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1030 with open(CAFILE_NEURONIO) as f:
1031 neuronio_pem = f.read()
1032 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1033
1034 # test PEM
1035 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1036 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1037 ctx.load_verify_locations(cadata=cacert_pem)
1038 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1039 ctx.load_verify_locations(cadata=neuronio_pem)
1040 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1041 # cert already in hash table
1042 ctx.load_verify_locations(cadata=neuronio_pem)
1043 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1044
1045 # combined
1046 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1047 combined = "\n".join((cacert_pem, neuronio_pem))
1048 ctx.load_verify_locations(cadata=combined)
1049 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1050
1051 # with junk around the certs
1052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1053 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1054 neuronio_pem, "tail"]
1055 ctx.load_verify_locations(cadata="\n".join(combined))
1056 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1057
1058 # test DER
1059 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1060 ctx.load_verify_locations(cadata=cacert_der)
1061 ctx.load_verify_locations(cadata=neuronio_der)
1062 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1063 # cert already in hash table
1064 ctx.load_verify_locations(cadata=cacert_der)
1065 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1066
1067 # combined
1068 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1069 combined = b"".join((cacert_der, neuronio_der))
1070 ctx.load_verify_locations(cadata=combined)
1071 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1072
1073 # error cases
1074 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1075 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1076
1077 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1078 ctx.load_verify_locations(cadata="broken")
1079 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1080 ctx.load_verify_locations(cadata=b"broken")
1081
1082
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001083 def test_load_dh_params(self):
1084 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1085 ctx.load_dh_params(DHFILE)
1086 if os.name != 'nt':
1087 ctx.load_dh_params(BYTES_DHFILE)
1088 self.assertRaises(TypeError, ctx.load_dh_params)
1089 self.assertRaises(TypeError, ctx.load_dh_params, None)
1090 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001091 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001092 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001093 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001094 ctx.load_dh_params(CERTFILE)
1095
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001096 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001097 def test_session_stats(self):
1098 for proto in PROTOCOLS:
1099 ctx = ssl.SSLContext(proto)
1100 self.assertEqual(ctx.session_stats(), {
1101 'number': 0,
1102 'connect': 0,
1103 'connect_good': 0,
1104 'connect_renegotiate': 0,
1105 'accept': 0,
1106 'accept_good': 0,
1107 'accept_renegotiate': 0,
1108 'hits': 0,
1109 'misses': 0,
1110 'timeouts': 0,
1111 'cache_full': 0,
1112 })
1113
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001114 def test_set_default_verify_paths(self):
1115 # There's not much we can do to test that it acts as expected,
1116 # so just check it doesn't crash or raise an exception.
1117 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1118 ctx.set_default_verify_paths()
1119
Antoine Pitrou501da612011-12-21 09:27:41 +01001120 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001121 def test_set_ecdh_curve(self):
1122 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1123 ctx.set_ecdh_curve("prime256v1")
1124 ctx.set_ecdh_curve(b"prime256v1")
1125 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1126 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1127 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1128 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1129
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001130 @needs_sni
1131 def test_sni_callback(self):
1132 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1133
1134 # set_servername_callback expects a callable, or None
1135 self.assertRaises(TypeError, ctx.set_servername_callback)
1136 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1137 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1138 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1139
1140 def dummycallback(sock, servername, ctx):
1141 pass
1142 ctx.set_servername_callback(None)
1143 ctx.set_servername_callback(dummycallback)
1144
1145 @needs_sni
1146 def test_sni_callback_refcycle(self):
1147 # Reference cycles through the servername callback are detected
1148 # and cleared.
1149 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1150 def dummycallback(sock, servername, ctx, cycle=ctx):
1151 pass
1152 ctx.set_servername_callback(dummycallback)
1153 wr = weakref.ref(ctx)
1154 del ctx, dummycallback
1155 gc.collect()
1156 self.assertIs(wr(), None)
1157
Christian Heimes9a5395a2013-06-17 15:44:12 +02001158 def test_cert_store_stats(self):
1159 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1160 self.assertEqual(ctx.cert_store_stats(),
1161 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1162 ctx.load_cert_chain(CERTFILE)
1163 self.assertEqual(ctx.cert_store_stats(),
1164 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1165 ctx.load_verify_locations(CERTFILE)
1166 self.assertEqual(ctx.cert_store_stats(),
1167 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001168 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001169 self.assertEqual(ctx.cert_store_stats(),
1170 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1171
1172 def test_get_ca_certs(self):
1173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1174 self.assertEqual(ctx.get_ca_certs(), [])
1175 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1176 ctx.load_verify_locations(CERTFILE)
1177 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001178 # but CAFILE_CACERT is a CA cert
1179 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001180 self.assertEqual(ctx.get_ca_certs(),
1181 [{'issuer': ((('organizationName', 'Root CA'),),
1182 (('organizationalUnitName', 'http://www.cacert.org'),),
1183 (('commonName', 'CA Cert Signing Authority'),),
1184 (('emailAddress', 'support@cacert.org'),)),
1185 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1186 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1187 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001188 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001189 'subject': ((('organizationName', 'Root CA'),),
1190 (('organizationalUnitName', 'http://www.cacert.org'),),
1191 (('commonName', 'CA Cert Signing Authority'),),
1192 (('emailAddress', 'support@cacert.org'),)),
1193 'version': 3}])
1194
Martin Panterb55f8b72016-01-14 12:53:56 +00001195 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001196 pem = f.read()
1197 der = ssl.PEM_cert_to_DER_cert(pem)
1198 self.assertEqual(ctx.get_ca_certs(True), [der])
1199
Christian Heimes72d28502013-11-23 13:56:58 +01001200 def test_load_default_certs(self):
1201 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1202 ctx.load_default_certs()
1203
1204 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1205 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1206 ctx.load_default_certs()
1207
1208 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1209 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1210
1211 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1212 self.assertRaises(TypeError, ctx.load_default_certs, None)
1213 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1214
Benjamin Peterson91244e02014-10-03 18:17:15 -04001215 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001216 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001217 def test_load_default_certs_env(self):
1218 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1219 with support.EnvironmentVarGuard() as env:
1220 env["SSL_CERT_DIR"] = CAPATH
1221 env["SSL_CERT_FILE"] = CERTFILE
1222 ctx.load_default_certs()
1223 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1224
Benjamin Peterson91244e02014-10-03 18:17:15 -04001225 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1226 def test_load_default_certs_env_windows(self):
1227 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1228 ctx.load_default_certs()
1229 stats = ctx.cert_store_stats()
1230
1231 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1232 with support.EnvironmentVarGuard() as env:
1233 env["SSL_CERT_DIR"] = CAPATH
1234 env["SSL_CERT_FILE"] = CERTFILE
1235 ctx.load_default_certs()
1236 stats["x509"] += 1
1237 self.assertEqual(ctx.cert_store_stats(), stats)
1238
Christian Heimes4c05b472013-11-23 15:58:30 +01001239 def test_create_default_context(self):
1240 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001241 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001242 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001243 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001244 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001245 self.assertEqual(
1246 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1247 getattr(ssl, "OP_NO_COMPRESSION", 0),
1248 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001249
1250 with open(SIGNING_CA) as f:
1251 cadata = f.read()
1252 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1253 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001254 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001255 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1256 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001257 self.assertEqual(
1258 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1259 getattr(ssl, "OP_NO_COMPRESSION", 0),
1260 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001261
1262 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001263 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001264 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1265 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001266 self.assertEqual(
1267 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1268 getattr(ssl, "OP_NO_COMPRESSION", 0),
1269 )
1270 self.assertEqual(
1271 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1272 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1273 )
1274 self.assertEqual(
1275 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1276 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1277 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001278
Christian Heimes67986f92013-11-23 22:43:47 +01001279 def test__create_stdlib_context(self):
1280 ctx = ssl._create_stdlib_context()
1281 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1282 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001283 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001284 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1285
1286 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1287 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1288 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1289 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1290
1291 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001292 cert_reqs=ssl.CERT_REQUIRED,
1293 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001294 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1295 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001296 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001297 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1298
1299 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1300 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1301 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1302 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001303
Christian Heimes1aa9a752013-12-02 02:41:19 +01001304 def test_check_hostname(self):
1305 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1306 self.assertFalse(ctx.check_hostname)
1307
1308 # Requires CERT_REQUIRED or CERT_OPTIONAL
1309 with self.assertRaises(ValueError):
1310 ctx.check_hostname = True
1311 ctx.verify_mode = ssl.CERT_REQUIRED
1312 self.assertFalse(ctx.check_hostname)
1313 ctx.check_hostname = True
1314 self.assertTrue(ctx.check_hostname)
1315
1316 ctx.verify_mode = ssl.CERT_OPTIONAL
1317 ctx.check_hostname = True
1318 self.assertTrue(ctx.check_hostname)
1319
1320 # Cannot set CERT_NONE with check_hostname enabled
1321 with self.assertRaises(ValueError):
1322 ctx.verify_mode = ssl.CERT_NONE
1323 ctx.check_hostname = False
1324 self.assertFalse(ctx.check_hostname)
1325
Antoine Pitrou152efa22010-05-16 18:19:27 +00001326
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001327class SSLErrorTests(unittest.TestCase):
1328
1329 def test_str(self):
1330 # The str() of a SSLError doesn't include the errno
1331 e = ssl.SSLError(1, "foo")
1332 self.assertEqual(str(e), "foo")
1333 self.assertEqual(e.errno, 1)
1334 # Same for a subclass
1335 e = ssl.SSLZeroReturnError(1, "foo")
1336 self.assertEqual(str(e), "foo")
1337 self.assertEqual(e.errno, 1)
1338
1339 def test_lib_reason(self):
1340 # Test the library and reason attributes
1341 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1342 with self.assertRaises(ssl.SSLError) as cm:
1343 ctx.load_dh_params(CERTFILE)
1344 self.assertEqual(cm.exception.library, 'PEM')
1345 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1346 s = str(cm.exception)
1347 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1348
1349 def test_subclass(self):
1350 # Check that the appropriate SSLError subclass is raised
1351 # (this only tests one of them)
1352 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1353 with socket.socket() as s:
1354 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001355 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001356 c = socket.socket()
1357 c.connect(s.getsockname())
1358 c.setblocking(False)
1359 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001360 with self.assertRaises(ssl.SSLWantReadError) as cm:
1361 c.do_handshake()
1362 s = str(cm.exception)
1363 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1364 # For compatibility
1365 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1366
1367
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001368class MemoryBIOTests(unittest.TestCase):
1369
1370 def test_read_write(self):
1371 bio = ssl.MemoryBIO()
1372 bio.write(b'foo')
1373 self.assertEqual(bio.read(), b'foo')
1374 self.assertEqual(bio.read(), b'')
1375 bio.write(b'foo')
1376 bio.write(b'bar')
1377 self.assertEqual(bio.read(), b'foobar')
1378 self.assertEqual(bio.read(), b'')
1379 bio.write(b'baz')
1380 self.assertEqual(bio.read(2), b'ba')
1381 self.assertEqual(bio.read(1), b'z')
1382 self.assertEqual(bio.read(1), b'')
1383
1384 def test_eof(self):
1385 bio = ssl.MemoryBIO()
1386 self.assertFalse(bio.eof)
1387 self.assertEqual(bio.read(), b'')
1388 self.assertFalse(bio.eof)
1389 bio.write(b'foo')
1390 self.assertFalse(bio.eof)
1391 bio.write_eof()
1392 self.assertFalse(bio.eof)
1393 self.assertEqual(bio.read(2), b'fo')
1394 self.assertFalse(bio.eof)
1395 self.assertEqual(bio.read(1), b'o')
1396 self.assertTrue(bio.eof)
1397 self.assertEqual(bio.read(), b'')
1398 self.assertTrue(bio.eof)
1399
1400 def test_pending(self):
1401 bio = ssl.MemoryBIO()
1402 self.assertEqual(bio.pending, 0)
1403 bio.write(b'foo')
1404 self.assertEqual(bio.pending, 3)
1405 for i in range(3):
1406 bio.read(1)
1407 self.assertEqual(bio.pending, 3-i-1)
1408 for i in range(3):
1409 bio.write(b'x')
1410 self.assertEqual(bio.pending, i+1)
1411 bio.read()
1412 self.assertEqual(bio.pending, 0)
1413
1414 def test_buffer_types(self):
1415 bio = ssl.MemoryBIO()
1416 bio.write(b'foo')
1417 self.assertEqual(bio.read(), b'foo')
1418 bio.write(bytearray(b'bar'))
1419 self.assertEqual(bio.read(), b'bar')
1420 bio.write(memoryview(b'baz'))
1421 self.assertEqual(bio.read(), b'baz')
1422
1423 def test_error_types(self):
1424 bio = ssl.MemoryBIO()
1425 self.assertRaises(TypeError, bio.write, 'foo')
1426 self.assertRaises(TypeError, bio.write, None)
1427 self.assertRaises(TypeError, bio.write, True)
1428 self.assertRaises(TypeError, bio.write, 1)
1429
1430
Martin Panter3840b2a2016-03-27 01:53:46 +00001431@unittest.skipUnless(_have_threads, "Needs threading module")
1432class SimpleBackgroundTests(unittest.TestCase):
1433
1434 """Tests that connect to a simple server running in the background"""
1435
1436 def setUp(self):
1437 server = ThreadedEchoServer(SIGNED_CERTFILE)
1438 self.server_addr = (HOST, server.port)
1439 server.__enter__()
1440 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001441
Antoine Pitrou480a1242010-04-28 21:37:09 +00001442 def test_connect(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001443 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1444 cert_reqs=ssl.CERT_NONE) as s:
1445 s.connect(self.server_addr)
1446 self.assertEqual({}, s.getpeercert())
Antoine Pitrou350c7222010-09-09 13:31:46 +00001447
Martin Panter3840b2a2016-03-27 01:53:46 +00001448 # this should succeed because we specify the root cert
1449 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1450 cert_reqs=ssl.CERT_REQUIRED,
1451 ca_certs=SIGNING_CA) as s:
1452 s.connect(self.server_addr)
1453 self.assertTrue(s.getpeercert())
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001454
Martin Panter3840b2a2016-03-27 01:53:46 +00001455 def test_connect_fail(self):
1456 # This should fail because we have no verification certs. Connection
1457 # failure crashes ThreadedEchoServer, so run this in an independent
1458 # test method.
1459 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1460 cert_reqs=ssl.CERT_REQUIRED)
1461 self.addCleanup(s.close)
1462 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1463 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001464
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001465 def test_connect_ex(self):
1466 # Issue #11326: check connect_ex() implementation
Martin Panter3840b2a2016-03-27 01:53:46 +00001467 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1468 cert_reqs=ssl.CERT_REQUIRED,
1469 ca_certs=SIGNING_CA)
1470 self.addCleanup(s.close)
1471 self.assertEqual(0, s.connect_ex(self.server_addr))
1472 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001473
1474 def test_non_blocking_connect_ex(self):
1475 # Issue #11326: non-blocking connect_ex() should allow handshake
1476 # to proceed after the socket gets ready.
Martin Panter3840b2a2016-03-27 01:53:46 +00001477 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1478 cert_reqs=ssl.CERT_REQUIRED,
1479 ca_certs=SIGNING_CA,
1480 do_handshake_on_connect=False)
1481 self.addCleanup(s.close)
1482 s.setblocking(False)
1483 rc = s.connect_ex(self.server_addr)
1484 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1485 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1486 # Wait for connect to finish
1487 select.select([], [s], [], 5.0)
1488 # Non-blocking handshake
1489 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001490 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001491 s.do_handshake()
1492 break
1493 except ssl.SSLWantReadError:
1494 select.select([s], [], [], 5.0)
1495 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001496 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001497 # SSL established
1498 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001499
Antoine Pitrou152efa22010-05-16 18:19:27 +00001500 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001501 # Same as test_connect, but with a separately created context
1502 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1503 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1504 s.connect(self.server_addr)
1505 self.assertEqual({}, s.getpeercert())
1506 # Same with a server hostname
1507 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1508 server_hostname="dummy") as s:
1509 s.connect(self.server_addr)
1510 ctx.verify_mode = ssl.CERT_REQUIRED
1511 # This should succeed because we specify the root cert
1512 ctx.load_verify_locations(SIGNING_CA)
1513 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1514 s.connect(self.server_addr)
1515 cert = s.getpeercert()
1516 self.assertTrue(cert)
1517
1518 def test_connect_with_context_fail(self):
1519 # This should fail because we have no verification certs. Connection
1520 # failure crashes ThreadedEchoServer, so run this in an independent
1521 # test method.
1522 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1523 ctx.verify_mode = ssl.CERT_REQUIRED
1524 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1525 self.addCleanup(s.close)
1526 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1527 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001528
1529 def test_connect_capath(self):
1530 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001531 # NOTE: the subject hashing algorithm has been changed between
1532 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1533 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001534 # filename) for this test to be portable across OpenSSL releases.
Martin Panter3840b2a2016-03-27 01:53:46 +00001535 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1536 ctx.verify_mode = ssl.CERT_REQUIRED
1537 ctx.load_verify_locations(capath=CAPATH)
1538 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1539 s.connect(self.server_addr)
1540 cert = s.getpeercert()
1541 self.assertTrue(cert)
1542 # Same with a bytes `capath` argument
1543 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1544 ctx.verify_mode = ssl.CERT_REQUIRED
1545 ctx.load_verify_locations(capath=BYTES_CAPATH)
1546 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1547 s.connect(self.server_addr)
1548 cert = s.getpeercert()
1549 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001550
Christian Heimesefff7062013-11-21 03:35:02 +01001551 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001552 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001553 pem = f.read()
1554 der = ssl.PEM_cert_to_DER_cert(pem)
Martin Panter3840b2a2016-03-27 01:53:46 +00001555 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1556 ctx.verify_mode = ssl.CERT_REQUIRED
1557 ctx.load_verify_locations(cadata=pem)
1558 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1559 s.connect(self.server_addr)
1560 cert = s.getpeercert()
1561 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001562
Martin Panter3840b2a2016-03-27 01:53:46 +00001563 # same with DER
1564 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1565 ctx.verify_mode = ssl.CERT_REQUIRED
1566 ctx.load_verify_locations(cadata=der)
1567 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1568 s.connect(self.server_addr)
1569 cert = s.getpeercert()
1570 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001571
Antoine Pitroue3220242010-04-24 11:13:53 +00001572 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1573 def test_makefile_close(self):
1574 # Issue #5238: creating a file-like object with makefile() shouldn't
1575 # delay closing the underlying "real socket" (here tested with its
1576 # file descriptor, hence skipping the test under Windows).
Martin Panter3840b2a2016-03-27 01:53:46 +00001577 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1578 ss.connect(self.server_addr)
1579 fd = ss.fileno()
1580 f = ss.makefile()
1581 f.close()
1582 # The fd is still open
1583 os.read(fd, 0)
1584 # Closing the SSL socket should close the fd too
1585 ss.close()
1586 gc.collect()
1587 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001588 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001589 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001590
Antoine Pitrou480a1242010-04-28 21:37:09 +00001591 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001592 s = socket.socket(socket.AF_INET)
1593 s.connect(self.server_addr)
1594 s.setblocking(False)
1595 s = ssl.wrap_socket(s,
1596 cert_reqs=ssl.CERT_NONE,
1597 do_handshake_on_connect=False)
1598 self.addCleanup(s.close)
1599 count = 0
1600 while True:
1601 try:
1602 count += 1
1603 s.do_handshake()
1604 break
1605 except ssl.SSLWantReadError:
1606 select.select([s], [], [])
1607 except ssl.SSLWantWriteError:
1608 select.select([], [s], [])
1609 if support.verbose:
1610 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001611
Antoine Pitrou480a1242010-04-28 21:37:09 +00001612 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001613 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001614
Martin Panter3840b2a2016-03-27 01:53:46 +00001615 def test_get_server_certificate_fail(self):
1616 # Connection failure crashes ThreadedEchoServer, so run this in an
1617 # independent test method
1618 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001619
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001620 def test_ciphers(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001621 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1622 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1623 s.connect(self.server_addr)
1624 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1625 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1626 s.connect(self.server_addr)
1627 # Error checking can happen at instantiation or when connecting
1628 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1629 with socket.socket(socket.AF_INET) as sock:
1630 s = ssl.wrap_socket(sock,
1631 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1632 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001633
Christian Heimes9a5395a2013-06-17 15:44:12 +02001634 def test_get_ca_certs_capath(self):
1635 # capath certs are loaded on request
Martin Panter3840b2a2016-03-27 01:53:46 +00001636 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1637 ctx.verify_mode = ssl.CERT_REQUIRED
1638 ctx.load_verify_locations(capath=CAPATH)
1639 self.assertEqual(ctx.get_ca_certs(), [])
1640 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1641 s.connect(self.server_addr)
1642 cert = s.getpeercert()
1643 self.assertTrue(cert)
1644 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001645
Christian Heimes575596e2013-12-15 21:49:17 +01001646 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001647 def test_context_setget(self):
1648 # Check that the context of a connected socket can be replaced.
Martin Panter3840b2a2016-03-27 01:53:46 +00001649 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1650 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1651 s = socket.socket(socket.AF_INET)
1652 with ctx1.wrap_socket(s) as ss:
1653 ss.connect(self.server_addr)
1654 self.assertIs(ss.context, ctx1)
1655 self.assertIs(ss._sslobj.context, ctx1)
1656 ss.context = ctx2
1657 self.assertIs(ss.context, ctx2)
1658 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001659
1660 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1661 # A simple IO loop. Call func(*args) depending on the error we get
1662 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1663 timeout = kwargs.get('timeout', 10)
1664 count = 0
1665 while True:
1666 errno = None
1667 count += 1
1668 try:
1669 ret = func(*args)
1670 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001671 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001672 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001673 raise
1674 errno = e.errno
1675 # Get any data from the outgoing BIO irrespective of any error, and
1676 # send it to the socket.
1677 buf = outgoing.read()
1678 sock.sendall(buf)
1679 # If there's no error, we're done. For WANT_READ, we need to get
1680 # data from the socket and put it in the incoming BIO.
1681 if errno is None:
1682 break
1683 elif errno == ssl.SSL_ERROR_WANT_READ:
1684 buf = sock.recv(32768)
1685 if buf:
1686 incoming.write(buf)
1687 else:
1688 incoming.write_eof()
1689 if support.verbose:
1690 sys.stdout.write("Needed %d calls to complete %s().\n"
1691 % (count, func.__name__))
1692 return ret
1693
Martin Panter3840b2a2016-03-27 01:53:46 +00001694 def test_bio_handshake(self):
1695 sock = socket.socket(socket.AF_INET)
1696 self.addCleanup(sock.close)
1697 sock.connect(self.server_addr)
1698 incoming = ssl.MemoryBIO()
1699 outgoing = ssl.MemoryBIO()
1700 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1701 ctx.verify_mode = ssl.CERT_REQUIRED
1702 ctx.load_verify_locations(SIGNING_CA)
1703 ctx.check_hostname = True
1704 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
1705 self.assertIs(sslobj._sslobj.owner, sslobj)
1706 self.assertIsNone(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001707 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001708 self.assertRaises(ValueError, sslobj.getpeercert)
1709 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1710 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1711 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1712 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001713 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001714 self.assertTrue(sslobj.getpeercert())
1715 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1716 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1717 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001718 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001719 except ssl.SSLSyscallError:
1720 # If the server shuts down the TCP connection without sending a
1721 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1722 pass
1723 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1724
1725 def test_bio_read_write_data(self):
1726 sock = socket.socket(socket.AF_INET)
1727 self.addCleanup(sock.close)
1728 sock.connect(self.server_addr)
1729 incoming = ssl.MemoryBIO()
1730 outgoing = ssl.MemoryBIO()
1731 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1732 ctx.verify_mode = ssl.CERT_NONE
1733 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1734 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1735 req = b'FOO\n'
1736 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1737 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1738 self.assertEqual(buf, b'foo\n')
1739 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001740
1741
Martin Panter3840b2a2016-03-27 01:53:46 +00001742class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001743
Martin Panter3840b2a2016-03-27 01:53:46 +00001744 def test_timeout_connect_ex(self):
1745 # Issue #12065: on a timeout, connect_ex() should return the original
1746 # errno (mimicking the behaviour of non-SSL sockets).
1747 with support.transient_internet(REMOTE_HOST):
1748 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1749 cert_reqs=ssl.CERT_REQUIRED,
1750 do_handshake_on_connect=False)
1751 self.addCleanup(s.close)
1752 s.settimeout(0.0000001)
1753 rc = s.connect_ex((REMOTE_HOST, 443))
1754 if rc == 0:
1755 self.skipTest("REMOTE_HOST responded too quickly")
1756 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1757
1758 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1759 def test_get_server_certificate_ipv6(self):
1760 with support.transient_internet('ipv6.google.com'):
1761 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1762 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1763
1764 def test_algorithms(self):
1765 # Issue #8484: all algorithms should be available when verifying a
1766 # certificate.
1767 # SHA256 was added in OpenSSL 0.9.8
1768 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1769 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
1770 # sha256.tbs-internet.com needs SNI to use the correct certificate
1771 if not ssl.HAS_SNI:
1772 self.skipTest("SNI needed for this test")
1773 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1774 remote = ("sha256.tbs-internet.com", 443)
1775 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
1776 with support.transient_internet("sha256.tbs-internet.com"):
1777 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1778 ctx.verify_mode = ssl.CERT_REQUIRED
1779 ctx.load_verify_locations(sha256_cert)
1780 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1781 server_hostname="sha256.tbs-internet.com")
1782 try:
1783 s.connect(remote)
1784 if support.verbose:
1785 sys.stdout.write("\nCipher with %r is %r\n" %
1786 (remote, s.cipher()))
1787 sys.stdout.write("Certificate is:\n%s\n" %
1788 pprint.pformat(s.getpeercert()))
1789 finally:
1790 s.close()
1791
1792
1793def _test_get_server_certificate(test, host, port, cert=None):
1794 pem = ssl.get_server_certificate((host, port))
1795 if not pem:
1796 test.fail("No server certificate on %s:%s!" % (host, port))
1797
1798 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1799 if not pem:
1800 test.fail("No server certificate on %s:%s!" % (host, port))
1801 if support.verbose:
1802 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1803
1804def _test_get_server_certificate_fail(test, host, port):
1805 try:
1806 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1807 except ssl.SSLError as x:
1808 #should fail
1809 if support.verbose:
1810 sys.stdout.write("%s\n" % x)
1811 else:
1812 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1813
1814
1815if _have_threads:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001816 from test.ssl_servers import make_https_server
1817
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001818 class ThreadedEchoServer(threading.Thread):
1819
1820 class ConnectionHandler(threading.Thread):
1821
1822 """A mildly complicated class, because we want it to work both
1823 with and without the SSL wrapper around the socket connection, so
1824 that we can test the STARTTLS functionality."""
1825
Bill Janssen6e027db2007-11-15 22:23:56 +00001826 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001827 self.server = server
1828 self.running = False
1829 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001830 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001831 self.sock.setblocking(1)
1832 self.sslconn = None
1833 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001834 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001835
Antoine Pitrou480a1242010-04-28 21:37:09 +00001836 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001837 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001838 self.sslconn = self.server.context.wrap_socket(
1839 self.sock, server_side=True)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001840 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1841 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001842 except (ssl.SSLError, ConnectionResetError) as e:
1843 # We treat ConnectionResetError as though it were an
1844 # SSLError - OpenSSL on Ubuntu abruptly closes the
1845 # connection when asked to use an unsupported protocol.
1846 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001847 # XXX Various errors can have happened here, for example
1848 # a mismatching protocol version, an invalid certificate,
1849 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001850 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001851 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001852 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001853 self.running = False
1854 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001855 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001856 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001857 else:
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001858 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
Antoine Pitroub5218772010-05-21 09:56:06 +00001859 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001860 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001861 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001862 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1863 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001864 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001865 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1866 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001867 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001868 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001869 sys.stdout.write(" server: selected protocol is now "
1870 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001871 return True
1872
1873 def read(self):
1874 if self.sslconn:
1875 return self.sslconn.read()
1876 else:
1877 return self.sock.recv(1024)
1878
1879 def write(self, bytes):
1880 if self.sslconn:
1881 return self.sslconn.write(bytes)
1882 else:
1883 return self.sock.send(bytes)
1884
1885 def close(self):
1886 if self.sslconn:
1887 self.sslconn.close()
1888 else:
1889 self.sock.close()
1890
Antoine Pitrou480a1242010-04-28 21:37:09 +00001891 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001892 self.running = True
1893 if not self.server.starttls_server:
1894 if not self.wrap_conn():
1895 return
1896 while self.running:
1897 try:
1898 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001899 stripped = msg.strip()
1900 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001901 # eof, so quit this handler
1902 self.running = False
Martin Panter3840b2a2016-03-27 01:53:46 +00001903 try:
1904 self.sock = self.sslconn.unwrap()
1905 except OSError:
1906 # Many tests shut the TCP connection down
1907 # without an SSL shutdown. This causes
1908 # unwrap() to raise OSError with errno=0!
1909 pass
1910 else:
1911 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001912 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001913 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001914 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001915 sys.stdout.write(" server: client closed connection\n")
1916 self.close()
1917 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001918 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001919 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001920 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001921 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001922 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001923 if not self.wrap_conn():
1924 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001925 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001926 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001927 if support.verbose and self.server.connectionchatty:
1928 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001929 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001930 self.sock = self.sslconn.unwrap()
1931 self.sslconn = None
1932 if support.verbose and self.server.connectionchatty:
1933 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001934 elif stripped == b'CB tls-unique':
1935 if support.verbose and self.server.connectionchatty:
1936 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1937 data = self.sslconn.get_channel_binding("tls-unique")
1938 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001939 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001940 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001941 self.server.connectionchatty):
1942 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001943 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1944 % (msg, ctype, msg.lower(), ctype))
1945 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001946 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001947 if self.server.chatty:
1948 handle_error("Test server failure:\n")
1949 self.close()
1950 self.running = False
1951 # normally, we'd just stop here, but for the test
1952 # harness, we want to stop the server
1953 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001954
Antoine Pitroub5218772010-05-21 09:56:06 +00001955 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001956 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001957 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersoncca27322015-01-23 16:35:37 -05001958 npn_protocols=None, alpn_protocols=None,
1959 ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001960 if context:
1961 self.context = context
1962 else:
1963 self.context = ssl.SSLContext(ssl_version
1964 if ssl_version is not None
1965 else ssl.PROTOCOL_TLSv1)
1966 self.context.verify_mode = (certreqs if certreqs is not None
1967 else ssl.CERT_NONE)
1968 if cacerts:
1969 self.context.load_verify_locations(cacerts)
1970 if certificate:
1971 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001972 if npn_protocols:
1973 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001974 if alpn_protocols:
1975 self.context.set_alpn_protocols(alpn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001976 if ciphers:
1977 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001978 self.chatty = chatty
1979 self.connectionchatty = connectionchatty
1980 self.starttls_server = starttls_server
1981 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001982 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001983 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001984 self.active = False
Benjamin Petersoncca27322015-01-23 16:35:37 -05001985 self.selected_npn_protocols = []
1986 self.selected_alpn_protocols = []
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001987 self.shared_ciphers = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001988 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001989 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001990 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001991
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001992 def __enter__(self):
1993 self.start(threading.Event())
1994 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001995 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001996
1997 def __exit__(self, *args):
1998 self.stop()
1999 self.join()
2000
Antoine Pitrou480a1242010-04-28 21:37:09 +00002001 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002002 self.flag = flag
2003 threading.Thread.start(self)
2004
Antoine Pitrou480a1242010-04-28 21:37:09 +00002005 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00002006 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01002007 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002008 self.active = True
2009 if self.flag:
2010 # signal an event
2011 self.flag.set()
2012 while self.active:
2013 try:
2014 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002015 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002016 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00002017 + repr(connaddr) + '\n')
2018 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002019 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01002020 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002021 except socket.timeout:
2022 pass
2023 except KeyboardInterrupt:
2024 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00002025 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002026
Antoine Pitrou480a1242010-04-28 21:37:09 +00002027 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002028 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002029
Bill Janssen54cc54c2007-12-14 22:08:56 +00002030 class AsyncoreEchoServer(threading.Thread):
2031
2032 # this one's based on asyncore.dispatcher
2033
2034 class EchoServer (asyncore.dispatcher):
2035
2036 class ConnectionHandler (asyncore.dispatcher_with_send):
2037
2038 def __init__(self, conn, certfile):
2039 self.socket = ssl.wrap_socket(conn, server_side=True,
2040 certfile=certfile,
2041 do_handshake_on_connect=False)
2042 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002043 self._ssl_accepting = True
2044 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002045
2046 def readable(self):
2047 if isinstance(self.socket, ssl.SSLSocket):
2048 while self.socket.pending() > 0:
2049 self.handle_read_event()
2050 return True
2051
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002052 def _do_ssl_handshake(self):
2053 try:
2054 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02002055 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2056 return
2057 except ssl.SSLEOFError:
2058 return self.handle_close()
2059 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002060 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002061 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002062 if err.args[0] == errno.ECONNABORTED:
2063 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002064 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002065 self._ssl_accepting = False
2066
2067 def handle_read(self):
2068 if self._ssl_accepting:
2069 self._do_ssl_handshake()
2070 else:
2071 data = self.recv(1024)
2072 if support.verbose:
2073 sys.stdout.write(" server: read %s from client\n" % repr(data))
2074 if not data:
2075 self.close()
2076 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002077 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002078
2079 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00002080 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002081 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002082 sys.stdout.write(" server: closed connection %s\n" % self.socket)
2083
2084 def handle_error(self):
2085 raise
2086
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002087 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002088 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002089 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2090 self.port = support.bind_port(sock, '')
2091 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002092 self.listen(5)
2093
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00002094 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002095 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002096 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2097 self.ConnectionHandler(sock_obj, self.certfile)
2098
2099 def handle_error(self):
2100 raise
2101
Trent Nelson78520002008-04-10 20:54:35 +00002102 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002103 self.flag = None
2104 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002105 self.server = self.EchoServer(certfile)
2106 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00002107 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002108 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002109
2110 def __str__(self):
2111 return "<%s %s>" % (self.__class__.__name__, self.server)
2112
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002113 def __enter__(self):
2114 self.start(threading.Event())
2115 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002116 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002117
2118 def __exit__(self, *args):
2119 if support.verbose:
2120 sys.stdout.write(" cleanup: stopping server.\n")
2121 self.stop()
2122 if support.verbose:
2123 sys.stdout.write(" cleanup: joining server thread.\n")
2124 self.join()
2125 if support.verbose:
2126 sys.stdout.write(" cleanup: successfully joined.\n")
2127
Bill Janssen54cc54c2007-12-14 22:08:56 +00002128 def start (self, flag=None):
2129 self.flag = flag
2130 threading.Thread.start(self)
2131
Antoine Pitrou480a1242010-04-28 21:37:09 +00002132 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002133 self.active = True
2134 if self.flag:
2135 self.flag.set()
2136 while self.active:
2137 try:
2138 asyncore.loop(1)
2139 except:
2140 pass
2141
Antoine Pitrou480a1242010-04-28 21:37:09 +00002142 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002143 self.active = False
2144 self.server.close()
2145
Antoine Pitroub5218772010-05-21 09:56:06 +00002146 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002147 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00002148 """
2149 Launch a server, connect a client to it and try various reads
2150 and writes.
2151 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002152 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00002153 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002154 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00002155 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002156 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002157 with client_context.wrap_socket(socket.socket(),
2158 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002159 s.connect((HOST, server.port))
2160 for arg in [indata, bytearray(indata), memoryview(indata)]:
2161 if connectionchatty:
2162 if support.verbose:
2163 sys.stdout.write(
2164 " client: sending %r...\n" % indata)
2165 s.write(arg)
2166 outdata = s.read()
2167 if connectionchatty:
2168 if support.verbose:
2169 sys.stdout.write(" client: read %r\n" % outdata)
2170 if outdata != indata.lower():
2171 raise AssertionError(
2172 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2173 % (outdata[:20], len(outdata),
2174 indata[:20].lower(), len(indata)))
2175 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00002176 if connectionchatty:
2177 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002178 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002179 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01002180 'compression': s.compression(),
2181 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002182 'peercert': s.getpeercert(),
Benjamin Petersoncca27322015-01-23 16:35:37 -05002183 'client_alpn_protocol': s.selected_alpn_protocol(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02002184 'client_npn_protocol': s.selected_npn_protocol(),
2185 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002186 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01002187 s.close()
Benjamin Petersoncca27322015-01-23 16:35:37 -05002188 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2189 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Peterson4cb17812015-01-07 11:14:26 -06002190 stats['server_shared_ciphers'] = server.shared_ciphers
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002191 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00002192
Antoine Pitroub5218772010-05-21 09:56:06 +00002193 def try_protocol_combo(server_protocol, client_protocol, expect_success,
2194 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02002195 """
2196 Try to SSL-connect using *client_protocol* to *server_protocol*.
2197 If *expect_success* is true, assert that the connection succeeds,
2198 if it's false, assert that the connection fails.
2199 Also, if *expect_success* is a string, assert that it is the protocol
2200 version actually used by the connection.
2201 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00002202 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002203 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00002204 certtype = {
2205 ssl.CERT_NONE: "CERT_NONE",
2206 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2207 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2208 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002209 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002210 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002211 sys.stdout.write(formatstr %
2212 (ssl.get_protocol_name(client_protocol),
2213 ssl.get_protocol_name(server_protocol),
2214 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00002215 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002216 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00002217 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002218 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002219
2220 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2221 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2222 # starting from OpenSSL 1.0.0 (see issue #8322).
2223 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2224 client_context.set_ciphers("ALL")
2225
Antoine Pitroub5218772010-05-21 09:56:06 +00002226 for ctx in (client_context, server_context):
2227 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00002228 ctx.load_cert_chain(CERTFILE)
2229 ctx.load_verify_locations(CERTFILE)
2230 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02002231 stats = server_params_test(client_context, server_context,
2232 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002233 # Protocol mismatch can result in either an SSLError, or a
2234 # "Connection reset by peer" error.
2235 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002236 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002237 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002238 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002239 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002240 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002241 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002242 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002243 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002244 "Client protocol %s succeeded with server protocol %s!"
2245 % (ssl.get_protocol_name(client_protocol),
2246 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002247 elif (expect_success is not True
2248 and expect_success != stats['version']):
2249 raise AssertionError("version mismatch: expected %r, got %r"
2250 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002251
2252
Bill Janssen6e027db2007-11-15 22:23:56 +00002253 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002254
Antoine Pitrou23df4832010-08-04 17:14:06 +00002255 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002256 def test_echo(self):
2257 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002258 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002259 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002260 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002261 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2262 context = ssl.SSLContext(protocol)
2263 context.load_cert_chain(CERTFILE)
2264 server_params_test(context, context,
2265 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002266
Antoine Pitrou480a1242010-04-28 21:37:09 +00002267 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002268 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002269 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002270 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2271 context.verify_mode = ssl.CERT_REQUIRED
2272 context.load_verify_locations(CERTFILE)
2273 context.load_cert_chain(CERTFILE)
2274 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002275 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002276 s = context.wrap_socket(socket.socket(),
2277 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002278 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002279 # getpeercert() raise ValueError while the handshake isn't
2280 # done.
2281 with self.assertRaises(ValueError):
2282 s.getpeercert()
2283 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002284 cert = s.getpeercert()
2285 self.assertTrue(cert, "Can't get peer certificate.")
2286 cipher = s.cipher()
2287 if support.verbose:
2288 sys.stdout.write(pprint.pformat(cert) + '\n')
2289 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2290 if 'subject' not in cert:
2291 self.fail("No subject field in certificate: %s." %
2292 pprint.pformat(cert))
2293 if ((('organizationName', 'Python Software Foundation'),)
2294 not in cert['subject']):
2295 self.fail(
2296 "Missing or invalid 'organizationName' field in certificate subject; "
2297 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002298 self.assertIn('notBefore', cert)
2299 self.assertIn('notAfter', cert)
2300 before = ssl.cert_time_to_seconds(cert['notBefore'])
2301 after = ssl.cert_time_to_seconds(cert['notAfter'])
2302 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002303 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002304
Christian Heimes2427b502013-11-23 11:24:32 +01002305 @unittest.skipUnless(have_verify_flags(),
2306 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002307 def test_crl_check(self):
2308 if support.verbose:
2309 sys.stdout.write("\n")
2310
2311 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2312 server_context.load_cert_chain(SIGNED_CERTFILE)
2313
2314 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2315 context.verify_mode = ssl.CERT_REQUIRED
2316 context.load_verify_locations(SIGNING_CA)
Benjamin Petersonc3d9c5c2015-03-04 23:18:48 -05002317 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2318 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01002319
2320 # VERIFY_DEFAULT should pass
2321 server = ThreadedEchoServer(context=server_context, chatty=True)
2322 with server:
2323 with context.wrap_socket(socket.socket()) as s:
2324 s.connect((HOST, server.port))
2325 cert = s.getpeercert()
2326 self.assertTrue(cert, "Can't get peer certificate.")
2327
2328 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002329 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002330
2331 server = ThreadedEchoServer(context=server_context, chatty=True)
2332 with server:
2333 with context.wrap_socket(socket.socket()) as s:
2334 with self.assertRaisesRegex(ssl.SSLError,
2335 "certificate verify failed"):
2336 s.connect((HOST, server.port))
2337
2338 # now load a CRL file. The CRL file is signed by the CA.
2339 context.load_verify_locations(CRLFILE)
2340
2341 server = ThreadedEchoServer(context=server_context, chatty=True)
2342 with server:
2343 with context.wrap_socket(socket.socket()) as s:
2344 s.connect((HOST, server.port))
2345 cert = s.getpeercert()
2346 self.assertTrue(cert, "Can't get peer certificate.")
2347
Christian Heimes1aa9a752013-12-02 02:41:19 +01002348 def test_check_hostname(self):
2349 if support.verbose:
2350 sys.stdout.write("\n")
2351
2352 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2353 server_context.load_cert_chain(SIGNED_CERTFILE)
2354
2355 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2356 context.verify_mode = ssl.CERT_REQUIRED
2357 context.check_hostname = True
2358 context.load_verify_locations(SIGNING_CA)
2359
2360 # correct hostname should verify
2361 server = ThreadedEchoServer(context=server_context, chatty=True)
2362 with server:
2363 with context.wrap_socket(socket.socket(),
2364 server_hostname="localhost") as s:
2365 s.connect((HOST, server.port))
2366 cert = s.getpeercert()
2367 self.assertTrue(cert, "Can't get peer certificate.")
2368
2369 # incorrect hostname should raise an exception
2370 server = ThreadedEchoServer(context=server_context, chatty=True)
2371 with server:
2372 with context.wrap_socket(socket.socket(),
2373 server_hostname="invalid") as s:
2374 with self.assertRaisesRegex(ssl.CertificateError,
2375 "hostname 'invalid' doesn't match 'localhost'"):
2376 s.connect((HOST, server.port))
2377
2378 # missing server_hostname arg should cause an exception, too
2379 server = ThreadedEchoServer(context=server_context, chatty=True)
2380 with server:
2381 with socket.socket() as s:
2382 with self.assertRaisesRegex(ValueError,
2383 "check_hostname requires server_hostname"):
2384 context.wrap_socket(s)
2385
Martin Panter407b62f2016-01-30 03:41:43 +00002386 def test_wrong_cert(self):
Martin Panter3464ea22016-02-01 21:58:11 +00002387 """Connecting when the server rejects the client's certificate
2388
2389 Launch a server with CERT_REQUIRED, and check that trying to
2390 connect to it with a wrong client certificate fails.
2391 """
2392 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2393 "wrongcert.pem")
2394 server = ThreadedEchoServer(CERTFILE,
2395 certreqs=ssl.CERT_REQUIRED,
2396 cacerts=CERTFILE, chatty=False,
2397 connectionchatty=False)
2398 with server, \
2399 socket.socket() as sock, \
2400 ssl.wrap_socket(sock,
2401 certfile=certfile,
2402 ssl_version=ssl.PROTOCOL_TLSv1) as s:
2403 try:
2404 # Expect either an SSL error about the server rejecting
2405 # the connection, or a low-level connection reset (which
2406 # sometimes happens on Windows)
2407 s.connect((HOST, server.port))
2408 except ssl.SSLError as e:
2409 if support.verbose:
2410 sys.stdout.write("\nSSLError is %r\n" % e)
2411 except OSError as e:
2412 if e.errno != errno.ECONNRESET:
2413 raise
2414 if support.verbose:
2415 sys.stdout.write("\nsocket.error is %r\n" % e)
2416 else:
2417 self.fail("Use of invalid cert should have failed!")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002418
Antoine Pitrou480a1242010-04-28 21:37:09 +00002419 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002420 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002421 in the client when attempting handshake.
2422 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002423 listener_ready = threading.Event()
2424 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002425
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002426 s = socket.socket()
2427 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002428
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002429 # `listener` runs in a thread. It sits in an accept() until
2430 # the main thread connects. Then it rudely closes the socket,
2431 # and sets Event `listener_gone` to let the main thread know
2432 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002433 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002434 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002435 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002436 newsock, addr = s.accept()
2437 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002438 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002439 listener_gone.set()
2440
2441 def connector():
2442 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002443 with socket.socket() as c:
2444 c.connect((HOST, port))
2445 listener_gone.wait()
2446 try:
2447 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002448 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002449 pass
2450 else:
2451 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002452
2453 t = threading.Thread(target=listener)
2454 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002455 try:
2456 connector()
2457 finally:
2458 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002459
Antoine Pitrou23df4832010-08-04 17:14:06 +00002460 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002461 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2462 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002463 def test_protocol_sslv2(self):
2464 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002465 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002466 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002467 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2468 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2469 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002470 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Victor Stinner648b8622014-12-12 12:23:59 +01002471 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2472 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002473 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002474 # SSLv23 client with specific SSL options
2475 if no_sslv2_implies_sslv3_hello():
2476 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2477 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2478 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002479 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002480 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002481 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002482 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002483
Antoine Pitrou23df4832010-08-04 17:14:06 +00002484 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002485 def test_protocol_sslv23(self):
2486 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002487 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002488 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002489 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2490 try:
2491 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002492 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002493 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2494 if support.verbose:
2495 sys.stdout.write(
2496 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2497 % str(x))
Benjamin Petersone32467c2014-12-05 21:59:35 -05002498 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002499 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002500 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002501 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002502
Benjamin Petersone32467c2014-12-05 21:59:35 -05002503 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002504 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002505 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002506 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002507
Benjamin Petersone32467c2014-12-05 21:59:35 -05002508 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002509 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002510 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002511 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002512
Antoine Pitroub5218772010-05-21 09:56:06 +00002513 # Server with specific SSL options
Benjamin Petersone32467c2014-12-05 21:59:35 -05002514 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2515 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002516 server_options=ssl.OP_NO_SSLv3)
2517 # Will choose TLSv1
2518 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2519 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2520 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2521 server_options=ssl.OP_NO_TLSv1)
2522
2523
Antoine Pitrou23df4832010-08-04 17:14:06 +00002524 @skip_if_broken_ubuntu_ssl
Benjamin Petersone32467c2014-12-05 21:59:35 -05002525 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2526 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002527 def test_protocol_sslv3(self):
2528 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002529 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002530 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002531 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2532 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2533 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002534 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2535 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002536 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2537 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002538 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002539 if no_sslv2_implies_sslv3_hello():
2540 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002541 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2542 False, client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002543
Antoine Pitrou23df4832010-08-04 17:14:06 +00002544 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002545 def test_protocol_tlsv1(self):
2546 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002547 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002548 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002549 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2550 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2551 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002552 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2553 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002554 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2555 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002556 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2557 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002558
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002559 @skip_if_broken_ubuntu_ssl
2560 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2561 "TLS version 1.1 not supported.")
2562 def test_protocol_tlsv1_1(self):
2563 """Connecting to a TLSv1.1 server with various client options.
2564 Testing against older TLS versions."""
2565 if support.verbose:
2566 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002567 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002568 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2569 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002570 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2571 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002572 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2573 client_options=ssl.OP_NO_TLSv1_1)
2574
Antoine Pitrou47e40422014-09-04 21:00:10 +02002575 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002576 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2577 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2578
2579
2580 @skip_if_broken_ubuntu_ssl
2581 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2582 "TLS version 1.2 not supported.")
2583 def test_protocol_tlsv1_2(self):
2584 """Connecting to a TLSv1.2 server with various client options.
2585 Testing against older TLS versions."""
2586 if support.verbose:
2587 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002588 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002589 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2590 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2591 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2592 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002593 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2594 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002595 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2596 client_options=ssl.OP_NO_TLSv1_2)
2597
Antoine Pitrou47e40422014-09-04 21:00:10 +02002598 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002599 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2600 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2601 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2602 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2603
Antoine Pitrou480a1242010-04-28 21:37:09 +00002604 def test_starttls(self):
2605 """Switching from clear text to encrypted and back again."""
2606 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002607
Trent Nelson78520002008-04-10 20:54:35 +00002608 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002609 ssl_version=ssl.PROTOCOL_TLSv1,
2610 starttls_server=True,
2611 chatty=True,
2612 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002613 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002614 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002615 s = socket.socket()
2616 s.setblocking(1)
2617 s.connect((HOST, server.port))
2618 if support.verbose:
2619 sys.stdout.write("\n")
2620 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002621 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002622 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002623 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002624 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002625 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002626 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002627 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002628 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002629 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002630 msg = outdata.strip().lower()
2631 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2632 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002633 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002634 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002635 " client: read %r from server, starting TLS...\n"
2636 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002637 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2638 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002639 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2640 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002641 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002642 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002643 " client: read %r from server, ending TLS...\n"
2644 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002645 s = conn.unwrap()
2646 wrapped = False
2647 else:
2648 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002649 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002650 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002651 if support.verbose:
2652 sys.stdout.write(" client: closing connection.\n")
2653 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002654 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002655 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002656 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002657 if wrapped:
2658 conn.close()
2659 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002660 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002661
Antoine Pitrou480a1242010-04-28 21:37:09 +00002662 def test_socketserver(self):
2663 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002664 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002665 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002666 if support.verbose:
2667 sys.stdout.write('\n')
2668 with open(CERTFILE, 'rb') as f:
2669 d1 = f.read()
2670 d2 = ''
2671 # now fetch the same data from the HTTPS server
Benjamin Peterson4ffb0752014-11-03 14:29:33 -05002672 url = 'https://localhost:%d/%s' % (
2673 server.port, os.path.split(CERTFILE)[1])
2674 context = ssl.create_default_context(cafile=CERTFILE)
2675 f = urllib.request.urlopen(url, context=context)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002676 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002677 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002678 if dlen and (int(dlen) > 0):
2679 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002680 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002681 sys.stdout.write(
2682 " client: read %d bytes from remote server '%s'\n"
2683 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002684 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002685 f.close()
2686 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002687
Antoine Pitrou480a1242010-04-28 21:37:09 +00002688 def test_asyncore_server(self):
2689 """Check the example asyncore integration."""
2690 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002691
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002692 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002693 sys.stdout.write("\n")
2694
Antoine Pitrou480a1242010-04-28 21:37:09 +00002695 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002696 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002697 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002698 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002699 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002700 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002701 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002702 " client: sending %r...\n" % indata)
2703 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002704 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002705 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002706 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002707 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002708 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002709 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2710 % (outdata[:20], len(outdata),
2711 indata[:20].lower(), len(indata)))
2712 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002713 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002714 sys.stdout.write(" client: closing connection.\n")
2715 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002716 if support.verbose:
2717 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002718
Antoine Pitrou480a1242010-04-28 21:37:09 +00002719 def test_recv_send(self):
2720 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002721 if support.verbose:
2722 sys.stdout.write("\n")
2723
2724 server = ThreadedEchoServer(CERTFILE,
2725 certreqs=ssl.CERT_NONE,
2726 ssl_version=ssl.PROTOCOL_TLSv1,
2727 cacerts=CERTFILE,
2728 chatty=True,
2729 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002730 with server:
2731 s = ssl.wrap_socket(socket.socket(),
2732 server_side=False,
2733 certfile=CERTFILE,
2734 ca_certs=CERTFILE,
2735 cert_reqs=ssl.CERT_NONE,
2736 ssl_version=ssl.PROTOCOL_TLSv1)
2737 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002738 # helper methods for standardising recv* method signatures
2739 def _recv_into():
2740 b = bytearray(b"\0"*100)
2741 count = s.recv_into(b)
2742 return b[:count]
2743
2744 def _recvfrom_into():
2745 b = bytearray(b"\0"*100)
2746 count, addr = s.recvfrom_into(b)
2747 return b[:count]
2748
Martin Panter519f9122016-04-03 02:12:54 +00002749 # (name, method, expect success?, *args, return value func)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002750 send_methods = [
Martin Panter519f9122016-04-03 02:12:54 +00002751 ('send', s.send, True, [], len),
2752 ('sendto', s.sendto, False, ["some.address"], len),
2753 ('sendall', s.sendall, True, [], lambda x: None),
Bill Janssen58afe4c2008-09-08 16:45:19 +00002754 ]
Martin Panter519f9122016-04-03 02:12:54 +00002755 # (name, method, whether to expect success, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002756 recv_methods = [
2757 ('recv', s.recv, True, []),
2758 ('recvfrom', s.recvfrom, False, ["some.address"]),
2759 ('recv_into', _recv_into, True, []),
2760 ('recvfrom_into', _recvfrom_into, False, []),
2761 ]
2762 data_prefix = "PREFIX_"
2763
Martin Panter519f9122016-04-03 02:12:54 +00002764 for (meth_name, send_meth, expect_success, args,
2765 ret_val_meth) in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002766 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002767 try:
Martin Panter519f9122016-04-03 02:12:54 +00002768 ret = send_meth(indata, *args)
2769 msg = "sending with {}".format(meth_name)
2770 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002771 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002772 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002773 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002774 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002775 "<<{outdata:r}>> ({nout:d}) received; "
2776 "expected <<{indata:r}>> ({nin:d})\n".format(
2777 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002778 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002779 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002780 )
2781 )
2782 except ValueError as e:
2783 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002784 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002785 "Failed to send with method <<{name:s}>>; "
2786 "expected to succeed.\n".format(name=meth_name)
2787 )
2788 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002789 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002790 "Method <<{name:s}>> failed with unexpected "
2791 "exception message: {exp:s}\n".format(
2792 name=meth_name, exp=e
2793 )
2794 )
2795
2796 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002797 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002798 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002799 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002800 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002801 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002802 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002803 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002804 "<<{outdata:r}>> ({nout:d}) received; "
2805 "expected <<{indata:r}>> ({nin:d})\n".format(
2806 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002807 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002808 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002809 )
2810 )
2811 except ValueError as e:
2812 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002813 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002814 "Failed to receive with method <<{name:s}>>; "
2815 "expected to succeed.\n".format(name=meth_name)
2816 )
2817 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002818 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002819 "Method <<{name:s}>> failed with unexpected "
2820 "exception message: {exp:s}\n".format(
2821 name=meth_name, exp=e
2822 )
2823 )
2824 # consume data
2825 s.read()
2826
Martin Panterf6b1d662016-03-28 00:22:09 +00002827 # read(-1, buffer) is supported, even though read(-1) is not
Martin Panterbed7f1a2016-07-11 00:17:13 +00002828 data = b"data"
Martin Panter5503d472016-03-27 05:35:19 +00002829 s.send(data)
2830 buffer = bytearray(len(data))
2831 self.assertEqual(s.read(-1, buffer), len(data))
2832 self.assertEqual(buffer, data)
2833
Nick Coghlan513886a2011-08-28 00:00:27 +10002834 # Make sure sendmsg et al are disallowed to avoid
2835 # inadvertent disclosure of data and/or corruption
2836 # of the encrypted data stream
2837 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2838 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2839 self.assertRaises(NotImplementedError,
2840 s.recvmsg_into, bytearray(100))
2841
Antoine Pitrou480a1242010-04-28 21:37:09 +00002842 s.write(b"over\n")
Martin Panter5503d472016-03-27 05:35:19 +00002843
2844 self.assertRaises(ValueError, s.recv, -1)
2845 self.assertRaises(ValueError, s.read, -1)
2846
Bill Janssen58afe4c2008-09-08 16:45:19 +00002847 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002848
Martin Panterbed7f1a2016-07-11 00:17:13 +00002849 def test_recv_zero(self):
2850 server = ThreadedEchoServer(CERTFILE)
2851 server.__enter__()
2852 self.addCleanup(server.__exit__, None, None)
2853 s = socket.create_connection((HOST, server.port))
2854 self.addCleanup(s.close)
2855 s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
2856 self.addCleanup(s.close)
2857
2858 # recv/read(0) should return no data
2859 s.send(b"data")
2860 self.assertEqual(s.recv(0), b"")
2861 self.assertEqual(s.read(0), b"")
2862 self.assertEqual(s.read(), b"data")
2863
2864 # Should not block if the other end sends no data
2865 s.setblocking(False)
2866 self.assertEqual(s.recv(0), b"")
2867 self.assertEqual(s.recv_into(bytearray()), 0)
2868
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002869 def test_nonblocking_send(self):
2870 server = ThreadedEchoServer(CERTFILE,
2871 certreqs=ssl.CERT_NONE,
2872 ssl_version=ssl.PROTOCOL_TLSv1,
2873 cacerts=CERTFILE,
2874 chatty=True,
2875 connectionchatty=False)
2876 with server:
2877 s = ssl.wrap_socket(socket.socket(),
2878 server_side=False,
2879 certfile=CERTFILE,
2880 ca_certs=CERTFILE,
2881 cert_reqs=ssl.CERT_NONE,
2882 ssl_version=ssl.PROTOCOL_TLSv1)
2883 s.connect((HOST, server.port))
2884 s.setblocking(False)
2885
2886 # If we keep sending data, at some point the buffers
2887 # will be full and the call will block
2888 buf = bytearray(8192)
2889 def fill_buffer():
2890 while True:
2891 s.send(buf)
2892 self.assertRaises((ssl.SSLWantWriteError,
2893 ssl.SSLWantReadError), fill_buffer)
2894
2895 # Now read all the output and discard it
2896 s.setblocking(True)
2897 s.close()
2898
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002899 def test_handshake_timeout(self):
2900 # Issue #5103: SSL handshake must respect the socket timeout
2901 server = socket.socket(socket.AF_INET)
2902 host = "127.0.0.1"
2903 port = support.bind_port(server)
2904 started = threading.Event()
2905 finish = False
2906
2907 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002908 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002909 started.set()
2910 conns = []
2911 while not finish:
2912 r, w, e = select.select([server], [], [], 0.1)
2913 if server in r:
2914 # Let the socket hang around rather than having
2915 # it closed by garbage collection.
2916 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002917 for sock in conns:
2918 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002919
2920 t = threading.Thread(target=serve)
2921 t.start()
2922 started.wait()
2923
2924 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002925 try:
2926 c = socket.socket(socket.AF_INET)
2927 c.settimeout(0.2)
2928 c.connect((host, port))
2929 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002930 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002931 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002932 finally:
2933 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002934 try:
2935 c = socket.socket(socket.AF_INET)
2936 c = ssl.wrap_socket(c)
2937 c.settimeout(0.2)
2938 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002939 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002940 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002941 finally:
2942 c.close()
2943 finally:
2944 finish = True
2945 t.join()
2946 server.close()
2947
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002948 def test_server_accept(self):
2949 # Issue #16357: accept() on a SSLSocket created through
2950 # SSLContext.wrap_socket().
2951 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2952 context.verify_mode = ssl.CERT_REQUIRED
2953 context.load_verify_locations(CERTFILE)
2954 context.load_cert_chain(CERTFILE)
2955 server = socket.socket(socket.AF_INET)
2956 host = "127.0.0.1"
2957 port = support.bind_port(server)
2958 server = context.wrap_socket(server, server_side=True)
2959
2960 evt = threading.Event()
2961 remote = None
2962 peer = None
2963 def serve():
2964 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002965 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002966 # Block on the accept and wait on the connection to close.
2967 evt.set()
2968 remote, peer = server.accept()
2969 remote.recv(1)
2970
2971 t = threading.Thread(target=serve)
2972 t.start()
2973 # Client wait until server setup and perform a connect.
2974 evt.wait()
2975 client = context.wrap_socket(socket.socket())
2976 client.connect((host, port))
2977 client_addr = client.getsockname()
2978 client.close()
2979 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002980 remote.close()
2981 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002982 # Sanity checks.
2983 self.assertIsInstance(remote, ssl.SSLSocket)
2984 self.assertEqual(peer, client_addr)
2985
Antoine Pitrou242db722013-05-01 20:52:07 +02002986 def test_getpeercert_enotconn(self):
2987 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2988 with context.wrap_socket(socket.socket()) as sock:
2989 with self.assertRaises(OSError) as cm:
2990 sock.getpeercert()
2991 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2992
2993 def test_do_handshake_enotconn(self):
2994 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2995 with context.wrap_socket(socket.socket()) as sock:
2996 with self.assertRaises(OSError) as cm:
2997 sock.do_handshake()
2998 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2999
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003000 def test_default_ciphers(self):
3001 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3002 try:
3003 # Force a set of weak ciphers on our client context
3004 context.set_ciphers("DES")
3005 except ssl.SSLError:
3006 self.skipTest("no DES cipher available")
3007 with ThreadedEchoServer(CERTFILE,
3008 ssl_version=ssl.PROTOCOL_SSLv23,
3009 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01003010 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02003011 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01003012 s.connect((HOST, server.port))
3013 self.assertIn("no shared cipher", str(server.conn_errors[0]))
3014
Antoine Pitrou47e40422014-09-04 21:00:10 +02003015 def test_version_basic(self):
3016 """
3017 Basic tests for SSLSocket.version().
3018 More tests are done in the test_protocol_*() methods.
3019 """
3020 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3021 with ThreadedEchoServer(CERTFILE,
3022 ssl_version=ssl.PROTOCOL_TLSv1,
3023 chatty=False) as server:
3024 with context.wrap_socket(socket.socket()) as s:
3025 self.assertIs(s.version(), None)
3026 s.connect((HOST, server.port))
Christian Heimes598894f2016-09-05 23:19:05 +02003027 self.assertEqual(s.version(), 'TLSv1')
Antoine Pitrou47e40422014-09-04 21:00:10 +02003028 self.assertIs(s.version(), None)
3029
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01003030 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3031 def test_default_ecdh_curve(self):
3032 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3033 # should be enabled by default on SSL contexts.
3034 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3035 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02003036 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3037 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3038 # our default cipher list should prefer ECDH-based ciphers
3039 # automatically.
3040 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3041 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01003042 with ThreadedEchoServer(context=context) as server:
3043 with context.wrap_socket(socket.socket()) as s:
3044 s.connect((HOST, server.port))
3045 self.assertIn("ECDH", s.cipher()[0])
3046
Antoine Pitroud6494802011-07-21 01:11:30 +02003047 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3048 "'tls-unique' channel binding not available")
3049 def test_tls_unique_channel_binding(self):
3050 """Test tls-unique channel binding."""
3051 if support.verbose:
3052 sys.stdout.write("\n")
3053
3054 server = ThreadedEchoServer(CERTFILE,
3055 certreqs=ssl.CERT_NONE,
3056 ssl_version=ssl.PROTOCOL_TLSv1,
3057 cacerts=CERTFILE,
3058 chatty=True,
3059 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01003060 with server:
3061 s = ssl.wrap_socket(socket.socket(),
3062 server_side=False,
3063 certfile=CERTFILE,
3064 ca_certs=CERTFILE,
3065 cert_reqs=ssl.CERT_NONE,
3066 ssl_version=ssl.PROTOCOL_TLSv1)
3067 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003068 # get the data
3069 cb_data = s.get_channel_binding("tls-unique")
3070 if support.verbose:
3071 sys.stdout.write(" got channel binding data: {0!r}\n"
3072 .format(cb_data))
3073
3074 # check if it is sane
3075 self.assertIsNotNone(cb_data)
3076 self.assertEqual(len(cb_data), 12) # True for TLSv1
3077
3078 # and compare with the peers version
3079 s.write(b"CB tls-unique\n")
3080 peer_data_repr = s.read().strip()
3081 self.assertEqual(peer_data_repr,
3082 repr(cb_data).encode("us-ascii"))
3083 s.close()
3084
3085 # now, again
3086 s = ssl.wrap_socket(socket.socket(),
3087 server_side=False,
3088 certfile=CERTFILE,
3089 ca_certs=CERTFILE,
3090 cert_reqs=ssl.CERT_NONE,
3091 ssl_version=ssl.PROTOCOL_TLSv1)
3092 s.connect((HOST, server.port))
3093 new_cb_data = s.get_channel_binding("tls-unique")
3094 if support.verbose:
3095 sys.stdout.write(" got another channel binding data: {0!r}\n"
3096 .format(new_cb_data))
3097 # is it really unique
3098 self.assertNotEqual(cb_data, new_cb_data)
3099 self.assertIsNotNone(cb_data)
3100 self.assertEqual(len(cb_data), 12) # True for TLSv1
3101 s.write(b"CB tls-unique\n")
3102 peer_data_repr = s.read().strip()
3103 self.assertEqual(peer_data_repr,
3104 repr(new_cb_data).encode("us-ascii"))
3105 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00003106
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003107 def test_compression(self):
3108 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3109 context.load_cert_chain(CERTFILE)
3110 stats = server_params_test(context, context,
3111 chatty=True, connectionchatty=True)
3112 if support.verbose:
3113 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3114 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3115
3116 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3117 "ssl.OP_NO_COMPRESSION needed for this test")
3118 def test_compression_disabled(self):
3119 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3120 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01003121 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003122 stats = server_params_test(context, context,
3123 chatty=True, connectionchatty=True)
3124 self.assertIs(stats['compression'], None)
3125
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003126 def test_dh_params(self):
3127 # Check we can get a connection with ephemeral Diffie-Hellman
3128 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3129 context.load_cert_chain(CERTFILE)
3130 context.load_dh_params(DHFILE)
3131 context.set_ciphers("kEDH")
3132 stats = server_params_test(context, context,
3133 chatty=True, connectionchatty=True)
3134 cipher = stats["cipher"][0]
3135 parts = cipher.split("-")
3136 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3137 self.fail("Non-DH cipher: " + cipher[0])
3138
Benjamin Petersoncca27322015-01-23 16:35:37 -05003139 def test_selected_alpn_protocol(self):
3140 # selected_alpn_protocol() is None unless ALPN is used.
3141 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3142 context.load_cert_chain(CERTFILE)
3143 stats = server_params_test(context, context,
3144 chatty=True, connectionchatty=True)
3145 self.assertIs(stats['client_alpn_protocol'], None)
3146
3147 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3148 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3149 # selected_alpn_protocol() is None unless ALPN is used by the client.
3150 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3151 client_context.load_verify_locations(CERTFILE)
3152 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3153 server_context.load_cert_chain(CERTFILE)
3154 server_context.set_alpn_protocols(['foo', 'bar'])
3155 stats = server_params_test(client_context, server_context,
3156 chatty=True, connectionchatty=True)
3157 self.assertIs(stats['client_alpn_protocol'], None)
3158
3159 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3160 def test_alpn_protocols(self):
3161 server_protocols = ['foo', 'bar', 'milkshake']
3162 protocol_tests = [
3163 (['foo', 'bar'], 'foo'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003164 (['bar', 'foo'], 'foo'),
Benjamin Petersoncca27322015-01-23 16:35:37 -05003165 (['milkshake'], 'milkshake'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003166 (['http/3.0', 'http/4.0'], None)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003167 ]
3168 for client_protocols, expected in protocol_tests:
Christian Heimes598894f2016-09-05 23:19:05 +02003169 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003170 server_context.load_cert_chain(CERTFILE)
3171 server_context.set_alpn_protocols(server_protocols)
Christian Heimes598894f2016-09-05 23:19:05 +02003172 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003173 client_context.load_cert_chain(CERTFILE)
3174 client_context.set_alpn_protocols(client_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003175
Christian Heimes598894f2016-09-05 23:19:05 +02003176 try:
3177 stats = server_params_test(client_context,
3178 server_context,
3179 chatty=True,
3180 connectionchatty=True)
3181 except ssl.SSLError as e:
3182 stats = e
3183
3184 if expected is None and IS_OPENSSL_1_1:
3185 # OpenSSL 1.1.0 raises handshake error
3186 self.assertIsInstance(stats, ssl.SSLError)
3187 else:
3188 msg = "failed trying %s (s) and %s (c).\n" \
3189 "was expecting %s, but got %%s from the %%s" \
3190 % (str(server_protocols), str(client_protocols),
3191 str(expected))
3192 client_result = stats['client_alpn_protocol']
3193 self.assertEqual(client_result, expected,
3194 msg % (client_result, "client"))
3195 server_result = stats['server_alpn_protocols'][-1] \
3196 if len(stats['server_alpn_protocols']) else 'nothing'
3197 self.assertEqual(server_result, expected,
3198 msg % (server_result, "server"))
Benjamin Petersoncca27322015-01-23 16:35:37 -05003199
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01003200 def test_selected_npn_protocol(self):
3201 # selected_npn_protocol() is None unless NPN is used
3202 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3203 context.load_cert_chain(CERTFILE)
3204 stats = server_params_test(context, context,
3205 chatty=True, connectionchatty=True)
3206 self.assertIs(stats['client_npn_protocol'], None)
3207
3208 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3209 def test_npn_protocols(self):
3210 server_protocols = ['http/1.1', 'spdy/2']
3211 protocol_tests = [
3212 (['http/1.1', 'spdy/2'], 'http/1.1'),
3213 (['spdy/2', 'http/1.1'], 'http/1.1'),
3214 (['spdy/2', 'test'], 'spdy/2'),
3215 (['abc', 'def'], 'abc')
3216 ]
3217 for client_protocols, expected in protocol_tests:
3218 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3219 server_context.load_cert_chain(CERTFILE)
3220 server_context.set_npn_protocols(server_protocols)
3221 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3222 client_context.load_cert_chain(CERTFILE)
3223 client_context.set_npn_protocols(client_protocols)
3224 stats = server_params_test(client_context, server_context,
3225 chatty=True, connectionchatty=True)
3226
3227 msg = "failed trying %s (s) and %s (c).\n" \
3228 "was expecting %s, but got %%s from the %%s" \
3229 % (str(server_protocols), str(client_protocols),
3230 str(expected))
3231 client_result = stats['client_npn_protocol']
3232 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3233 server_result = stats['server_npn_protocols'][-1] \
3234 if len(stats['server_npn_protocols']) else 'nothing'
3235 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3236
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003237 def sni_contexts(self):
3238 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3239 server_context.load_cert_chain(SIGNED_CERTFILE)
3240 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3241 other_context.load_cert_chain(SIGNED_CERTFILE2)
3242 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3243 client_context.verify_mode = ssl.CERT_REQUIRED
3244 client_context.load_verify_locations(SIGNING_CA)
3245 return server_context, other_context, client_context
3246
3247 def check_common_name(self, stats, name):
3248 cert = stats['peercert']
3249 self.assertIn((('commonName', name),), cert['subject'])
3250
3251 @needs_sni
3252 def test_sni_callback(self):
3253 calls = []
3254 server_context, other_context, client_context = self.sni_contexts()
3255
3256 def servername_cb(ssl_sock, server_name, initial_context):
3257 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003258 if server_name is not None:
3259 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003260 server_context.set_servername_callback(servername_cb)
3261
3262 stats = server_params_test(client_context, server_context,
3263 chatty=True,
3264 sni_name='supermessage')
3265 # The hostname was fetched properly, and the certificate was
3266 # changed for the connection.
3267 self.assertEqual(calls, [("supermessage", server_context)])
3268 # CERTFILE4 was selected
3269 self.check_common_name(stats, 'fakehostname')
3270
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003271 calls = []
3272 # The callback is called with server_name=None
3273 stats = server_params_test(client_context, server_context,
3274 chatty=True,
3275 sni_name=None)
3276 self.assertEqual(calls, [(None, server_context)])
3277 self.check_common_name(stats, 'localhost')
3278
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003279 # Check disabling the callback
3280 calls = []
3281 server_context.set_servername_callback(None)
3282
3283 stats = server_params_test(client_context, server_context,
3284 chatty=True,
3285 sni_name='notfunny')
3286 # Certificate didn't change
3287 self.check_common_name(stats, 'localhost')
3288 self.assertEqual(calls, [])
3289
3290 @needs_sni
3291 def test_sni_callback_alert(self):
3292 # Returning a TLS alert is reflected to the connecting client
3293 server_context, other_context, client_context = self.sni_contexts()
3294
3295 def cb_returning_alert(ssl_sock, server_name, initial_context):
3296 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3297 server_context.set_servername_callback(cb_returning_alert)
3298
3299 with self.assertRaises(ssl.SSLError) as cm:
3300 stats = server_params_test(client_context, server_context,
3301 chatty=False,
3302 sni_name='supermessage')
3303 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3304
3305 @needs_sni
3306 def test_sni_callback_raising(self):
3307 # Raising fails the connection with a TLS handshake failure alert.
3308 server_context, other_context, client_context = self.sni_contexts()
3309
3310 def cb_raising(ssl_sock, server_name, initial_context):
3311 1/0
3312 server_context.set_servername_callback(cb_raising)
3313
3314 with self.assertRaises(ssl.SSLError) as cm, \
3315 support.captured_stderr() as stderr:
3316 stats = server_params_test(client_context, server_context,
3317 chatty=False,
3318 sni_name='supermessage')
3319 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3320 self.assertIn("ZeroDivisionError", stderr.getvalue())
3321
3322 @needs_sni
3323 def test_sni_callback_wrong_return_type(self):
3324 # Returning the wrong return type terminates the TLS connection
3325 # with an internal error alert.
3326 server_context, other_context, client_context = self.sni_contexts()
3327
3328 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3329 return "foo"
3330 server_context.set_servername_callback(cb_wrong_return_type)
3331
3332 with self.assertRaises(ssl.SSLError) as cm, \
3333 support.captured_stderr() as stderr:
3334 stats = server_params_test(client_context, server_context,
3335 chatty=False,
3336 sni_name='supermessage')
3337 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3338 self.assertIn("TypeError", stderr.getvalue())
3339
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003340 def test_shared_ciphers(self):
Benjamin Petersonaacd5242015-01-07 11:42:38 -06003341 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson15042922015-01-07 22:12:43 -06003342 server_context.load_cert_chain(SIGNED_CERTFILE)
3343 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3344 client_context.verify_mode = ssl.CERT_REQUIRED
3345 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes598894f2016-09-05 23:19:05 +02003346 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3347 client_context.set_ciphers("AES128:AES256")
3348 server_context.set_ciphers("AES256")
3349 alg1 = "AES256"
3350 alg2 = "AES-256"
3351 else:
3352 client_context.set_ciphers("AES:3DES")
3353 server_context.set_ciphers("3DES")
3354 alg1 = "3DES"
3355 alg2 = "DES-CBC3"
3356
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003357 stats = server_params_test(client_context, server_context)
3358 ciphers = stats['server_shared_ciphers'][0]
3359 self.assertGreater(len(ciphers), 0)
3360 for name, tls_version, bits in ciphers:
Christian Heimes598894f2016-09-05 23:19:05 +02003361 if not alg1 in name.split("-") and alg2 not in name:
3362 self.fail(name)
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003363
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003364 def test_read_write_after_close_raises_valuerror(self):
3365 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3366 context.verify_mode = ssl.CERT_REQUIRED
3367 context.load_verify_locations(CERTFILE)
3368 context.load_cert_chain(CERTFILE)
3369 server = ThreadedEchoServer(context=context, chatty=False)
3370
3371 with server:
3372 s = context.wrap_socket(socket.socket())
3373 s.connect((HOST, server.port))
3374 s.close()
3375
3376 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02003377 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003378
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003379 def test_sendfile(self):
3380 TEST_DATA = b"x" * 512
3381 with open(support.TESTFN, 'wb') as f:
3382 f.write(TEST_DATA)
3383 self.addCleanup(support.unlink, support.TESTFN)
3384 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3385 context.verify_mode = ssl.CERT_REQUIRED
3386 context.load_verify_locations(CERTFILE)
3387 context.load_cert_chain(CERTFILE)
3388 server = ThreadedEchoServer(context=context, chatty=False)
3389 with server:
3390 with context.wrap_socket(socket.socket()) as s:
3391 s.connect((HOST, server.port))
3392 with open(support.TESTFN, 'rb') as file:
3393 s.sendfile(file)
3394 self.assertEqual(s.recv(1024), TEST_DATA)
3395
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003396
Thomas Woutersed03b412007-08-28 21:37:11 +00003397def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003398 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003399 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003400 plats = {
3401 'Linux': platform.linux_distribution,
3402 'Mac': platform.mac_ver,
3403 'Windows': platform.win32_ver,
3404 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003405 with warnings.catch_warnings():
3406 warnings.filterwarnings(
3407 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003408 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003409 'functions are deprecated .*',
3410 PendingDeprecationWarning,
3411 )
3412 for name, func in plats.items():
3413 plat = func()
3414 if plat and plat[0]:
3415 plat = '%s %r' % (name, plat)
3416 break
3417 else:
3418 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003419 print("test_ssl: testing with %r %r" %
3420 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3421 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003422 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003423 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3424 try:
3425 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3426 except AttributeError:
3427 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003428
Antoine Pitrou152efa22010-05-16 18:19:27 +00003429 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003430 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003431 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003432 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003433 BADCERT, BADKEY, EMPTYCERT]:
3434 if not os.path.exists(filename):
3435 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003436
Martin Panter3840b2a2016-03-27 01:53:46 +00003437 tests = [
3438 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
3439 SimpleBackgroundTests,
3440 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003441
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003442 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003443 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003444
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003445 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003446 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003447 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003448 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003449
Antoine Pitrou480a1242010-04-28 21:37:09 +00003450 try:
3451 support.run_unittest(*tests)
3452 finally:
3453 if _have_threads:
3454 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003455
3456if __name__ == "__main__":
3457 test_main()