blob: 4e0e4a2185a7e7bdb15c40d580319fb7ccad53d5 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Thomas Woutersed03b412007-08-28 21:37:11 +000021
Antoine Pitrou05d936d2010-10-13 11:38:36 +000022ssl = support.import_module("ssl")
23
Martin Panter3840b2a2016-03-27 01:53:46 +000024try:
25 import threading
26except ImportError:
27 _have_threads = False
28else:
29 _have_threads = True
30
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010031PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020033IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
34IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
35
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Christian Heimesefff7062013-11-21 03:35:02 +010037def data_file(*name):
38 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000039
Antoine Pitrou81564092010-10-08 23:06:24 +000040# The custom key and certificate files used in test_ssl are generated
41# using Lib/test/make_ssl_certs.py.
42# Other certificates are simply fetched from the Internet servers they
43# are meant to authenticate.
44
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000047ONLYCERT = data_file("ssl_cert.pem")
48ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000049BYTES_ONLYCERT = os.fsencode(ONLYCERT)
50BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020051CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
52ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
53KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000054CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000055BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010056CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
57CAFILE_CACERT = data_file("capath", "5ed36f99.0")
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059
Christian Heimes22587792013-11-21 23:56:13 +010060# empty CRL
61CRLFILE = data_file("revocation.crl")
62
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010063# Two keys and certs signed by the same CA (for SNI tests)
64SIGNED_CERTFILE = data_file("keycert3.pem")
65SIGNED_CERTFILE2 = data_file("keycert4.pem")
Martin Panter3840b2a2016-03-27 01:53:46 +000066# Same certificate as pycacert.pem, but without extra text in file
67SIGNING_CA = data_file("capath", "ceff1710.0")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010068
Martin Panter3d81d932016-01-14 09:36:00 +000069REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +000070
71EMPTYCERT = data_file("nullcert.pem")
72BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +000073NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000074BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020075NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020076NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000077
Benjamin Petersona7eaf562015-04-02 00:04:06 -040078DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +010079BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000080
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010081
Thomas Woutersed03b412007-08-28 21:37:11 +000082def handle_error(prefix):
83 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000084 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000085 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000086
Antoine Pitroub5218772010-05-21 09:56:06 +000087def can_clear_options():
88 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020089 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000090
91def no_sslv2_implies_sslv3_hello():
92 # 0.9.7h or higher
93 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
94
Christian Heimes2427b502013-11-23 11:24:32 +010095def have_verify_flags():
96 # 0.9.8 or higher
97 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
98
Antoine Pitrouc695c952014-04-28 20:57:36 +020099def utc_offset(): #NOTE: ignore issues like #1647654
100 # local time = utc time + utc offset
101 if time.daylight and time.localtime().tm_isdst > 0:
102 return -time.altzone # seconds
103 return -time.timezone
104
Christian Heimes9424bb42013-06-17 15:32:57 +0200105def asn1time(cert_time):
106 # Some versions of OpenSSL ignore seconds, see #18207
107 # 0.9.8.i
108 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
109 fmt = "%b %d %H:%M:%S %Y GMT"
110 dt = datetime.datetime.strptime(cert_time, fmt)
111 dt = dt.replace(second=0)
112 cert_time = dt.strftime(fmt)
113 # %d adds leading zero but ASN1_TIME_print() uses leading space
114 if cert_time[4] == "0":
115 cert_time = cert_time[:4] + " " + cert_time[5:]
116
117 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000118
Antoine Pitrou23df4832010-08-04 17:14:06 +0000119# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
120def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200121 if hasattr(ssl, 'PROTOCOL_SSLv2'):
122 @functools.wraps(func)
123 def f(*args, **kwargs):
124 try:
125 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
126 except ssl.SSLError:
127 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
128 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
129 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
130 return func(*args, **kwargs)
131 return f
132 else:
133 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000134
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100135needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
136
Antoine Pitrou23df4832010-08-04 17:14:06 +0000137
Antoine Pitrou152efa22010-05-16 18:19:27 +0000138class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000139
Antoine Pitrou480a1242010-04-28 21:37:09 +0000140 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000141 ssl.CERT_NONE
142 ssl.CERT_OPTIONAL
143 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100144 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100145 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100146 if ssl.HAS_ECDH:
147 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100148 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
149 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000150 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100151 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000152
Antoine Pitrou172f0252014-04-18 20:33:08 +0200153 def test_str_for_enums(self):
154 # Make sure that the PROTOCOL_* constants have enum-like string
155 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200156 proto = ssl.PROTOCOL_TLS
157 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200158 ctx = ssl.SSLContext(proto)
159 self.assertIs(ctx.protocol, proto)
160
Antoine Pitrou480a1242010-04-28 21:37:09 +0000161 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000162 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000163 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000164 sys.stdout.write("\n RAND_status is %d (%s)\n"
165 % (v, (v and "sufficient randomness") or
166 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200167
168 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
169 self.assertEqual(len(data), 16)
170 self.assertEqual(is_cryptographic, v == 1)
171 if v:
172 data = ssl.RAND_bytes(16)
173 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200174 else:
175 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200176
Victor Stinner1e81a392013-12-19 16:47:04 +0100177 # negative num is invalid
178 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
179 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
180
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100181 if hasattr(ssl, 'RAND_egd'):
182 self.assertRaises(TypeError, ssl.RAND_egd, 1)
183 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000184 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200185 ssl.RAND_add(b"this is a random bytes object", 75.0)
186 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000187
Christian Heimesf77b4b22013-08-21 13:26:05 +0200188 @unittest.skipUnless(os.name == 'posix', 'requires posix')
189 def test_random_fork(self):
190 status = ssl.RAND_status()
191 if not status:
192 self.fail("OpenSSL's PRNG has insufficient randomness")
193
194 rfd, wfd = os.pipe()
195 pid = os.fork()
196 if pid == 0:
197 try:
198 os.close(rfd)
199 child_random = ssl.RAND_pseudo_bytes(16)[0]
200 self.assertEqual(len(child_random), 16)
201 os.write(wfd, child_random)
202 os.close(wfd)
203 except BaseException:
204 os._exit(1)
205 else:
206 os._exit(0)
207 else:
208 os.close(wfd)
209 self.addCleanup(os.close, rfd)
210 _, status = os.waitpid(pid, 0)
211 self.assertEqual(status, 0)
212
213 child_random = os.read(rfd, 16)
214 self.assertEqual(len(child_random), 16)
215 parent_random = ssl.RAND_pseudo_bytes(16)[0]
216 self.assertEqual(len(parent_random), 16)
217
218 self.assertNotEqual(child_random, parent_random)
219
Antoine Pitrou480a1242010-04-28 21:37:09 +0000220 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000221 # note that this uses an 'unofficial' function in _ssl.c,
222 # provided solely for this test, to exercise the certificate
223 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000224 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000226 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200227 self.assertEqual(p['issuer'],
228 ((('countryName', 'XY'),),
229 (('localityName', 'Castle Anthrax'),),
230 (('organizationName', 'Python Software Foundation'),),
231 (('commonName', 'localhost'),))
232 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100233 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200234 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
235 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200236 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
237 self.assertEqual(p['subject'],
238 ((('countryName', 'XY'),),
239 (('localityName', 'Castle Anthrax'),),
240 (('organizationName', 'Python Software Foundation'),),
241 (('commonName', 'localhost'),))
242 )
243 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
244 # Issue #13034: the subjectAltName in some certificates
245 # (notably projects.developer.nokia.com:443) wasn't parsed
246 p = ssl._ssl._test_decode_cert(NOKIACERT)
247 if support.verbose:
248 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
249 self.assertEqual(p['subjectAltName'],
250 (('DNS', 'projects.developer.nokia.com'),
251 ('DNS', 'projects.forum.nokia.com'))
252 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100253 # extra OCSP and AIA fields
254 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
255 self.assertEqual(p['caIssuers'],
256 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
257 self.assertEqual(p['crlDistributionPoints'],
258 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000259
Christian Heimes824f7f32013-08-17 00:54:47 +0200260 def test_parse_cert_CVE_2013_4238(self):
261 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
262 if support.verbose:
263 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
264 subject = ((('countryName', 'US'),),
265 (('stateOrProvinceName', 'Oregon'),),
266 (('localityName', 'Beaverton'),),
267 (('organizationName', 'Python Software Foundation'),),
268 (('organizationalUnitName', 'Python Core Development'),),
269 (('commonName', 'null.python.org\x00example.org'),),
270 (('emailAddress', 'python-dev@python.org'),))
271 self.assertEqual(p['subject'], subject)
272 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200273 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
274 san = (('DNS', 'altnull.python.org\x00example.com'),
275 ('email', 'null@python.org\x00user@example.org'),
276 ('URI', 'http://null.python.org\x00http://example.org'),
277 ('IP Address', '192.0.2.1'),
278 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
279 else:
280 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
281 san = (('DNS', 'altnull.python.org\x00example.com'),
282 ('email', 'null@python.org\x00user@example.org'),
283 ('URI', 'http://null.python.org\x00http://example.org'),
284 ('IP Address', '192.0.2.1'),
285 ('IP Address', '<invalid>'))
286
287 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200288
Antoine Pitrou480a1242010-04-28 21:37:09 +0000289 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000290 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000291 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000292 d1 = ssl.PEM_cert_to_DER_cert(pem)
293 p2 = ssl.DER_cert_to_PEM_cert(d1)
294 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000295 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000296 if not p2.startswith(ssl.PEM_HEADER + '\n'):
297 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
298 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
299 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000300
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000301 def test_openssl_version(self):
302 n = ssl.OPENSSL_VERSION_NUMBER
303 t = ssl.OPENSSL_VERSION_INFO
304 s = ssl.OPENSSL_VERSION
305 self.assertIsInstance(n, int)
306 self.assertIsInstance(t, tuple)
307 self.assertIsInstance(s, str)
308 # Some sanity checks follow
309 # >= 0.9
310 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400311 # < 3.0
312 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000313 major, minor, fix, patch, status = t
314 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400315 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000316 self.assertGreaterEqual(minor, 0)
317 self.assertLess(minor, 256)
318 self.assertGreaterEqual(fix, 0)
319 self.assertLess(fix, 256)
320 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100321 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000322 self.assertGreaterEqual(status, 0)
323 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400324 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200325 if IS_LIBRESSL:
326 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100327 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400328 else:
329 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100330 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000331
Antoine Pitrou9d543662010-04-23 23:10:32 +0000332 @support.cpython_only
333 def test_refcycle(self):
334 # Issue #7943: an SSL object doesn't create reference cycles with
335 # itself.
336 s = socket.socket(socket.AF_INET)
337 ss = ssl.wrap_socket(s)
338 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100339 with support.check_warnings(("", ResourceWarning)):
340 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100341 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000342
Antoine Pitroua468adc2010-09-14 14:43:44 +0000343 def test_wrapped_unconnected(self):
344 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200345 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000346 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100347 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100348 self.assertRaises(OSError, ss.recv, 1)
349 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
350 self.assertRaises(OSError, ss.recvfrom, 1)
351 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
352 self.assertRaises(OSError, ss.send, b'x')
353 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000354
Antoine Pitrou40f08742010-04-24 22:04:40 +0000355 def test_timeout(self):
356 # Issue #8524: when creating an SSL socket, the timeout of the
357 # original socket should be retained.
358 for timeout in (None, 0.0, 5.0):
359 s = socket.socket(socket.AF_INET)
360 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100361 with ssl.wrap_socket(s) as ss:
362 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000363
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000364 def test_errors(self):
365 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000366 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000367 "certfile must be specified",
368 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000369 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000370 "certfile must be specified for server-side operations",
371 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000372 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000373 "certfile must be specified for server-side operations",
374 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100375 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
376 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
377 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200378 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000379 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000380 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000381 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200382 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000383 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000384 ssl.wrap_socket(sock,
385 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000386 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200387 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000388 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000389 ssl.wrap_socket(sock,
390 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000391 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000392
Martin Panter3464ea22016-02-01 21:58:11 +0000393 def bad_cert_test(self, certfile):
394 """Check that trying to use the given client certificate fails"""
395 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
396 certfile)
397 sock = socket.socket()
398 self.addCleanup(sock.close)
399 with self.assertRaises(ssl.SSLError):
400 ssl.wrap_socket(sock,
401 certfile=certfile,
402 ssl_version=ssl.PROTOCOL_TLSv1)
403
404 def test_empty_cert(self):
405 """Wrapping with an empty cert file"""
406 self.bad_cert_test("nullcert.pem")
407
408 def test_malformed_cert(self):
409 """Wrapping with a badly formatted certificate (syntax error)"""
410 self.bad_cert_test("badcert.pem")
411
412 def test_malformed_key(self):
413 """Wrapping with a badly formatted key (syntax error)"""
414 self.bad_cert_test("badkey.pem")
415
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000416 def test_match_hostname(self):
417 def ok(cert, hostname):
418 ssl.match_hostname(cert, hostname)
419 def fail(cert, hostname):
420 self.assertRaises(ssl.CertificateError,
421 ssl.match_hostname, cert, hostname)
422
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100423 # -- Hostname matching --
424
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000425 cert = {'subject': ((('commonName', 'example.com'),),)}
426 ok(cert, 'example.com')
427 ok(cert, 'ExAmple.cOm')
428 fail(cert, 'www.example.com')
429 fail(cert, '.example.com')
430 fail(cert, 'example.org')
431 fail(cert, 'exampleXcom')
432
433 cert = {'subject': ((('commonName', '*.a.com'),),)}
434 ok(cert, 'foo.a.com')
435 fail(cert, 'bar.foo.a.com')
436 fail(cert, 'a.com')
437 fail(cert, 'Xa.com')
438 fail(cert, '.a.com')
439
Georg Brandl72c98d32013-10-27 07:16:53 +0100440 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000441 cert = {'subject': ((('commonName', 'f*.com'),),)}
442 ok(cert, 'foo.com')
443 ok(cert, 'f.com')
444 fail(cert, 'bar.com')
445 fail(cert, 'foo.a.com')
446 fail(cert, 'bar.foo.com')
447
Christian Heimes824f7f32013-08-17 00:54:47 +0200448 # NULL bytes are bad, CVE-2013-4073
449 cert = {'subject': ((('commonName',
450 'null.python.org\x00example.org'),),)}
451 ok(cert, 'null.python.org\x00example.org') # or raise an error?
452 fail(cert, 'example.org')
453 fail(cert, 'null.python.org')
454
Georg Brandl72c98d32013-10-27 07:16:53 +0100455 # error cases with wildcards
456 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
457 fail(cert, 'bar.foo.a.com')
458 fail(cert, 'a.com')
459 fail(cert, 'Xa.com')
460 fail(cert, '.a.com')
461
462 cert = {'subject': ((('commonName', 'a.*.com'),),)}
463 fail(cert, 'a.foo.com')
464 fail(cert, 'a..com')
465 fail(cert, 'a.com')
466
467 # wildcard doesn't match IDNA prefix 'xn--'
468 idna = 'püthon.python.org'.encode("idna").decode("ascii")
469 cert = {'subject': ((('commonName', idna),),)}
470 ok(cert, idna)
471 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
472 fail(cert, idna)
473 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
474 fail(cert, idna)
475
476 # wildcard in first fragment and IDNA A-labels in sequent fragments
477 # are supported.
478 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
479 cert = {'subject': ((('commonName', idna),),)}
480 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
481 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
482 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
483 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
484
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000485 # Slightly fake real-world example
486 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
487 'subject': ((('commonName', 'linuxfrz.org'),),),
488 'subjectAltName': (('DNS', 'linuxfr.org'),
489 ('DNS', 'linuxfr.com'),
490 ('othername', '<unsupported>'))}
491 ok(cert, 'linuxfr.org')
492 ok(cert, 'linuxfr.com')
493 # Not a "DNS" entry
494 fail(cert, '<unsupported>')
495 # When there is a subjectAltName, commonName isn't used
496 fail(cert, 'linuxfrz.org')
497
498 # A pristine real-world example
499 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
500 'subject': ((('countryName', 'US'),),
501 (('stateOrProvinceName', 'California'),),
502 (('localityName', 'Mountain View'),),
503 (('organizationName', 'Google Inc'),),
504 (('commonName', 'mail.google.com'),))}
505 ok(cert, 'mail.google.com')
506 fail(cert, 'gmail.com')
507 # Only commonName is considered
508 fail(cert, 'California')
509
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100510 # -- IPv4 matching --
511 cert = {'subject': ((('commonName', 'example.com'),),),
512 'subjectAltName': (('DNS', 'example.com'),
513 ('IP Address', '10.11.12.13'),
514 ('IP Address', '14.15.16.17'))}
515 ok(cert, '10.11.12.13')
516 ok(cert, '14.15.16.17')
517 fail(cert, '14.15.16.18')
518 fail(cert, 'example.net')
519
520 # -- IPv6 matching --
521 cert = {'subject': ((('commonName', 'example.com'),),),
522 'subjectAltName': (('DNS', 'example.com'),
523 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
524 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
525 ok(cert, '2001::cafe')
526 ok(cert, '2003::baba')
527 fail(cert, '2003::bebe')
528 fail(cert, 'example.net')
529
530 # -- Miscellaneous --
531
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000532 # Neither commonName nor subjectAltName
533 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
534 'subject': ((('countryName', 'US'),),
535 (('stateOrProvinceName', 'California'),),
536 (('localityName', 'Mountain View'),),
537 (('organizationName', 'Google Inc'),))}
538 fail(cert, 'mail.google.com')
539
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200540 # No DNS entry in subjectAltName but a commonName
541 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
542 'subject': ((('countryName', 'US'),),
543 (('stateOrProvinceName', 'California'),),
544 (('localityName', 'Mountain View'),),
545 (('commonName', 'mail.google.com'),)),
546 'subjectAltName': (('othername', 'blabla'), )}
547 ok(cert, 'mail.google.com')
548
549 # No DNS entry subjectAltName and no commonName
550 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
551 'subject': ((('countryName', 'US'),),
552 (('stateOrProvinceName', 'California'),),
553 (('localityName', 'Mountain View'),),
554 (('organizationName', 'Google Inc'),)),
555 'subjectAltName': (('othername', 'blabla'),)}
556 fail(cert, 'google.com')
557
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000558 # Empty cert / no cert
559 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
560 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
561
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200562 # Issue #17980: avoid denials of service by refusing more than one
563 # wildcard per fragment.
564 cert = {'subject': ((('commonName', 'a*b.com'),),)}
565 ok(cert, 'axxb.com')
566 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100567 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200568 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
569 with self.assertRaises(ssl.CertificateError) as cm:
570 ssl.match_hostname(cert, 'axxbxxc.com')
571 self.assertIn("too many wildcards", str(cm.exception))
572
Antoine Pitroud5323212010-10-22 18:19:07 +0000573 def test_server_side(self):
574 # server_hostname doesn't work for server sockets
575 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000576 with socket.socket() as sock:
577 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
578 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000579
Antoine Pitroud6494802011-07-21 01:11:30 +0200580 def test_unknown_channel_binding(self):
581 # should raise ValueError for unknown type
582 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200583 s.bind(('127.0.0.1', 0))
584 s.listen()
585 c = socket.socket(socket.AF_INET)
586 c.connect(s.getsockname())
587 with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100588 with self.assertRaises(ValueError):
589 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200590 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200591
592 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
593 "'tls-unique' channel binding not available")
594 def test_tls_unique_channel_binding(self):
595 # unconnected should return None for known type
596 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100597 with ssl.wrap_socket(s) as ss:
598 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200599 # the same for server-side
600 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100601 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
602 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200603
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600604 def test_dealloc_warn(self):
605 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
606 r = repr(ss)
607 with self.assertWarns(ResourceWarning) as cm:
608 ss = None
609 support.gc_collect()
610 self.assertIn(r, str(cm.warning.args[0]))
611
Christian Heimes6d7ad132013-06-09 18:02:55 +0200612 def test_get_default_verify_paths(self):
613 paths = ssl.get_default_verify_paths()
614 self.assertEqual(len(paths), 6)
615 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
616
617 with support.EnvironmentVarGuard() as env:
618 env["SSL_CERT_DIR"] = CAPATH
619 env["SSL_CERT_FILE"] = CERTFILE
620 paths = ssl.get_default_verify_paths()
621 self.assertEqual(paths.cafile, CERTFILE)
622 self.assertEqual(paths.capath, CAPATH)
623
Christian Heimes44109d72013-11-22 01:51:30 +0100624 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
625 def test_enum_certificates(self):
626 self.assertTrue(ssl.enum_certificates("CA"))
627 self.assertTrue(ssl.enum_certificates("ROOT"))
628
629 self.assertRaises(TypeError, ssl.enum_certificates)
630 self.assertRaises(WindowsError, ssl.enum_certificates, "")
631
Christian Heimesc2d65e12013-11-22 16:13:55 +0100632 trust_oids = set()
633 for storename in ("CA", "ROOT"):
634 store = ssl.enum_certificates(storename)
635 self.assertIsInstance(store, list)
636 for element in store:
637 self.assertIsInstance(element, tuple)
638 self.assertEqual(len(element), 3)
639 cert, enc, trust = element
640 self.assertIsInstance(cert, bytes)
641 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
642 self.assertIsInstance(trust, (set, bool))
643 if isinstance(trust, set):
644 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100645
646 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100647 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200648
Christian Heimes46bebee2013-06-09 19:03:31 +0200649 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100650 def test_enum_crls(self):
651 self.assertTrue(ssl.enum_crls("CA"))
652 self.assertRaises(TypeError, ssl.enum_crls)
653 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200654
Christian Heimes44109d72013-11-22 01:51:30 +0100655 crls = ssl.enum_crls("CA")
656 self.assertIsInstance(crls, list)
657 for element in crls:
658 self.assertIsInstance(element, tuple)
659 self.assertEqual(len(element), 2)
660 self.assertIsInstance(element[0], bytes)
661 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200662
Christian Heimes46bebee2013-06-09 19:03:31 +0200663
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100664 def test_asn1object(self):
665 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
666 '1.3.6.1.5.5.7.3.1')
667
668 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
669 self.assertEqual(val, expected)
670 self.assertEqual(val.nid, 129)
671 self.assertEqual(val.shortname, 'serverAuth')
672 self.assertEqual(val.longname, 'TLS Web Server Authentication')
673 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
674 self.assertIsInstance(val, ssl._ASN1Object)
675 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
676
677 val = ssl._ASN1Object.fromnid(129)
678 self.assertEqual(val, expected)
679 self.assertIsInstance(val, ssl._ASN1Object)
680 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100681 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
682 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100683 for i in range(1000):
684 try:
685 obj = ssl._ASN1Object.fromnid(i)
686 except ValueError:
687 pass
688 else:
689 self.assertIsInstance(obj.nid, int)
690 self.assertIsInstance(obj.shortname, str)
691 self.assertIsInstance(obj.longname, str)
692 self.assertIsInstance(obj.oid, (str, type(None)))
693
694 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
695 self.assertEqual(val, expected)
696 self.assertIsInstance(val, ssl._ASN1Object)
697 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
698 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
699 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100700 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
701 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100702
Christian Heimes72d28502013-11-23 13:56:58 +0100703 def test_purpose_enum(self):
704 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
705 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
706 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
707 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
708 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
709 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
710 '1.3.6.1.5.5.7.3.1')
711
712 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
713 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
714 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
715 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
716 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
717 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
718 '1.3.6.1.5.5.7.3.2')
719
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100720 def test_unsupported_dtls(self):
721 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
722 self.addCleanup(s.close)
723 with self.assertRaises(NotImplementedError) as cx:
724 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
725 self.assertEqual(str(cx.exception), "only stream sockets are supported")
726 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
727 with self.assertRaises(NotImplementedError) as cx:
728 ctx.wrap_socket(s)
729 self.assertEqual(str(cx.exception), "only stream sockets are supported")
730
Antoine Pitrouc695c952014-04-28 20:57:36 +0200731 def cert_time_ok(self, timestring, timestamp):
732 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
733
734 def cert_time_fail(self, timestring):
735 with self.assertRaises(ValueError):
736 ssl.cert_time_to_seconds(timestring)
737
738 @unittest.skipUnless(utc_offset(),
739 'local time needs to be different from UTC')
740 def test_cert_time_to_seconds_timezone(self):
741 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
742 # results if local timezone is not UTC
743 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
744 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
745
746 def test_cert_time_to_seconds(self):
747 timestring = "Jan 5 09:34:43 2018 GMT"
748 ts = 1515144883.0
749 self.cert_time_ok(timestring, ts)
750 # accept keyword parameter, assert its name
751 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
752 # accept both %e and %d (space or zero generated by strftime)
753 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
754 # case-insensitive
755 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
756 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
757 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
758 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
759 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
760 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
761 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
762 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
763
764 newyear_ts = 1230768000.0
765 # leap seconds
766 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
767 # same timestamp
768 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
769
770 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
771 # allow 60th second (even if it is not a leap second)
772 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
773 # allow 2nd leap second for compatibility with time.strptime()
774 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
775 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
776
777 # no special treatement for the special value:
778 # 99991231235959Z (rfc 5280)
779 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
780
781 @support.run_with_locale('LC_ALL', '')
782 def test_cert_time_to_seconds_locale(self):
783 # `cert_time_to_seconds()` should be locale independent
784
785 def local_february_name():
786 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
787
788 if local_february_name().lower() == 'feb':
789 self.skipTest("locale-specific month name needs to be "
790 "different from C locale")
791
792 # locale-independent
793 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
794 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
795
Martin Panter3840b2a2016-03-27 01:53:46 +0000796 def test_connect_ex_error(self):
797 server = socket.socket(socket.AF_INET)
798 self.addCleanup(server.close)
799 port = support.bind_port(server) # Reserve port but don't listen
800 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
801 cert_reqs=ssl.CERT_REQUIRED)
802 self.addCleanup(s.close)
803 rc = s.connect_ex((HOST, port))
804 # Issue #19919: Windows machines or VMs hosted on Windows
805 # machines sometimes return EWOULDBLOCK.
806 errors = (
807 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
808 errno.EWOULDBLOCK,
809 )
810 self.assertIn(rc, errors)
811
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100812
Antoine Pitrou152efa22010-05-16 18:19:27 +0000813class ContextTests(unittest.TestCase):
814
Antoine Pitrou23df4832010-08-04 17:14:06 +0000815 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000816 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100817 for protocol in PROTOCOLS:
818 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200819 ctx = ssl.SSLContext()
820 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000821 self.assertRaises(ValueError, ssl.SSLContext, -1)
822 self.assertRaises(ValueError, ssl.SSLContext, 42)
823
Antoine Pitrou23df4832010-08-04 17:14:06 +0000824 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000825 def test_protocol(self):
826 for proto in PROTOCOLS:
827 ctx = ssl.SSLContext(proto)
828 self.assertEqual(ctx.protocol, proto)
829
830 def test_ciphers(self):
831 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
832 ctx.set_ciphers("ALL")
833 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000834 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000835 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000836
Christian Heimes25bfcd52016-09-06 00:04:45 +0200837 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
838 def test_get_ciphers(self):
839 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200840 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200841 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +0200842 self.assertIn('AES256-GCM-SHA384', names)
843 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +0200844
Antoine Pitrou23df4832010-08-04 17:14:06 +0000845 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000846 def test_options(self):
847 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800848 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +0200849 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
850 if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
851 default |= ssl.OP_NO_COMPRESSION
852 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800853 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +0200854 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000855 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +0200856 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
857 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000858 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -0700859 # Ubuntu has OP_NO_SSLv3 forced on by default
860 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +0000861 else:
862 with self.assertRaises(ValueError):
863 ctx.options = 0
864
Christian Heimes22587792013-11-21 23:56:13 +0100865 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000866 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
867 # Default value
868 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
869 ctx.verify_mode = ssl.CERT_OPTIONAL
870 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
871 ctx.verify_mode = ssl.CERT_REQUIRED
872 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
873 ctx.verify_mode = ssl.CERT_NONE
874 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
875 with self.assertRaises(TypeError):
876 ctx.verify_mode = None
877 with self.assertRaises(ValueError):
878 ctx.verify_mode = 42
879
Christian Heimes2427b502013-11-23 11:24:32 +0100880 @unittest.skipUnless(have_verify_flags(),
881 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100882 def test_verify_flags(self):
883 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -0500884 # default value
885 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
886 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +0100887 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
888 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
889 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
890 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
891 ctx.verify_flags = ssl.VERIFY_DEFAULT
892 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
893 # supports any value
894 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
895 self.assertEqual(ctx.verify_flags,
896 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
897 with self.assertRaises(TypeError):
898 ctx.verify_flags = None
899
Antoine Pitrou152efa22010-05-16 18:19:27 +0000900 def test_load_cert_chain(self):
901 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
902 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500903 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000904 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
905 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200906 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000907 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000908 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000909 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000910 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000911 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000912 ctx.load_cert_chain(EMPTYCERT)
913 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000914 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000915 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
916 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
917 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000918 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000919 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000920 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000921 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000922 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000923 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
924 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000925 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000926 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +0000927 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200928 # Password protected key and cert
929 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
930 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
931 ctx.load_cert_chain(CERTFILE_PROTECTED,
932 password=bytearray(KEY_PASSWORD.encode()))
933 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
934 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
935 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
936 bytearray(KEY_PASSWORD.encode()))
937 with self.assertRaisesRegex(TypeError, "should be a string"):
938 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
939 with self.assertRaises(ssl.SSLError):
940 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
941 with self.assertRaisesRegex(ValueError, "cannot be longer"):
942 # openssl has a fixed limit on the password buffer.
943 # PEM_BUFSIZE is generally set to 1kb.
944 # Return a string larger than this.
945 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
946 # Password callback
947 def getpass_unicode():
948 return KEY_PASSWORD
949 def getpass_bytes():
950 return KEY_PASSWORD.encode()
951 def getpass_bytearray():
952 return bytearray(KEY_PASSWORD.encode())
953 def getpass_badpass():
954 return "badpass"
955 def getpass_huge():
956 return b'a' * (1024 * 1024)
957 def getpass_bad_type():
958 return 9
959 def getpass_exception():
960 raise Exception('getpass error')
961 class GetPassCallable:
962 def __call__(self):
963 return KEY_PASSWORD
964 def getpass(self):
965 return KEY_PASSWORD
966 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
967 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
968 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
969 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
970 ctx.load_cert_chain(CERTFILE_PROTECTED,
971 password=GetPassCallable().getpass)
972 with self.assertRaises(ssl.SSLError):
973 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
974 with self.assertRaisesRegex(ValueError, "cannot be longer"):
975 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
976 with self.assertRaisesRegex(TypeError, "must return a string"):
977 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
978 with self.assertRaisesRegex(Exception, "getpass error"):
979 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
980 # Make sure the password function isn't called if it isn't needed
981 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000982
983 def test_load_verify_locations(self):
984 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
985 ctx.load_verify_locations(CERTFILE)
986 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
987 ctx.load_verify_locations(BYTES_CERTFILE)
988 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
989 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100990 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200991 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000992 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000993 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000994 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000995 ctx.load_verify_locations(BADCERT)
996 ctx.load_verify_locations(CERTFILE, CAPATH)
997 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
998
Victor Stinner80f75e62011-01-29 11:31:20 +0000999 # Issue #10989: crash if the second argument type is invalid
1000 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1001
Christian Heimesefff7062013-11-21 03:35:02 +01001002 def test_load_verify_cadata(self):
1003 # test cadata
1004 with open(CAFILE_CACERT) as f:
1005 cacert_pem = f.read()
1006 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1007 with open(CAFILE_NEURONIO) as f:
1008 neuronio_pem = f.read()
1009 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1010
1011 # test PEM
1012 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1013 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1014 ctx.load_verify_locations(cadata=cacert_pem)
1015 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1016 ctx.load_verify_locations(cadata=neuronio_pem)
1017 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1018 # cert already in hash table
1019 ctx.load_verify_locations(cadata=neuronio_pem)
1020 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1021
1022 # combined
1023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1024 combined = "\n".join((cacert_pem, neuronio_pem))
1025 ctx.load_verify_locations(cadata=combined)
1026 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1027
1028 # with junk around the certs
1029 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1030 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1031 neuronio_pem, "tail"]
1032 ctx.load_verify_locations(cadata="\n".join(combined))
1033 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1034
1035 # test DER
1036 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1037 ctx.load_verify_locations(cadata=cacert_der)
1038 ctx.load_verify_locations(cadata=neuronio_der)
1039 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1040 # cert already in hash table
1041 ctx.load_verify_locations(cadata=cacert_der)
1042 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1043
1044 # combined
1045 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1046 combined = b"".join((cacert_der, neuronio_der))
1047 ctx.load_verify_locations(cadata=combined)
1048 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1049
1050 # error cases
1051 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1052 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1053
1054 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1055 ctx.load_verify_locations(cadata="broken")
1056 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1057 ctx.load_verify_locations(cadata=b"broken")
1058
1059
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001060 def test_load_dh_params(self):
1061 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1062 ctx.load_dh_params(DHFILE)
1063 if os.name != 'nt':
1064 ctx.load_dh_params(BYTES_DHFILE)
1065 self.assertRaises(TypeError, ctx.load_dh_params)
1066 self.assertRaises(TypeError, ctx.load_dh_params, None)
1067 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001068 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001069 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001070 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001071 ctx.load_dh_params(CERTFILE)
1072
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001073 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001074 def test_session_stats(self):
1075 for proto in PROTOCOLS:
1076 ctx = ssl.SSLContext(proto)
1077 self.assertEqual(ctx.session_stats(), {
1078 'number': 0,
1079 'connect': 0,
1080 'connect_good': 0,
1081 'connect_renegotiate': 0,
1082 'accept': 0,
1083 'accept_good': 0,
1084 'accept_renegotiate': 0,
1085 'hits': 0,
1086 'misses': 0,
1087 'timeouts': 0,
1088 'cache_full': 0,
1089 })
1090
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001091 def test_set_default_verify_paths(self):
1092 # There's not much we can do to test that it acts as expected,
1093 # so just check it doesn't crash or raise an exception.
1094 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1095 ctx.set_default_verify_paths()
1096
Antoine Pitrou501da612011-12-21 09:27:41 +01001097 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001098 def test_set_ecdh_curve(self):
1099 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1100 ctx.set_ecdh_curve("prime256v1")
1101 ctx.set_ecdh_curve(b"prime256v1")
1102 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1103 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1104 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1105 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1106
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001107 @needs_sni
1108 def test_sni_callback(self):
1109 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1110
1111 # set_servername_callback expects a callable, or None
1112 self.assertRaises(TypeError, ctx.set_servername_callback)
1113 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1114 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1115 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1116
1117 def dummycallback(sock, servername, ctx):
1118 pass
1119 ctx.set_servername_callback(None)
1120 ctx.set_servername_callback(dummycallback)
1121
1122 @needs_sni
1123 def test_sni_callback_refcycle(self):
1124 # Reference cycles through the servername callback are detected
1125 # and cleared.
1126 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1127 def dummycallback(sock, servername, ctx, cycle=ctx):
1128 pass
1129 ctx.set_servername_callback(dummycallback)
1130 wr = weakref.ref(ctx)
1131 del ctx, dummycallback
1132 gc.collect()
1133 self.assertIs(wr(), None)
1134
Christian Heimes9a5395a2013-06-17 15:44:12 +02001135 def test_cert_store_stats(self):
1136 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1137 self.assertEqual(ctx.cert_store_stats(),
1138 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1139 ctx.load_cert_chain(CERTFILE)
1140 self.assertEqual(ctx.cert_store_stats(),
1141 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1142 ctx.load_verify_locations(CERTFILE)
1143 self.assertEqual(ctx.cert_store_stats(),
1144 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001145 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001146 self.assertEqual(ctx.cert_store_stats(),
1147 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1148
1149 def test_get_ca_certs(self):
1150 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1151 self.assertEqual(ctx.get_ca_certs(), [])
1152 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1153 ctx.load_verify_locations(CERTFILE)
1154 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001155 # but CAFILE_CACERT is a CA cert
1156 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001157 self.assertEqual(ctx.get_ca_certs(),
1158 [{'issuer': ((('organizationName', 'Root CA'),),
1159 (('organizationalUnitName', 'http://www.cacert.org'),),
1160 (('commonName', 'CA Cert Signing Authority'),),
1161 (('emailAddress', 'support@cacert.org'),)),
1162 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1163 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1164 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001165 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001166 'subject': ((('organizationName', 'Root CA'),),
1167 (('organizationalUnitName', 'http://www.cacert.org'),),
1168 (('commonName', 'CA Cert Signing Authority'),),
1169 (('emailAddress', 'support@cacert.org'),)),
1170 'version': 3}])
1171
Martin Panterb55f8b72016-01-14 12:53:56 +00001172 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001173 pem = f.read()
1174 der = ssl.PEM_cert_to_DER_cert(pem)
1175 self.assertEqual(ctx.get_ca_certs(True), [der])
1176
Christian Heimes72d28502013-11-23 13:56:58 +01001177 def test_load_default_certs(self):
1178 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1179 ctx.load_default_certs()
1180
1181 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1182 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1183 ctx.load_default_certs()
1184
1185 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1186 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1187
1188 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1189 self.assertRaises(TypeError, ctx.load_default_certs, None)
1190 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1191
Benjamin Peterson91244e02014-10-03 18:17:15 -04001192 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001193 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001194 def test_load_default_certs_env(self):
1195 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1196 with support.EnvironmentVarGuard() as env:
1197 env["SSL_CERT_DIR"] = CAPATH
1198 env["SSL_CERT_FILE"] = CERTFILE
1199 ctx.load_default_certs()
1200 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1201
Benjamin Peterson91244e02014-10-03 18:17:15 -04001202 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1203 def test_load_default_certs_env_windows(self):
1204 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1205 ctx.load_default_certs()
1206 stats = ctx.cert_store_stats()
1207
1208 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1209 with support.EnvironmentVarGuard() as env:
1210 env["SSL_CERT_DIR"] = CAPATH
1211 env["SSL_CERT_FILE"] = CERTFILE
1212 ctx.load_default_certs()
1213 stats["x509"] += 1
1214 self.assertEqual(ctx.cert_store_stats(), stats)
1215
Christian Heimes4c05b472013-11-23 15:58:30 +01001216 def test_create_default_context(self):
1217 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001218 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001219 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001220 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001221 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001222 self.assertEqual(
1223 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1224 getattr(ssl, "OP_NO_COMPRESSION", 0),
1225 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001226
1227 with open(SIGNING_CA) as f:
1228 cadata = f.read()
1229 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1230 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001231 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001232 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1233 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001234 self.assertEqual(
1235 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1236 getattr(ssl, "OP_NO_COMPRESSION", 0),
1237 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001238
1239 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001240 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001241 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1242 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001243 self.assertEqual(
1244 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1245 getattr(ssl, "OP_NO_COMPRESSION", 0),
1246 )
1247 self.assertEqual(
1248 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1249 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1250 )
1251 self.assertEqual(
1252 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1253 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1254 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001255
Christian Heimes67986f92013-11-23 22:43:47 +01001256 def test__create_stdlib_context(self):
1257 ctx = ssl._create_stdlib_context()
1258 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1259 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001260 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001261 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1262
1263 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1264 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1265 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1266 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1267
1268 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001269 cert_reqs=ssl.CERT_REQUIRED,
1270 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001271 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1272 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001273 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001274 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1275
1276 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1277 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1278 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1279 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001280
Christian Heimes1aa9a752013-12-02 02:41:19 +01001281 def test_check_hostname(self):
1282 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1283 self.assertFalse(ctx.check_hostname)
1284
1285 # Requires CERT_REQUIRED or CERT_OPTIONAL
1286 with self.assertRaises(ValueError):
1287 ctx.check_hostname = True
1288 ctx.verify_mode = ssl.CERT_REQUIRED
1289 self.assertFalse(ctx.check_hostname)
1290 ctx.check_hostname = True
1291 self.assertTrue(ctx.check_hostname)
1292
1293 ctx.verify_mode = ssl.CERT_OPTIONAL
1294 ctx.check_hostname = True
1295 self.assertTrue(ctx.check_hostname)
1296
1297 # Cannot set CERT_NONE with check_hostname enabled
1298 with self.assertRaises(ValueError):
1299 ctx.verify_mode = ssl.CERT_NONE
1300 ctx.check_hostname = False
1301 self.assertFalse(ctx.check_hostname)
1302
Antoine Pitrou152efa22010-05-16 18:19:27 +00001303
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001304class SSLErrorTests(unittest.TestCase):
1305
1306 def test_str(self):
1307 # The str() of a SSLError doesn't include the errno
1308 e = ssl.SSLError(1, "foo")
1309 self.assertEqual(str(e), "foo")
1310 self.assertEqual(e.errno, 1)
1311 # Same for a subclass
1312 e = ssl.SSLZeroReturnError(1, "foo")
1313 self.assertEqual(str(e), "foo")
1314 self.assertEqual(e.errno, 1)
1315
1316 def test_lib_reason(self):
1317 # Test the library and reason attributes
1318 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1319 with self.assertRaises(ssl.SSLError) as cm:
1320 ctx.load_dh_params(CERTFILE)
1321 self.assertEqual(cm.exception.library, 'PEM')
1322 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1323 s = str(cm.exception)
1324 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1325
1326 def test_subclass(self):
1327 # Check that the appropriate SSLError subclass is raised
1328 # (this only tests one of them)
1329 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1330 with socket.socket() as s:
1331 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001332 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001333 c = socket.socket()
1334 c.connect(s.getsockname())
1335 c.setblocking(False)
1336 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001337 with self.assertRaises(ssl.SSLWantReadError) as cm:
1338 c.do_handshake()
1339 s = str(cm.exception)
1340 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1341 # For compatibility
1342 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1343
1344
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001345class MemoryBIOTests(unittest.TestCase):
1346
1347 def test_read_write(self):
1348 bio = ssl.MemoryBIO()
1349 bio.write(b'foo')
1350 self.assertEqual(bio.read(), b'foo')
1351 self.assertEqual(bio.read(), b'')
1352 bio.write(b'foo')
1353 bio.write(b'bar')
1354 self.assertEqual(bio.read(), b'foobar')
1355 self.assertEqual(bio.read(), b'')
1356 bio.write(b'baz')
1357 self.assertEqual(bio.read(2), b'ba')
1358 self.assertEqual(bio.read(1), b'z')
1359 self.assertEqual(bio.read(1), b'')
1360
1361 def test_eof(self):
1362 bio = ssl.MemoryBIO()
1363 self.assertFalse(bio.eof)
1364 self.assertEqual(bio.read(), b'')
1365 self.assertFalse(bio.eof)
1366 bio.write(b'foo')
1367 self.assertFalse(bio.eof)
1368 bio.write_eof()
1369 self.assertFalse(bio.eof)
1370 self.assertEqual(bio.read(2), b'fo')
1371 self.assertFalse(bio.eof)
1372 self.assertEqual(bio.read(1), b'o')
1373 self.assertTrue(bio.eof)
1374 self.assertEqual(bio.read(), b'')
1375 self.assertTrue(bio.eof)
1376
1377 def test_pending(self):
1378 bio = ssl.MemoryBIO()
1379 self.assertEqual(bio.pending, 0)
1380 bio.write(b'foo')
1381 self.assertEqual(bio.pending, 3)
1382 for i in range(3):
1383 bio.read(1)
1384 self.assertEqual(bio.pending, 3-i-1)
1385 for i in range(3):
1386 bio.write(b'x')
1387 self.assertEqual(bio.pending, i+1)
1388 bio.read()
1389 self.assertEqual(bio.pending, 0)
1390
1391 def test_buffer_types(self):
1392 bio = ssl.MemoryBIO()
1393 bio.write(b'foo')
1394 self.assertEqual(bio.read(), b'foo')
1395 bio.write(bytearray(b'bar'))
1396 self.assertEqual(bio.read(), b'bar')
1397 bio.write(memoryview(b'baz'))
1398 self.assertEqual(bio.read(), b'baz')
1399
1400 def test_error_types(self):
1401 bio = ssl.MemoryBIO()
1402 self.assertRaises(TypeError, bio.write, 'foo')
1403 self.assertRaises(TypeError, bio.write, None)
1404 self.assertRaises(TypeError, bio.write, True)
1405 self.assertRaises(TypeError, bio.write, 1)
1406
1407
Martin Panter3840b2a2016-03-27 01:53:46 +00001408@unittest.skipUnless(_have_threads, "Needs threading module")
1409class SimpleBackgroundTests(unittest.TestCase):
1410
1411 """Tests that connect to a simple server running in the background"""
1412
1413 def setUp(self):
1414 server = ThreadedEchoServer(SIGNED_CERTFILE)
1415 self.server_addr = (HOST, server.port)
1416 server.__enter__()
1417 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001418
Antoine Pitrou480a1242010-04-28 21:37:09 +00001419 def test_connect(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001420 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1421 cert_reqs=ssl.CERT_NONE) as s:
1422 s.connect(self.server_addr)
1423 self.assertEqual({}, s.getpeercert())
Antoine Pitrou350c7222010-09-09 13:31:46 +00001424
Martin Panter3840b2a2016-03-27 01:53:46 +00001425 # this should succeed because we specify the root cert
1426 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1427 cert_reqs=ssl.CERT_REQUIRED,
1428 ca_certs=SIGNING_CA) as s:
1429 s.connect(self.server_addr)
1430 self.assertTrue(s.getpeercert())
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001431
Martin Panter3840b2a2016-03-27 01:53:46 +00001432 def test_connect_fail(self):
1433 # This should fail because we have no verification certs. Connection
1434 # failure crashes ThreadedEchoServer, so run this in an independent
1435 # test method.
1436 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1437 cert_reqs=ssl.CERT_REQUIRED)
1438 self.addCleanup(s.close)
1439 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1440 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001441
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001442 def test_connect_ex(self):
1443 # Issue #11326: check connect_ex() implementation
Martin Panter3840b2a2016-03-27 01:53:46 +00001444 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1445 cert_reqs=ssl.CERT_REQUIRED,
1446 ca_certs=SIGNING_CA)
1447 self.addCleanup(s.close)
1448 self.assertEqual(0, s.connect_ex(self.server_addr))
1449 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001450
1451 def test_non_blocking_connect_ex(self):
1452 # Issue #11326: non-blocking connect_ex() should allow handshake
1453 # to proceed after the socket gets ready.
Martin Panter3840b2a2016-03-27 01:53:46 +00001454 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1455 cert_reqs=ssl.CERT_REQUIRED,
1456 ca_certs=SIGNING_CA,
1457 do_handshake_on_connect=False)
1458 self.addCleanup(s.close)
1459 s.setblocking(False)
1460 rc = s.connect_ex(self.server_addr)
1461 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1462 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1463 # Wait for connect to finish
1464 select.select([], [s], [], 5.0)
1465 # Non-blocking handshake
1466 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001467 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001468 s.do_handshake()
1469 break
1470 except ssl.SSLWantReadError:
1471 select.select([s], [], [], 5.0)
1472 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001473 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001474 # SSL established
1475 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001476
Antoine Pitrou152efa22010-05-16 18:19:27 +00001477 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001478 # Same as test_connect, but with a separately created context
1479 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1480 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1481 s.connect(self.server_addr)
1482 self.assertEqual({}, s.getpeercert())
1483 # Same with a server hostname
1484 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1485 server_hostname="dummy") as s:
1486 s.connect(self.server_addr)
1487 ctx.verify_mode = ssl.CERT_REQUIRED
1488 # This should succeed because we specify the root cert
1489 ctx.load_verify_locations(SIGNING_CA)
1490 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1491 s.connect(self.server_addr)
1492 cert = s.getpeercert()
1493 self.assertTrue(cert)
1494
1495 def test_connect_with_context_fail(self):
1496 # This should fail because we have no verification certs. Connection
1497 # failure crashes ThreadedEchoServer, so run this in an independent
1498 # test method.
1499 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1500 ctx.verify_mode = ssl.CERT_REQUIRED
1501 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1502 self.addCleanup(s.close)
1503 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1504 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001505
1506 def test_connect_capath(self):
1507 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001508 # NOTE: the subject hashing algorithm has been changed between
1509 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1510 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001511 # filename) for this test to be portable across OpenSSL releases.
Martin Panter3840b2a2016-03-27 01:53:46 +00001512 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1513 ctx.verify_mode = ssl.CERT_REQUIRED
1514 ctx.load_verify_locations(capath=CAPATH)
1515 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1516 s.connect(self.server_addr)
1517 cert = s.getpeercert()
1518 self.assertTrue(cert)
1519 # Same with a bytes `capath` argument
1520 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1521 ctx.verify_mode = ssl.CERT_REQUIRED
1522 ctx.load_verify_locations(capath=BYTES_CAPATH)
1523 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1524 s.connect(self.server_addr)
1525 cert = s.getpeercert()
1526 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001527
Christian Heimesefff7062013-11-21 03:35:02 +01001528 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001529 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001530 pem = f.read()
1531 der = ssl.PEM_cert_to_DER_cert(pem)
Martin Panter3840b2a2016-03-27 01:53:46 +00001532 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1533 ctx.verify_mode = ssl.CERT_REQUIRED
1534 ctx.load_verify_locations(cadata=pem)
1535 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1536 s.connect(self.server_addr)
1537 cert = s.getpeercert()
1538 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001539
Martin Panter3840b2a2016-03-27 01:53:46 +00001540 # same with DER
1541 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1542 ctx.verify_mode = ssl.CERT_REQUIRED
1543 ctx.load_verify_locations(cadata=der)
1544 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1545 s.connect(self.server_addr)
1546 cert = s.getpeercert()
1547 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001548
Antoine Pitroue3220242010-04-24 11:13:53 +00001549 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1550 def test_makefile_close(self):
1551 # Issue #5238: creating a file-like object with makefile() shouldn't
1552 # delay closing the underlying "real socket" (here tested with its
1553 # file descriptor, hence skipping the test under Windows).
Martin Panter3840b2a2016-03-27 01:53:46 +00001554 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1555 ss.connect(self.server_addr)
1556 fd = ss.fileno()
1557 f = ss.makefile()
1558 f.close()
1559 # The fd is still open
1560 os.read(fd, 0)
1561 # Closing the SSL socket should close the fd too
1562 ss.close()
1563 gc.collect()
1564 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001565 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001566 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001567
Antoine Pitrou480a1242010-04-28 21:37:09 +00001568 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001569 s = socket.socket(socket.AF_INET)
1570 s.connect(self.server_addr)
1571 s.setblocking(False)
1572 s = ssl.wrap_socket(s,
1573 cert_reqs=ssl.CERT_NONE,
1574 do_handshake_on_connect=False)
1575 self.addCleanup(s.close)
1576 count = 0
1577 while True:
1578 try:
1579 count += 1
1580 s.do_handshake()
1581 break
1582 except ssl.SSLWantReadError:
1583 select.select([s], [], [])
1584 except ssl.SSLWantWriteError:
1585 select.select([], [s], [])
1586 if support.verbose:
1587 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001588
Antoine Pitrou480a1242010-04-28 21:37:09 +00001589 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001590 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001591
Martin Panter3840b2a2016-03-27 01:53:46 +00001592 def test_get_server_certificate_fail(self):
1593 # Connection failure crashes ThreadedEchoServer, so run this in an
1594 # independent test method
1595 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001596
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001597 def test_ciphers(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001598 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1599 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1600 s.connect(self.server_addr)
1601 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1602 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1603 s.connect(self.server_addr)
1604 # Error checking can happen at instantiation or when connecting
1605 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1606 with socket.socket(socket.AF_INET) as sock:
1607 s = ssl.wrap_socket(sock,
1608 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1609 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001610
Christian Heimes9a5395a2013-06-17 15:44:12 +02001611 def test_get_ca_certs_capath(self):
1612 # capath certs are loaded on request
Martin Panter3840b2a2016-03-27 01:53:46 +00001613 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1614 ctx.verify_mode = ssl.CERT_REQUIRED
1615 ctx.load_verify_locations(capath=CAPATH)
1616 self.assertEqual(ctx.get_ca_certs(), [])
1617 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1618 s.connect(self.server_addr)
1619 cert = s.getpeercert()
1620 self.assertTrue(cert)
1621 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001622
Christian Heimes575596e2013-12-15 21:49:17 +01001623 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001624 def test_context_setget(self):
1625 # Check that the context of a connected socket can be replaced.
Martin Panter3840b2a2016-03-27 01:53:46 +00001626 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1627 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1628 s = socket.socket(socket.AF_INET)
1629 with ctx1.wrap_socket(s) as ss:
1630 ss.connect(self.server_addr)
1631 self.assertIs(ss.context, ctx1)
1632 self.assertIs(ss._sslobj.context, ctx1)
1633 ss.context = ctx2
1634 self.assertIs(ss.context, ctx2)
1635 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001636
1637 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1638 # A simple IO loop. Call func(*args) depending on the error we get
1639 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1640 timeout = kwargs.get('timeout', 10)
1641 count = 0
1642 while True:
1643 errno = None
1644 count += 1
1645 try:
1646 ret = func(*args)
1647 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001648 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001649 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001650 raise
1651 errno = e.errno
1652 # Get any data from the outgoing BIO irrespective of any error, and
1653 # send it to the socket.
1654 buf = outgoing.read()
1655 sock.sendall(buf)
1656 # If there's no error, we're done. For WANT_READ, we need to get
1657 # data from the socket and put it in the incoming BIO.
1658 if errno is None:
1659 break
1660 elif errno == ssl.SSL_ERROR_WANT_READ:
1661 buf = sock.recv(32768)
1662 if buf:
1663 incoming.write(buf)
1664 else:
1665 incoming.write_eof()
1666 if support.verbose:
1667 sys.stdout.write("Needed %d calls to complete %s().\n"
1668 % (count, func.__name__))
1669 return ret
1670
Martin Panter3840b2a2016-03-27 01:53:46 +00001671 def test_bio_handshake(self):
1672 sock = socket.socket(socket.AF_INET)
1673 self.addCleanup(sock.close)
1674 sock.connect(self.server_addr)
1675 incoming = ssl.MemoryBIO()
1676 outgoing = ssl.MemoryBIO()
1677 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1678 ctx.verify_mode = ssl.CERT_REQUIRED
1679 ctx.load_verify_locations(SIGNING_CA)
1680 ctx.check_hostname = True
1681 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
1682 self.assertIs(sslobj._sslobj.owner, sslobj)
1683 self.assertIsNone(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001684 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001685 self.assertRaises(ValueError, sslobj.getpeercert)
1686 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1687 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1688 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1689 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001690 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001691 self.assertTrue(sslobj.getpeercert())
1692 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1693 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1694 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001695 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001696 except ssl.SSLSyscallError:
1697 # If the server shuts down the TCP connection without sending a
1698 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1699 pass
1700 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1701
1702 def test_bio_read_write_data(self):
1703 sock = socket.socket(socket.AF_INET)
1704 self.addCleanup(sock.close)
1705 sock.connect(self.server_addr)
1706 incoming = ssl.MemoryBIO()
1707 outgoing = ssl.MemoryBIO()
1708 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1709 ctx.verify_mode = ssl.CERT_NONE
1710 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1711 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1712 req = b'FOO\n'
1713 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1714 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1715 self.assertEqual(buf, b'foo\n')
1716 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001717
1718
Martin Panter3840b2a2016-03-27 01:53:46 +00001719class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001720
Martin Panter3840b2a2016-03-27 01:53:46 +00001721 def test_timeout_connect_ex(self):
1722 # Issue #12065: on a timeout, connect_ex() should return the original
1723 # errno (mimicking the behaviour of non-SSL sockets).
1724 with support.transient_internet(REMOTE_HOST):
1725 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1726 cert_reqs=ssl.CERT_REQUIRED,
1727 do_handshake_on_connect=False)
1728 self.addCleanup(s.close)
1729 s.settimeout(0.0000001)
1730 rc = s.connect_ex((REMOTE_HOST, 443))
1731 if rc == 0:
1732 self.skipTest("REMOTE_HOST responded too quickly")
1733 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1734
1735 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1736 def test_get_server_certificate_ipv6(self):
1737 with support.transient_internet('ipv6.google.com'):
1738 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1739 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1740
1741 def test_algorithms(self):
1742 # Issue #8484: all algorithms should be available when verifying a
1743 # certificate.
1744 # SHA256 was added in OpenSSL 0.9.8
1745 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1746 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
1747 # sha256.tbs-internet.com needs SNI to use the correct certificate
1748 if not ssl.HAS_SNI:
1749 self.skipTest("SNI needed for this test")
1750 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1751 remote = ("sha256.tbs-internet.com", 443)
1752 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
1753 with support.transient_internet("sha256.tbs-internet.com"):
1754 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1755 ctx.verify_mode = ssl.CERT_REQUIRED
1756 ctx.load_verify_locations(sha256_cert)
1757 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1758 server_hostname="sha256.tbs-internet.com")
1759 try:
1760 s.connect(remote)
1761 if support.verbose:
1762 sys.stdout.write("\nCipher with %r is %r\n" %
1763 (remote, s.cipher()))
1764 sys.stdout.write("Certificate is:\n%s\n" %
1765 pprint.pformat(s.getpeercert()))
1766 finally:
1767 s.close()
1768
1769
1770def _test_get_server_certificate(test, host, port, cert=None):
1771 pem = ssl.get_server_certificate((host, port))
1772 if not pem:
1773 test.fail("No server certificate on %s:%s!" % (host, port))
1774
1775 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1776 if not pem:
1777 test.fail("No server certificate on %s:%s!" % (host, port))
1778 if support.verbose:
1779 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1780
1781def _test_get_server_certificate_fail(test, host, port):
1782 try:
1783 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1784 except ssl.SSLError as x:
1785 #should fail
1786 if support.verbose:
1787 sys.stdout.write("%s\n" % x)
1788 else:
1789 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1790
1791
1792if _have_threads:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001793 from test.ssl_servers import make_https_server
1794
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001795 class ThreadedEchoServer(threading.Thread):
1796
1797 class ConnectionHandler(threading.Thread):
1798
1799 """A mildly complicated class, because we want it to work both
1800 with and without the SSL wrapper around the socket connection, so
1801 that we can test the STARTTLS functionality."""
1802
Bill Janssen6e027db2007-11-15 22:23:56 +00001803 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001804 self.server = server
1805 self.running = False
1806 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001807 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001808 self.sock.setblocking(1)
1809 self.sslconn = None
1810 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001811 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001812
Antoine Pitrou480a1242010-04-28 21:37:09 +00001813 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001814 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001815 self.sslconn = self.server.context.wrap_socket(
1816 self.sock, server_side=True)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001817 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1818 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001819 except (ssl.SSLError, ConnectionResetError) as e:
1820 # We treat ConnectionResetError as though it were an
1821 # SSLError - OpenSSL on Ubuntu abruptly closes the
1822 # connection when asked to use an unsupported protocol.
1823 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001824 # XXX Various errors can have happened here, for example
1825 # a mismatching protocol version, an invalid certificate,
1826 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001827 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001828 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001829 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001830 self.running = False
1831 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001832 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001833 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001834 else:
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001835 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
Antoine Pitroub5218772010-05-21 09:56:06 +00001836 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001837 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001838 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001839 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1840 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001841 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001842 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1843 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001844 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001845 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001846 sys.stdout.write(" server: selected protocol is now "
1847 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001848 return True
1849
1850 def read(self):
1851 if self.sslconn:
1852 return self.sslconn.read()
1853 else:
1854 return self.sock.recv(1024)
1855
1856 def write(self, bytes):
1857 if self.sslconn:
1858 return self.sslconn.write(bytes)
1859 else:
1860 return self.sock.send(bytes)
1861
1862 def close(self):
1863 if self.sslconn:
1864 self.sslconn.close()
1865 else:
1866 self.sock.close()
1867
Antoine Pitrou480a1242010-04-28 21:37:09 +00001868 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001869 self.running = True
1870 if not self.server.starttls_server:
1871 if not self.wrap_conn():
1872 return
1873 while self.running:
1874 try:
1875 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001876 stripped = msg.strip()
1877 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001878 # eof, so quit this handler
1879 self.running = False
Martin Panter3840b2a2016-03-27 01:53:46 +00001880 try:
1881 self.sock = self.sslconn.unwrap()
1882 except OSError:
1883 # Many tests shut the TCP connection down
1884 # without an SSL shutdown. This causes
1885 # unwrap() to raise OSError with errno=0!
1886 pass
1887 else:
1888 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001889 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001890 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001891 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001892 sys.stdout.write(" server: client closed connection\n")
1893 self.close()
1894 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001895 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001896 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001897 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001898 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001899 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001900 if not self.wrap_conn():
1901 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001902 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001903 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001904 if support.verbose and self.server.connectionchatty:
1905 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001906 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001907 self.sock = self.sslconn.unwrap()
1908 self.sslconn = None
1909 if support.verbose and self.server.connectionchatty:
1910 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001911 elif stripped == b'CB tls-unique':
1912 if support.verbose and self.server.connectionchatty:
1913 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1914 data = self.sslconn.get_channel_binding("tls-unique")
1915 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001916 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001917 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001918 self.server.connectionchatty):
1919 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001920 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1921 % (msg, ctype, msg.lower(), ctype))
1922 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001923 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001924 if self.server.chatty:
1925 handle_error("Test server failure:\n")
1926 self.close()
1927 self.running = False
1928 # normally, we'd just stop here, but for the test
1929 # harness, we want to stop the server
1930 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001931
Antoine Pitroub5218772010-05-21 09:56:06 +00001932 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001933 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001934 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersoncca27322015-01-23 16:35:37 -05001935 npn_protocols=None, alpn_protocols=None,
1936 ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001937 if context:
1938 self.context = context
1939 else:
1940 self.context = ssl.SSLContext(ssl_version
1941 if ssl_version is not None
1942 else ssl.PROTOCOL_TLSv1)
1943 self.context.verify_mode = (certreqs if certreqs is not None
1944 else ssl.CERT_NONE)
1945 if cacerts:
1946 self.context.load_verify_locations(cacerts)
1947 if certificate:
1948 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001949 if npn_protocols:
1950 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001951 if alpn_protocols:
1952 self.context.set_alpn_protocols(alpn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001953 if ciphers:
1954 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001955 self.chatty = chatty
1956 self.connectionchatty = connectionchatty
1957 self.starttls_server = starttls_server
1958 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001959 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001960 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001961 self.active = False
Benjamin Petersoncca27322015-01-23 16:35:37 -05001962 self.selected_npn_protocols = []
1963 self.selected_alpn_protocols = []
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001964 self.shared_ciphers = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001965 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001966 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001967 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001968
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001969 def __enter__(self):
1970 self.start(threading.Event())
1971 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001972 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001973
1974 def __exit__(self, *args):
1975 self.stop()
1976 self.join()
1977
Antoine Pitrou480a1242010-04-28 21:37:09 +00001978 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001979 self.flag = flag
1980 threading.Thread.start(self)
1981
Antoine Pitrou480a1242010-04-28 21:37:09 +00001982 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001983 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01001984 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001985 self.active = True
1986 if self.flag:
1987 # signal an event
1988 self.flag.set()
1989 while self.active:
1990 try:
1991 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001992 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001993 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001994 + repr(connaddr) + '\n')
1995 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001996 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001997 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001998 except socket.timeout:
1999 pass
2000 except KeyboardInterrupt:
2001 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00002002 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002003
Antoine Pitrou480a1242010-04-28 21:37:09 +00002004 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002005 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002006
Bill Janssen54cc54c2007-12-14 22:08:56 +00002007 class AsyncoreEchoServer(threading.Thread):
2008
2009 # this one's based on asyncore.dispatcher
2010
2011 class EchoServer (asyncore.dispatcher):
2012
2013 class ConnectionHandler (asyncore.dispatcher_with_send):
2014
2015 def __init__(self, conn, certfile):
2016 self.socket = ssl.wrap_socket(conn, server_side=True,
2017 certfile=certfile,
2018 do_handshake_on_connect=False)
2019 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002020 self._ssl_accepting = True
2021 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002022
2023 def readable(self):
2024 if isinstance(self.socket, ssl.SSLSocket):
2025 while self.socket.pending() > 0:
2026 self.handle_read_event()
2027 return True
2028
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002029 def _do_ssl_handshake(self):
2030 try:
2031 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02002032 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2033 return
2034 except ssl.SSLEOFError:
2035 return self.handle_close()
2036 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002037 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002038 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002039 if err.args[0] == errno.ECONNABORTED:
2040 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002041 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002042 self._ssl_accepting = False
2043
2044 def handle_read(self):
2045 if self._ssl_accepting:
2046 self._do_ssl_handshake()
2047 else:
2048 data = self.recv(1024)
2049 if support.verbose:
2050 sys.stdout.write(" server: read %s from client\n" % repr(data))
2051 if not data:
2052 self.close()
2053 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002054 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002055
2056 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00002057 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002058 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002059 sys.stdout.write(" server: closed connection %s\n" % self.socket)
2060
2061 def handle_error(self):
2062 raise
2063
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002064 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002065 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002066 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2067 self.port = support.bind_port(sock, '')
2068 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002069 self.listen(5)
2070
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00002071 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002072 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002073 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2074 self.ConnectionHandler(sock_obj, self.certfile)
2075
2076 def handle_error(self):
2077 raise
2078
Trent Nelson78520002008-04-10 20:54:35 +00002079 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002080 self.flag = None
2081 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002082 self.server = self.EchoServer(certfile)
2083 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00002084 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002085 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002086
2087 def __str__(self):
2088 return "<%s %s>" % (self.__class__.__name__, self.server)
2089
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002090 def __enter__(self):
2091 self.start(threading.Event())
2092 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002093 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002094
2095 def __exit__(self, *args):
2096 if support.verbose:
2097 sys.stdout.write(" cleanup: stopping server.\n")
2098 self.stop()
2099 if support.verbose:
2100 sys.stdout.write(" cleanup: joining server thread.\n")
2101 self.join()
2102 if support.verbose:
2103 sys.stdout.write(" cleanup: successfully joined.\n")
2104
Bill Janssen54cc54c2007-12-14 22:08:56 +00002105 def start (self, flag=None):
2106 self.flag = flag
2107 threading.Thread.start(self)
2108
Antoine Pitrou480a1242010-04-28 21:37:09 +00002109 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002110 self.active = True
2111 if self.flag:
2112 self.flag.set()
2113 while self.active:
2114 try:
2115 asyncore.loop(1)
2116 except:
2117 pass
2118
Antoine Pitrou480a1242010-04-28 21:37:09 +00002119 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002120 self.active = False
2121 self.server.close()
2122
Antoine Pitroub5218772010-05-21 09:56:06 +00002123 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002124 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00002125 """
2126 Launch a server, connect a client to it and try various reads
2127 and writes.
2128 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002129 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00002130 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002131 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00002132 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002133 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002134 with client_context.wrap_socket(socket.socket(),
2135 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002136 s.connect((HOST, server.port))
2137 for arg in [indata, bytearray(indata), memoryview(indata)]:
2138 if connectionchatty:
2139 if support.verbose:
2140 sys.stdout.write(
2141 " client: sending %r...\n" % indata)
2142 s.write(arg)
2143 outdata = s.read()
2144 if connectionchatty:
2145 if support.verbose:
2146 sys.stdout.write(" client: read %r\n" % outdata)
2147 if outdata != indata.lower():
2148 raise AssertionError(
2149 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2150 % (outdata[:20], len(outdata),
2151 indata[:20].lower(), len(indata)))
2152 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00002153 if connectionchatty:
2154 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002155 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002156 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01002157 'compression': s.compression(),
2158 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002159 'peercert': s.getpeercert(),
Benjamin Petersoncca27322015-01-23 16:35:37 -05002160 'client_alpn_protocol': s.selected_alpn_protocol(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02002161 'client_npn_protocol': s.selected_npn_protocol(),
2162 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002163 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01002164 s.close()
Benjamin Petersoncca27322015-01-23 16:35:37 -05002165 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2166 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Peterson4cb17812015-01-07 11:14:26 -06002167 stats['server_shared_ciphers'] = server.shared_ciphers
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002168 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00002169
Antoine Pitroub5218772010-05-21 09:56:06 +00002170 def try_protocol_combo(server_protocol, client_protocol, expect_success,
2171 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02002172 """
2173 Try to SSL-connect using *client_protocol* to *server_protocol*.
2174 If *expect_success* is true, assert that the connection succeeds,
2175 if it's false, assert that the connection fails.
2176 Also, if *expect_success* is a string, assert that it is the protocol
2177 version actually used by the connection.
2178 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00002179 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002180 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00002181 certtype = {
2182 ssl.CERT_NONE: "CERT_NONE",
2183 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2184 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2185 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002186 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002187 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002188 sys.stdout.write(formatstr %
2189 (ssl.get_protocol_name(client_protocol),
2190 ssl.get_protocol_name(server_protocol),
2191 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00002192 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002193 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00002194 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002195 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002196
2197 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2198 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2199 # starting from OpenSSL 1.0.0 (see issue #8322).
2200 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2201 client_context.set_ciphers("ALL")
2202
Antoine Pitroub5218772010-05-21 09:56:06 +00002203 for ctx in (client_context, server_context):
2204 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00002205 ctx.load_cert_chain(CERTFILE)
2206 ctx.load_verify_locations(CERTFILE)
2207 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02002208 stats = server_params_test(client_context, server_context,
2209 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002210 # Protocol mismatch can result in either an SSLError, or a
2211 # "Connection reset by peer" error.
2212 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002213 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002214 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002215 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002216 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002217 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002218 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002219 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002220 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002221 "Client protocol %s succeeded with server protocol %s!"
2222 % (ssl.get_protocol_name(client_protocol),
2223 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002224 elif (expect_success is not True
2225 and expect_success != stats['version']):
2226 raise AssertionError("version mismatch: expected %r, got %r"
2227 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002228
2229
Bill Janssen6e027db2007-11-15 22:23:56 +00002230 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002231
Antoine Pitrou23df4832010-08-04 17:14:06 +00002232 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002233 def test_echo(self):
2234 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002235 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002236 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002237 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002238 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2239 context = ssl.SSLContext(protocol)
2240 context.load_cert_chain(CERTFILE)
2241 server_params_test(context, context,
2242 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002243
Antoine Pitrou480a1242010-04-28 21:37:09 +00002244 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002245 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002246 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002247 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2248 context.verify_mode = ssl.CERT_REQUIRED
2249 context.load_verify_locations(CERTFILE)
2250 context.load_cert_chain(CERTFILE)
2251 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002252 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002253 s = context.wrap_socket(socket.socket(),
2254 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002255 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002256 # getpeercert() raise ValueError while the handshake isn't
2257 # done.
2258 with self.assertRaises(ValueError):
2259 s.getpeercert()
2260 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002261 cert = s.getpeercert()
2262 self.assertTrue(cert, "Can't get peer certificate.")
2263 cipher = s.cipher()
2264 if support.verbose:
2265 sys.stdout.write(pprint.pformat(cert) + '\n')
2266 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2267 if 'subject' not in cert:
2268 self.fail("No subject field in certificate: %s." %
2269 pprint.pformat(cert))
2270 if ((('organizationName', 'Python Software Foundation'),)
2271 not in cert['subject']):
2272 self.fail(
2273 "Missing or invalid 'organizationName' field in certificate subject; "
2274 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002275 self.assertIn('notBefore', cert)
2276 self.assertIn('notAfter', cert)
2277 before = ssl.cert_time_to_seconds(cert['notBefore'])
2278 after = ssl.cert_time_to_seconds(cert['notAfter'])
2279 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002280 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002281
Christian Heimes2427b502013-11-23 11:24:32 +01002282 @unittest.skipUnless(have_verify_flags(),
2283 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002284 def test_crl_check(self):
2285 if support.verbose:
2286 sys.stdout.write("\n")
2287
2288 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2289 server_context.load_cert_chain(SIGNED_CERTFILE)
2290
2291 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2292 context.verify_mode = ssl.CERT_REQUIRED
2293 context.load_verify_locations(SIGNING_CA)
Benjamin Petersonc3d9c5c2015-03-04 23:18:48 -05002294 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2295 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01002296
2297 # VERIFY_DEFAULT should pass
2298 server = ThreadedEchoServer(context=server_context, chatty=True)
2299 with server:
2300 with context.wrap_socket(socket.socket()) as s:
2301 s.connect((HOST, server.port))
2302 cert = s.getpeercert()
2303 self.assertTrue(cert, "Can't get peer certificate.")
2304
2305 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002306 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002307
2308 server = ThreadedEchoServer(context=server_context, chatty=True)
2309 with server:
2310 with context.wrap_socket(socket.socket()) as s:
2311 with self.assertRaisesRegex(ssl.SSLError,
2312 "certificate verify failed"):
2313 s.connect((HOST, server.port))
2314
2315 # now load a CRL file. The CRL file is signed by the CA.
2316 context.load_verify_locations(CRLFILE)
2317
2318 server = ThreadedEchoServer(context=server_context, chatty=True)
2319 with server:
2320 with context.wrap_socket(socket.socket()) as s:
2321 s.connect((HOST, server.port))
2322 cert = s.getpeercert()
2323 self.assertTrue(cert, "Can't get peer certificate.")
2324
Christian Heimes1aa9a752013-12-02 02:41:19 +01002325 def test_check_hostname(self):
2326 if support.verbose:
2327 sys.stdout.write("\n")
2328
2329 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2330 server_context.load_cert_chain(SIGNED_CERTFILE)
2331
2332 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2333 context.verify_mode = ssl.CERT_REQUIRED
2334 context.check_hostname = True
2335 context.load_verify_locations(SIGNING_CA)
2336
2337 # correct hostname should verify
2338 server = ThreadedEchoServer(context=server_context, chatty=True)
2339 with server:
2340 with context.wrap_socket(socket.socket(),
2341 server_hostname="localhost") as s:
2342 s.connect((HOST, server.port))
2343 cert = s.getpeercert()
2344 self.assertTrue(cert, "Can't get peer certificate.")
2345
2346 # incorrect hostname should raise an exception
2347 server = ThreadedEchoServer(context=server_context, chatty=True)
2348 with server:
2349 with context.wrap_socket(socket.socket(),
2350 server_hostname="invalid") as s:
2351 with self.assertRaisesRegex(ssl.CertificateError,
2352 "hostname 'invalid' doesn't match 'localhost'"):
2353 s.connect((HOST, server.port))
2354
2355 # missing server_hostname arg should cause an exception, too
2356 server = ThreadedEchoServer(context=server_context, chatty=True)
2357 with server:
2358 with socket.socket() as s:
2359 with self.assertRaisesRegex(ValueError,
2360 "check_hostname requires server_hostname"):
2361 context.wrap_socket(s)
2362
Martin Panter407b62f2016-01-30 03:41:43 +00002363 def test_wrong_cert(self):
Martin Panter3464ea22016-02-01 21:58:11 +00002364 """Connecting when the server rejects the client's certificate
2365
2366 Launch a server with CERT_REQUIRED, and check that trying to
2367 connect to it with a wrong client certificate fails.
2368 """
2369 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2370 "wrongcert.pem")
2371 server = ThreadedEchoServer(CERTFILE,
2372 certreqs=ssl.CERT_REQUIRED,
2373 cacerts=CERTFILE, chatty=False,
2374 connectionchatty=False)
2375 with server, \
2376 socket.socket() as sock, \
2377 ssl.wrap_socket(sock,
2378 certfile=certfile,
2379 ssl_version=ssl.PROTOCOL_TLSv1) as s:
2380 try:
2381 # Expect either an SSL error about the server rejecting
2382 # the connection, or a low-level connection reset (which
2383 # sometimes happens on Windows)
2384 s.connect((HOST, server.port))
2385 except ssl.SSLError as e:
2386 if support.verbose:
2387 sys.stdout.write("\nSSLError is %r\n" % e)
2388 except OSError as e:
2389 if e.errno != errno.ECONNRESET:
2390 raise
2391 if support.verbose:
2392 sys.stdout.write("\nsocket.error is %r\n" % e)
2393 else:
2394 self.fail("Use of invalid cert should have failed!")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002395
Antoine Pitrou480a1242010-04-28 21:37:09 +00002396 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002397 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002398 in the client when attempting handshake.
2399 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002400 listener_ready = threading.Event()
2401 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002402
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002403 s = socket.socket()
2404 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002405
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002406 # `listener` runs in a thread. It sits in an accept() until
2407 # the main thread connects. Then it rudely closes the socket,
2408 # and sets Event `listener_gone` to let the main thread know
2409 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002410 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002411 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002412 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002413 newsock, addr = s.accept()
2414 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002415 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002416 listener_gone.set()
2417
2418 def connector():
2419 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002420 with socket.socket() as c:
2421 c.connect((HOST, port))
2422 listener_gone.wait()
2423 try:
2424 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002425 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002426 pass
2427 else:
2428 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002429
2430 t = threading.Thread(target=listener)
2431 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002432 try:
2433 connector()
2434 finally:
2435 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002436
Antoine Pitrou23df4832010-08-04 17:14:06 +00002437 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002438 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2439 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002440 def test_protocol_sslv2(self):
2441 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002442 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002443 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002444 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2445 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2446 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002447 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Victor Stinner648b8622014-12-12 12:23:59 +01002448 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2449 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002450 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002451 # SSLv23 client with specific SSL options
2452 if no_sslv2_implies_sslv3_hello():
2453 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2454 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2455 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002456 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002457 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002458 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002459 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002460
Antoine Pitrou23df4832010-08-04 17:14:06 +00002461 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002462 def test_protocol_sslv23(self):
2463 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002464 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002465 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002466 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2467 try:
2468 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002469 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002470 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2471 if support.verbose:
2472 sys.stdout.write(
2473 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2474 % str(x))
Benjamin Petersone32467c2014-12-05 21:59:35 -05002475 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002476 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002477 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002478 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002479
Benjamin Petersone32467c2014-12-05 21:59:35 -05002480 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002481 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002482 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002483 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002484
Benjamin Petersone32467c2014-12-05 21:59:35 -05002485 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002486 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002487 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002488 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002489
Antoine Pitroub5218772010-05-21 09:56:06 +00002490 # Server with specific SSL options
Benjamin Petersone32467c2014-12-05 21:59:35 -05002491 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2492 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002493 server_options=ssl.OP_NO_SSLv3)
2494 # Will choose TLSv1
2495 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2496 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2497 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2498 server_options=ssl.OP_NO_TLSv1)
2499
2500
Antoine Pitrou23df4832010-08-04 17:14:06 +00002501 @skip_if_broken_ubuntu_ssl
Benjamin Petersone32467c2014-12-05 21:59:35 -05002502 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2503 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002504 def test_protocol_sslv3(self):
2505 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002506 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002507 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002508 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2509 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2510 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002511 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2512 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002513 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2514 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002515 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002516 if no_sslv2_implies_sslv3_hello():
2517 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002518 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2519 False, client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002520
Antoine Pitrou23df4832010-08-04 17:14:06 +00002521 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002522 def test_protocol_tlsv1(self):
2523 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002524 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002525 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002526 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2527 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2528 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002529 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2530 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002531 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2532 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002533 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2534 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002535
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002536 @skip_if_broken_ubuntu_ssl
2537 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2538 "TLS version 1.1 not supported.")
2539 def test_protocol_tlsv1_1(self):
2540 """Connecting to a TLSv1.1 server with various client options.
2541 Testing against older TLS versions."""
2542 if support.verbose:
2543 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002544 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002545 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2546 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002547 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2548 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002549 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2550 client_options=ssl.OP_NO_TLSv1_1)
2551
Antoine Pitrou47e40422014-09-04 21:00:10 +02002552 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002553 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2554 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2555
2556
2557 @skip_if_broken_ubuntu_ssl
2558 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2559 "TLS version 1.2 not supported.")
2560 def test_protocol_tlsv1_2(self):
2561 """Connecting to a TLSv1.2 server with various client options.
2562 Testing against older TLS versions."""
2563 if support.verbose:
2564 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002565 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002566 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2567 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2568 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2569 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002570 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2571 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002572 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2573 client_options=ssl.OP_NO_TLSv1_2)
2574
Antoine Pitrou47e40422014-09-04 21:00:10 +02002575 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002576 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2577 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2578 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2579 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2580
Antoine Pitrou480a1242010-04-28 21:37:09 +00002581 def test_starttls(self):
2582 """Switching from clear text to encrypted and back again."""
2583 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002584
Trent Nelson78520002008-04-10 20:54:35 +00002585 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002586 ssl_version=ssl.PROTOCOL_TLSv1,
2587 starttls_server=True,
2588 chatty=True,
2589 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002590 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002591 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002592 s = socket.socket()
2593 s.setblocking(1)
2594 s.connect((HOST, server.port))
2595 if support.verbose:
2596 sys.stdout.write("\n")
2597 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002598 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002599 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002600 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002601 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002602 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002603 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002604 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002605 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002606 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002607 msg = outdata.strip().lower()
2608 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2609 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002610 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002611 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002612 " client: read %r from server, starting TLS...\n"
2613 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002614 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2615 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002616 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2617 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002618 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002619 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002620 " client: read %r from server, ending TLS...\n"
2621 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002622 s = conn.unwrap()
2623 wrapped = False
2624 else:
2625 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002626 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002627 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002628 if support.verbose:
2629 sys.stdout.write(" client: closing connection.\n")
2630 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002631 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002632 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002633 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002634 if wrapped:
2635 conn.close()
2636 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002637 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002638
Antoine Pitrou480a1242010-04-28 21:37:09 +00002639 def test_socketserver(self):
2640 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002641 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002642 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002643 if support.verbose:
2644 sys.stdout.write('\n')
2645 with open(CERTFILE, 'rb') as f:
2646 d1 = f.read()
2647 d2 = ''
2648 # now fetch the same data from the HTTPS server
Benjamin Peterson4ffb0752014-11-03 14:29:33 -05002649 url = 'https://localhost:%d/%s' % (
2650 server.port, os.path.split(CERTFILE)[1])
2651 context = ssl.create_default_context(cafile=CERTFILE)
2652 f = urllib.request.urlopen(url, context=context)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002653 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002654 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002655 if dlen and (int(dlen) > 0):
2656 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002657 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002658 sys.stdout.write(
2659 " client: read %d bytes from remote server '%s'\n"
2660 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002661 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002662 f.close()
2663 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002664
Antoine Pitrou480a1242010-04-28 21:37:09 +00002665 def test_asyncore_server(self):
2666 """Check the example asyncore integration."""
2667 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002668
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002669 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002670 sys.stdout.write("\n")
2671
Antoine Pitrou480a1242010-04-28 21:37:09 +00002672 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002673 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002674 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002675 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002676 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002677 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002678 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002679 " client: sending %r...\n" % indata)
2680 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002681 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002682 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002683 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002684 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002685 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002686 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2687 % (outdata[:20], len(outdata),
2688 indata[:20].lower(), len(indata)))
2689 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002690 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002691 sys.stdout.write(" client: closing connection.\n")
2692 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002693 if support.verbose:
2694 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002695
Antoine Pitrou480a1242010-04-28 21:37:09 +00002696 def test_recv_send(self):
2697 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002698 if support.verbose:
2699 sys.stdout.write("\n")
2700
2701 server = ThreadedEchoServer(CERTFILE,
2702 certreqs=ssl.CERT_NONE,
2703 ssl_version=ssl.PROTOCOL_TLSv1,
2704 cacerts=CERTFILE,
2705 chatty=True,
2706 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002707 with server:
2708 s = ssl.wrap_socket(socket.socket(),
2709 server_side=False,
2710 certfile=CERTFILE,
2711 ca_certs=CERTFILE,
2712 cert_reqs=ssl.CERT_NONE,
2713 ssl_version=ssl.PROTOCOL_TLSv1)
2714 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002715 # helper methods for standardising recv* method signatures
2716 def _recv_into():
2717 b = bytearray(b"\0"*100)
2718 count = s.recv_into(b)
2719 return b[:count]
2720
2721 def _recvfrom_into():
2722 b = bytearray(b"\0"*100)
2723 count, addr = s.recvfrom_into(b)
2724 return b[:count]
2725
Martin Panter519f9122016-04-03 02:12:54 +00002726 # (name, method, expect success?, *args, return value func)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002727 send_methods = [
Martin Panter519f9122016-04-03 02:12:54 +00002728 ('send', s.send, True, [], len),
2729 ('sendto', s.sendto, False, ["some.address"], len),
2730 ('sendall', s.sendall, True, [], lambda x: None),
Bill Janssen58afe4c2008-09-08 16:45:19 +00002731 ]
Martin Panter519f9122016-04-03 02:12:54 +00002732 # (name, method, whether to expect success, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002733 recv_methods = [
2734 ('recv', s.recv, True, []),
2735 ('recvfrom', s.recvfrom, False, ["some.address"]),
2736 ('recv_into', _recv_into, True, []),
2737 ('recvfrom_into', _recvfrom_into, False, []),
2738 ]
2739 data_prefix = "PREFIX_"
2740
Martin Panter519f9122016-04-03 02:12:54 +00002741 for (meth_name, send_meth, expect_success, args,
2742 ret_val_meth) in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002743 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002744 try:
Martin Panter519f9122016-04-03 02:12:54 +00002745 ret = send_meth(indata, *args)
2746 msg = "sending with {}".format(meth_name)
2747 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002748 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002749 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002750 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002751 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002752 "<<{outdata:r}>> ({nout:d}) received; "
2753 "expected <<{indata:r}>> ({nin:d})\n".format(
2754 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002755 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002756 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002757 )
2758 )
2759 except ValueError as e:
2760 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002761 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002762 "Failed to send with method <<{name:s}>>; "
2763 "expected to succeed.\n".format(name=meth_name)
2764 )
2765 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002766 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002767 "Method <<{name:s}>> failed with unexpected "
2768 "exception message: {exp:s}\n".format(
2769 name=meth_name, exp=e
2770 )
2771 )
2772
2773 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002774 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002775 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002776 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002777 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002778 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002779 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002780 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002781 "<<{outdata:r}>> ({nout:d}) received; "
2782 "expected <<{indata:r}>> ({nin:d})\n".format(
2783 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002784 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002785 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002786 )
2787 )
2788 except ValueError as e:
2789 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002790 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002791 "Failed to receive with method <<{name:s}>>; "
2792 "expected to succeed.\n".format(name=meth_name)
2793 )
2794 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002795 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002796 "Method <<{name:s}>> failed with unexpected "
2797 "exception message: {exp:s}\n".format(
2798 name=meth_name, exp=e
2799 )
2800 )
2801 # consume data
2802 s.read()
2803
Martin Panterf6b1d662016-03-28 00:22:09 +00002804 # read(-1, buffer) is supported, even though read(-1) is not
Martin Panterbed7f1a2016-07-11 00:17:13 +00002805 data = b"data"
Martin Panter5503d472016-03-27 05:35:19 +00002806 s.send(data)
2807 buffer = bytearray(len(data))
2808 self.assertEqual(s.read(-1, buffer), len(data))
2809 self.assertEqual(buffer, data)
2810
Nick Coghlan513886a2011-08-28 00:00:27 +10002811 # Make sure sendmsg et al are disallowed to avoid
2812 # inadvertent disclosure of data and/or corruption
2813 # of the encrypted data stream
2814 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2815 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2816 self.assertRaises(NotImplementedError,
2817 s.recvmsg_into, bytearray(100))
2818
Antoine Pitrou480a1242010-04-28 21:37:09 +00002819 s.write(b"over\n")
Martin Panter5503d472016-03-27 05:35:19 +00002820
2821 self.assertRaises(ValueError, s.recv, -1)
2822 self.assertRaises(ValueError, s.read, -1)
2823
Bill Janssen58afe4c2008-09-08 16:45:19 +00002824 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002825
Martin Panterbed7f1a2016-07-11 00:17:13 +00002826 def test_recv_zero(self):
2827 server = ThreadedEchoServer(CERTFILE)
2828 server.__enter__()
2829 self.addCleanup(server.__exit__, None, None)
2830 s = socket.create_connection((HOST, server.port))
2831 self.addCleanup(s.close)
2832 s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
2833 self.addCleanup(s.close)
2834
2835 # recv/read(0) should return no data
2836 s.send(b"data")
2837 self.assertEqual(s.recv(0), b"")
2838 self.assertEqual(s.read(0), b"")
2839 self.assertEqual(s.read(), b"data")
2840
2841 # Should not block if the other end sends no data
2842 s.setblocking(False)
2843 self.assertEqual(s.recv(0), b"")
2844 self.assertEqual(s.recv_into(bytearray()), 0)
2845
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002846 def test_nonblocking_send(self):
2847 server = ThreadedEchoServer(CERTFILE,
2848 certreqs=ssl.CERT_NONE,
2849 ssl_version=ssl.PROTOCOL_TLSv1,
2850 cacerts=CERTFILE,
2851 chatty=True,
2852 connectionchatty=False)
2853 with server:
2854 s = ssl.wrap_socket(socket.socket(),
2855 server_side=False,
2856 certfile=CERTFILE,
2857 ca_certs=CERTFILE,
2858 cert_reqs=ssl.CERT_NONE,
2859 ssl_version=ssl.PROTOCOL_TLSv1)
2860 s.connect((HOST, server.port))
2861 s.setblocking(False)
2862
2863 # If we keep sending data, at some point the buffers
2864 # will be full and the call will block
2865 buf = bytearray(8192)
2866 def fill_buffer():
2867 while True:
2868 s.send(buf)
2869 self.assertRaises((ssl.SSLWantWriteError,
2870 ssl.SSLWantReadError), fill_buffer)
2871
2872 # Now read all the output and discard it
2873 s.setblocking(True)
2874 s.close()
2875
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002876 def test_handshake_timeout(self):
2877 # Issue #5103: SSL handshake must respect the socket timeout
2878 server = socket.socket(socket.AF_INET)
2879 host = "127.0.0.1"
2880 port = support.bind_port(server)
2881 started = threading.Event()
2882 finish = False
2883
2884 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002885 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002886 started.set()
2887 conns = []
2888 while not finish:
2889 r, w, e = select.select([server], [], [], 0.1)
2890 if server in r:
2891 # Let the socket hang around rather than having
2892 # it closed by garbage collection.
2893 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002894 for sock in conns:
2895 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002896
2897 t = threading.Thread(target=serve)
2898 t.start()
2899 started.wait()
2900
2901 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002902 try:
2903 c = socket.socket(socket.AF_INET)
2904 c.settimeout(0.2)
2905 c.connect((host, port))
2906 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002907 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002908 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002909 finally:
2910 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002911 try:
2912 c = socket.socket(socket.AF_INET)
2913 c = ssl.wrap_socket(c)
2914 c.settimeout(0.2)
2915 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002916 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002917 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002918 finally:
2919 c.close()
2920 finally:
2921 finish = True
2922 t.join()
2923 server.close()
2924
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002925 def test_server_accept(self):
2926 # Issue #16357: accept() on a SSLSocket created through
2927 # SSLContext.wrap_socket().
2928 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2929 context.verify_mode = ssl.CERT_REQUIRED
2930 context.load_verify_locations(CERTFILE)
2931 context.load_cert_chain(CERTFILE)
2932 server = socket.socket(socket.AF_INET)
2933 host = "127.0.0.1"
2934 port = support.bind_port(server)
2935 server = context.wrap_socket(server, server_side=True)
2936
2937 evt = threading.Event()
2938 remote = None
2939 peer = None
2940 def serve():
2941 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002942 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002943 # Block on the accept and wait on the connection to close.
2944 evt.set()
2945 remote, peer = server.accept()
2946 remote.recv(1)
2947
2948 t = threading.Thread(target=serve)
2949 t.start()
2950 # Client wait until server setup and perform a connect.
2951 evt.wait()
2952 client = context.wrap_socket(socket.socket())
2953 client.connect((host, port))
2954 client_addr = client.getsockname()
2955 client.close()
2956 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002957 remote.close()
2958 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002959 # Sanity checks.
2960 self.assertIsInstance(remote, ssl.SSLSocket)
2961 self.assertEqual(peer, client_addr)
2962
Antoine Pitrou242db722013-05-01 20:52:07 +02002963 def test_getpeercert_enotconn(self):
2964 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2965 with context.wrap_socket(socket.socket()) as sock:
2966 with self.assertRaises(OSError) as cm:
2967 sock.getpeercert()
2968 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2969
2970 def test_do_handshake_enotconn(self):
2971 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2972 with context.wrap_socket(socket.socket()) as sock:
2973 with self.assertRaises(OSError) as cm:
2974 sock.do_handshake()
2975 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2976
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002977 def test_default_ciphers(self):
2978 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2979 try:
2980 # Force a set of weak ciphers on our client context
2981 context.set_ciphers("DES")
2982 except ssl.SSLError:
2983 self.skipTest("no DES cipher available")
2984 with ThreadedEchoServer(CERTFILE,
2985 ssl_version=ssl.PROTOCOL_SSLv23,
2986 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002987 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002988 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002989 s.connect((HOST, server.port))
2990 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2991
Antoine Pitrou47e40422014-09-04 21:00:10 +02002992 def test_version_basic(self):
2993 """
2994 Basic tests for SSLSocket.version().
2995 More tests are done in the test_protocol_*() methods.
2996 """
2997 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2998 with ThreadedEchoServer(CERTFILE,
2999 ssl_version=ssl.PROTOCOL_TLSv1,
3000 chatty=False) as server:
3001 with context.wrap_socket(socket.socket()) as s:
3002 self.assertIs(s.version(), None)
3003 s.connect((HOST, server.port))
Christian Heimes598894f2016-09-05 23:19:05 +02003004 self.assertEqual(s.version(), 'TLSv1')
Antoine Pitrou47e40422014-09-04 21:00:10 +02003005 self.assertIs(s.version(), None)
3006
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01003007 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3008 def test_default_ecdh_curve(self):
3009 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3010 # should be enabled by default on SSL contexts.
3011 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3012 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02003013 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3014 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3015 # our default cipher list should prefer ECDH-based ciphers
3016 # automatically.
3017 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3018 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01003019 with ThreadedEchoServer(context=context) as server:
3020 with context.wrap_socket(socket.socket()) as s:
3021 s.connect((HOST, server.port))
3022 self.assertIn("ECDH", s.cipher()[0])
3023
Antoine Pitroud6494802011-07-21 01:11:30 +02003024 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3025 "'tls-unique' channel binding not available")
3026 def test_tls_unique_channel_binding(self):
3027 """Test tls-unique channel binding."""
3028 if support.verbose:
3029 sys.stdout.write("\n")
3030
3031 server = ThreadedEchoServer(CERTFILE,
3032 certreqs=ssl.CERT_NONE,
3033 ssl_version=ssl.PROTOCOL_TLSv1,
3034 cacerts=CERTFILE,
3035 chatty=True,
3036 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01003037 with server:
3038 s = ssl.wrap_socket(socket.socket(),
3039 server_side=False,
3040 certfile=CERTFILE,
3041 ca_certs=CERTFILE,
3042 cert_reqs=ssl.CERT_NONE,
3043 ssl_version=ssl.PROTOCOL_TLSv1)
3044 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003045 # get the data
3046 cb_data = s.get_channel_binding("tls-unique")
3047 if support.verbose:
3048 sys.stdout.write(" got channel binding data: {0!r}\n"
3049 .format(cb_data))
3050
3051 # check if it is sane
3052 self.assertIsNotNone(cb_data)
3053 self.assertEqual(len(cb_data), 12) # True for TLSv1
3054
3055 # and compare with the peers version
3056 s.write(b"CB tls-unique\n")
3057 peer_data_repr = s.read().strip()
3058 self.assertEqual(peer_data_repr,
3059 repr(cb_data).encode("us-ascii"))
3060 s.close()
3061
3062 # now, again
3063 s = ssl.wrap_socket(socket.socket(),
3064 server_side=False,
3065 certfile=CERTFILE,
3066 ca_certs=CERTFILE,
3067 cert_reqs=ssl.CERT_NONE,
3068 ssl_version=ssl.PROTOCOL_TLSv1)
3069 s.connect((HOST, server.port))
3070 new_cb_data = s.get_channel_binding("tls-unique")
3071 if support.verbose:
3072 sys.stdout.write(" got another channel binding data: {0!r}\n"
3073 .format(new_cb_data))
3074 # is it really unique
3075 self.assertNotEqual(cb_data, new_cb_data)
3076 self.assertIsNotNone(cb_data)
3077 self.assertEqual(len(cb_data), 12) # True for TLSv1
3078 s.write(b"CB tls-unique\n")
3079 peer_data_repr = s.read().strip()
3080 self.assertEqual(peer_data_repr,
3081 repr(new_cb_data).encode("us-ascii"))
3082 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00003083
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003084 def test_compression(self):
3085 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3086 context.load_cert_chain(CERTFILE)
3087 stats = server_params_test(context, context,
3088 chatty=True, connectionchatty=True)
3089 if support.verbose:
3090 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3091 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3092
3093 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3094 "ssl.OP_NO_COMPRESSION needed for this test")
3095 def test_compression_disabled(self):
3096 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3097 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01003098 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003099 stats = server_params_test(context, context,
3100 chatty=True, connectionchatty=True)
3101 self.assertIs(stats['compression'], None)
3102
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003103 def test_dh_params(self):
3104 # Check we can get a connection with ephemeral Diffie-Hellman
3105 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3106 context.load_cert_chain(CERTFILE)
3107 context.load_dh_params(DHFILE)
3108 context.set_ciphers("kEDH")
3109 stats = server_params_test(context, context,
3110 chatty=True, connectionchatty=True)
3111 cipher = stats["cipher"][0]
3112 parts = cipher.split("-")
3113 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3114 self.fail("Non-DH cipher: " + cipher[0])
3115
Benjamin Petersoncca27322015-01-23 16:35:37 -05003116 def test_selected_alpn_protocol(self):
3117 # selected_alpn_protocol() is None unless ALPN is used.
3118 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3119 context.load_cert_chain(CERTFILE)
3120 stats = server_params_test(context, context,
3121 chatty=True, connectionchatty=True)
3122 self.assertIs(stats['client_alpn_protocol'], None)
3123
3124 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3125 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3126 # selected_alpn_protocol() is None unless ALPN is used by the client.
3127 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3128 client_context.load_verify_locations(CERTFILE)
3129 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3130 server_context.load_cert_chain(CERTFILE)
3131 server_context.set_alpn_protocols(['foo', 'bar'])
3132 stats = server_params_test(client_context, server_context,
3133 chatty=True, connectionchatty=True)
3134 self.assertIs(stats['client_alpn_protocol'], None)
3135
3136 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3137 def test_alpn_protocols(self):
3138 server_protocols = ['foo', 'bar', 'milkshake']
3139 protocol_tests = [
3140 (['foo', 'bar'], 'foo'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003141 (['bar', 'foo'], 'foo'),
Benjamin Petersoncca27322015-01-23 16:35:37 -05003142 (['milkshake'], 'milkshake'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003143 (['http/3.0', 'http/4.0'], None)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003144 ]
3145 for client_protocols, expected in protocol_tests:
Christian Heimes598894f2016-09-05 23:19:05 +02003146 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003147 server_context.load_cert_chain(CERTFILE)
3148 server_context.set_alpn_protocols(server_protocols)
Christian Heimes598894f2016-09-05 23:19:05 +02003149 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003150 client_context.load_cert_chain(CERTFILE)
3151 client_context.set_alpn_protocols(client_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003152
Christian Heimes598894f2016-09-05 23:19:05 +02003153 try:
3154 stats = server_params_test(client_context,
3155 server_context,
3156 chatty=True,
3157 connectionchatty=True)
3158 except ssl.SSLError as e:
3159 stats = e
3160
3161 if expected is None and IS_OPENSSL_1_1:
3162 # OpenSSL 1.1.0 raises handshake error
3163 self.assertIsInstance(stats, ssl.SSLError)
3164 else:
3165 msg = "failed trying %s (s) and %s (c).\n" \
3166 "was expecting %s, but got %%s from the %%s" \
3167 % (str(server_protocols), str(client_protocols),
3168 str(expected))
3169 client_result = stats['client_alpn_protocol']
3170 self.assertEqual(client_result, expected,
3171 msg % (client_result, "client"))
3172 server_result = stats['server_alpn_protocols'][-1] \
3173 if len(stats['server_alpn_protocols']) else 'nothing'
3174 self.assertEqual(server_result, expected,
3175 msg % (server_result, "server"))
Benjamin Petersoncca27322015-01-23 16:35:37 -05003176
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01003177 def test_selected_npn_protocol(self):
3178 # selected_npn_protocol() is None unless NPN is used
3179 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3180 context.load_cert_chain(CERTFILE)
3181 stats = server_params_test(context, context,
3182 chatty=True, connectionchatty=True)
3183 self.assertIs(stats['client_npn_protocol'], None)
3184
3185 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3186 def test_npn_protocols(self):
3187 server_protocols = ['http/1.1', 'spdy/2']
3188 protocol_tests = [
3189 (['http/1.1', 'spdy/2'], 'http/1.1'),
3190 (['spdy/2', 'http/1.1'], 'http/1.1'),
3191 (['spdy/2', 'test'], 'spdy/2'),
3192 (['abc', 'def'], 'abc')
3193 ]
3194 for client_protocols, expected in protocol_tests:
3195 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3196 server_context.load_cert_chain(CERTFILE)
3197 server_context.set_npn_protocols(server_protocols)
3198 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3199 client_context.load_cert_chain(CERTFILE)
3200 client_context.set_npn_protocols(client_protocols)
3201 stats = server_params_test(client_context, server_context,
3202 chatty=True, connectionchatty=True)
3203
3204 msg = "failed trying %s (s) and %s (c).\n" \
3205 "was expecting %s, but got %%s from the %%s" \
3206 % (str(server_protocols), str(client_protocols),
3207 str(expected))
3208 client_result = stats['client_npn_protocol']
3209 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3210 server_result = stats['server_npn_protocols'][-1] \
3211 if len(stats['server_npn_protocols']) else 'nothing'
3212 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3213
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003214 def sni_contexts(self):
3215 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3216 server_context.load_cert_chain(SIGNED_CERTFILE)
3217 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3218 other_context.load_cert_chain(SIGNED_CERTFILE2)
3219 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3220 client_context.verify_mode = ssl.CERT_REQUIRED
3221 client_context.load_verify_locations(SIGNING_CA)
3222 return server_context, other_context, client_context
3223
3224 def check_common_name(self, stats, name):
3225 cert = stats['peercert']
3226 self.assertIn((('commonName', name),), cert['subject'])
3227
3228 @needs_sni
3229 def test_sni_callback(self):
3230 calls = []
3231 server_context, other_context, client_context = self.sni_contexts()
3232
3233 def servername_cb(ssl_sock, server_name, initial_context):
3234 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003235 if server_name is not None:
3236 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003237 server_context.set_servername_callback(servername_cb)
3238
3239 stats = server_params_test(client_context, server_context,
3240 chatty=True,
3241 sni_name='supermessage')
3242 # The hostname was fetched properly, and the certificate was
3243 # changed for the connection.
3244 self.assertEqual(calls, [("supermessage", server_context)])
3245 # CERTFILE4 was selected
3246 self.check_common_name(stats, 'fakehostname')
3247
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003248 calls = []
3249 # The callback is called with server_name=None
3250 stats = server_params_test(client_context, server_context,
3251 chatty=True,
3252 sni_name=None)
3253 self.assertEqual(calls, [(None, server_context)])
3254 self.check_common_name(stats, 'localhost')
3255
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003256 # Check disabling the callback
3257 calls = []
3258 server_context.set_servername_callback(None)
3259
3260 stats = server_params_test(client_context, server_context,
3261 chatty=True,
3262 sni_name='notfunny')
3263 # Certificate didn't change
3264 self.check_common_name(stats, 'localhost')
3265 self.assertEqual(calls, [])
3266
3267 @needs_sni
3268 def test_sni_callback_alert(self):
3269 # Returning a TLS alert is reflected to the connecting client
3270 server_context, other_context, client_context = self.sni_contexts()
3271
3272 def cb_returning_alert(ssl_sock, server_name, initial_context):
3273 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3274 server_context.set_servername_callback(cb_returning_alert)
3275
3276 with self.assertRaises(ssl.SSLError) as cm:
3277 stats = server_params_test(client_context, server_context,
3278 chatty=False,
3279 sni_name='supermessage')
3280 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3281
3282 @needs_sni
3283 def test_sni_callback_raising(self):
3284 # Raising fails the connection with a TLS handshake failure alert.
3285 server_context, other_context, client_context = self.sni_contexts()
3286
3287 def cb_raising(ssl_sock, server_name, initial_context):
3288 1/0
3289 server_context.set_servername_callback(cb_raising)
3290
3291 with self.assertRaises(ssl.SSLError) as cm, \
3292 support.captured_stderr() as stderr:
3293 stats = server_params_test(client_context, server_context,
3294 chatty=False,
3295 sni_name='supermessage')
3296 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3297 self.assertIn("ZeroDivisionError", stderr.getvalue())
3298
3299 @needs_sni
3300 def test_sni_callback_wrong_return_type(self):
3301 # Returning the wrong return type terminates the TLS connection
3302 # with an internal error alert.
3303 server_context, other_context, client_context = self.sni_contexts()
3304
3305 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3306 return "foo"
3307 server_context.set_servername_callback(cb_wrong_return_type)
3308
3309 with self.assertRaises(ssl.SSLError) as cm, \
3310 support.captured_stderr() as stderr:
3311 stats = server_params_test(client_context, server_context,
3312 chatty=False,
3313 sni_name='supermessage')
3314 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3315 self.assertIn("TypeError", stderr.getvalue())
3316
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003317 def test_shared_ciphers(self):
Benjamin Petersonaacd5242015-01-07 11:42:38 -06003318 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson15042922015-01-07 22:12:43 -06003319 server_context.load_cert_chain(SIGNED_CERTFILE)
3320 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3321 client_context.verify_mode = ssl.CERT_REQUIRED
3322 client_context.load_verify_locations(SIGNING_CA)
Christian Heimes598894f2016-09-05 23:19:05 +02003323 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3324 client_context.set_ciphers("AES128:AES256")
3325 server_context.set_ciphers("AES256")
3326 alg1 = "AES256"
3327 alg2 = "AES-256"
3328 else:
3329 client_context.set_ciphers("AES:3DES")
3330 server_context.set_ciphers("3DES")
3331 alg1 = "3DES"
3332 alg2 = "DES-CBC3"
3333
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003334 stats = server_params_test(client_context, server_context)
3335 ciphers = stats['server_shared_ciphers'][0]
3336 self.assertGreater(len(ciphers), 0)
3337 for name, tls_version, bits in ciphers:
Christian Heimes598894f2016-09-05 23:19:05 +02003338 if not alg1 in name.split("-") and alg2 not in name:
3339 self.fail(name)
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003340
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003341 def test_read_write_after_close_raises_valuerror(self):
3342 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3343 context.verify_mode = ssl.CERT_REQUIRED
3344 context.load_verify_locations(CERTFILE)
3345 context.load_cert_chain(CERTFILE)
3346 server = ThreadedEchoServer(context=context, chatty=False)
3347
3348 with server:
3349 s = context.wrap_socket(socket.socket())
3350 s.connect((HOST, server.port))
3351 s.close()
3352
3353 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02003354 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003355
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003356 def test_sendfile(self):
3357 TEST_DATA = b"x" * 512
3358 with open(support.TESTFN, 'wb') as f:
3359 f.write(TEST_DATA)
3360 self.addCleanup(support.unlink, support.TESTFN)
3361 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3362 context.verify_mode = ssl.CERT_REQUIRED
3363 context.load_verify_locations(CERTFILE)
3364 context.load_cert_chain(CERTFILE)
3365 server = ThreadedEchoServer(context=context, chatty=False)
3366 with server:
3367 with context.wrap_socket(socket.socket()) as s:
3368 s.connect((HOST, server.port))
3369 with open(support.TESTFN, 'rb') as file:
3370 s.sendfile(file)
3371 self.assertEqual(s.recv(1024), TEST_DATA)
3372
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003373
Thomas Woutersed03b412007-08-28 21:37:11 +00003374def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003375 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003376 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003377 plats = {
3378 'Linux': platform.linux_distribution,
3379 'Mac': platform.mac_ver,
3380 'Windows': platform.win32_ver,
3381 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003382 with warnings.catch_warnings():
3383 warnings.filterwarnings(
3384 'ignore',
3385 'dist\(\) and linux_distribution\(\) '
3386 'functions are deprecated .*',
3387 PendingDeprecationWarning,
3388 )
3389 for name, func in plats.items():
3390 plat = func()
3391 if plat and plat[0]:
3392 plat = '%s %r' % (name, plat)
3393 break
3394 else:
3395 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003396 print("test_ssl: testing with %r %r" %
3397 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3398 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003399 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003400 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3401 try:
3402 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3403 except AttributeError:
3404 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003405
Antoine Pitrou152efa22010-05-16 18:19:27 +00003406 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003407 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003408 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003409 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003410 BADCERT, BADKEY, EMPTYCERT]:
3411 if not os.path.exists(filename):
3412 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003413
Martin Panter3840b2a2016-03-27 01:53:46 +00003414 tests = [
3415 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
3416 SimpleBackgroundTests,
3417 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003418
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003419 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003420 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003421
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003422 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003423 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003424 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003425 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003426
Antoine Pitrou480a1242010-04-28 21:37:09 +00003427 try:
3428 support.run_unittest(*tests)
3429 finally:
3430 if _have_threads:
3431 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003432
3433if __name__ == "__main__":
3434 test_main()