blob: f6afa267c564df388588859926b98be9023ed4aa [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Thomas Woutersed03b412007-08-28 21:37:11 +000021
Antoine Pitrou05d936d2010-10-13 11:38:36 +000022ssl = support.import_module("ssl")
23
Martin Panter3840b2a2016-03-27 01:53:46 +000024try:
25 import threading
26except ImportError:
27 _have_threads = False
28else:
29 _have_threads = True
30
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010031PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020033IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
34IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
35
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059
Christian Heimes22587792013-11-21 23:56:13 +010060# empty CRL
61CRLFILE = data_file("revocation.crl")
62
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010063# Two keys and certs signed by the same CA (for SNI tests)
64SIGNED_CERTFILE = data_file("keycert3.pem")
65SIGNED_CERTFILE2 = data_file("keycert4.pem")
Martin Panter3840b2a2016-03-27 01:53:46 +000066# Same certificate as pycacert.pem, but without extra text in file
67SIGNING_CA = data_file("capath", "ceff1710.0")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010068
Martin Panter3d81d932016-01-14 09:36:00 +000069REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +000070
71EMPTYCERT = data_file("nullcert.pem")
72BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +000073NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000074BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020075NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020076NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000077
Benjamin Petersona7eaf562015-04-02 00:04:06 -040078DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +010079BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000080
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010081
Thomas Woutersed03b412007-08-28 21:37:11 +000082def handle_error(prefix):
83 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000084 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000085 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000086
Antoine Pitroub5218772010-05-21 09:56:06 +000087def can_clear_options():
88 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020089 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000090
91def no_sslv2_implies_sslv3_hello():
92 # 0.9.7h or higher
93 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
94
Christian Heimes2427b502013-11-23 11:24:32 +010095def have_verify_flags():
96 # 0.9.8 or higher
97 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
98
Antoine Pitrouc695c952014-04-28 20:57:36 +020099def utc_offset(): #NOTE: ignore issues like #1647654
100 # local time = utc time + utc offset
101 if time.daylight and time.localtime().tm_isdst > 0:
102 return -time.altzone # seconds
103 return -time.timezone
104
Christian Heimes9424bb42013-06-17 15:32:57 +0200105def asn1time(cert_time):
106 # Some versions of OpenSSL ignore seconds, see #18207
107 # 0.9.8.i
108 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
109 fmt = "%b %d %H:%M:%S %Y GMT"
110 dt = datetime.datetime.strptime(cert_time, fmt)
111 dt = dt.replace(second=0)
112 cert_time = dt.strftime(fmt)
113 # %d adds leading zero but ASN1_TIME_print() uses leading space
114 if cert_time[4] == "0":
115 cert_time = cert_time[:4] + " " + cert_time[5:]
116
117 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000118
Antoine Pitrou23df4832010-08-04 17:14:06 +0000119# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
120def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200121 if hasattr(ssl, 'PROTOCOL_SSLv2'):
122 @functools.wraps(func)
123 def f(*args, **kwargs):
124 try:
125 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
126 except ssl.SSLError:
127 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
128 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
129 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
130 return func(*args, **kwargs)
131 return f
132 else:
133 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000134
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100135needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
136
Antoine Pitrou23df4832010-08-04 17:14:06 +0000137
Antoine Pitrou152efa22010-05-16 18:19:27 +0000138class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000139
Antoine Pitrou480a1242010-04-28 21:37:09 +0000140 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000141 ssl.CERT_NONE
142 ssl.CERT_OPTIONAL
143 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100144 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100145 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100146 if ssl.HAS_ECDH:
147 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100148 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
149 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000150 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100151 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000152
Antoine Pitrou172f0252014-04-18 20:33:08 +0200153 def test_str_for_enums(self):
154 # Make sure that the PROTOCOL_* constants have enum-like string
155 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200156 proto = ssl.PROTOCOL_TLS
157 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200158 ctx = ssl.SSLContext(proto)
159 self.assertIs(ctx.protocol, proto)
160
Antoine Pitrou480a1242010-04-28 21:37:09 +0000161 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000162 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000163 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000164 sys.stdout.write("\n RAND_status is %d (%s)\n"
165 % (v, (v and "sufficient randomness") or
166 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200167
168 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
169 self.assertEqual(len(data), 16)
170 self.assertEqual(is_cryptographic, v == 1)
171 if v:
172 data = ssl.RAND_bytes(16)
173 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200174 else:
175 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200176
Victor Stinner1e81a392013-12-19 16:47:04 +0100177 # negative num is invalid
178 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
179 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
180
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100181 if hasattr(ssl, 'RAND_egd'):
182 self.assertRaises(TypeError, ssl.RAND_egd, 1)
183 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000184 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200185 ssl.RAND_add(b"this is a random bytes object", 75.0)
186 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000187
Christian Heimesf77b4b22013-08-21 13:26:05 +0200188 @unittest.skipUnless(os.name == 'posix', 'requires posix')
189 def test_random_fork(self):
190 status = ssl.RAND_status()
191 if not status:
192 self.fail("OpenSSL's PRNG has insufficient randomness")
193
194 rfd, wfd = os.pipe()
195 pid = os.fork()
196 if pid == 0:
197 try:
198 os.close(rfd)
199 child_random = ssl.RAND_pseudo_bytes(16)[0]
200 self.assertEqual(len(child_random), 16)
201 os.write(wfd, child_random)
202 os.close(wfd)
203 except BaseException:
204 os._exit(1)
205 else:
206 os._exit(0)
207 else:
208 os.close(wfd)
209 self.addCleanup(os.close, rfd)
210 _, status = os.waitpid(pid, 0)
211 self.assertEqual(status, 0)
212
213 child_random = os.read(rfd, 16)
214 self.assertEqual(len(child_random), 16)
215 parent_random = ssl.RAND_pseudo_bytes(16)[0]
216 self.assertEqual(len(parent_random), 16)
217
218 self.assertNotEqual(child_random, parent_random)
219
Antoine Pitrou480a1242010-04-28 21:37:09 +0000220 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000221 # note that this uses an 'unofficial' function in _ssl.c,
222 # provided solely for this test, to exercise the certificate
223 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000224 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000226 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200227 self.assertEqual(p['issuer'],
228 ((('countryName', 'XY'),),
229 (('localityName', 'Castle Anthrax'),),
230 (('organizationName', 'Python Software Foundation'),),
231 (('commonName', 'localhost'),))
232 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100233 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200234 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
235 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200236 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
237 self.assertEqual(p['subject'],
238 ((('countryName', 'XY'),),
239 (('localityName', 'Castle Anthrax'),),
240 (('organizationName', 'Python Software Foundation'),),
241 (('commonName', 'localhost'),))
242 )
243 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
244 # Issue #13034: the subjectAltName in some certificates
245 # (notably projects.developer.nokia.com:443) wasn't parsed
246 p = ssl._ssl._test_decode_cert(NOKIACERT)
247 if support.verbose:
248 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
249 self.assertEqual(p['subjectAltName'],
250 (('DNS', 'projects.developer.nokia.com'),
251 ('DNS', 'projects.forum.nokia.com'))
252 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100253 # extra OCSP and AIA fields
254 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
255 self.assertEqual(p['caIssuers'],
256 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
257 self.assertEqual(p['crlDistributionPoints'],
258 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000259
Christian Heimes824f7f32013-08-17 00:54:47 +0200260 def test_parse_cert_CVE_2013_4238(self):
261 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
262 if support.verbose:
263 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
264 subject = ((('countryName', 'US'),),
265 (('stateOrProvinceName', 'Oregon'),),
266 (('localityName', 'Beaverton'),),
267 (('organizationName', 'Python Software Foundation'),),
268 (('organizationalUnitName', 'Python Core Development'),),
269 (('commonName', 'null.python.org\x00example.org'),),
270 (('emailAddress', 'python-dev@python.org'),))
271 self.assertEqual(p['subject'], subject)
272 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200273 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
274 san = (('DNS', 'altnull.python.org\x00example.com'),
275 ('email', 'null@python.org\x00user@example.org'),
276 ('URI', 'http://null.python.org\x00http://example.org'),
277 ('IP Address', '192.0.2.1'),
278 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
279 else:
280 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
281 san = (('DNS', 'altnull.python.org\x00example.com'),
282 ('email', 'null@python.org\x00user@example.org'),
283 ('URI', 'http://null.python.org\x00http://example.org'),
284 ('IP Address', '192.0.2.1'),
285 ('IP Address', '<invalid>'))
286
287 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200288
Antoine Pitrou480a1242010-04-28 21:37:09 +0000289 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000290 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000291 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000292 d1 = ssl.PEM_cert_to_DER_cert(pem)
293 p2 = ssl.DER_cert_to_PEM_cert(d1)
294 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000295 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000296 if not p2.startswith(ssl.PEM_HEADER + '\n'):
297 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
298 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
299 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000300
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000301 def test_openssl_version(self):
302 n = ssl.OPENSSL_VERSION_NUMBER
303 t = ssl.OPENSSL_VERSION_INFO
304 s = ssl.OPENSSL_VERSION
305 self.assertIsInstance(n, int)
306 self.assertIsInstance(t, tuple)
307 self.assertIsInstance(s, str)
308 # Some sanity checks follow
309 # >= 0.9
310 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400311 # < 3.0
312 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000313 major, minor, fix, patch, status = t
314 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400315 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000316 self.assertGreaterEqual(minor, 0)
317 self.assertLess(minor, 256)
318 self.assertGreaterEqual(fix, 0)
319 self.assertLess(fix, 256)
320 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100321 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000322 self.assertGreaterEqual(status, 0)
323 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400324 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200325 if IS_LIBRESSL:
326 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100327 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400328 else:
329 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100330 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000331
Antoine Pitrou9d543662010-04-23 23:10:32 +0000332 @support.cpython_only
333 def test_refcycle(self):
334 # Issue #7943: an SSL object doesn't create reference cycles with
335 # itself.
336 s = socket.socket(socket.AF_INET)
337 ss = ssl.wrap_socket(s)
338 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100339 with support.check_warnings(("", ResourceWarning)):
340 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100341 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000342
Antoine Pitroua468adc2010-09-14 14:43:44 +0000343 def test_wrapped_unconnected(self):
344 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200345 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000346 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100347 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100348 self.assertRaises(OSError, ss.recv, 1)
349 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
350 self.assertRaises(OSError, ss.recvfrom, 1)
351 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
352 self.assertRaises(OSError, ss.send, b'x')
353 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000354
Antoine Pitrou40f08742010-04-24 22:04:40 +0000355 def test_timeout(self):
356 # Issue #8524: when creating an SSL socket, the timeout of the
357 # original socket should be retained.
358 for timeout in (None, 0.0, 5.0):
359 s = socket.socket(socket.AF_INET)
360 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100361 with ssl.wrap_socket(s) as ss:
362 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000363
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000364 def test_errors(self):
365 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000366 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000367 "certfile must be specified",
368 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000369 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000370 "certfile must be specified for server-side operations",
371 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000372 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000373 "certfile must be specified for server-side operations",
374 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100375 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
376 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
377 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200378 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000379 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000380 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000381 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200382 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000383 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000384 ssl.wrap_socket(sock,
385 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000386 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200387 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000388 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000389 ssl.wrap_socket(sock,
390 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000391 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000392
Martin Panter3464ea22016-02-01 21:58:11 +0000393 def bad_cert_test(self, certfile):
394 """Check that trying to use the given client certificate fails"""
395 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
396 certfile)
397 sock = socket.socket()
398 self.addCleanup(sock.close)
399 with self.assertRaises(ssl.SSLError):
400 ssl.wrap_socket(sock,
401 certfile=certfile,
402 ssl_version=ssl.PROTOCOL_TLSv1)
403
404 def test_empty_cert(self):
405 """Wrapping with an empty cert file"""
406 self.bad_cert_test("nullcert.pem")
407
408 def test_malformed_cert(self):
409 """Wrapping with a badly formatted certificate (syntax error)"""
410 self.bad_cert_test("badcert.pem")
411
412 def test_malformed_key(self):
413 """Wrapping with a badly formatted key (syntax error)"""
414 self.bad_cert_test("badkey.pem")
415
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000416 def test_match_hostname(self):
417 def ok(cert, hostname):
418 ssl.match_hostname(cert, hostname)
419 def fail(cert, hostname):
420 self.assertRaises(ssl.CertificateError,
421 ssl.match_hostname, cert, hostname)
422
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100423 # -- Hostname matching --
424
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000425 cert = {'subject': ((('commonName', 'example.com'),),)}
426 ok(cert, 'example.com')
427 ok(cert, 'ExAmple.cOm')
428 fail(cert, 'www.example.com')
429 fail(cert, '.example.com')
430 fail(cert, 'example.org')
431 fail(cert, 'exampleXcom')
432
433 cert = {'subject': ((('commonName', '*.a.com'),),)}
434 ok(cert, 'foo.a.com')
435 fail(cert, 'bar.foo.a.com')
436 fail(cert, 'a.com')
437 fail(cert, 'Xa.com')
438 fail(cert, '.a.com')
439
Georg Brandl72c98d32013-10-27 07:16:53 +0100440 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000441 cert = {'subject': ((('commonName', 'f*.com'),),)}
442 ok(cert, 'foo.com')
443 ok(cert, 'f.com')
444 fail(cert, 'bar.com')
445 fail(cert, 'foo.a.com')
446 fail(cert, 'bar.foo.com')
447
Christian Heimes824f7f32013-08-17 00:54:47 +0200448 # NULL bytes are bad, CVE-2013-4073
449 cert = {'subject': ((('commonName',
450 'null.python.org\x00example.org'),),)}
451 ok(cert, 'null.python.org\x00example.org') # or raise an error?
452 fail(cert, 'example.org')
453 fail(cert, 'null.python.org')
454
Georg Brandl72c98d32013-10-27 07:16:53 +0100455 # error cases with wildcards
456 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
457 fail(cert, 'bar.foo.a.com')
458 fail(cert, 'a.com')
459 fail(cert, 'Xa.com')
460 fail(cert, '.a.com')
461
462 cert = {'subject': ((('commonName', 'a.*.com'),),)}
463 fail(cert, 'a.foo.com')
464 fail(cert, 'a..com')
465 fail(cert, 'a.com')
466
467 # wildcard doesn't match IDNA prefix 'xn--'
468 idna = 'püthon.python.org'.encode("idna").decode("ascii")
469 cert = {'subject': ((('commonName', idna),),)}
470 ok(cert, idna)
471 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
472 fail(cert, idna)
473 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
474 fail(cert, idna)
475
476 # wildcard in first fragment and IDNA A-labels in sequent fragments
477 # are supported.
478 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
479 cert = {'subject': ((('commonName', idna),),)}
480 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
481 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
482 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
483 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
484
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000485 # Slightly fake real-world example
486 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
487 'subject': ((('commonName', 'linuxfrz.org'),),),
488 'subjectAltName': (('DNS', 'linuxfr.org'),
489 ('DNS', 'linuxfr.com'),
490 ('othername', '<unsupported>'))}
491 ok(cert, 'linuxfr.org')
492 ok(cert, 'linuxfr.com')
493 # Not a "DNS" entry
494 fail(cert, '<unsupported>')
495 # When there is a subjectAltName, commonName isn't used
496 fail(cert, 'linuxfrz.org')
497
498 # A pristine real-world example
499 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
500 'subject': ((('countryName', 'US'),),
501 (('stateOrProvinceName', 'California'),),
502 (('localityName', 'Mountain View'),),
503 (('organizationName', 'Google Inc'),),
504 (('commonName', 'mail.google.com'),))}
505 ok(cert, 'mail.google.com')
506 fail(cert, 'gmail.com')
507 # Only commonName is considered
508 fail(cert, 'California')
509
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100510 # -- IPv4 matching --
511 cert = {'subject': ((('commonName', 'example.com'),),),
512 'subjectAltName': (('DNS', 'example.com'),
513 ('IP Address', '10.11.12.13'),
514 ('IP Address', '14.15.16.17'))}
515 ok(cert, '10.11.12.13')
516 ok(cert, '14.15.16.17')
517 fail(cert, '14.15.16.18')
518 fail(cert, 'example.net')
519
520 # -- IPv6 matching --
521 cert = {'subject': ((('commonName', 'example.com'),),),
522 'subjectAltName': (('DNS', 'example.com'),
523 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
524 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
525 ok(cert, '2001::cafe')
526 ok(cert, '2003::baba')
527 fail(cert, '2003::bebe')
528 fail(cert, 'example.net')
529
530 # -- Miscellaneous --
531
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000532 # Neither commonName nor subjectAltName
533 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
534 'subject': ((('countryName', 'US'),),
535 (('stateOrProvinceName', 'California'),),
536 (('localityName', 'Mountain View'),),
537 (('organizationName', 'Google Inc'),))}
538 fail(cert, 'mail.google.com')
539
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200540 # No DNS entry in subjectAltName but a commonName
541 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
542 'subject': ((('countryName', 'US'),),
543 (('stateOrProvinceName', 'California'),),
544 (('localityName', 'Mountain View'),),
545 (('commonName', 'mail.google.com'),)),
546 'subjectAltName': (('othername', 'blabla'), )}
547 ok(cert, 'mail.google.com')
548
549 # No DNS entry subjectAltName and no commonName
550 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
551 'subject': ((('countryName', 'US'),),
552 (('stateOrProvinceName', 'California'),),
553 (('localityName', 'Mountain View'),),
554 (('organizationName', 'Google Inc'),)),
555 'subjectAltName': (('othername', 'blabla'),)}
556 fail(cert, 'google.com')
557
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000558 # Empty cert / no cert
559 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
560 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
561
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200562 # Issue #17980: avoid denials of service by refusing more than one
563 # wildcard per fragment.
564 cert = {'subject': ((('commonName', 'a*b.com'),),)}
565 ok(cert, 'axxb.com')
566 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100567 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200568 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
569 with self.assertRaises(ssl.CertificateError) as cm:
570 ssl.match_hostname(cert, 'axxbxxc.com')
571 self.assertIn("too many wildcards", str(cm.exception))
572
Antoine Pitroud5323212010-10-22 18:19:07 +0000573 def test_server_side(self):
574 # server_hostname doesn't work for server sockets
575 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000576 with socket.socket() as sock:
577 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
578 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000579
Antoine Pitroud6494802011-07-21 01:11:30 +0200580 def test_unknown_channel_binding(self):
581 # should raise ValueError for unknown type
582 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200583 s.bind(('127.0.0.1', 0))
584 s.listen()
585 c = socket.socket(socket.AF_INET)
586 c.connect(s.getsockname())
587 with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100588 with self.assertRaises(ValueError):
589 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200590 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200591
592 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
593 "'tls-unique' channel binding not available")
594 def test_tls_unique_channel_binding(self):
595 # unconnected should return None for known type
596 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100597 with ssl.wrap_socket(s) as ss:
598 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200599 # the same for server-side
600 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100601 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
602 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200603
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600604 def test_dealloc_warn(self):
605 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
606 r = repr(ss)
607 with self.assertWarns(ResourceWarning) as cm:
608 ss = None
609 support.gc_collect()
610 self.assertIn(r, str(cm.warning.args[0]))
611
Christian Heimes6d7ad132013-06-09 18:02:55 +0200612 def test_get_default_verify_paths(self):
613 paths = ssl.get_default_verify_paths()
614 self.assertEqual(len(paths), 6)
615 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
616
617 with support.EnvironmentVarGuard() as env:
618 env["SSL_CERT_DIR"] = CAPATH
619 env["SSL_CERT_FILE"] = CERTFILE
620 paths = ssl.get_default_verify_paths()
621 self.assertEqual(paths.cafile, CERTFILE)
622 self.assertEqual(paths.capath, CAPATH)
623
Christian Heimes44109d72013-11-22 01:51:30 +0100624 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
625 def test_enum_certificates(self):
626 self.assertTrue(ssl.enum_certificates("CA"))
627 self.assertTrue(ssl.enum_certificates("ROOT"))
628
629 self.assertRaises(TypeError, ssl.enum_certificates)
630 self.assertRaises(WindowsError, ssl.enum_certificates, "")
631
Christian Heimesc2d65e12013-11-22 16:13:55 +0100632 trust_oids = set()
633 for storename in ("CA", "ROOT"):
634 store = ssl.enum_certificates(storename)
635 self.assertIsInstance(store, list)
636 for element in store:
637 self.assertIsInstance(element, tuple)
638 self.assertEqual(len(element), 3)
639 cert, enc, trust = element
640 self.assertIsInstance(cert, bytes)
641 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
642 self.assertIsInstance(trust, (set, bool))
643 if isinstance(trust, set):
644 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100645
646 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100647 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200648
Christian Heimes46bebee2013-06-09 19:03:31 +0200649 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100650 def test_enum_crls(self):
651 self.assertTrue(ssl.enum_crls("CA"))
652 self.assertRaises(TypeError, ssl.enum_crls)
653 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200654
Christian Heimes44109d72013-11-22 01:51:30 +0100655 crls = ssl.enum_crls("CA")
656 self.assertIsInstance(crls, list)
657 for element in crls:
658 self.assertIsInstance(element, tuple)
659 self.assertEqual(len(element), 2)
660 self.assertIsInstance(element[0], bytes)
661 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200662
Christian Heimes46bebee2013-06-09 19:03:31 +0200663
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100664 def test_asn1object(self):
665 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
666 '1.3.6.1.5.5.7.3.1')
667
668 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
669 self.assertEqual(val, expected)
670 self.assertEqual(val.nid, 129)
671 self.assertEqual(val.shortname, 'serverAuth')
672 self.assertEqual(val.longname, 'TLS Web Server Authentication')
673 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
674 self.assertIsInstance(val, ssl._ASN1Object)
675 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
676
677 val = ssl._ASN1Object.fromnid(129)
678 self.assertEqual(val, expected)
679 self.assertIsInstance(val, ssl._ASN1Object)
680 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100681 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
682 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100683 for i in range(1000):
684 try:
685 obj = ssl._ASN1Object.fromnid(i)
686 except ValueError:
687 pass
688 else:
689 self.assertIsInstance(obj.nid, int)
690 self.assertIsInstance(obj.shortname, str)
691 self.assertIsInstance(obj.longname, str)
692 self.assertIsInstance(obj.oid, (str, type(None)))
693
694 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
695 self.assertEqual(val, expected)
696 self.assertIsInstance(val, ssl._ASN1Object)
697 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
698 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
699 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100700 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
701 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100702
Christian Heimes72d28502013-11-23 13:56:58 +0100703 def test_purpose_enum(self):
704 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
705 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
706 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
707 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
708 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
709 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
710 '1.3.6.1.5.5.7.3.1')
711
712 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
713 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
714 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
715 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
716 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
717 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
718 '1.3.6.1.5.5.7.3.2')
719
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100720 def test_unsupported_dtls(self):
721 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
722 self.addCleanup(s.close)
723 with self.assertRaises(NotImplementedError) as cx:
724 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
725 self.assertEqual(str(cx.exception), "only stream sockets are supported")
726 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
727 with self.assertRaises(NotImplementedError) as cx:
728 ctx.wrap_socket(s)
729 self.assertEqual(str(cx.exception), "only stream sockets are supported")
730
Antoine Pitrouc695c952014-04-28 20:57:36 +0200731 def cert_time_ok(self, timestring, timestamp):
732 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
733
734 def cert_time_fail(self, timestring):
735 with self.assertRaises(ValueError):
736 ssl.cert_time_to_seconds(timestring)
737
738 @unittest.skipUnless(utc_offset(),
739 'local time needs to be different from UTC')
740 def test_cert_time_to_seconds_timezone(self):
741 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
742 # results if local timezone is not UTC
743 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
744 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
745
746 def test_cert_time_to_seconds(self):
747 timestring = "Jan 5 09:34:43 2018 GMT"
748 ts = 1515144883.0
749 self.cert_time_ok(timestring, ts)
750 # accept keyword parameter, assert its name
751 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
752 # accept both %e and %d (space or zero generated by strftime)
753 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
754 # case-insensitive
755 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
756 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
757 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
758 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
759 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
760 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
761 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
762 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
763
764 newyear_ts = 1230768000.0
765 # leap seconds
766 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
767 # same timestamp
768 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
769
770 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
771 # allow 60th second (even if it is not a leap second)
772 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
773 # allow 2nd leap second for compatibility with time.strptime()
774 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
775 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
776
777 # no special treatement for the special value:
778 # 99991231235959Z (rfc 5280)
779 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
780
781 @support.run_with_locale('LC_ALL', '')
782 def test_cert_time_to_seconds_locale(self):
783 # `cert_time_to_seconds()` should be locale independent
784
785 def local_february_name():
786 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
787
788 if local_february_name().lower() == 'feb':
789 self.skipTest("locale-specific month name needs to be "
790 "different from C locale")
791
792 # locale-independent
793 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
794 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
795
Martin Panter3840b2a2016-03-27 01:53:46 +0000796 def test_connect_ex_error(self):
797 server = socket.socket(socket.AF_INET)
798 self.addCleanup(server.close)
799 port = support.bind_port(server) # Reserve port but don't listen
800 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
801 cert_reqs=ssl.CERT_REQUIRED)
802 self.addCleanup(s.close)
803 rc = s.connect_ex((HOST, port))
804 # Issue #19919: Windows machines or VMs hosted on Windows
805 # machines sometimes return EWOULDBLOCK.
806 errors = (
807 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
808 errno.EWOULDBLOCK,
809 )
810 self.assertIn(rc, errors)
811
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100812
Antoine Pitrou152efa22010-05-16 18:19:27 +0000813class ContextTests(unittest.TestCase):
814
Antoine Pitrou23df4832010-08-04 17:14:06 +0000815 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000816 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100817 for protocol in PROTOCOLS:
818 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200819 ctx = ssl.SSLContext()
820 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000821 self.assertRaises(ValueError, ssl.SSLContext, -1)
822 self.assertRaises(ValueError, ssl.SSLContext, 42)
823
Antoine Pitrou23df4832010-08-04 17:14:06 +0000824 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000825 def test_protocol(self):
826 for proto in PROTOCOLS:
827 ctx = ssl.SSLContext(proto)
828 self.assertEqual(ctx.protocol, proto)
829
830 def test_ciphers(self):
831 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
832 ctx.set_ciphers("ALL")
833 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000834 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000835 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000836
Antoine Pitrou23df4832010-08-04 17:14:06 +0000837 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000838 def test_options(self):
839 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800840 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +0200841 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
842 if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
843 default |= ssl.OP_NO_COMPRESSION
844 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800845 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +0200846 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000847 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +0200848 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
849 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000850 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -0700851 # Ubuntu has OP_NO_SSLv3 forced on by default
852 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +0000853 else:
854 with self.assertRaises(ValueError):
855 ctx.options = 0
856
Christian Heimes22587792013-11-21 23:56:13 +0100857 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000858 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
859 # Default value
860 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
861 ctx.verify_mode = ssl.CERT_OPTIONAL
862 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
863 ctx.verify_mode = ssl.CERT_REQUIRED
864 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
865 ctx.verify_mode = ssl.CERT_NONE
866 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
867 with self.assertRaises(TypeError):
868 ctx.verify_mode = None
869 with self.assertRaises(ValueError):
870 ctx.verify_mode = 42
871
Christian Heimes2427b502013-11-23 11:24:32 +0100872 @unittest.skipUnless(have_verify_flags(),
873 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100874 def test_verify_flags(self):
875 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -0500876 # default value
877 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
878 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +0100879 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
880 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
881 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
882 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
883 ctx.verify_flags = ssl.VERIFY_DEFAULT
884 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
885 # supports any value
886 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
887 self.assertEqual(ctx.verify_flags,
888 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
889 with self.assertRaises(TypeError):
890 ctx.verify_flags = None
891
Antoine Pitrou152efa22010-05-16 18:19:27 +0000892 def test_load_cert_chain(self):
893 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
894 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500895 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000896 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
897 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200898 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000899 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000900 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000901 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000902 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000903 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000904 ctx.load_cert_chain(EMPTYCERT)
905 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000906 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000907 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
908 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
909 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000910 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000911 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000912 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000913 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000914 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000915 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
916 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000917 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000918 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +0000919 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200920 # Password protected key and cert
921 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
922 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
923 ctx.load_cert_chain(CERTFILE_PROTECTED,
924 password=bytearray(KEY_PASSWORD.encode()))
925 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
926 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
927 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
928 bytearray(KEY_PASSWORD.encode()))
929 with self.assertRaisesRegex(TypeError, "should be a string"):
930 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
931 with self.assertRaises(ssl.SSLError):
932 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
933 with self.assertRaisesRegex(ValueError, "cannot be longer"):
934 # openssl has a fixed limit on the password buffer.
935 # PEM_BUFSIZE is generally set to 1kb.
936 # Return a string larger than this.
937 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
938 # Password callback
939 def getpass_unicode():
940 return KEY_PASSWORD
941 def getpass_bytes():
942 return KEY_PASSWORD.encode()
943 def getpass_bytearray():
944 return bytearray(KEY_PASSWORD.encode())
945 def getpass_badpass():
946 return "badpass"
947 def getpass_huge():
948 return b'a' * (1024 * 1024)
949 def getpass_bad_type():
950 return 9
951 def getpass_exception():
952 raise Exception('getpass error')
953 class GetPassCallable:
954 def __call__(self):
955 return KEY_PASSWORD
956 def getpass(self):
957 return KEY_PASSWORD
958 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
959 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
960 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
961 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
962 ctx.load_cert_chain(CERTFILE_PROTECTED,
963 password=GetPassCallable().getpass)
964 with self.assertRaises(ssl.SSLError):
965 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
966 with self.assertRaisesRegex(ValueError, "cannot be longer"):
967 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
968 with self.assertRaisesRegex(TypeError, "must return a string"):
969 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
970 with self.assertRaisesRegex(Exception, "getpass error"):
971 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
972 # Make sure the password function isn't called if it isn't needed
973 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000974
975 def test_load_verify_locations(self):
976 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
977 ctx.load_verify_locations(CERTFILE)
978 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
979 ctx.load_verify_locations(BYTES_CERTFILE)
980 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
981 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100982 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200983 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000984 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000985 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000986 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000987 ctx.load_verify_locations(BADCERT)
988 ctx.load_verify_locations(CERTFILE, CAPATH)
989 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
990
Victor Stinner80f75e62011-01-29 11:31:20 +0000991 # Issue #10989: crash if the second argument type is invalid
992 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
993
Christian Heimesefff7062013-11-21 03:35:02 +0100994 def test_load_verify_cadata(self):
995 # test cadata
996 with open(CAFILE_CACERT) as f:
997 cacert_pem = f.read()
998 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
999 with open(CAFILE_NEURONIO) as f:
1000 neuronio_pem = f.read()
1001 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1002
1003 # test PEM
1004 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1005 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1006 ctx.load_verify_locations(cadata=cacert_pem)
1007 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1008 ctx.load_verify_locations(cadata=neuronio_pem)
1009 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1010 # cert already in hash table
1011 ctx.load_verify_locations(cadata=neuronio_pem)
1012 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1013
1014 # combined
1015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1016 combined = "\n".join((cacert_pem, neuronio_pem))
1017 ctx.load_verify_locations(cadata=combined)
1018 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1019
1020 # with junk around the certs
1021 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1022 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1023 neuronio_pem, "tail"]
1024 ctx.load_verify_locations(cadata="\n".join(combined))
1025 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1026
1027 # test DER
1028 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1029 ctx.load_verify_locations(cadata=cacert_der)
1030 ctx.load_verify_locations(cadata=neuronio_der)
1031 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1032 # cert already in hash table
1033 ctx.load_verify_locations(cadata=cacert_der)
1034 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1035
1036 # combined
1037 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1038 combined = b"".join((cacert_der, neuronio_der))
1039 ctx.load_verify_locations(cadata=combined)
1040 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1041
1042 # error cases
1043 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1044 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1045
1046 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1047 ctx.load_verify_locations(cadata="broken")
1048 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1049 ctx.load_verify_locations(cadata=b"broken")
1050
1051
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001052 def test_load_dh_params(self):
1053 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1054 ctx.load_dh_params(DHFILE)
1055 if os.name != 'nt':
1056 ctx.load_dh_params(BYTES_DHFILE)
1057 self.assertRaises(TypeError, ctx.load_dh_params)
1058 self.assertRaises(TypeError, ctx.load_dh_params, None)
1059 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001060 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001061 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001062 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001063 ctx.load_dh_params(CERTFILE)
1064
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001065 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001066 def test_session_stats(self):
1067 for proto in PROTOCOLS:
1068 ctx = ssl.SSLContext(proto)
1069 self.assertEqual(ctx.session_stats(), {
1070 'number': 0,
1071 'connect': 0,
1072 'connect_good': 0,
1073 'connect_renegotiate': 0,
1074 'accept': 0,
1075 'accept_good': 0,
1076 'accept_renegotiate': 0,
1077 'hits': 0,
1078 'misses': 0,
1079 'timeouts': 0,
1080 'cache_full': 0,
1081 })
1082
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001083 def test_set_default_verify_paths(self):
1084 # There's not much we can do to test that it acts as expected,
1085 # so just check it doesn't crash or raise an exception.
1086 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1087 ctx.set_default_verify_paths()
1088
Antoine Pitrou501da612011-12-21 09:27:41 +01001089 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001090 def test_set_ecdh_curve(self):
1091 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1092 ctx.set_ecdh_curve("prime256v1")
1093 ctx.set_ecdh_curve(b"prime256v1")
1094 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1095 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1096 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1097 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1098
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001099 @needs_sni
1100 def test_sni_callback(self):
1101 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1102
1103 # set_servername_callback expects a callable, or None
1104 self.assertRaises(TypeError, ctx.set_servername_callback)
1105 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1106 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1107 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1108
1109 def dummycallback(sock, servername, ctx):
1110 pass
1111 ctx.set_servername_callback(None)
1112 ctx.set_servername_callback(dummycallback)
1113
1114 @needs_sni
1115 def test_sni_callback_refcycle(self):
1116 # Reference cycles through the servername callback are detected
1117 # and cleared.
1118 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1119 def dummycallback(sock, servername, ctx, cycle=ctx):
1120 pass
1121 ctx.set_servername_callback(dummycallback)
1122 wr = weakref.ref(ctx)
1123 del ctx, dummycallback
1124 gc.collect()
1125 self.assertIs(wr(), None)
1126
Christian Heimes9a5395a2013-06-17 15:44:12 +02001127 def test_cert_store_stats(self):
1128 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1129 self.assertEqual(ctx.cert_store_stats(),
1130 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1131 ctx.load_cert_chain(CERTFILE)
1132 self.assertEqual(ctx.cert_store_stats(),
1133 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1134 ctx.load_verify_locations(CERTFILE)
1135 self.assertEqual(ctx.cert_store_stats(),
1136 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001137 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001138 self.assertEqual(ctx.cert_store_stats(),
1139 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1140
1141 def test_get_ca_certs(self):
1142 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1143 self.assertEqual(ctx.get_ca_certs(), [])
1144 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1145 ctx.load_verify_locations(CERTFILE)
1146 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001147 # but CAFILE_CACERT is a CA cert
1148 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001149 self.assertEqual(ctx.get_ca_certs(),
1150 [{'issuer': ((('organizationName', 'Root CA'),),
1151 (('organizationalUnitName', 'http://www.cacert.org'),),
1152 (('commonName', 'CA Cert Signing Authority'),),
1153 (('emailAddress', 'support@cacert.org'),)),
1154 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1155 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1156 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001157 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001158 'subject': ((('organizationName', 'Root CA'),),
1159 (('organizationalUnitName', 'http://www.cacert.org'),),
1160 (('commonName', 'CA Cert Signing Authority'),),
1161 (('emailAddress', 'support@cacert.org'),)),
1162 'version': 3}])
1163
Martin Panterb55f8b72016-01-14 12:53:56 +00001164 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001165 pem = f.read()
1166 der = ssl.PEM_cert_to_DER_cert(pem)
1167 self.assertEqual(ctx.get_ca_certs(True), [der])
1168
Christian Heimes72d28502013-11-23 13:56:58 +01001169 def test_load_default_certs(self):
1170 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1171 ctx.load_default_certs()
1172
1173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1174 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1175 ctx.load_default_certs()
1176
1177 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1178 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1179
1180 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1181 self.assertRaises(TypeError, ctx.load_default_certs, None)
1182 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1183
Benjamin Peterson91244e02014-10-03 18:17:15 -04001184 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001185 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001186 def test_load_default_certs_env(self):
1187 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1188 with support.EnvironmentVarGuard() as env:
1189 env["SSL_CERT_DIR"] = CAPATH
1190 env["SSL_CERT_FILE"] = CERTFILE
1191 ctx.load_default_certs()
1192 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1193
Benjamin Peterson91244e02014-10-03 18:17:15 -04001194 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1195 def test_load_default_certs_env_windows(self):
1196 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1197 ctx.load_default_certs()
1198 stats = ctx.cert_store_stats()
1199
1200 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1201 with support.EnvironmentVarGuard() as env:
1202 env["SSL_CERT_DIR"] = CAPATH
1203 env["SSL_CERT_FILE"] = CERTFILE
1204 ctx.load_default_certs()
1205 stats["x509"] += 1
1206 self.assertEqual(ctx.cert_store_stats(), stats)
1207
Christian Heimes4c05b472013-11-23 15:58:30 +01001208 def test_create_default_context(self):
1209 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001210 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001211 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001212 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001213 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001214 self.assertEqual(
1215 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1216 getattr(ssl, "OP_NO_COMPRESSION", 0),
1217 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001218
1219 with open(SIGNING_CA) as f:
1220 cadata = f.read()
1221 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1222 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001223 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001224 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1225 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001226 self.assertEqual(
1227 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1228 getattr(ssl, "OP_NO_COMPRESSION", 0),
1229 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001230
1231 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001232 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001233 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1234 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001235 self.assertEqual(
1236 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1237 getattr(ssl, "OP_NO_COMPRESSION", 0),
1238 )
1239 self.assertEqual(
1240 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1241 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1242 )
1243 self.assertEqual(
1244 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1245 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1246 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001247
Christian Heimes67986f92013-11-23 22:43:47 +01001248 def test__create_stdlib_context(self):
1249 ctx = ssl._create_stdlib_context()
1250 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1251 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001252 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001253 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1254
1255 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1256 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1257 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1258 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1259
1260 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001261 cert_reqs=ssl.CERT_REQUIRED,
1262 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001263 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1264 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001265 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001266 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1267
1268 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1269 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1270 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1271 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001272
Christian Heimes1aa9a752013-12-02 02:41:19 +01001273 def test_check_hostname(self):
1274 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1275 self.assertFalse(ctx.check_hostname)
1276
1277 # Requires CERT_REQUIRED or CERT_OPTIONAL
1278 with self.assertRaises(ValueError):
1279 ctx.check_hostname = True
1280 ctx.verify_mode = ssl.CERT_REQUIRED
1281 self.assertFalse(ctx.check_hostname)
1282 ctx.check_hostname = True
1283 self.assertTrue(ctx.check_hostname)
1284
1285 ctx.verify_mode = ssl.CERT_OPTIONAL
1286 ctx.check_hostname = True
1287 self.assertTrue(ctx.check_hostname)
1288
1289 # Cannot set CERT_NONE with check_hostname enabled
1290 with self.assertRaises(ValueError):
1291 ctx.verify_mode = ssl.CERT_NONE
1292 ctx.check_hostname = False
1293 self.assertFalse(ctx.check_hostname)
1294
Antoine Pitrou152efa22010-05-16 18:19:27 +00001295
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001296class SSLErrorTests(unittest.TestCase):
1297
1298 def test_str(self):
1299 # The str() of a SSLError doesn't include the errno
1300 e = ssl.SSLError(1, "foo")
1301 self.assertEqual(str(e), "foo")
1302 self.assertEqual(e.errno, 1)
1303 # Same for a subclass
1304 e = ssl.SSLZeroReturnError(1, "foo")
1305 self.assertEqual(str(e), "foo")
1306 self.assertEqual(e.errno, 1)
1307
1308 def test_lib_reason(self):
1309 # Test the library and reason attributes
1310 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1311 with self.assertRaises(ssl.SSLError) as cm:
1312 ctx.load_dh_params(CERTFILE)
1313 self.assertEqual(cm.exception.library, 'PEM')
1314 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1315 s = str(cm.exception)
1316 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1317
1318 def test_subclass(self):
1319 # Check that the appropriate SSLError subclass is raised
1320 # (this only tests one of them)
1321 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1322 with socket.socket() as s:
1323 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001324 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001325 c = socket.socket()
1326 c.connect(s.getsockname())
1327 c.setblocking(False)
1328 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001329 with self.assertRaises(ssl.SSLWantReadError) as cm:
1330 c.do_handshake()
1331 s = str(cm.exception)
1332 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1333 # For compatibility
1334 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1335
1336
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001337class MemoryBIOTests(unittest.TestCase):
1338
1339 def test_read_write(self):
1340 bio = ssl.MemoryBIO()
1341 bio.write(b'foo')
1342 self.assertEqual(bio.read(), b'foo')
1343 self.assertEqual(bio.read(), b'')
1344 bio.write(b'foo')
1345 bio.write(b'bar')
1346 self.assertEqual(bio.read(), b'foobar')
1347 self.assertEqual(bio.read(), b'')
1348 bio.write(b'baz')
1349 self.assertEqual(bio.read(2), b'ba')
1350 self.assertEqual(bio.read(1), b'z')
1351 self.assertEqual(bio.read(1), b'')
1352
1353 def test_eof(self):
1354 bio = ssl.MemoryBIO()
1355 self.assertFalse(bio.eof)
1356 self.assertEqual(bio.read(), b'')
1357 self.assertFalse(bio.eof)
1358 bio.write(b'foo')
1359 self.assertFalse(bio.eof)
1360 bio.write_eof()
1361 self.assertFalse(bio.eof)
1362 self.assertEqual(bio.read(2), b'fo')
1363 self.assertFalse(bio.eof)
1364 self.assertEqual(bio.read(1), b'o')
1365 self.assertTrue(bio.eof)
1366 self.assertEqual(bio.read(), b'')
1367 self.assertTrue(bio.eof)
1368
1369 def test_pending(self):
1370 bio = ssl.MemoryBIO()
1371 self.assertEqual(bio.pending, 0)
1372 bio.write(b'foo')
1373 self.assertEqual(bio.pending, 3)
1374 for i in range(3):
1375 bio.read(1)
1376 self.assertEqual(bio.pending, 3-i-1)
1377 for i in range(3):
1378 bio.write(b'x')
1379 self.assertEqual(bio.pending, i+1)
1380 bio.read()
1381 self.assertEqual(bio.pending, 0)
1382
1383 def test_buffer_types(self):
1384 bio = ssl.MemoryBIO()
1385 bio.write(b'foo')
1386 self.assertEqual(bio.read(), b'foo')
1387 bio.write(bytearray(b'bar'))
1388 self.assertEqual(bio.read(), b'bar')
1389 bio.write(memoryview(b'baz'))
1390 self.assertEqual(bio.read(), b'baz')
1391
1392 def test_error_types(self):
1393 bio = ssl.MemoryBIO()
1394 self.assertRaises(TypeError, bio.write, 'foo')
1395 self.assertRaises(TypeError, bio.write, None)
1396 self.assertRaises(TypeError, bio.write, True)
1397 self.assertRaises(TypeError, bio.write, 1)
1398
1399
Martin Panter3840b2a2016-03-27 01:53:46 +00001400@unittest.skipUnless(_have_threads, "Needs threading module")
1401class SimpleBackgroundTests(unittest.TestCase):
1402
1403 """Tests that connect to a simple server running in the background"""
1404
1405 def setUp(self):
1406 server = ThreadedEchoServer(SIGNED_CERTFILE)
1407 self.server_addr = (HOST, server.port)
1408 server.__enter__()
1409 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001410
Antoine Pitrou480a1242010-04-28 21:37:09 +00001411 def test_connect(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001412 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1413 cert_reqs=ssl.CERT_NONE) as s:
1414 s.connect(self.server_addr)
1415 self.assertEqual({}, s.getpeercert())
Antoine Pitrou350c7222010-09-09 13:31:46 +00001416
Martin Panter3840b2a2016-03-27 01:53:46 +00001417 # this should succeed because we specify the root cert
1418 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1419 cert_reqs=ssl.CERT_REQUIRED,
1420 ca_certs=SIGNING_CA) as s:
1421 s.connect(self.server_addr)
1422 self.assertTrue(s.getpeercert())
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001423
Martin Panter3840b2a2016-03-27 01:53:46 +00001424 def test_connect_fail(self):
1425 # This should fail because we have no verification certs. Connection
1426 # failure crashes ThreadedEchoServer, so run this in an independent
1427 # test method.
1428 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1429 cert_reqs=ssl.CERT_REQUIRED)
1430 self.addCleanup(s.close)
1431 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1432 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001433
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001434 def test_connect_ex(self):
1435 # Issue #11326: check connect_ex() implementation
Martin Panter3840b2a2016-03-27 01:53:46 +00001436 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1437 cert_reqs=ssl.CERT_REQUIRED,
1438 ca_certs=SIGNING_CA)
1439 self.addCleanup(s.close)
1440 self.assertEqual(0, s.connect_ex(self.server_addr))
1441 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001442
1443 def test_non_blocking_connect_ex(self):
1444 # Issue #11326: non-blocking connect_ex() should allow handshake
1445 # to proceed after the socket gets ready.
Martin Panter3840b2a2016-03-27 01:53:46 +00001446 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1447 cert_reqs=ssl.CERT_REQUIRED,
1448 ca_certs=SIGNING_CA,
1449 do_handshake_on_connect=False)
1450 self.addCleanup(s.close)
1451 s.setblocking(False)
1452 rc = s.connect_ex(self.server_addr)
1453 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1454 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1455 # Wait for connect to finish
1456 select.select([], [s], [], 5.0)
1457 # Non-blocking handshake
1458 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001459 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001460 s.do_handshake()
1461 break
1462 except ssl.SSLWantReadError:
1463 select.select([s], [], [], 5.0)
1464 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001465 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001466 # SSL established
1467 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001468
Antoine Pitrou152efa22010-05-16 18:19:27 +00001469 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001470 # Same as test_connect, but with a separately created context
1471 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1472 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1473 s.connect(self.server_addr)
1474 self.assertEqual({}, s.getpeercert())
1475 # Same with a server hostname
1476 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1477 server_hostname="dummy") as s:
1478 s.connect(self.server_addr)
1479 ctx.verify_mode = ssl.CERT_REQUIRED
1480 # This should succeed because we specify the root cert
1481 ctx.load_verify_locations(SIGNING_CA)
1482 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1483 s.connect(self.server_addr)
1484 cert = s.getpeercert()
1485 self.assertTrue(cert)
1486
1487 def test_connect_with_context_fail(self):
1488 # This should fail because we have no verification certs. Connection
1489 # failure crashes ThreadedEchoServer, so run this in an independent
1490 # test method.
1491 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1492 ctx.verify_mode = ssl.CERT_REQUIRED
1493 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1494 self.addCleanup(s.close)
1495 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1496 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001497
1498 def test_connect_capath(self):
1499 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001500 # NOTE: the subject hashing algorithm has been changed between
1501 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1502 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001503 # filename) for this test to be portable across OpenSSL releases.
Martin Panter3840b2a2016-03-27 01:53:46 +00001504 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1505 ctx.verify_mode = ssl.CERT_REQUIRED
1506 ctx.load_verify_locations(capath=CAPATH)
1507 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1508 s.connect(self.server_addr)
1509 cert = s.getpeercert()
1510 self.assertTrue(cert)
1511 # Same with a bytes `capath` argument
1512 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1513 ctx.verify_mode = ssl.CERT_REQUIRED
1514 ctx.load_verify_locations(capath=BYTES_CAPATH)
1515 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1516 s.connect(self.server_addr)
1517 cert = s.getpeercert()
1518 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001519
Christian Heimesefff7062013-11-21 03:35:02 +01001520 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001521 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001522 pem = f.read()
1523 der = ssl.PEM_cert_to_DER_cert(pem)
Martin Panter3840b2a2016-03-27 01:53:46 +00001524 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1525 ctx.verify_mode = ssl.CERT_REQUIRED
1526 ctx.load_verify_locations(cadata=pem)
1527 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1528 s.connect(self.server_addr)
1529 cert = s.getpeercert()
1530 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001531
Martin Panter3840b2a2016-03-27 01:53:46 +00001532 # same with DER
1533 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1534 ctx.verify_mode = ssl.CERT_REQUIRED
1535 ctx.load_verify_locations(cadata=der)
1536 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1537 s.connect(self.server_addr)
1538 cert = s.getpeercert()
1539 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001540
Antoine Pitroue3220242010-04-24 11:13:53 +00001541 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1542 def test_makefile_close(self):
1543 # Issue #5238: creating a file-like object with makefile() shouldn't
1544 # delay closing the underlying "real socket" (here tested with its
1545 # file descriptor, hence skipping the test under Windows).
Martin Panter3840b2a2016-03-27 01:53:46 +00001546 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1547 ss.connect(self.server_addr)
1548 fd = ss.fileno()
1549 f = ss.makefile()
1550 f.close()
1551 # The fd is still open
1552 os.read(fd, 0)
1553 # Closing the SSL socket should close the fd too
1554 ss.close()
1555 gc.collect()
1556 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001557 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001558 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001559
Antoine Pitrou480a1242010-04-28 21:37:09 +00001560 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001561 s = socket.socket(socket.AF_INET)
1562 s.connect(self.server_addr)
1563 s.setblocking(False)
1564 s = ssl.wrap_socket(s,
1565 cert_reqs=ssl.CERT_NONE,
1566 do_handshake_on_connect=False)
1567 self.addCleanup(s.close)
1568 count = 0
1569 while True:
1570 try:
1571 count += 1
1572 s.do_handshake()
1573 break
1574 except ssl.SSLWantReadError:
1575 select.select([s], [], [])
1576 except ssl.SSLWantWriteError:
1577 select.select([], [s], [])
1578 if support.verbose:
1579 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001580
Antoine Pitrou480a1242010-04-28 21:37:09 +00001581 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001582 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001583
Martin Panter3840b2a2016-03-27 01:53:46 +00001584 def test_get_server_certificate_fail(self):
1585 # Connection failure crashes ThreadedEchoServer, so run this in an
1586 # independent test method
1587 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001588
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001589 def test_ciphers(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001590 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1591 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1592 s.connect(self.server_addr)
1593 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1594 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1595 s.connect(self.server_addr)
1596 # Error checking can happen at instantiation or when connecting
1597 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1598 with socket.socket(socket.AF_INET) as sock:
1599 s = ssl.wrap_socket(sock,
1600 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1601 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001602
Christian Heimes9a5395a2013-06-17 15:44:12 +02001603 def test_get_ca_certs_capath(self):
1604 # capath certs are loaded on request
Martin Panter3840b2a2016-03-27 01:53:46 +00001605 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1606 ctx.verify_mode = ssl.CERT_REQUIRED
1607 ctx.load_verify_locations(capath=CAPATH)
1608 self.assertEqual(ctx.get_ca_certs(), [])
1609 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1610 s.connect(self.server_addr)
1611 cert = s.getpeercert()
1612 self.assertTrue(cert)
1613 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001614
Christian Heimes575596e2013-12-15 21:49:17 +01001615 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001616 def test_context_setget(self):
1617 # Check that the context of a connected socket can be replaced.
Martin Panter3840b2a2016-03-27 01:53:46 +00001618 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1619 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1620 s = socket.socket(socket.AF_INET)
1621 with ctx1.wrap_socket(s) as ss:
1622 ss.connect(self.server_addr)
1623 self.assertIs(ss.context, ctx1)
1624 self.assertIs(ss._sslobj.context, ctx1)
1625 ss.context = ctx2
1626 self.assertIs(ss.context, ctx2)
1627 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001628
1629 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1630 # A simple IO loop. Call func(*args) depending on the error we get
1631 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1632 timeout = kwargs.get('timeout', 10)
1633 count = 0
1634 while True:
1635 errno = None
1636 count += 1
1637 try:
1638 ret = func(*args)
1639 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001640 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001641 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001642 raise
1643 errno = e.errno
1644 # Get any data from the outgoing BIO irrespective of any error, and
1645 # send it to the socket.
1646 buf = outgoing.read()
1647 sock.sendall(buf)
1648 # If there's no error, we're done. For WANT_READ, we need to get
1649 # data from the socket and put it in the incoming BIO.
1650 if errno is None:
1651 break
1652 elif errno == ssl.SSL_ERROR_WANT_READ:
1653 buf = sock.recv(32768)
1654 if buf:
1655 incoming.write(buf)
1656 else:
1657 incoming.write_eof()
1658 if support.verbose:
1659 sys.stdout.write("Needed %d calls to complete %s().\n"
1660 % (count, func.__name__))
1661 return ret
1662
Martin Panter3840b2a2016-03-27 01:53:46 +00001663 def test_bio_handshake(self):
1664 sock = socket.socket(socket.AF_INET)
1665 self.addCleanup(sock.close)
1666 sock.connect(self.server_addr)
1667 incoming = ssl.MemoryBIO()
1668 outgoing = ssl.MemoryBIO()
1669 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1670 ctx.verify_mode = ssl.CERT_REQUIRED
1671 ctx.load_verify_locations(SIGNING_CA)
1672 ctx.check_hostname = True
1673 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
1674 self.assertIs(sslobj._sslobj.owner, sslobj)
1675 self.assertIsNone(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001676 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001677 self.assertRaises(ValueError, sslobj.getpeercert)
1678 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1679 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1680 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1681 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001682 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001683 self.assertTrue(sslobj.getpeercert())
1684 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1685 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1686 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001687 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001688 except ssl.SSLSyscallError:
1689 # If the server shuts down the TCP connection without sending a
1690 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1691 pass
1692 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1693
1694 def test_bio_read_write_data(self):
1695 sock = socket.socket(socket.AF_INET)
1696 self.addCleanup(sock.close)
1697 sock.connect(self.server_addr)
1698 incoming = ssl.MemoryBIO()
1699 outgoing = ssl.MemoryBIO()
1700 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1701 ctx.verify_mode = ssl.CERT_NONE
1702 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1703 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1704 req = b'FOO\n'
1705 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1706 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1707 self.assertEqual(buf, b'foo\n')
1708 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001709
1710
Martin Panter3840b2a2016-03-27 01:53:46 +00001711class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001712
Martin Panter3840b2a2016-03-27 01:53:46 +00001713 def test_timeout_connect_ex(self):
1714 # Issue #12065: on a timeout, connect_ex() should return the original
1715 # errno (mimicking the behaviour of non-SSL sockets).
1716 with support.transient_internet(REMOTE_HOST):
1717 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1718 cert_reqs=ssl.CERT_REQUIRED,
1719 do_handshake_on_connect=False)
1720 self.addCleanup(s.close)
1721 s.settimeout(0.0000001)
1722 rc = s.connect_ex((REMOTE_HOST, 443))
1723 if rc == 0:
1724 self.skipTest("REMOTE_HOST responded too quickly")
1725 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1726
1727 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1728 def test_get_server_certificate_ipv6(self):
1729 with support.transient_internet('ipv6.google.com'):
1730 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1731 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1732
1733 def test_algorithms(self):
1734 # Issue #8484: all algorithms should be available when verifying a
1735 # certificate.
1736 # SHA256 was added in OpenSSL 0.9.8
1737 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1738 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
1739 # sha256.tbs-internet.com needs SNI to use the correct certificate
1740 if not ssl.HAS_SNI:
1741 self.skipTest("SNI needed for this test")
1742 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1743 remote = ("sha256.tbs-internet.com", 443)
1744 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
1745 with support.transient_internet("sha256.tbs-internet.com"):
1746 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1747 ctx.verify_mode = ssl.CERT_REQUIRED
1748 ctx.load_verify_locations(sha256_cert)
1749 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1750 server_hostname="sha256.tbs-internet.com")
1751 try:
1752 s.connect(remote)
1753 if support.verbose:
1754 sys.stdout.write("\nCipher with %r is %r\n" %
1755 (remote, s.cipher()))
1756 sys.stdout.write("Certificate is:\n%s\n" %
1757 pprint.pformat(s.getpeercert()))
1758 finally:
1759 s.close()
1760
1761
1762def _test_get_server_certificate(test, host, port, cert=None):
1763 pem = ssl.get_server_certificate((host, port))
1764 if not pem:
1765 test.fail("No server certificate on %s:%s!" % (host, port))
1766
1767 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1768 if not pem:
1769 test.fail("No server certificate on %s:%s!" % (host, port))
1770 if support.verbose:
1771 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1772
1773def _test_get_server_certificate_fail(test, host, port):
1774 try:
1775 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1776 except ssl.SSLError as x:
1777 #should fail
1778 if support.verbose:
1779 sys.stdout.write("%s\n" % x)
1780 else:
1781 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1782
1783
1784if _have_threads:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001785 from test.ssl_servers import make_https_server
1786
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001787 class ThreadedEchoServer(threading.Thread):
1788
1789 class ConnectionHandler(threading.Thread):
1790
1791 """A mildly complicated class, because we want it to work both
1792 with and without the SSL wrapper around the socket connection, so
1793 that we can test the STARTTLS functionality."""
1794
Bill Janssen6e027db2007-11-15 22:23:56 +00001795 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001796 self.server = server
1797 self.running = False
1798 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001799 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001800 self.sock.setblocking(1)
1801 self.sslconn = None
1802 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001803 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001804
Antoine Pitrou480a1242010-04-28 21:37:09 +00001805 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001806 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001807 self.sslconn = self.server.context.wrap_socket(
1808 self.sock, server_side=True)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001809 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1810 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001811 except (ssl.SSLError, ConnectionResetError) as e:
1812 # We treat ConnectionResetError as though it were an
1813 # SSLError - OpenSSL on Ubuntu abruptly closes the
1814 # connection when asked to use an unsupported protocol.
1815 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001816 # XXX Various errors can have happened here, for example
1817 # a mismatching protocol version, an invalid certificate,
1818 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001819 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001820 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001821 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001822 self.running = False
1823 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001824 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001825 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001826 else:
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001827 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
Antoine Pitroub5218772010-05-21 09:56:06 +00001828 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001829 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001830 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001831 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1832 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001833 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001834 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1835 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001836 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001837 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001838 sys.stdout.write(" server: selected protocol is now "
1839 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001840 return True
1841
1842 def read(self):
1843 if self.sslconn:
1844 return self.sslconn.read()
1845 else:
1846 return self.sock.recv(1024)
1847
1848 def write(self, bytes):
1849 if self.sslconn:
1850 return self.sslconn.write(bytes)
1851 else:
1852 return self.sock.send(bytes)
1853
1854 def close(self):
1855 if self.sslconn:
1856 self.sslconn.close()
1857 else:
1858 self.sock.close()
1859
Antoine Pitrou480a1242010-04-28 21:37:09 +00001860 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001861 self.running = True
1862 if not self.server.starttls_server:
1863 if not self.wrap_conn():
1864 return
1865 while self.running:
1866 try:
1867 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001868 stripped = msg.strip()
1869 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001870 # eof, so quit this handler
1871 self.running = False
Martin Panter3840b2a2016-03-27 01:53:46 +00001872 try:
1873 self.sock = self.sslconn.unwrap()
1874 except OSError:
1875 # Many tests shut the TCP connection down
1876 # without an SSL shutdown. This causes
1877 # unwrap() to raise OSError with errno=0!
1878 pass
1879 else:
1880 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001881 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001882 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001883 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001884 sys.stdout.write(" server: client closed connection\n")
1885 self.close()
1886 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001887 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001888 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001889 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001890 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001891 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001892 if not self.wrap_conn():
1893 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001894 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001895 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001896 if support.verbose and self.server.connectionchatty:
1897 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001898 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001899 self.sock = self.sslconn.unwrap()
1900 self.sslconn = None
1901 if support.verbose and self.server.connectionchatty:
1902 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001903 elif stripped == b'CB tls-unique':
1904 if support.verbose and self.server.connectionchatty:
1905 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1906 data = self.sslconn.get_channel_binding("tls-unique")
1907 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001908 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001909 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001910 self.server.connectionchatty):
1911 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001912 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1913 % (msg, ctype, msg.lower(), ctype))
1914 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001915 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001916 if self.server.chatty:
1917 handle_error("Test server failure:\n")
1918 self.close()
1919 self.running = False
1920 # normally, we'd just stop here, but for the test
1921 # harness, we want to stop the server
1922 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001923
Antoine Pitroub5218772010-05-21 09:56:06 +00001924 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001925 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001926 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersoncca27322015-01-23 16:35:37 -05001927 npn_protocols=None, alpn_protocols=None,
1928 ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001929 if context:
1930 self.context = context
1931 else:
1932 self.context = ssl.SSLContext(ssl_version
1933 if ssl_version is not None
1934 else ssl.PROTOCOL_TLSv1)
1935 self.context.verify_mode = (certreqs if certreqs is not None
1936 else ssl.CERT_NONE)
1937 if cacerts:
1938 self.context.load_verify_locations(cacerts)
1939 if certificate:
1940 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001941 if npn_protocols:
1942 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001943 if alpn_protocols:
1944 self.context.set_alpn_protocols(alpn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001945 if ciphers:
1946 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001947 self.chatty = chatty
1948 self.connectionchatty = connectionchatty
1949 self.starttls_server = starttls_server
1950 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001951 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001952 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001953 self.active = False
Benjamin Petersoncca27322015-01-23 16:35:37 -05001954 self.selected_npn_protocols = []
1955 self.selected_alpn_protocols = []
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001956 self.shared_ciphers = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001957 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001958 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001959 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001960
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001961 def __enter__(self):
1962 self.start(threading.Event())
1963 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001964 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001965
1966 def __exit__(self, *args):
1967 self.stop()
1968 self.join()
1969
Antoine Pitrou480a1242010-04-28 21:37:09 +00001970 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001971 self.flag = flag
1972 threading.Thread.start(self)
1973
Antoine Pitrou480a1242010-04-28 21:37:09 +00001974 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001975 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01001976 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001977 self.active = True
1978 if self.flag:
1979 # signal an event
1980 self.flag.set()
1981 while self.active:
1982 try:
1983 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001984 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001985 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001986 + repr(connaddr) + '\n')
1987 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001988 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001989 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001990 except socket.timeout:
1991 pass
1992 except KeyboardInterrupt:
1993 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001994 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001995
Antoine Pitrou480a1242010-04-28 21:37:09 +00001996 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001997 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001998
Bill Janssen54cc54c2007-12-14 22:08:56 +00001999 class AsyncoreEchoServer(threading.Thread):
2000
2001 # this one's based on asyncore.dispatcher
2002
2003 class EchoServer (asyncore.dispatcher):
2004
2005 class ConnectionHandler (asyncore.dispatcher_with_send):
2006
2007 def __init__(self, conn, certfile):
2008 self.socket = ssl.wrap_socket(conn, server_side=True,
2009 certfile=certfile,
2010 do_handshake_on_connect=False)
2011 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002012 self._ssl_accepting = True
2013 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002014
2015 def readable(self):
2016 if isinstance(self.socket, ssl.SSLSocket):
2017 while self.socket.pending() > 0:
2018 self.handle_read_event()
2019 return True
2020
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002021 def _do_ssl_handshake(self):
2022 try:
2023 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02002024 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2025 return
2026 except ssl.SSLEOFError:
2027 return self.handle_close()
2028 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002029 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002030 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002031 if err.args[0] == errno.ECONNABORTED:
2032 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002033 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002034 self._ssl_accepting = False
2035
2036 def handle_read(self):
2037 if self._ssl_accepting:
2038 self._do_ssl_handshake()
2039 else:
2040 data = self.recv(1024)
2041 if support.verbose:
2042 sys.stdout.write(" server: read %s from client\n" % repr(data))
2043 if not data:
2044 self.close()
2045 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002046 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002047
2048 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00002049 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002050 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002051 sys.stdout.write(" server: closed connection %s\n" % self.socket)
2052
2053 def handle_error(self):
2054 raise
2055
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002056 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002057 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002058 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2059 self.port = support.bind_port(sock, '')
2060 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002061 self.listen(5)
2062
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00002063 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002064 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002065 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2066 self.ConnectionHandler(sock_obj, self.certfile)
2067
2068 def handle_error(self):
2069 raise
2070
Trent Nelson78520002008-04-10 20:54:35 +00002071 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002072 self.flag = None
2073 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002074 self.server = self.EchoServer(certfile)
2075 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00002076 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002077 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002078
2079 def __str__(self):
2080 return "<%s %s>" % (self.__class__.__name__, self.server)
2081
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002082 def __enter__(self):
2083 self.start(threading.Event())
2084 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002085 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002086
2087 def __exit__(self, *args):
2088 if support.verbose:
2089 sys.stdout.write(" cleanup: stopping server.\n")
2090 self.stop()
2091 if support.verbose:
2092 sys.stdout.write(" cleanup: joining server thread.\n")
2093 self.join()
2094 if support.verbose:
2095 sys.stdout.write(" cleanup: successfully joined.\n")
2096
Bill Janssen54cc54c2007-12-14 22:08:56 +00002097 def start (self, flag=None):
2098 self.flag = flag
2099 threading.Thread.start(self)
2100
Antoine Pitrou480a1242010-04-28 21:37:09 +00002101 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002102 self.active = True
2103 if self.flag:
2104 self.flag.set()
2105 while self.active:
2106 try:
2107 asyncore.loop(1)
2108 except:
2109 pass
2110
Antoine Pitrou480a1242010-04-28 21:37:09 +00002111 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002112 self.active = False
2113 self.server.close()
2114
Antoine Pitroub5218772010-05-21 09:56:06 +00002115 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002116 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00002117 """
2118 Launch a server, connect a client to it and try various reads
2119 and writes.
2120 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002121 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00002122 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002123 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00002124 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002125 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002126 with client_context.wrap_socket(socket.socket(),
2127 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002128 s.connect((HOST, server.port))
2129 for arg in [indata, bytearray(indata), memoryview(indata)]:
2130 if connectionchatty:
2131 if support.verbose:
2132 sys.stdout.write(
2133 " client: sending %r...\n" % indata)
2134 s.write(arg)
2135 outdata = s.read()
2136 if connectionchatty:
2137 if support.verbose:
2138 sys.stdout.write(" client: read %r\n" % outdata)
2139 if outdata != indata.lower():
2140 raise AssertionError(
2141 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2142 % (outdata[:20], len(outdata),
2143 indata[:20].lower(), len(indata)))
2144 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00002145 if connectionchatty:
2146 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002147 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002148 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01002149 'compression': s.compression(),
2150 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002151 'peercert': s.getpeercert(),
Benjamin Petersoncca27322015-01-23 16:35:37 -05002152 'client_alpn_protocol': s.selected_alpn_protocol(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02002153 'client_npn_protocol': s.selected_npn_protocol(),
2154 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002155 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01002156 s.close()
Benjamin Petersoncca27322015-01-23 16:35:37 -05002157 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2158 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Peterson4cb17812015-01-07 11:14:26 -06002159 stats['server_shared_ciphers'] = server.shared_ciphers
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002160 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00002161
Antoine Pitroub5218772010-05-21 09:56:06 +00002162 def try_protocol_combo(server_protocol, client_protocol, expect_success,
2163 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02002164 """
2165 Try to SSL-connect using *client_protocol* to *server_protocol*.
2166 If *expect_success* is true, assert that the connection succeeds,
2167 if it's false, assert that the connection fails.
2168 Also, if *expect_success* is a string, assert that it is the protocol
2169 version actually used by the connection.
2170 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00002171 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002172 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00002173 certtype = {
2174 ssl.CERT_NONE: "CERT_NONE",
2175 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2176 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2177 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002178 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002179 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002180 sys.stdout.write(formatstr %
2181 (ssl.get_protocol_name(client_protocol),
2182 ssl.get_protocol_name(server_protocol),
2183 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00002184 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002185 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00002186 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002187 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002188
2189 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2190 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2191 # starting from OpenSSL 1.0.0 (see issue #8322).
2192 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2193 client_context.set_ciphers("ALL")
2194
Antoine Pitroub5218772010-05-21 09:56:06 +00002195 for ctx in (client_context, server_context):
2196 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00002197 ctx.load_cert_chain(CERTFILE)
2198 ctx.load_verify_locations(CERTFILE)
2199 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02002200 stats = server_params_test(client_context, server_context,
2201 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002202 # Protocol mismatch can result in either an SSLError, or a
2203 # "Connection reset by peer" error.
2204 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002205 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002206 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002207 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002208 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002209 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002210 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002211 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002212 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002213 "Client protocol %s succeeded with server protocol %s!"
2214 % (ssl.get_protocol_name(client_protocol),
2215 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002216 elif (expect_success is not True
2217 and expect_success != stats['version']):
2218 raise AssertionError("version mismatch: expected %r, got %r"
2219 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002220
2221
Bill Janssen6e027db2007-11-15 22:23:56 +00002222 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002223
Antoine Pitrou23df4832010-08-04 17:14:06 +00002224 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002225 def test_echo(self):
2226 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002227 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002228 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002229 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002230 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2231 context = ssl.SSLContext(protocol)
2232 context.load_cert_chain(CERTFILE)
2233 server_params_test(context, context,
2234 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002235
Antoine Pitrou480a1242010-04-28 21:37:09 +00002236 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002237 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002238 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002239 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2240 context.verify_mode = ssl.CERT_REQUIRED
2241 context.load_verify_locations(CERTFILE)
2242 context.load_cert_chain(CERTFILE)
2243 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002244 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002245 s = context.wrap_socket(socket.socket(),
2246 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002247 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002248 # getpeercert() raise ValueError while the handshake isn't
2249 # done.
2250 with self.assertRaises(ValueError):
2251 s.getpeercert()
2252 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002253 cert = s.getpeercert()
2254 self.assertTrue(cert, "Can't get peer certificate.")
2255 cipher = s.cipher()
2256 if support.verbose:
2257 sys.stdout.write(pprint.pformat(cert) + '\n')
2258 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2259 if 'subject' not in cert:
2260 self.fail("No subject field in certificate: %s." %
2261 pprint.pformat(cert))
2262 if ((('organizationName', 'Python Software Foundation'),)
2263 not in cert['subject']):
2264 self.fail(
2265 "Missing or invalid 'organizationName' field in certificate subject; "
2266 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002267 self.assertIn('notBefore', cert)
2268 self.assertIn('notAfter', cert)
2269 before = ssl.cert_time_to_seconds(cert['notBefore'])
2270 after = ssl.cert_time_to_seconds(cert['notAfter'])
2271 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002272 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002273
Christian Heimes2427b502013-11-23 11:24:32 +01002274 @unittest.skipUnless(have_verify_flags(),
2275 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002276 def test_crl_check(self):
2277 if support.verbose:
2278 sys.stdout.write("\n")
2279
2280 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2281 server_context.load_cert_chain(SIGNED_CERTFILE)
2282
2283 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2284 context.verify_mode = ssl.CERT_REQUIRED
2285 context.load_verify_locations(SIGNING_CA)
Benjamin Petersonc3d9c5c2015-03-04 23:18:48 -05002286 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2287 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01002288
2289 # VERIFY_DEFAULT should pass
2290 server = ThreadedEchoServer(context=server_context, chatty=True)
2291 with server:
2292 with context.wrap_socket(socket.socket()) as s:
2293 s.connect((HOST, server.port))
2294 cert = s.getpeercert()
2295 self.assertTrue(cert, "Can't get peer certificate.")
2296
2297 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002298 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002299
2300 server = ThreadedEchoServer(context=server_context, chatty=True)
2301 with server:
2302 with context.wrap_socket(socket.socket()) as s:
2303 with self.assertRaisesRegex(ssl.SSLError,
2304 "certificate verify failed"):
2305 s.connect((HOST, server.port))
2306
2307 # now load a CRL file. The CRL file is signed by the CA.
2308 context.load_verify_locations(CRLFILE)
2309
2310 server = ThreadedEchoServer(context=server_context, chatty=True)
2311 with server:
2312 with context.wrap_socket(socket.socket()) as s:
2313 s.connect((HOST, server.port))
2314 cert = s.getpeercert()
2315 self.assertTrue(cert, "Can't get peer certificate.")
2316
Christian Heimes1aa9a752013-12-02 02:41:19 +01002317 def test_check_hostname(self):
2318 if support.verbose:
2319 sys.stdout.write("\n")
2320
2321 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2322 server_context.load_cert_chain(SIGNED_CERTFILE)
2323
2324 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2325 context.verify_mode = ssl.CERT_REQUIRED
2326 context.check_hostname = True
2327 context.load_verify_locations(SIGNING_CA)
2328
2329 # correct hostname should verify
2330 server = ThreadedEchoServer(context=server_context, chatty=True)
2331 with server:
2332 with context.wrap_socket(socket.socket(),
2333 server_hostname="localhost") as s:
2334 s.connect((HOST, server.port))
2335 cert = s.getpeercert()
2336 self.assertTrue(cert, "Can't get peer certificate.")
2337
2338 # incorrect hostname should raise an exception
2339 server = ThreadedEchoServer(context=server_context, chatty=True)
2340 with server:
2341 with context.wrap_socket(socket.socket(),
2342 server_hostname="invalid") as s:
2343 with self.assertRaisesRegex(ssl.CertificateError,
2344 "hostname 'invalid' doesn't match 'localhost'"):
2345 s.connect((HOST, server.port))
2346
2347 # missing server_hostname arg should cause an exception, too
2348 server = ThreadedEchoServer(context=server_context, chatty=True)
2349 with server:
2350 with socket.socket() as s:
2351 with self.assertRaisesRegex(ValueError,
2352 "check_hostname requires server_hostname"):
2353 context.wrap_socket(s)
2354
Martin Panter407b62f2016-01-30 03:41:43 +00002355 def test_wrong_cert(self):
Martin Panter3464ea22016-02-01 21:58:11 +00002356 """Connecting when the server rejects the client's certificate
2357
2358 Launch a server with CERT_REQUIRED, and check that trying to
2359 connect to it with a wrong client certificate fails.
2360 """
2361 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2362 "wrongcert.pem")
2363 server = ThreadedEchoServer(CERTFILE,
2364 certreqs=ssl.CERT_REQUIRED,
2365 cacerts=CERTFILE, chatty=False,
2366 connectionchatty=False)
2367 with server, \
2368 socket.socket() as sock, \
2369 ssl.wrap_socket(sock,
2370 certfile=certfile,
2371 ssl_version=ssl.PROTOCOL_TLSv1) as s:
2372 try:
2373 # Expect either an SSL error about the server rejecting
2374 # the connection, or a low-level connection reset (which
2375 # sometimes happens on Windows)
2376 s.connect((HOST, server.port))
2377 except ssl.SSLError as e:
2378 if support.verbose:
2379 sys.stdout.write("\nSSLError is %r\n" % e)
2380 except OSError as e:
2381 if e.errno != errno.ECONNRESET:
2382 raise
2383 if support.verbose:
2384 sys.stdout.write("\nsocket.error is %r\n" % e)
2385 else:
2386 self.fail("Use of invalid cert should have failed!")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002387
Antoine Pitrou480a1242010-04-28 21:37:09 +00002388 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002389 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002390 in the client when attempting handshake.
2391 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002392 listener_ready = threading.Event()
2393 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002394
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002395 s = socket.socket()
2396 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002397
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002398 # `listener` runs in a thread. It sits in an accept() until
2399 # the main thread connects. Then it rudely closes the socket,
2400 # and sets Event `listener_gone` to let the main thread know
2401 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002402 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002403 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002404 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002405 newsock, addr = s.accept()
2406 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002407 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002408 listener_gone.set()
2409
2410 def connector():
2411 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002412 with socket.socket() as c:
2413 c.connect((HOST, port))
2414 listener_gone.wait()
2415 try:
2416 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002417 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002418 pass
2419 else:
2420 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002421
2422 t = threading.Thread(target=listener)
2423 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002424 try:
2425 connector()
2426 finally:
2427 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002428
Antoine Pitrou23df4832010-08-04 17:14:06 +00002429 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002430 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2431 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002432 def test_protocol_sslv2(self):
2433 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002434 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002435 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002436 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2437 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2438 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002439 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Victor Stinner648b8622014-12-12 12:23:59 +01002440 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2441 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002442 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002443 # SSLv23 client with specific SSL options
2444 if no_sslv2_implies_sslv3_hello():
2445 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2446 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2447 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002448 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002449 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002450 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002451 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002452
Antoine Pitrou23df4832010-08-04 17:14:06 +00002453 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002454 def test_protocol_sslv23(self):
2455 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002456 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002457 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002458 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2459 try:
2460 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002461 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002462 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2463 if support.verbose:
2464 sys.stdout.write(
2465 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2466 % str(x))
Benjamin Petersone32467c2014-12-05 21:59:35 -05002467 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002468 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002469 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002470 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002471
Benjamin Petersone32467c2014-12-05 21:59:35 -05002472 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002473 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002474 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002475 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002476
Benjamin Petersone32467c2014-12-05 21:59:35 -05002477 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002478 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002479 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002480 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002481
Antoine Pitroub5218772010-05-21 09:56:06 +00002482 # Server with specific SSL options
Benjamin Petersone32467c2014-12-05 21:59:35 -05002483 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2484 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002485 server_options=ssl.OP_NO_SSLv3)
2486 # Will choose TLSv1
2487 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2488 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2489 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2490 server_options=ssl.OP_NO_TLSv1)
2491
2492
Antoine Pitrou23df4832010-08-04 17:14:06 +00002493 @skip_if_broken_ubuntu_ssl
Benjamin Petersone32467c2014-12-05 21:59:35 -05002494 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2495 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002496 def test_protocol_sslv3(self):
2497 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002498 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002499 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002500 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2501 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2502 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002503 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2504 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002505 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2506 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002507 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002508 if no_sslv2_implies_sslv3_hello():
2509 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002510 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2511 False, client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002512
Antoine Pitrou23df4832010-08-04 17:14:06 +00002513 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002514 def test_protocol_tlsv1(self):
2515 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002516 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002517 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002518 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2519 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2520 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002521 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2522 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002523 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2524 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002525 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2526 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002527
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002528 @skip_if_broken_ubuntu_ssl
2529 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2530 "TLS version 1.1 not supported.")
2531 def test_protocol_tlsv1_1(self):
2532 """Connecting to a TLSv1.1 server with various client options.
2533 Testing against older TLS versions."""
2534 if support.verbose:
2535 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002536 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002537 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2538 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002539 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2540 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002541 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2542 client_options=ssl.OP_NO_TLSv1_1)
2543
Antoine Pitrou47e40422014-09-04 21:00:10 +02002544 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002545 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2546 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2547
2548
2549 @skip_if_broken_ubuntu_ssl
2550 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2551 "TLS version 1.2 not supported.")
2552 def test_protocol_tlsv1_2(self):
2553 """Connecting to a TLSv1.2 server with various client options.
2554 Testing against older TLS versions."""
2555 if support.verbose:
2556 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002557 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002558 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2559 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2560 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2561 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002562 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2563 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002564 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2565 client_options=ssl.OP_NO_TLSv1_2)
2566
Antoine Pitrou47e40422014-09-04 21:00:10 +02002567 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002568 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2569 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2570 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2571 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2572
Antoine Pitrou480a1242010-04-28 21:37:09 +00002573 def test_starttls(self):
2574 """Switching from clear text to encrypted and back again."""
2575 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002576
Trent Nelson78520002008-04-10 20:54:35 +00002577 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002578 ssl_version=ssl.PROTOCOL_TLSv1,
2579 starttls_server=True,
2580 chatty=True,
2581 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002582 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002583 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002584 s = socket.socket()
2585 s.setblocking(1)
2586 s.connect((HOST, server.port))
2587 if support.verbose:
2588 sys.stdout.write("\n")
2589 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002590 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002591 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002592 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002593 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002594 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002595 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002596 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002597 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002598 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002599 msg = outdata.strip().lower()
2600 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2601 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002602 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002603 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002604 " client: read %r from server, starting TLS...\n"
2605 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002606 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2607 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002608 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2609 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002610 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002611 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002612 " client: read %r from server, ending TLS...\n"
2613 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002614 s = conn.unwrap()
2615 wrapped = False
2616 else:
2617 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002618 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002619 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002620 if support.verbose:
2621 sys.stdout.write(" client: closing connection.\n")
2622 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002623 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002624 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002625 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002626 if wrapped:
2627 conn.close()
2628 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002629 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002630
Antoine Pitrou480a1242010-04-28 21:37:09 +00002631 def test_socketserver(self):
2632 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002633 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002634 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002635 if support.verbose:
2636 sys.stdout.write('\n')
2637 with open(CERTFILE, 'rb') as f:
2638 d1 = f.read()
2639 d2 = ''
2640 # now fetch the same data from the HTTPS server
Benjamin Peterson4ffb0752014-11-03 14:29:33 -05002641 url = 'https://localhost:%d/%s' % (
2642 server.port, os.path.split(CERTFILE)[1])
2643 context = ssl.create_default_context(cafile=CERTFILE)
2644 f = urllib.request.urlopen(url, context=context)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002645 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002646 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002647 if dlen and (int(dlen) > 0):
2648 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002649 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002650 sys.stdout.write(
2651 " client: read %d bytes from remote server '%s'\n"
2652 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002653 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002654 f.close()
2655 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002656
Antoine Pitrou480a1242010-04-28 21:37:09 +00002657 def test_asyncore_server(self):
2658 """Check the example asyncore integration."""
2659 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002660
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002661 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002662 sys.stdout.write("\n")
2663
Antoine Pitrou480a1242010-04-28 21:37:09 +00002664 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002665 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002666 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002667 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002668 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002669 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002670 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002671 " client: sending %r...\n" % indata)
2672 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002673 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002674 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002675 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002676 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002677 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002678 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2679 % (outdata[:20], len(outdata),
2680 indata[:20].lower(), len(indata)))
2681 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002682 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002683 sys.stdout.write(" client: closing connection.\n")
2684 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002685 if support.verbose:
2686 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002687
Antoine Pitrou480a1242010-04-28 21:37:09 +00002688 def test_recv_send(self):
2689 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002690 if support.verbose:
2691 sys.stdout.write("\n")
2692
2693 server = ThreadedEchoServer(CERTFILE,
2694 certreqs=ssl.CERT_NONE,
2695 ssl_version=ssl.PROTOCOL_TLSv1,
2696 cacerts=CERTFILE,
2697 chatty=True,
2698 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002699 with server:
2700 s = ssl.wrap_socket(socket.socket(),
2701 server_side=False,
2702 certfile=CERTFILE,
2703 ca_certs=CERTFILE,
2704 cert_reqs=ssl.CERT_NONE,
2705 ssl_version=ssl.PROTOCOL_TLSv1)
2706 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002707 # helper methods for standardising recv* method signatures
2708 def _recv_into():
2709 b = bytearray(b"\0"*100)
2710 count = s.recv_into(b)
2711 return b[:count]
2712
2713 def _recvfrom_into():
2714 b = bytearray(b"\0"*100)
2715 count, addr = s.recvfrom_into(b)
2716 return b[:count]
2717
Martin Panter519f9122016-04-03 02:12:54 +00002718 # (name, method, expect success?, *args, return value func)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002719 send_methods = [
Martin Panter519f9122016-04-03 02:12:54 +00002720 ('send', s.send, True, [], len),
2721 ('sendto', s.sendto, False, ["some.address"], len),
2722 ('sendall', s.sendall, True, [], lambda x: None),
Bill Janssen58afe4c2008-09-08 16:45:19 +00002723 ]
Martin Panter519f9122016-04-03 02:12:54 +00002724 # (name, method, whether to expect success, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002725 recv_methods = [
2726 ('recv', s.recv, True, []),
2727 ('recvfrom', s.recvfrom, False, ["some.address"]),
2728 ('recv_into', _recv_into, True, []),
2729 ('recvfrom_into', _recvfrom_into, False, []),
2730 ]
2731 data_prefix = "PREFIX_"
2732
Martin Panter519f9122016-04-03 02:12:54 +00002733 for (meth_name, send_meth, expect_success, args,
2734 ret_val_meth) in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002735 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002736 try:
Martin Panter519f9122016-04-03 02:12:54 +00002737 ret = send_meth(indata, *args)
2738 msg = "sending with {}".format(meth_name)
2739 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002740 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002741 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002742 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002743 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002744 "<<{outdata:r}>> ({nout:d}) received; "
2745 "expected <<{indata:r}>> ({nin:d})\n".format(
2746 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002747 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002748 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002749 )
2750 )
2751 except ValueError as e:
2752 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002753 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002754 "Failed to send with method <<{name:s}>>; "
2755 "expected to succeed.\n".format(name=meth_name)
2756 )
2757 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002758 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002759 "Method <<{name:s}>> failed with unexpected "
2760 "exception message: {exp:s}\n".format(
2761 name=meth_name, exp=e
2762 )
2763 )
2764
2765 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002766 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002767 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002768 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002769 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002770 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002771 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002772 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002773 "<<{outdata:r}>> ({nout:d}) received; "
2774 "expected <<{indata:r}>> ({nin:d})\n".format(
2775 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002776 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002777 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002778 )
2779 )
2780 except ValueError as e:
2781 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002782 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002783 "Failed to receive with method <<{name:s}>>; "
2784 "expected to succeed.\n".format(name=meth_name)
2785 )
2786 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002787 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002788 "Method <<{name:s}>> failed with unexpected "
2789 "exception message: {exp:s}\n".format(
2790 name=meth_name, exp=e
2791 )
2792 )
2793 # consume data
2794 s.read()
2795
Martin Panterf6b1d662016-03-28 00:22:09 +00002796 # read(-1, buffer) is supported, even though read(-1) is not
Martin Panterbed7f1a2016-07-11 00:17:13 +00002797 data = b"data"
Martin Panter5503d472016-03-27 05:35:19 +00002798 s.send(data)
2799 buffer = bytearray(len(data))
2800 self.assertEqual(s.read(-1, buffer), len(data))
2801 self.assertEqual(buffer, data)
2802
Nick Coghlan513886a2011-08-28 00:00:27 +10002803 # Make sure sendmsg et al are disallowed to avoid
2804 # inadvertent disclosure of data and/or corruption
2805 # of the encrypted data stream
2806 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2807 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2808 self.assertRaises(NotImplementedError,
2809 s.recvmsg_into, bytearray(100))
2810
Antoine Pitrou480a1242010-04-28 21:37:09 +00002811 s.write(b"over\n")
Martin Panter5503d472016-03-27 05:35:19 +00002812
2813 self.assertRaises(ValueError, s.recv, -1)
2814 self.assertRaises(ValueError, s.read, -1)
2815
Bill Janssen58afe4c2008-09-08 16:45:19 +00002816 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002817
Martin Panterbed7f1a2016-07-11 00:17:13 +00002818 def test_recv_zero(self):
2819 server = ThreadedEchoServer(CERTFILE)
2820 server.__enter__()
2821 self.addCleanup(server.__exit__, None, None)
2822 s = socket.create_connection((HOST, server.port))
2823 self.addCleanup(s.close)
2824 s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
2825 self.addCleanup(s.close)
2826
2827 # recv/read(0) should return no data
2828 s.send(b"data")
2829 self.assertEqual(s.recv(0), b"")
2830 self.assertEqual(s.read(0), b"")
2831 self.assertEqual(s.read(), b"data")
2832
2833 # Should not block if the other end sends no data
2834 s.setblocking(False)
2835 self.assertEqual(s.recv(0), b"")
2836 self.assertEqual(s.recv_into(bytearray()), 0)
2837
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002838 def test_nonblocking_send(self):
2839 server = ThreadedEchoServer(CERTFILE,
2840 certreqs=ssl.CERT_NONE,
2841 ssl_version=ssl.PROTOCOL_TLSv1,
2842 cacerts=CERTFILE,
2843 chatty=True,
2844 connectionchatty=False)
2845 with server:
2846 s = ssl.wrap_socket(socket.socket(),
2847 server_side=False,
2848 certfile=CERTFILE,
2849 ca_certs=CERTFILE,
2850 cert_reqs=ssl.CERT_NONE,
2851 ssl_version=ssl.PROTOCOL_TLSv1)
2852 s.connect((HOST, server.port))
2853 s.setblocking(False)
2854
2855 # If we keep sending data, at some point the buffers
2856 # will be full and the call will block
2857 buf = bytearray(8192)
2858 def fill_buffer():
2859 while True:
2860 s.send(buf)
2861 self.assertRaises((ssl.SSLWantWriteError,
2862 ssl.SSLWantReadError), fill_buffer)
2863
2864 # Now read all the output and discard it
2865 s.setblocking(True)
2866 s.close()
2867
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002868 def test_handshake_timeout(self):
2869 # Issue #5103: SSL handshake must respect the socket timeout
2870 server = socket.socket(socket.AF_INET)
2871 host = "127.0.0.1"
2872 port = support.bind_port(server)
2873 started = threading.Event()
2874 finish = False
2875
2876 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002877 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002878 started.set()
2879 conns = []
2880 while not finish:
2881 r, w, e = select.select([server], [], [], 0.1)
2882 if server in r:
2883 # Let the socket hang around rather than having
2884 # it closed by garbage collection.
2885 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002886 for sock in conns:
2887 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002888
2889 t = threading.Thread(target=serve)
2890 t.start()
2891 started.wait()
2892
2893 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002894 try:
2895 c = socket.socket(socket.AF_INET)
2896 c.settimeout(0.2)
2897 c.connect((host, port))
2898 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002899 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002900 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002901 finally:
2902 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002903 try:
2904 c = socket.socket(socket.AF_INET)
2905 c = ssl.wrap_socket(c)
2906 c.settimeout(0.2)
2907 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002908 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002909 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002910 finally:
2911 c.close()
2912 finally:
2913 finish = True
2914 t.join()
2915 server.close()
2916
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002917 def test_server_accept(self):
2918 # Issue #16357: accept() on a SSLSocket created through
2919 # SSLContext.wrap_socket().
2920 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2921 context.verify_mode = ssl.CERT_REQUIRED
2922 context.load_verify_locations(CERTFILE)
2923 context.load_cert_chain(CERTFILE)
2924 server = socket.socket(socket.AF_INET)
2925 host = "127.0.0.1"
2926 port = support.bind_port(server)
2927 server = context.wrap_socket(server, server_side=True)
2928
2929 evt = threading.Event()
2930 remote = None
2931 peer = None
2932 def serve():
2933 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002934 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002935 # Block on the accept and wait on the connection to close.
2936 evt.set()
2937 remote, peer = server.accept()
2938 remote.recv(1)
2939
2940 t = threading.Thread(target=serve)
2941 t.start()
2942 # Client wait until server setup and perform a connect.
2943 evt.wait()
2944 client = context.wrap_socket(socket.socket())
2945 client.connect((host, port))
2946 client_addr = client.getsockname()
2947 client.close()
2948 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002949 remote.close()
2950 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002951 # Sanity checks.
2952 self.assertIsInstance(remote, ssl.SSLSocket)
2953 self.assertEqual(peer, client_addr)
2954
Antoine Pitrou242db722013-05-01 20:52:07 +02002955 def test_getpeercert_enotconn(self):
2956 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2957 with context.wrap_socket(socket.socket()) as sock:
2958 with self.assertRaises(OSError) as cm:
2959 sock.getpeercert()
2960 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2961
2962 def test_do_handshake_enotconn(self):
2963 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2964 with context.wrap_socket(socket.socket()) as sock:
2965 with self.assertRaises(OSError) as cm:
2966 sock.do_handshake()
2967 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2968
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002969 def test_default_ciphers(self):
2970 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2971 try:
2972 # Force a set of weak ciphers on our client context
2973 context.set_ciphers("DES")
2974 except ssl.SSLError:
2975 self.skipTest("no DES cipher available")
2976 with ThreadedEchoServer(CERTFILE,
2977 ssl_version=ssl.PROTOCOL_SSLv23,
2978 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002979 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002980 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002981 s.connect((HOST, server.port))
2982 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2983
Antoine Pitrou47e40422014-09-04 21:00:10 +02002984 def test_version_basic(self):
2985 """
2986 Basic tests for SSLSocket.version().
2987 More tests are done in the test_protocol_*() methods.
2988 """
2989 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2990 with ThreadedEchoServer(CERTFILE,
2991 ssl_version=ssl.PROTOCOL_TLSv1,
2992 chatty=False) as server:
2993 with context.wrap_socket(socket.socket()) as s:
2994 self.assertIs(s.version(), None)
2995 s.connect((HOST, server.port))
Christian Heimes598894f2016-09-05 23:19:05 +02002996 self.assertEqual(s.version(), 'TLSv1')
Antoine Pitrou47e40422014-09-04 21:00:10 +02002997 self.assertIs(s.version(), None)
2998
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002999 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3000 def test_default_ecdh_curve(self):
3001 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3002 # should be enabled by default on SSL contexts.
3003 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3004 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02003005 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3006 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3007 # our default cipher list should prefer ECDH-based ciphers
3008 # automatically.
3009 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3010 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01003011 with ThreadedEchoServer(context=context) as server:
3012 with context.wrap_socket(socket.socket()) as s:
3013 s.connect((HOST, server.port))
3014 self.assertIn("ECDH", s.cipher()[0])
3015
Antoine Pitroud6494802011-07-21 01:11:30 +02003016 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3017 "'tls-unique' channel binding not available")
3018 def test_tls_unique_channel_binding(self):
3019 """Test tls-unique channel binding."""
3020 if support.verbose:
3021 sys.stdout.write("\n")
3022
3023 server = ThreadedEchoServer(CERTFILE,
3024 certreqs=ssl.CERT_NONE,
3025 ssl_version=ssl.PROTOCOL_TLSv1,
3026 cacerts=CERTFILE,
3027 chatty=True,
3028 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01003029 with server:
3030 s = ssl.wrap_socket(socket.socket(),
3031 server_side=False,
3032 certfile=CERTFILE,
3033 ca_certs=CERTFILE,
3034 cert_reqs=ssl.CERT_NONE,
3035 ssl_version=ssl.PROTOCOL_TLSv1)
3036 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003037 # get the data
3038 cb_data = s.get_channel_binding("tls-unique")
3039 if support.verbose:
3040 sys.stdout.write(" got channel binding data: {0!r}\n"
3041 .format(cb_data))
3042
3043 # check if it is sane
3044 self.assertIsNotNone(cb_data)
3045 self.assertEqual(len(cb_data), 12) # True for TLSv1
3046
3047 # and compare with the peers version
3048 s.write(b"CB tls-unique\n")
3049 peer_data_repr = s.read().strip()
3050 self.assertEqual(peer_data_repr,
3051 repr(cb_data).encode("us-ascii"))
3052 s.close()
3053
3054 # now, again
3055 s = ssl.wrap_socket(socket.socket(),
3056 server_side=False,
3057 certfile=CERTFILE,
3058 ca_certs=CERTFILE,
3059 cert_reqs=ssl.CERT_NONE,
3060 ssl_version=ssl.PROTOCOL_TLSv1)
3061 s.connect((HOST, server.port))
3062 new_cb_data = s.get_channel_binding("tls-unique")
3063 if support.verbose:
3064 sys.stdout.write(" got another channel binding data: {0!r}\n"
3065 .format(new_cb_data))
3066 # is it really unique
3067 self.assertNotEqual(cb_data, new_cb_data)
3068 self.assertIsNotNone(cb_data)
3069 self.assertEqual(len(cb_data), 12) # True for TLSv1
3070 s.write(b"CB tls-unique\n")
3071 peer_data_repr = s.read().strip()
3072 self.assertEqual(peer_data_repr,
3073 repr(new_cb_data).encode("us-ascii"))
3074 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00003075
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003076 def test_compression(self):
3077 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3078 context.load_cert_chain(CERTFILE)
3079 stats = server_params_test(context, context,
3080 chatty=True, connectionchatty=True)
3081 if support.verbose:
3082 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3083 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3084
3085 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3086 "ssl.OP_NO_COMPRESSION needed for this test")
3087 def test_compression_disabled(self):
3088 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3089 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01003090 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003091 stats = server_params_test(context, context,
3092 chatty=True, connectionchatty=True)
3093 self.assertIs(stats['compression'], None)
3094
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003095 def test_dh_params(self):
3096 # Check we can get a connection with ephemeral Diffie-Hellman
3097 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3098 context.load_cert_chain(CERTFILE)
3099 context.load_dh_params(DHFILE)
3100 context.set_ciphers("kEDH")
3101 stats = server_params_test(context, context,
3102 chatty=True, connectionchatty=True)
3103 cipher = stats["cipher"][0]
3104 parts = cipher.split("-")
3105 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3106 self.fail("Non-DH cipher: " + cipher[0])
3107
Benjamin Petersoncca27322015-01-23 16:35:37 -05003108 def test_selected_alpn_protocol(self):
3109 # selected_alpn_protocol() is None unless ALPN is used.
3110 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3111 context.load_cert_chain(CERTFILE)
3112 stats = server_params_test(context, context,
3113 chatty=True, connectionchatty=True)
3114 self.assertIs(stats['client_alpn_protocol'], None)
3115
3116 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3117 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3118 # selected_alpn_protocol() is None unless ALPN is used by the client.
3119 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3120 client_context.load_verify_locations(CERTFILE)
3121 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3122 server_context.load_cert_chain(CERTFILE)
3123 server_context.set_alpn_protocols(['foo', 'bar'])
3124 stats = server_params_test(client_context, server_context,
3125 chatty=True, connectionchatty=True)
3126 self.assertIs(stats['client_alpn_protocol'], None)
3127
3128 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3129 def test_alpn_protocols(self):
3130 server_protocols = ['foo', 'bar', 'milkshake']
3131 protocol_tests = [
3132 (['foo', 'bar'], 'foo'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003133 (['bar', 'foo'], 'foo'),
Benjamin Petersoncca27322015-01-23 16:35:37 -05003134 (['milkshake'], 'milkshake'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003135 (['http/3.0', 'http/4.0'], None)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003136 ]
3137 for client_protocols, expected in protocol_tests:
Christian Heimes598894f2016-09-05 23:19:05 +02003138 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003139 server_context.load_cert_chain(CERTFILE)
3140 server_context.set_alpn_protocols(server_protocols)
Christian Heimes598894f2016-09-05 23:19:05 +02003141 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003142 client_context.load_cert_chain(CERTFILE)
3143 client_context.set_alpn_protocols(client_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003144
Christian Heimes598894f2016-09-05 23:19:05 +02003145 try:
3146 stats = server_params_test(client_context,
3147 server_context,
3148 chatty=True,
3149 connectionchatty=True)
3150 except ssl.SSLError as e:
3151 stats = e
3152
3153 if expected is None and IS_OPENSSL_1_1:
3154 # OpenSSL 1.1.0 raises handshake error
3155 self.assertIsInstance(stats, ssl.SSLError)
3156 else:
3157 msg = "failed trying %s (s) and %s (c).\n" \
3158 "was expecting %s, but got %%s from the %%s" \
3159 % (str(server_protocols), str(client_protocols),
3160 str(expected))
3161 client_result = stats['client_alpn_protocol']
3162 self.assertEqual(client_result, expected,
3163 msg % (client_result, "client"))
3164 server_result = stats['server_alpn_protocols'][-1] \
3165 if len(stats['server_alpn_protocols']) else 'nothing'
3166 self.assertEqual(server_result, expected,
3167 msg % (server_result, "server"))
Benjamin Petersoncca27322015-01-23 16:35:37 -05003168
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01003169 def test_selected_npn_protocol(self):
3170 # selected_npn_protocol() is None unless NPN is used
3171 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3172 context.load_cert_chain(CERTFILE)
3173 stats = server_params_test(context, context,
3174 chatty=True, connectionchatty=True)
3175 self.assertIs(stats['client_npn_protocol'], None)
3176
3177 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3178 def test_npn_protocols(self):
3179 server_protocols = ['http/1.1', 'spdy/2']
3180 protocol_tests = [
3181 (['http/1.1', 'spdy/2'], 'http/1.1'),
3182 (['spdy/2', 'http/1.1'], 'http/1.1'),
3183 (['spdy/2', 'test'], 'spdy/2'),
3184 (['abc', 'def'], 'abc')
3185 ]
3186 for client_protocols, expected in protocol_tests:
3187 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3188 server_context.load_cert_chain(CERTFILE)
3189 server_context.set_npn_protocols(server_protocols)
3190 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3191 client_context.load_cert_chain(CERTFILE)
3192 client_context.set_npn_protocols(client_protocols)
3193 stats = server_params_test(client_context, server_context,
3194 chatty=True, connectionchatty=True)
3195
3196 msg = "failed trying %s (s) and %s (c).\n" \
3197 "was expecting %s, but got %%s from the %%s" \
3198 % (str(server_protocols), str(client_protocols),
3199 str(expected))
3200 client_result = stats['client_npn_protocol']
3201 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3202 server_result = stats['server_npn_protocols'][-1] \
3203 if len(stats['server_npn_protocols']) else 'nothing'
3204 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3205
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003206 def sni_contexts(self):
3207 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3208 server_context.load_cert_chain(SIGNED_CERTFILE)
3209 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3210 other_context.load_cert_chain(SIGNED_CERTFILE2)
3211 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3212 client_context.verify_mode = ssl.CERT_REQUIRED
3213 client_context.load_verify_locations(SIGNING_CA)
3214 return server_context, other_context, client_context
3215
3216 def check_common_name(self, stats, name):
3217 cert = stats['peercert']
3218 self.assertIn((('commonName', name),), cert['subject'])
3219
3220 @needs_sni
3221 def test_sni_callback(self):
3222 calls = []
3223 server_context, other_context, client_context = self.sni_contexts()
3224
3225 def servername_cb(ssl_sock, server_name, initial_context):
3226 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003227 if server_name is not None:
3228 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003229 server_context.set_servername_callback(servername_cb)
3230
3231 stats = server_params_test(client_context, server_context,
3232 chatty=True,
3233 sni_name='supermessage')
3234 # The hostname was fetched properly, and the certificate was
3235 # changed for the connection.
3236 self.assertEqual(calls, [("supermessage", server_context)])
3237 # CERTFILE4 was selected
3238 self.check_common_name(stats, 'fakehostname')
3239
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003240 calls = []
3241 # The callback is called with server_name=None
3242 stats = server_params_test(client_context, server_context,
3243 chatty=True,
3244 sni_name=None)
3245 self.assertEqual(calls, [(None, server_context)])
3246 self.check_common_name(stats, 'localhost')
3247
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003248 # Check disabling the callback
3249 calls = []
3250 server_context.set_servername_callback(None)
3251
3252 stats = server_params_test(client_context, server_context,
3253 chatty=True,
3254 sni_name='notfunny')
3255 # Certificate didn't change
3256 self.check_common_name(stats, 'localhost')
3257 self.assertEqual(calls, [])
3258
3259 @needs_sni
3260 def test_sni_callback_alert(self):
3261 # Returning a TLS alert is reflected to the connecting client
3262 server_context, other_context, client_context = self.sni_contexts()
3263
3264 def cb_returning_alert(ssl_sock, server_name, initial_context):
3265 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3266 server_context.set_servername_callback(cb_returning_alert)
3267
3268 with self.assertRaises(ssl.SSLError) as cm:
3269 stats = server_params_test(client_context, server_context,
3270 chatty=False,
3271 sni_name='supermessage')
3272 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3273
3274 @needs_sni
3275 def test_sni_callback_raising(self):
3276 # Raising fails the connection with a TLS handshake failure alert.
3277 server_context, other_context, client_context = self.sni_contexts()
3278
3279 def cb_raising(ssl_sock, server_name, initial_context):
3280 1/0
3281 server_context.set_servername_callback(cb_raising)
3282
3283 with self.assertRaises(ssl.SSLError) as cm, \
3284 support.captured_stderr() as stderr:
3285 stats = server_params_test(client_context, server_context,
3286 chatty=False,
3287 sni_name='supermessage')
3288 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3289 self.assertIn("ZeroDivisionError", stderr.getvalue())
3290
3291 @needs_sni
3292 def test_sni_callback_wrong_return_type(self):
3293 # Returning the wrong return type terminates the TLS connection
3294 # with an internal error alert.
3295 server_context, other_context, client_context = self.sni_contexts()
3296
3297 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3298 return "foo"
3299 server_context.set_servername_callback(cb_wrong_return_type)
3300
3301 with self.assertRaises(ssl.SSLError) as cm, \
3302 support.captured_stderr() as stderr:
3303 stats = server_params_test(client_context, server_context,
3304 chatty=False,
3305 sni_name='supermessage')
3306 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3307 self.assertIn("TypeError", stderr.getvalue())
3308
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003309 def test_shared_ciphers(self):
Benjamin Petersonaacd5242015-01-07 11:42:38 -06003310 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson15042922015-01-07 22:12:43 -06003311 server_context.load_cert_chain(SIGNED_CERTFILE)
3312 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3313 client_context.verify_mode = ssl.CERT_REQUIRED
3314 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes598894f2016-09-05 23:19:05 +02003315 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3316 client_context.set_ciphers("AES128:AES256")
3317 server_context.set_ciphers("AES256")
3318 alg1 = "AES256"
3319 alg2 = "AES-256"
3320 else:
3321 client_context.set_ciphers("AES:3DES")
3322 server_context.set_ciphers("3DES")
3323 alg1 = "3DES"
3324 alg2 = "DES-CBC3"
3325
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003326 stats = server_params_test(client_context, server_context)
3327 ciphers = stats['server_shared_ciphers'][0]
3328 self.assertGreater(len(ciphers), 0)
3329 for name, tls_version, bits in ciphers:
Christian Heimes598894f2016-09-05 23:19:05 +02003330 if not alg1 in name.split("-") and alg2 not in name:
3331 self.fail(name)
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003332
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003333 def test_read_write_after_close_raises_valuerror(self):
3334 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3335 context.verify_mode = ssl.CERT_REQUIRED
3336 context.load_verify_locations(CERTFILE)
3337 context.load_cert_chain(CERTFILE)
3338 server = ThreadedEchoServer(context=context, chatty=False)
3339
3340 with server:
3341 s = context.wrap_socket(socket.socket())
3342 s.connect((HOST, server.port))
3343 s.close()
3344
3345 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02003346 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003347
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003348 def test_sendfile(self):
3349 TEST_DATA = b"x" * 512
3350 with open(support.TESTFN, 'wb') as f:
3351 f.write(TEST_DATA)
3352 self.addCleanup(support.unlink, support.TESTFN)
3353 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3354 context.verify_mode = ssl.CERT_REQUIRED
3355 context.load_verify_locations(CERTFILE)
3356 context.load_cert_chain(CERTFILE)
3357 server = ThreadedEchoServer(context=context, chatty=False)
3358 with server:
3359 with context.wrap_socket(socket.socket()) as s:
3360 s.connect((HOST, server.port))
3361 with open(support.TESTFN, 'rb') as file:
3362 s.sendfile(file)
3363 self.assertEqual(s.recv(1024), TEST_DATA)
3364
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003365
Thomas Woutersed03b412007-08-28 21:37:11 +00003366def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003367 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003368 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003369 plats = {
3370 'Linux': platform.linux_distribution,
3371 'Mac': platform.mac_ver,
3372 'Windows': platform.win32_ver,
3373 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003374 with warnings.catch_warnings():
3375 warnings.filterwarnings(
3376 'ignore',
3377 'dist\(\) and linux_distribution\(\) '
3378 'functions are deprecated .*',
3379 PendingDeprecationWarning,
3380 )
3381 for name, func in plats.items():
3382 plat = func()
3383 if plat and plat[0]:
3384 plat = '%s %r' % (name, plat)
3385 break
3386 else:
3387 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003388 print("test_ssl: testing with %r %r" %
3389 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3390 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003391 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003392 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3393 try:
3394 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3395 except AttributeError:
3396 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003397
Antoine Pitrou152efa22010-05-16 18:19:27 +00003398 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003399 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003400 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003401 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003402 BADCERT, BADKEY, EMPTYCERT]:
3403 if not os.path.exists(filename):
3404 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003405
Martin Panter3840b2a2016-03-27 01:53:46 +00003406 tests = [
3407 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
3408 SimpleBackgroundTests,
3409 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003410
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003411 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003412 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003413
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003414 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003415 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003416 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003417 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003418
Antoine Pitrou480a1242010-04-28 21:37:09 +00003419 try:
3420 support.run_unittest(*tests)
3421 finally:
3422 if _have_threads:
3423 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003424
3425if __name__ == "__main__":
3426 test_main()