blob: f72fb15cca4227d9405d5ece1245eb1451ce43c4 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Antoine Pitrou242db722013-05-01 20:52:07 +020021from unittest import mock
Thomas Woutersed03b412007-08-28 21:37:11 +000022
Antoine Pitrou05d936d2010-10-13 11:38:36 +000023ssl = support.import_module("ssl")
24
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010025PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000026HOST = support.HOST
Antoine Pitrou152efa22010-05-16 18:19:27 +000027
Christian Heimesefff7062013-11-21 03:35:02 +010028def data_file(*name):
29 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000030
Antoine Pitrou81564092010-10-08 23:06:24 +000031# The custom key and certificate files used in test_ssl are generated
32# using Lib/test/make_ssl_certs.py.
33# Other certificates are simply fetched from the Internet servers they
34# are meant to authenticate.
35
Antoine Pitrou152efa22010-05-16 18:19:27 +000036CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000037BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038ONLYCERT = data_file("ssl_cert.pem")
39ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000040BYTES_ONLYCERT = os.fsencode(ONLYCERT)
41BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020042CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
43ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
44KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010047CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
48CAFILE_CACERT = data_file("capath", "5ed36f99.0")
49
Antoine Pitrou152efa22010-05-16 18:19:27 +000050
Christian Heimes22587792013-11-21 23:56:13 +010051# empty CRL
52CRLFILE = data_file("revocation.crl")
53
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010054# Two keys and certs signed by the same CA (for SNI tests)
55SIGNED_CERTFILE = data_file("keycert3.pem")
56SIGNED_CERTFILE2 = data_file("keycert4.pem")
57SIGNING_CA = data_file("pycacert.pem")
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
60
61EMPTYCERT = data_file("nullcert.pem")
62BADCERT = data_file("badcert.pem")
63WRONGCERT = data_file("XXXnonexisting.pem")
64BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020065NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020066NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000067
Antoine Pitrou0e576f12011-12-22 10:03:38 +010068DHFILE = data_file("dh512.pem")
69BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000070
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010071
Thomas Woutersed03b412007-08-28 21:37:11 +000072def handle_error(prefix):
73 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000074 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000075 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000076
Antoine Pitroub5218772010-05-21 09:56:06 +000077def can_clear_options():
78 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020079 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000080
81def no_sslv2_implies_sslv3_hello():
82 # 0.9.7h or higher
83 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
84
Christian Heimes2427b502013-11-23 11:24:32 +010085def have_verify_flags():
86 # 0.9.8 or higher
87 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
88
Antoine Pitrouc695c952014-04-28 20:57:36 +020089def utc_offset(): #NOTE: ignore issues like #1647654
90 # local time = utc time + utc offset
91 if time.daylight and time.localtime().tm_isdst > 0:
92 return -time.altzone # seconds
93 return -time.timezone
94
Christian Heimes9424bb42013-06-17 15:32:57 +020095def asn1time(cert_time):
96 # Some versions of OpenSSL ignore seconds, see #18207
97 # 0.9.8.i
98 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
99 fmt = "%b %d %H:%M:%S %Y GMT"
100 dt = datetime.datetime.strptime(cert_time, fmt)
101 dt = dt.replace(second=0)
102 cert_time = dt.strftime(fmt)
103 # %d adds leading zero but ASN1_TIME_print() uses leading space
104 if cert_time[4] == "0":
105 cert_time = cert_time[:4] + " " + cert_time[5:]
106
107 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000108
Antoine Pitrou23df4832010-08-04 17:14:06 +0000109# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
110def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200111 if hasattr(ssl, 'PROTOCOL_SSLv2'):
112 @functools.wraps(func)
113 def f(*args, **kwargs):
114 try:
115 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
116 except ssl.SSLError:
117 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
118 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
119 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
120 return func(*args, **kwargs)
121 return f
122 else:
123 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000124
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100125needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
126
Antoine Pitrou23df4832010-08-04 17:14:06 +0000127
Antoine Pitrou152efa22010-05-16 18:19:27 +0000128class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000129
Antoine Pitrou480a1242010-04-28 21:37:09 +0000130 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000131 ssl.CERT_NONE
132 ssl.CERT_OPTIONAL
133 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100134 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100135 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100136 if ssl.HAS_ECDH:
137 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100138 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
139 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000140 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100141 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000142
Antoine Pitrou172f0252014-04-18 20:33:08 +0200143 def test_str_for_enums(self):
144 # Make sure that the PROTOCOL_* constants have enum-like string
145 # reprs.
146 proto = ssl.PROTOCOL_SSLv3
147 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv3')
148 ctx = ssl.SSLContext(proto)
149 self.assertIs(ctx.protocol, proto)
150
Antoine Pitrou480a1242010-04-28 21:37:09 +0000151 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000152 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000153 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000154 sys.stdout.write("\n RAND_status is %d (%s)\n"
155 % (v, (v and "sufficient randomness") or
156 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200157
158 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
159 self.assertEqual(len(data), 16)
160 self.assertEqual(is_cryptographic, v == 1)
161 if v:
162 data = ssl.RAND_bytes(16)
163 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200164 else:
165 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200166
Victor Stinner1e81a392013-12-19 16:47:04 +0100167 # negative num is invalid
168 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
169 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
170
Jesus Ceac8754a12012-09-11 02:00:58 +0200171 self.assertRaises(TypeError, ssl.RAND_egd, 1)
172 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000173 ssl.RAND_add("this is a random string", 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000174
Christian Heimesf77b4b22013-08-21 13:26:05 +0200175 @unittest.skipUnless(os.name == 'posix', 'requires posix')
176 def test_random_fork(self):
177 status = ssl.RAND_status()
178 if not status:
179 self.fail("OpenSSL's PRNG has insufficient randomness")
180
181 rfd, wfd = os.pipe()
182 pid = os.fork()
183 if pid == 0:
184 try:
185 os.close(rfd)
186 child_random = ssl.RAND_pseudo_bytes(16)[0]
187 self.assertEqual(len(child_random), 16)
188 os.write(wfd, child_random)
189 os.close(wfd)
190 except BaseException:
191 os._exit(1)
192 else:
193 os._exit(0)
194 else:
195 os.close(wfd)
196 self.addCleanup(os.close, rfd)
197 _, status = os.waitpid(pid, 0)
198 self.assertEqual(status, 0)
199
200 child_random = os.read(rfd, 16)
201 self.assertEqual(len(child_random), 16)
202 parent_random = ssl.RAND_pseudo_bytes(16)[0]
203 self.assertEqual(len(parent_random), 16)
204
205 self.assertNotEqual(child_random, parent_random)
206
Antoine Pitrou480a1242010-04-28 21:37:09 +0000207 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000208 # note that this uses an 'unofficial' function in _ssl.c,
209 # provided solely for this test, to exercise the certificate
210 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000211 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000212 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000213 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200214 self.assertEqual(p['issuer'],
215 ((('countryName', 'XY'),),
216 (('localityName', 'Castle Anthrax'),),
217 (('organizationName', 'Python Software Foundation'),),
218 (('commonName', 'localhost'),))
219 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100220 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200221 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
222 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200223 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
224 self.assertEqual(p['subject'],
225 ((('countryName', 'XY'),),
226 (('localityName', 'Castle Anthrax'),),
227 (('organizationName', 'Python Software Foundation'),),
228 (('commonName', 'localhost'),))
229 )
230 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
231 # Issue #13034: the subjectAltName in some certificates
232 # (notably projects.developer.nokia.com:443) wasn't parsed
233 p = ssl._ssl._test_decode_cert(NOKIACERT)
234 if support.verbose:
235 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
236 self.assertEqual(p['subjectAltName'],
237 (('DNS', 'projects.developer.nokia.com'),
238 ('DNS', 'projects.forum.nokia.com'))
239 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100240 # extra OCSP and AIA fields
241 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
242 self.assertEqual(p['caIssuers'],
243 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
244 self.assertEqual(p['crlDistributionPoints'],
245 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000246
Christian Heimes824f7f32013-08-17 00:54:47 +0200247 def test_parse_cert_CVE_2013_4238(self):
248 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
249 if support.verbose:
250 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
251 subject = ((('countryName', 'US'),),
252 (('stateOrProvinceName', 'Oregon'),),
253 (('localityName', 'Beaverton'),),
254 (('organizationName', 'Python Software Foundation'),),
255 (('organizationalUnitName', 'Python Core Development'),),
256 (('commonName', 'null.python.org\x00example.org'),),
257 (('emailAddress', 'python-dev@python.org'),))
258 self.assertEqual(p['subject'], subject)
259 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200260 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
261 san = (('DNS', 'altnull.python.org\x00example.com'),
262 ('email', 'null@python.org\x00user@example.org'),
263 ('URI', 'http://null.python.org\x00http://example.org'),
264 ('IP Address', '192.0.2.1'),
265 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
266 else:
267 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
268 san = (('DNS', 'altnull.python.org\x00example.com'),
269 ('email', 'null@python.org\x00user@example.org'),
270 ('URI', 'http://null.python.org\x00http://example.org'),
271 ('IP Address', '192.0.2.1'),
272 ('IP Address', '<invalid>'))
273
274 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200275
Antoine Pitrou480a1242010-04-28 21:37:09 +0000276 def test_DER_to_PEM(self):
277 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
278 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000279 d1 = ssl.PEM_cert_to_DER_cert(pem)
280 p2 = ssl.DER_cert_to_PEM_cert(d1)
281 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000282 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000283 if not p2.startswith(ssl.PEM_HEADER + '\n'):
284 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
285 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
286 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000287
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000288 def test_openssl_version(self):
289 n = ssl.OPENSSL_VERSION_NUMBER
290 t = ssl.OPENSSL_VERSION_INFO
291 s = ssl.OPENSSL_VERSION
292 self.assertIsInstance(n, int)
293 self.assertIsInstance(t, tuple)
294 self.assertIsInstance(s, str)
295 # Some sanity checks follow
296 # >= 0.9
297 self.assertGreaterEqual(n, 0x900000)
298 # < 2.0
299 self.assertLess(n, 0x20000000)
300 major, minor, fix, patch, status = t
301 self.assertGreaterEqual(major, 0)
302 self.assertLess(major, 2)
303 self.assertGreaterEqual(minor, 0)
304 self.assertLess(minor, 256)
305 self.assertGreaterEqual(fix, 0)
306 self.assertLess(fix, 256)
307 self.assertGreaterEqual(patch, 0)
308 self.assertLessEqual(patch, 26)
309 self.assertGreaterEqual(status, 0)
310 self.assertLessEqual(status, 15)
311 # Version string as returned by OpenSSL, the format might change
312 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
313 (s, t))
314
Antoine Pitrou9d543662010-04-23 23:10:32 +0000315 @support.cpython_only
316 def test_refcycle(self):
317 # Issue #7943: an SSL object doesn't create reference cycles with
318 # itself.
319 s = socket.socket(socket.AF_INET)
320 ss = ssl.wrap_socket(s)
321 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100322 with support.check_warnings(("", ResourceWarning)):
323 del ss
324 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000325
Antoine Pitroua468adc2010-09-14 14:43:44 +0000326 def test_wrapped_unconnected(self):
327 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200328 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000329 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100330 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100331 self.assertRaises(OSError, ss.recv, 1)
332 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
333 self.assertRaises(OSError, ss.recvfrom, 1)
334 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
335 self.assertRaises(OSError, ss.send, b'x')
336 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000337
Antoine Pitrou40f08742010-04-24 22:04:40 +0000338 def test_timeout(self):
339 # Issue #8524: when creating an SSL socket, the timeout of the
340 # original socket should be retained.
341 for timeout in (None, 0.0, 5.0):
342 s = socket.socket(socket.AF_INET)
343 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100344 with ssl.wrap_socket(s) as ss:
345 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000346
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000347 def test_errors(self):
348 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000349 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000350 "certfile must be specified",
351 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000352 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000353 "certfile must be specified for server-side operations",
354 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000355 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000356 "certfile must be specified for server-side operations",
357 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100358 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
359 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
360 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200361 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000362 with socket.socket() as sock:
363 ssl.wrap_socket(sock, certfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000364 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200365 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000366 with socket.socket() as sock:
367 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000368 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200369 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000370 with socket.socket() as sock:
371 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000372 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000373
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000374 def test_match_hostname(self):
375 def ok(cert, hostname):
376 ssl.match_hostname(cert, hostname)
377 def fail(cert, hostname):
378 self.assertRaises(ssl.CertificateError,
379 ssl.match_hostname, cert, hostname)
380
381 cert = {'subject': ((('commonName', 'example.com'),),)}
382 ok(cert, 'example.com')
383 ok(cert, 'ExAmple.cOm')
384 fail(cert, 'www.example.com')
385 fail(cert, '.example.com')
386 fail(cert, 'example.org')
387 fail(cert, 'exampleXcom')
388
389 cert = {'subject': ((('commonName', '*.a.com'),),)}
390 ok(cert, 'foo.a.com')
391 fail(cert, 'bar.foo.a.com')
392 fail(cert, 'a.com')
393 fail(cert, 'Xa.com')
394 fail(cert, '.a.com')
395
Georg Brandl72c98d32013-10-27 07:16:53 +0100396 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000397 cert = {'subject': ((('commonName', 'f*.com'),),)}
398 ok(cert, 'foo.com')
399 ok(cert, 'f.com')
400 fail(cert, 'bar.com')
401 fail(cert, 'foo.a.com')
402 fail(cert, 'bar.foo.com')
403
Christian Heimes824f7f32013-08-17 00:54:47 +0200404 # NULL bytes are bad, CVE-2013-4073
405 cert = {'subject': ((('commonName',
406 'null.python.org\x00example.org'),),)}
407 ok(cert, 'null.python.org\x00example.org') # or raise an error?
408 fail(cert, 'example.org')
409 fail(cert, 'null.python.org')
410
Georg Brandl72c98d32013-10-27 07:16:53 +0100411 # error cases with wildcards
412 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
413 fail(cert, 'bar.foo.a.com')
414 fail(cert, 'a.com')
415 fail(cert, 'Xa.com')
416 fail(cert, '.a.com')
417
418 cert = {'subject': ((('commonName', 'a.*.com'),),)}
419 fail(cert, 'a.foo.com')
420 fail(cert, 'a..com')
421 fail(cert, 'a.com')
422
423 # wildcard doesn't match IDNA prefix 'xn--'
424 idna = 'püthon.python.org'.encode("idna").decode("ascii")
425 cert = {'subject': ((('commonName', idna),),)}
426 ok(cert, idna)
427 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
428 fail(cert, idna)
429 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
430 fail(cert, idna)
431
432 # wildcard in first fragment and IDNA A-labels in sequent fragments
433 # are supported.
434 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
435 cert = {'subject': ((('commonName', idna),),)}
436 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
437 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
438 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
439 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
440
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000441 # Slightly fake real-world example
442 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
443 'subject': ((('commonName', 'linuxfrz.org'),),),
444 'subjectAltName': (('DNS', 'linuxfr.org'),
445 ('DNS', 'linuxfr.com'),
446 ('othername', '<unsupported>'))}
447 ok(cert, 'linuxfr.org')
448 ok(cert, 'linuxfr.com')
449 # Not a "DNS" entry
450 fail(cert, '<unsupported>')
451 # When there is a subjectAltName, commonName isn't used
452 fail(cert, 'linuxfrz.org')
453
454 # A pristine real-world example
455 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
456 'subject': ((('countryName', 'US'),),
457 (('stateOrProvinceName', 'California'),),
458 (('localityName', 'Mountain View'),),
459 (('organizationName', 'Google Inc'),),
460 (('commonName', 'mail.google.com'),))}
461 ok(cert, 'mail.google.com')
462 fail(cert, 'gmail.com')
463 # Only commonName is considered
464 fail(cert, 'California')
465
466 # Neither commonName nor subjectAltName
467 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
468 'subject': ((('countryName', 'US'),),
469 (('stateOrProvinceName', 'California'),),
470 (('localityName', 'Mountain View'),),
471 (('organizationName', 'Google Inc'),))}
472 fail(cert, 'mail.google.com')
473
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200474 # No DNS entry in subjectAltName but a commonName
475 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
476 'subject': ((('countryName', 'US'),),
477 (('stateOrProvinceName', 'California'),),
478 (('localityName', 'Mountain View'),),
479 (('commonName', 'mail.google.com'),)),
480 'subjectAltName': (('othername', 'blabla'), )}
481 ok(cert, 'mail.google.com')
482
483 # No DNS entry subjectAltName and no commonName
484 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
485 'subject': ((('countryName', 'US'),),
486 (('stateOrProvinceName', 'California'),),
487 (('localityName', 'Mountain View'),),
488 (('organizationName', 'Google Inc'),)),
489 'subjectAltName': (('othername', 'blabla'),)}
490 fail(cert, 'google.com')
491
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000492 # Empty cert / no cert
493 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
494 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
495
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200496 # Issue #17980: avoid denials of service by refusing more than one
497 # wildcard per fragment.
498 cert = {'subject': ((('commonName', 'a*b.com'),),)}
499 ok(cert, 'axxb.com')
500 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100501 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200502 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
503 with self.assertRaises(ssl.CertificateError) as cm:
504 ssl.match_hostname(cert, 'axxbxxc.com')
505 self.assertIn("too many wildcards", str(cm.exception))
506
Antoine Pitroud5323212010-10-22 18:19:07 +0000507 def test_server_side(self):
508 # server_hostname doesn't work for server sockets
509 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000510 with socket.socket() as sock:
511 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
512 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000513
Antoine Pitroud6494802011-07-21 01:11:30 +0200514 def test_unknown_channel_binding(self):
515 # should raise ValueError for unknown type
516 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100517 with ssl.wrap_socket(s) as ss:
518 with self.assertRaises(ValueError):
519 ss.get_channel_binding("unknown-type")
Antoine Pitroud6494802011-07-21 01:11:30 +0200520
521 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
522 "'tls-unique' channel binding not available")
523 def test_tls_unique_channel_binding(self):
524 # unconnected should return None for known type
525 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100526 with ssl.wrap_socket(s) as ss:
527 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200528 # the same for server-side
529 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100530 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
531 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200532
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600533 def test_dealloc_warn(self):
534 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
535 r = repr(ss)
536 with self.assertWarns(ResourceWarning) as cm:
537 ss = None
538 support.gc_collect()
539 self.assertIn(r, str(cm.warning.args[0]))
540
Christian Heimes6d7ad132013-06-09 18:02:55 +0200541 def test_get_default_verify_paths(self):
542 paths = ssl.get_default_verify_paths()
543 self.assertEqual(len(paths), 6)
544 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
545
546 with support.EnvironmentVarGuard() as env:
547 env["SSL_CERT_DIR"] = CAPATH
548 env["SSL_CERT_FILE"] = CERTFILE
549 paths = ssl.get_default_verify_paths()
550 self.assertEqual(paths.cafile, CERTFILE)
551 self.assertEqual(paths.capath, CAPATH)
552
Christian Heimes44109d72013-11-22 01:51:30 +0100553 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
554 def test_enum_certificates(self):
555 self.assertTrue(ssl.enum_certificates("CA"))
556 self.assertTrue(ssl.enum_certificates("ROOT"))
557
558 self.assertRaises(TypeError, ssl.enum_certificates)
559 self.assertRaises(WindowsError, ssl.enum_certificates, "")
560
Christian Heimesc2d65e12013-11-22 16:13:55 +0100561 trust_oids = set()
562 for storename in ("CA", "ROOT"):
563 store = ssl.enum_certificates(storename)
564 self.assertIsInstance(store, list)
565 for element in store:
566 self.assertIsInstance(element, tuple)
567 self.assertEqual(len(element), 3)
568 cert, enc, trust = element
569 self.assertIsInstance(cert, bytes)
570 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
571 self.assertIsInstance(trust, (set, bool))
572 if isinstance(trust, set):
573 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100574
575 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100576 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200577
Christian Heimes46bebee2013-06-09 19:03:31 +0200578 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100579 def test_enum_crls(self):
580 self.assertTrue(ssl.enum_crls("CA"))
581 self.assertRaises(TypeError, ssl.enum_crls)
582 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200583
Christian Heimes44109d72013-11-22 01:51:30 +0100584 crls = ssl.enum_crls("CA")
585 self.assertIsInstance(crls, list)
586 for element in crls:
587 self.assertIsInstance(element, tuple)
588 self.assertEqual(len(element), 2)
589 self.assertIsInstance(element[0], bytes)
590 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200591
Christian Heimes46bebee2013-06-09 19:03:31 +0200592
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100593 def test_asn1object(self):
594 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
595 '1.3.6.1.5.5.7.3.1')
596
597 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
598 self.assertEqual(val, expected)
599 self.assertEqual(val.nid, 129)
600 self.assertEqual(val.shortname, 'serverAuth')
601 self.assertEqual(val.longname, 'TLS Web Server Authentication')
602 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
603 self.assertIsInstance(val, ssl._ASN1Object)
604 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
605
606 val = ssl._ASN1Object.fromnid(129)
607 self.assertEqual(val, expected)
608 self.assertIsInstance(val, ssl._ASN1Object)
609 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100610 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
611 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100612 for i in range(1000):
613 try:
614 obj = ssl._ASN1Object.fromnid(i)
615 except ValueError:
616 pass
617 else:
618 self.assertIsInstance(obj.nid, int)
619 self.assertIsInstance(obj.shortname, str)
620 self.assertIsInstance(obj.longname, str)
621 self.assertIsInstance(obj.oid, (str, type(None)))
622
623 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
624 self.assertEqual(val, expected)
625 self.assertIsInstance(val, ssl._ASN1Object)
626 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
627 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
628 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100629 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
630 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100631
Christian Heimes72d28502013-11-23 13:56:58 +0100632 def test_purpose_enum(self):
633 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
634 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
635 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
636 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
637 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
638 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
639 '1.3.6.1.5.5.7.3.1')
640
641 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
642 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
643 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
644 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
645 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
646 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
647 '1.3.6.1.5.5.7.3.2')
648
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100649 def test_unsupported_dtls(self):
650 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
651 self.addCleanup(s.close)
652 with self.assertRaises(NotImplementedError) as cx:
653 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
654 self.assertEqual(str(cx.exception), "only stream sockets are supported")
655 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
656 with self.assertRaises(NotImplementedError) as cx:
657 ctx.wrap_socket(s)
658 self.assertEqual(str(cx.exception), "only stream sockets are supported")
659
Antoine Pitrouc695c952014-04-28 20:57:36 +0200660 def cert_time_ok(self, timestring, timestamp):
661 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
662
663 def cert_time_fail(self, timestring):
664 with self.assertRaises(ValueError):
665 ssl.cert_time_to_seconds(timestring)
666
667 @unittest.skipUnless(utc_offset(),
668 'local time needs to be different from UTC')
669 def test_cert_time_to_seconds_timezone(self):
670 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
671 # results if local timezone is not UTC
672 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
673 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
674
675 def test_cert_time_to_seconds(self):
676 timestring = "Jan 5 09:34:43 2018 GMT"
677 ts = 1515144883.0
678 self.cert_time_ok(timestring, ts)
679 # accept keyword parameter, assert its name
680 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
681 # accept both %e and %d (space or zero generated by strftime)
682 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
683 # case-insensitive
684 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
685 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
686 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
687 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
688 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
689 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
690 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
691 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
692
693 newyear_ts = 1230768000.0
694 # leap seconds
695 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
696 # same timestamp
697 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
698
699 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
700 # allow 60th second (even if it is not a leap second)
701 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
702 # allow 2nd leap second for compatibility with time.strptime()
703 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
704 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
705
706 # no special treatement for the special value:
707 # 99991231235959Z (rfc 5280)
708 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
709
710 @support.run_with_locale('LC_ALL', '')
711 def test_cert_time_to_seconds_locale(self):
712 # `cert_time_to_seconds()` should be locale independent
713
714 def local_february_name():
715 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
716
717 if local_february_name().lower() == 'feb':
718 self.skipTest("locale-specific month name needs to be "
719 "different from C locale")
720
721 # locale-independent
722 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
723 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
724
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100725
Antoine Pitrou152efa22010-05-16 18:19:27 +0000726class ContextTests(unittest.TestCase):
727
Antoine Pitrou23df4832010-08-04 17:14:06 +0000728 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000729 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100730 for protocol in PROTOCOLS:
731 ssl.SSLContext(protocol)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000732 self.assertRaises(TypeError, ssl.SSLContext)
733 self.assertRaises(ValueError, ssl.SSLContext, -1)
734 self.assertRaises(ValueError, ssl.SSLContext, 42)
735
Antoine Pitrou23df4832010-08-04 17:14:06 +0000736 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000737 def test_protocol(self):
738 for proto in PROTOCOLS:
739 ctx = ssl.SSLContext(proto)
740 self.assertEqual(ctx.protocol, proto)
741
742 def test_ciphers(self):
743 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
744 ctx.set_ciphers("ALL")
745 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000746 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000747 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000748
Antoine Pitrou23df4832010-08-04 17:14:06 +0000749 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000750 def test_options(self):
751 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +0100752 # OP_ALL | OP_NO_SSLv2 is the default value
Antoine Pitroub5218772010-05-21 09:56:06 +0000753 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
754 ctx.options)
755 ctx.options |= ssl.OP_NO_SSLv3
756 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
757 ctx.options)
758 if can_clear_options():
759 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
760 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
761 ctx.options)
762 ctx.options = 0
763 self.assertEqual(0, ctx.options)
764 else:
765 with self.assertRaises(ValueError):
766 ctx.options = 0
767
Christian Heimes22587792013-11-21 23:56:13 +0100768 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000769 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
770 # Default value
771 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
772 ctx.verify_mode = ssl.CERT_OPTIONAL
773 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
774 ctx.verify_mode = ssl.CERT_REQUIRED
775 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
776 ctx.verify_mode = ssl.CERT_NONE
777 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
778 with self.assertRaises(TypeError):
779 ctx.verify_mode = None
780 with self.assertRaises(ValueError):
781 ctx.verify_mode = 42
782
Christian Heimes2427b502013-11-23 11:24:32 +0100783 @unittest.skipUnless(have_verify_flags(),
784 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100785 def test_verify_flags(self):
786 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
787 # default value by OpenSSL
788 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
789 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
790 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
791 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
792 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
793 ctx.verify_flags = ssl.VERIFY_DEFAULT
794 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
795 # supports any value
796 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
797 self.assertEqual(ctx.verify_flags,
798 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
799 with self.assertRaises(TypeError):
800 ctx.verify_flags = None
801
Antoine Pitrou152efa22010-05-16 18:19:27 +0000802 def test_load_cert_chain(self):
803 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
804 # Combined key and cert in a single file
805 ctx.load_cert_chain(CERTFILE)
806 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
807 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200808 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000809 ctx.load_cert_chain(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000810 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000811 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000812 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000813 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000814 ctx.load_cert_chain(EMPTYCERT)
815 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000816 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000817 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
818 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
819 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000820 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000821 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000822 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000823 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000824 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000825 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
826 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000827 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000828 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Antoine Pitrou81564092010-10-08 23:06:24 +0000829 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200830 # Password protected key and cert
831 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
832 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
833 ctx.load_cert_chain(CERTFILE_PROTECTED,
834 password=bytearray(KEY_PASSWORD.encode()))
835 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
836 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
837 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
838 bytearray(KEY_PASSWORD.encode()))
839 with self.assertRaisesRegex(TypeError, "should be a string"):
840 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
841 with self.assertRaises(ssl.SSLError):
842 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
843 with self.assertRaisesRegex(ValueError, "cannot be longer"):
844 # openssl has a fixed limit on the password buffer.
845 # PEM_BUFSIZE is generally set to 1kb.
846 # Return a string larger than this.
847 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
848 # Password callback
849 def getpass_unicode():
850 return KEY_PASSWORD
851 def getpass_bytes():
852 return KEY_PASSWORD.encode()
853 def getpass_bytearray():
854 return bytearray(KEY_PASSWORD.encode())
855 def getpass_badpass():
856 return "badpass"
857 def getpass_huge():
858 return b'a' * (1024 * 1024)
859 def getpass_bad_type():
860 return 9
861 def getpass_exception():
862 raise Exception('getpass error')
863 class GetPassCallable:
864 def __call__(self):
865 return KEY_PASSWORD
866 def getpass(self):
867 return KEY_PASSWORD
868 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
869 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
870 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
871 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
872 ctx.load_cert_chain(CERTFILE_PROTECTED,
873 password=GetPassCallable().getpass)
874 with self.assertRaises(ssl.SSLError):
875 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
876 with self.assertRaisesRegex(ValueError, "cannot be longer"):
877 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
878 with self.assertRaisesRegex(TypeError, "must return a string"):
879 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
880 with self.assertRaisesRegex(Exception, "getpass error"):
881 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
882 # Make sure the password function isn't called if it isn't needed
883 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000884
885 def test_load_verify_locations(self):
886 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
887 ctx.load_verify_locations(CERTFILE)
888 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
889 ctx.load_verify_locations(BYTES_CERTFILE)
890 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
891 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100892 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200893 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000894 ctx.load_verify_locations(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000895 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000896 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000897 ctx.load_verify_locations(BADCERT)
898 ctx.load_verify_locations(CERTFILE, CAPATH)
899 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
900
Victor Stinner80f75e62011-01-29 11:31:20 +0000901 # Issue #10989: crash if the second argument type is invalid
902 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
903
Christian Heimesefff7062013-11-21 03:35:02 +0100904 def test_load_verify_cadata(self):
905 # test cadata
906 with open(CAFILE_CACERT) as f:
907 cacert_pem = f.read()
908 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
909 with open(CAFILE_NEURONIO) as f:
910 neuronio_pem = f.read()
911 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
912
913 # test PEM
914 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
915 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
916 ctx.load_verify_locations(cadata=cacert_pem)
917 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
918 ctx.load_verify_locations(cadata=neuronio_pem)
919 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
920 # cert already in hash table
921 ctx.load_verify_locations(cadata=neuronio_pem)
922 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
923
924 # combined
925 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
926 combined = "\n".join((cacert_pem, neuronio_pem))
927 ctx.load_verify_locations(cadata=combined)
928 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
929
930 # with junk around the certs
931 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
932 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
933 neuronio_pem, "tail"]
934 ctx.load_verify_locations(cadata="\n".join(combined))
935 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
936
937 # test DER
938 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
939 ctx.load_verify_locations(cadata=cacert_der)
940 ctx.load_verify_locations(cadata=neuronio_der)
941 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
942 # cert already in hash table
943 ctx.load_verify_locations(cadata=cacert_der)
944 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
945
946 # combined
947 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
948 combined = b"".join((cacert_der, neuronio_der))
949 ctx.load_verify_locations(cadata=combined)
950 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
951
952 # error cases
953 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
954 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
955
956 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
957 ctx.load_verify_locations(cadata="broken")
958 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
959 ctx.load_verify_locations(cadata=b"broken")
960
961
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100962 def test_load_dh_params(self):
963 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
964 ctx.load_dh_params(DHFILE)
965 if os.name != 'nt':
966 ctx.load_dh_params(BYTES_DHFILE)
967 self.assertRaises(TypeError, ctx.load_dh_params)
968 self.assertRaises(TypeError, ctx.load_dh_params, None)
969 with self.assertRaises(FileNotFoundError) as cm:
970 ctx.load_dh_params(WRONGCERT)
971 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +0200972 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100973 ctx.load_dh_params(CERTFILE)
974
Antoine Pitroueb585ad2010-10-22 18:24:20 +0000975 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +0000976 def test_session_stats(self):
977 for proto in PROTOCOLS:
978 ctx = ssl.SSLContext(proto)
979 self.assertEqual(ctx.session_stats(), {
980 'number': 0,
981 'connect': 0,
982 'connect_good': 0,
983 'connect_renegotiate': 0,
984 'accept': 0,
985 'accept_good': 0,
986 'accept_renegotiate': 0,
987 'hits': 0,
988 'misses': 0,
989 'timeouts': 0,
990 'cache_full': 0,
991 })
992
Antoine Pitrou664c2d12010-11-17 20:29:42 +0000993 def test_set_default_verify_paths(self):
994 # There's not much we can do to test that it acts as expected,
995 # so just check it doesn't crash or raise an exception.
996 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
997 ctx.set_default_verify_paths()
998
Antoine Pitrou501da612011-12-21 09:27:41 +0100999 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001000 def test_set_ecdh_curve(self):
1001 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1002 ctx.set_ecdh_curve("prime256v1")
1003 ctx.set_ecdh_curve(b"prime256v1")
1004 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1005 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1006 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1007 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1008
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001009 @needs_sni
1010 def test_sni_callback(self):
1011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1012
1013 # set_servername_callback expects a callable, or None
1014 self.assertRaises(TypeError, ctx.set_servername_callback)
1015 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1016 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1017 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1018
1019 def dummycallback(sock, servername, ctx):
1020 pass
1021 ctx.set_servername_callback(None)
1022 ctx.set_servername_callback(dummycallback)
1023
1024 @needs_sni
1025 def test_sni_callback_refcycle(self):
1026 # Reference cycles through the servername callback are detected
1027 # and cleared.
1028 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1029 def dummycallback(sock, servername, ctx, cycle=ctx):
1030 pass
1031 ctx.set_servername_callback(dummycallback)
1032 wr = weakref.ref(ctx)
1033 del ctx, dummycallback
1034 gc.collect()
1035 self.assertIs(wr(), None)
1036
Christian Heimes9a5395a2013-06-17 15:44:12 +02001037 def test_cert_store_stats(self):
1038 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1039 self.assertEqual(ctx.cert_store_stats(),
1040 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1041 ctx.load_cert_chain(CERTFILE)
1042 self.assertEqual(ctx.cert_store_stats(),
1043 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1044 ctx.load_verify_locations(CERTFILE)
1045 self.assertEqual(ctx.cert_store_stats(),
1046 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1047 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1048 self.assertEqual(ctx.cert_store_stats(),
1049 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1050
1051 def test_get_ca_certs(self):
1052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1053 self.assertEqual(ctx.get_ca_certs(), [])
1054 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1055 ctx.load_verify_locations(CERTFILE)
1056 self.assertEqual(ctx.get_ca_certs(), [])
1057 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1058 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1059 self.assertEqual(ctx.get_ca_certs(),
1060 [{'issuer': ((('organizationName', 'Root CA'),),
1061 (('organizationalUnitName', 'http://www.cacert.org'),),
1062 (('commonName', 'CA Cert Signing Authority'),),
1063 (('emailAddress', 'support@cacert.org'),)),
1064 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1065 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1066 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001067 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001068 'subject': ((('organizationName', 'Root CA'),),
1069 (('organizationalUnitName', 'http://www.cacert.org'),),
1070 (('commonName', 'CA Cert Signing Authority'),),
1071 (('emailAddress', 'support@cacert.org'),)),
1072 'version': 3}])
1073
1074 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1075 pem = f.read()
1076 der = ssl.PEM_cert_to_DER_cert(pem)
1077 self.assertEqual(ctx.get_ca_certs(True), [der])
1078
Christian Heimes72d28502013-11-23 13:56:58 +01001079 def test_load_default_certs(self):
1080 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1081 ctx.load_default_certs()
1082
1083 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1084 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1085 ctx.load_default_certs()
1086
1087 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1088 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1089
1090 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1091 self.assertRaises(TypeError, ctx.load_default_certs, None)
1092 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1093
Christian Heimes4c05b472013-11-23 15:58:30 +01001094 def test_create_default_context(self):
1095 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001096 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001097 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001098 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001099 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001100 self.assertEqual(
1101 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1102 getattr(ssl, "OP_NO_COMPRESSION", 0),
1103 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001104
1105 with open(SIGNING_CA) as f:
1106 cadata = f.read()
1107 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1108 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001109 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001110 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1111 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001112 self.assertEqual(
1113 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1114 getattr(ssl, "OP_NO_COMPRESSION", 0),
1115 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001116
1117 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001118 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001119 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1120 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001121 self.assertEqual(
1122 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1123 getattr(ssl, "OP_NO_COMPRESSION", 0),
1124 )
1125 self.assertEqual(
1126 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1127 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1128 )
1129 self.assertEqual(
1130 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1131 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1132 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001133
Christian Heimes67986f92013-11-23 22:43:47 +01001134 def test__create_stdlib_context(self):
1135 ctx = ssl._create_stdlib_context()
1136 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1137 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001138 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001139 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1140
1141 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1142 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1143 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1144 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1145
1146 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001147 cert_reqs=ssl.CERT_REQUIRED,
1148 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001149 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1150 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001151 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001152 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1153
1154 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1155 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1156 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1157 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001158
Christian Heimes1aa9a752013-12-02 02:41:19 +01001159 def test_check_hostname(self):
1160 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1161 self.assertFalse(ctx.check_hostname)
1162
1163 # Requires CERT_REQUIRED or CERT_OPTIONAL
1164 with self.assertRaises(ValueError):
1165 ctx.check_hostname = True
1166 ctx.verify_mode = ssl.CERT_REQUIRED
1167 self.assertFalse(ctx.check_hostname)
1168 ctx.check_hostname = True
1169 self.assertTrue(ctx.check_hostname)
1170
1171 ctx.verify_mode = ssl.CERT_OPTIONAL
1172 ctx.check_hostname = True
1173 self.assertTrue(ctx.check_hostname)
1174
1175 # Cannot set CERT_NONE with check_hostname enabled
1176 with self.assertRaises(ValueError):
1177 ctx.verify_mode = ssl.CERT_NONE
1178 ctx.check_hostname = False
1179 self.assertFalse(ctx.check_hostname)
1180
Antoine Pitrou152efa22010-05-16 18:19:27 +00001181
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001182class SSLErrorTests(unittest.TestCase):
1183
1184 def test_str(self):
1185 # The str() of a SSLError doesn't include the errno
1186 e = ssl.SSLError(1, "foo")
1187 self.assertEqual(str(e), "foo")
1188 self.assertEqual(e.errno, 1)
1189 # Same for a subclass
1190 e = ssl.SSLZeroReturnError(1, "foo")
1191 self.assertEqual(str(e), "foo")
1192 self.assertEqual(e.errno, 1)
1193
1194 def test_lib_reason(self):
1195 # Test the library and reason attributes
1196 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1197 with self.assertRaises(ssl.SSLError) as cm:
1198 ctx.load_dh_params(CERTFILE)
1199 self.assertEqual(cm.exception.library, 'PEM')
1200 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1201 s = str(cm.exception)
1202 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1203
1204 def test_subclass(self):
1205 # Check that the appropriate SSLError subclass is raised
1206 # (this only tests one of them)
1207 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1208 with socket.socket() as s:
1209 s.bind(("127.0.0.1", 0))
1210 s.listen(5)
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001211 c = socket.socket()
1212 c.connect(s.getsockname())
1213 c.setblocking(False)
1214 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001215 with self.assertRaises(ssl.SSLWantReadError) as cm:
1216 c.do_handshake()
1217 s = str(cm.exception)
1218 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1219 # For compatibility
1220 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1221
1222
Bill Janssen6e027db2007-11-15 22:23:56 +00001223class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001224
Antoine Pitrou480a1242010-04-28 21:37:09 +00001225 def test_connect(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001226 with support.transient_internet("svn.python.org"):
1227 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1228 cert_reqs=ssl.CERT_NONE)
1229 try:
1230 s.connect(("svn.python.org", 443))
1231 self.assertEqual({}, s.getpeercert())
1232 finally:
1233 s.close()
1234
1235 # this should fail because we have no verification certs
1236 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1237 cert_reqs=ssl.CERT_REQUIRED)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001238 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1239 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001240 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001241
Antoine Pitrou350c7222010-09-09 13:31:46 +00001242 # this should succeed because we specify the root cert
1243 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1244 cert_reqs=ssl.CERT_REQUIRED,
1245 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1246 try:
1247 s.connect(("svn.python.org", 443))
1248 self.assertTrue(s.getpeercert())
1249 finally:
1250 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001251
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001252 def test_connect_ex(self):
1253 # Issue #11326: check connect_ex() implementation
1254 with support.transient_internet("svn.python.org"):
1255 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1256 cert_reqs=ssl.CERT_REQUIRED,
1257 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1258 try:
1259 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1260 self.assertTrue(s.getpeercert())
1261 finally:
1262 s.close()
1263
1264 def test_non_blocking_connect_ex(self):
1265 # Issue #11326: non-blocking connect_ex() should allow handshake
1266 # to proceed after the socket gets ready.
1267 with support.transient_internet("svn.python.org"):
1268 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1269 cert_reqs=ssl.CERT_REQUIRED,
1270 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1271 do_handshake_on_connect=False)
1272 try:
1273 s.setblocking(False)
1274 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8a14a0c2011-02-27 15:44:12 +00001275 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1276 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001277 # Wait for connect to finish
1278 select.select([], [s], [], 5.0)
1279 # Non-blocking handshake
1280 while True:
1281 try:
1282 s.do_handshake()
1283 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001284 except ssl.SSLWantReadError:
1285 select.select([s], [], [], 5.0)
1286 except ssl.SSLWantWriteError:
1287 select.select([], [s], [], 5.0)
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001288 # SSL established
1289 self.assertTrue(s.getpeercert())
1290 finally:
1291 s.close()
1292
Antoine Pitroub4410db2011-05-18 18:51:06 +02001293 def test_timeout_connect_ex(self):
1294 # Issue #12065: on a timeout, connect_ex() should return the original
1295 # errno (mimicking the behaviour of non-SSL sockets).
1296 with support.transient_internet("svn.python.org"):
1297 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1298 cert_reqs=ssl.CERT_REQUIRED,
1299 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1300 do_handshake_on_connect=False)
1301 try:
1302 s.settimeout(0.0000001)
1303 rc = s.connect_ex(('svn.python.org', 443))
1304 if rc == 0:
1305 self.skipTest("svn.python.org responded too quickly")
1306 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1307 finally:
1308 s.close()
1309
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001310 def test_connect_ex_error(self):
Antoine Pitrouddb87ab2012-12-28 19:07:43 +01001311 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001312 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1313 cert_reqs=ssl.CERT_REQUIRED,
1314 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1315 try:
Christian Heimesde570742013-12-16 21:15:44 +01001316 rc = s.connect_ex(("svn.python.org", 444))
1317 # Issue #19919: Windows machines or VMs hosted on Windows
1318 # machines sometimes return EWOULDBLOCK.
1319 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001320 finally:
1321 s.close()
1322
Antoine Pitrou152efa22010-05-16 18:19:27 +00001323 def test_connect_with_context(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001324 with support.transient_internet("svn.python.org"):
1325 # Same as test_connect, but with a separately created context
1326 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1327 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1328 s.connect(("svn.python.org", 443))
1329 try:
1330 self.assertEqual({}, s.getpeercert())
1331 finally:
1332 s.close()
Antoine Pitroud5323212010-10-22 18:19:07 +00001333 # Same with a server hostname
1334 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1335 server_hostname="svn.python.org")
1336 if ssl.HAS_SNI:
1337 s.connect(("svn.python.org", 443))
1338 s.close()
1339 else:
1340 self.assertRaises(ValueError, s.connect, ("svn.python.org", 443))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001341 # This should fail because we have no verification certs
1342 ctx.verify_mode = ssl.CERT_REQUIRED
1343 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Ezio Melottied3a7d22010-12-01 02:32:32 +00001344 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
Antoine Pitrou350c7222010-09-09 13:31:46 +00001345 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001346 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001347 # This should succeed because we specify the root cert
1348 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1349 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1350 s.connect(("svn.python.org", 443))
1351 try:
1352 cert = s.getpeercert()
1353 self.assertTrue(cert)
1354 finally:
1355 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001356
1357 def test_connect_capath(self):
1358 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001359 # NOTE: the subject hashing algorithm has been changed between
1360 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1361 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001362 # filename) for this test to be portable across OpenSSL releases.
Antoine Pitrou350c7222010-09-09 13:31:46 +00001363 with support.transient_internet("svn.python.org"):
1364 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1365 ctx.verify_mode = ssl.CERT_REQUIRED
1366 ctx.load_verify_locations(capath=CAPATH)
1367 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1368 s.connect(("svn.python.org", 443))
1369 try:
1370 cert = s.getpeercert()
1371 self.assertTrue(cert)
1372 finally:
1373 s.close()
1374 # Same with a bytes `capath` argument
1375 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1376 ctx.verify_mode = ssl.CERT_REQUIRED
1377 ctx.load_verify_locations(capath=BYTES_CAPATH)
1378 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1379 s.connect(("svn.python.org", 443))
1380 try:
1381 cert = s.getpeercert()
1382 self.assertTrue(cert)
1383 finally:
1384 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001385
Christian Heimesefff7062013-11-21 03:35:02 +01001386 def test_connect_cadata(self):
1387 with open(CAFILE_CACERT) as f:
1388 pem = f.read()
1389 der = ssl.PEM_cert_to_DER_cert(pem)
1390 with support.transient_internet("svn.python.org"):
1391 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1392 ctx.verify_mode = ssl.CERT_REQUIRED
1393 ctx.load_verify_locations(cadata=pem)
1394 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1395 s.connect(("svn.python.org", 443))
1396 cert = s.getpeercert()
1397 self.assertTrue(cert)
1398
1399 # same with DER
1400 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1401 ctx.verify_mode = ssl.CERT_REQUIRED
1402 ctx.load_verify_locations(cadata=der)
1403 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1404 s.connect(("svn.python.org", 443))
1405 cert = s.getpeercert()
1406 self.assertTrue(cert)
1407
Antoine Pitroue3220242010-04-24 11:13:53 +00001408 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1409 def test_makefile_close(self):
1410 # Issue #5238: creating a file-like object with makefile() shouldn't
1411 # delay closing the underlying "real socket" (here tested with its
1412 # file descriptor, hence skipping the test under Windows).
Antoine Pitrou350c7222010-09-09 13:31:46 +00001413 with support.transient_internet("svn.python.org"):
1414 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1415 ss.connect(("svn.python.org", 443))
1416 fd = ss.fileno()
1417 f = ss.makefile()
1418 f.close()
1419 # The fd is still open
Antoine Pitroue3220242010-04-24 11:13:53 +00001420 os.read(fd, 0)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001421 # Closing the SSL socket should close the fd too
1422 ss.close()
1423 gc.collect()
1424 with self.assertRaises(OSError) as e:
1425 os.read(fd, 0)
1426 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001427
Antoine Pitrou480a1242010-04-28 21:37:09 +00001428 def test_non_blocking_handshake(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001429 with support.transient_internet("svn.python.org"):
1430 s = socket.socket(socket.AF_INET)
1431 s.connect(("svn.python.org", 443))
1432 s.setblocking(False)
1433 s = ssl.wrap_socket(s,
1434 cert_reqs=ssl.CERT_NONE,
1435 do_handshake_on_connect=False)
1436 count = 0
1437 while True:
1438 try:
1439 count += 1
1440 s.do_handshake()
1441 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001442 except ssl.SSLWantReadError:
1443 select.select([s], [], [])
1444 except ssl.SSLWantWriteError:
1445 select.select([], [s], [])
Antoine Pitrou350c7222010-09-09 13:31:46 +00001446 s.close()
1447 if support.verbose:
1448 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001449
Antoine Pitrou480a1242010-04-28 21:37:09 +00001450 def test_get_server_certificate(self):
Antoine Pitrou15399c32011-04-28 19:23:55 +02001451 def _test_get_server_certificate(host, port, cert=None):
1452 with support.transient_internet(host):
Antoine Pitrou94a5b662014-04-16 18:56:28 +02001453 pem = ssl.get_server_certificate((host, port))
Antoine Pitrou15399c32011-04-28 19:23:55 +02001454 if not pem:
1455 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001456
Antoine Pitrou15399c32011-04-28 19:23:55 +02001457 try:
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001458 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001459 ca_certs=CERTFILE)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001460 except ssl.SSLError as x:
1461 #should fail
1462 if support.verbose:
1463 sys.stdout.write("%s\n" % x)
1464 else:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001465 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1466
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001467 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001468 ca_certs=cert)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001469 if not pem:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001470 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001471 if support.verbose:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001472 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001473
Antoine Pitrou15399c32011-04-28 19:23:55 +02001474 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1475 if support.IPV6_ENABLED:
1476 _test_get_server_certificate('ipv6.google.com', 443)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001477
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001478 def test_ciphers(self):
1479 remote = ("svn.python.org", 443)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001480 with support.transient_internet(remote[0]):
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001481 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1482 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1483 s.connect(remote)
1484 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1485 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1486 s.connect(remote)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001487 # Error checking can happen at instantiation or when connecting
Ezio Melottied3a7d22010-12-01 02:32:32 +00001488 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitroud2eca372010-10-29 23:41:37 +00001489 with socket.socket(socket.AF_INET) as sock:
1490 s = ssl.wrap_socket(sock,
1491 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1492 s.connect(remote)
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001493
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001494 def test_algorithms(self):
1495 # Issue #8484: all algorithms should be available when verifying a
1496 # certificate.
Antoine Pitrou29619b22010-04-22 18:43:31 +00001497 # SHA256 was added in OpenSSL 0.9.8
1498 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1499 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001500 # sha256.tbs-internet.com needs SNI to use the correct certificate
1501 if not ssl.HAS_SNI:
1502 self.skipTest("SNI needed for this test")
Victor Stinnerf332abb2011-01-08 03:16:05 +00001503 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1504 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001505 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Antoine Pitrou160fd932011-01-08 10:23:29 +00001506 with support.transient_internet("sha256.tbs-internet.com"):
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001507 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1508 ctx.verify_mode = ssl.CERT_REQUIRED
1509 ctx.load_verify_locations(sha256_cert)
1510 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1511 server_hostname="sha256.tbs-internet.com")
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001512 try:
1513 s.connect(remote)
1514 if support.verbose:
1515 sys.stdout.write("\nCipher with %r is %r\n" %
1516 (remote, s.cipher()))
1517 sys.stdout.write("Certificate is:\n%s\n" %
1518 pprint.pformat(s.getpeercert()))
1519 finally:
1520 s.close()
1521
Christian Heimes9a5395a2013-06-17 15:44:12 +02001522 def test_get_ca_certs_capath(self):
1523 # capath certs are loaded on request
1524 with support.transient_internet("svn.python.org"):
1525 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1526 ctx.verify_mode = ssl.CERT_REQUIRED
1527 ctx.load_verify_locations(capath=CAPATH)
1528 self.assertEqual(ctx.get_ca_certs(), [])
1529 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1530 s.connect(("svn.python.org", 443))
1531 try:
1532 cert = s.getpeercert()
1533 self.assertTrue(cert)
1534 finally:
1535 s.close()
1536 self.assertEqual(len(ctx.get_ca_certs()), 1)
1537
Christian Heimes575596e2013-12-15 21:49:17 +01001538 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001539 def test_context_setget(self):
1540 # Check that the context of a connected socket can be replaced.
1541 with support.transient_internet("svn.python.org"):
1542 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1543 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1544 s = socket.socket(socket.AF_INET)
1545 with ctx1.wrap_socket(s) as ss:
1546 ss.connect(("svn.python.org", 443))
1547 self.assertIs(ss.context, ctx1)
1548 self.assertIs(ss._sslobj.context, ctx1)
1549 ss.context = ctx2
1550 self.assertIs(ss.context, ctx2)
1551 self.assertIs(ss._sslobj.context, ctx2)
1552
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001553try:
1554 import threading
1555except ImportError:
1556 _have_threads = False
1557else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001558 _have_threads = True
1559
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001560 from test.ssl_servers import make_https_server
1561
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001562 class ThreadedEchoServer(threading.Thread):
1563
1564 class ConnectionHandler(threading.Thread):
1565
1566 """A mildly complicated class, because we want it to work both
1567 with and without the SSL wrapper around the socket connection, so
1568 that we can test the STARTTLS functionality."""
1569
Bill Janssen6e027db2007-11-15 22:23:56 +00001570 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001571 self.server = server
1572 self.running = False
1573 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001574 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001575 self.sock.setblocking(1)
1576 self.sslconn = None
1577 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001578 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001579
Antoine Pitrou480a1242010-04-28 21:37:09 +00001580 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001581 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001582 self.sslconn = self.server.context.wrap_socket(
1583 self.sock, server_side=True)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001584 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001585 except (ssl.SSLError, ConnectionResetError) as e:
1586 # We treat ConnectionResetError as though it were an
1587 # SSLError - OpenSSL on Ubuntu abruptly closes the
1588 # connection when asked to use an unsupported protocol.
1589 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001590 # XXX Various errors can have happened here, for example
1591 # a mismatching protocol version, an invalid certificate,
1592 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001593 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001594 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001595 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001596 self.running = False
1597 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001598 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001599 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001600 else:
Antoine Pitroub5218772010-05-21 09:56:06 +00001601 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001602 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001603 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001604 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1605 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001606 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001607 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1608 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001609 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001610 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001611 sys.stdout.write(" server: selected protocol is now "
1612 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001613 return True
1614
1615 def read(self):
1616 if self.sslconn:
1617 return self.sslconn.read()
1618 else:
1619 return self.sock.recv(1024)
1620
1621 def write(self, bytes):
1622 if self.sslconn:
1623 return self.sslconn.write(bytes)
1624 else:
1625 return self.sock.send(bytes)
1626
1627 def close(self):
1628 if self.sslconn:
1629 self.sslconn.close()
1630 else:
1631 self.sock.close()
1632
Antoine Pitrou480a1242010-04-28 21:37:09 +00001633 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001634 self.running = True
1635 if not self.server.starttls_server:
1636 if not self.wrap_conn():
1637 return
1638 while self.running:
1639 try:
1640 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001641 stripped = msg.strip()
1642 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001643 # eof, so quit this handler
1644 self.running = False
1645 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001646 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001647 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001648 sys.stdout.write(" server: client closed connection\n")
1649 self.close()
1650 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001651 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001652 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001653 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001654 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001655 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001656 if not self.wrap_conn():
1657 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001658 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001659 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001660 if support.verbose and self.server.connectionchatty:
1661 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001662 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001663 self.sock = self.sslconn.unwrap()
1664 self.sslconn = None
1665 if support.verbose and self.server.connectionchatty:
1666 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001667 elif stripped == b'CB tls-unique':
1668 if support.verbose and self.server.connectionchatty:
1669 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1670 data = self.sslconn.get_channel_binding("tls-unique")
1671 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001672 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001673 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001674 self.server.connectionchatty):
1675 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001676 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1677 % (msg, ctype, msg.lower(), ctype))
1678 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001679 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001680 if self.server.chatty:
1681 handle_error("Test server failure:\n")
1682 self.close()
1683 self.running = False
1684 # normally, we'd just stop here, but for the test
1685 # harness, we want to stop the server
1686 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001687
Antoine Pitroub5218772010-05-21 09:56:06 +00001688 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001689 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001690 chatty=True, connectionchatty=False, starttls_server=False,
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001691 npn_protocols=None, ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001692 if context:
1693 self.context = context
1694 else:
1695 self.context = ssl.SSLContext(ssl_version
1696 if ssl_version is not None
1697 else ssl.PROTOCOL_TLSv1)
1698 self.context.verify_mode = (certreqs if certreqs is not None
1699 else ssl.CERT_NONE)
1700 if cacerts:
1701 self.context.load_verify_locations(cacerts)
1702 if certificate:
1703 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001704 if npn_protocols:
1705 self.context.set_npn_protocols(npn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001706 if ciphers:
1707 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001708 self.chatty = chatty
1709 self.connectionchatty = connectionchatty
1710 self.starttls_server = starttls_server
1711 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001712 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001713 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001714 self.active = False
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001715 self.selected_protocols = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001716 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001717 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001718 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001719
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001720 def __enter__(self):
1721 self.start(threading.Event())
1722 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001723 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001724
1725 def __exit__(self, *args):
1726 self.stop()
1727 self.join()
1728
Antoine Pitrou480a1242010-04-28 21:37:09 +00001729 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001730 self.flag = flag
1731 threading.Thread.start(self)
1732
Antoine Pitrou480a1242010-04-28 21:37:09 +00001733 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001734 self.sock.settimeout(0.05)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001735 self.sock.listen(5)
1736 self.active = True
1737 if self.flag:
1738 # signal an event
1739 self.flag.set()
1740 while self.active:
1741 try:
1742 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001743 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001744 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001745 + repr(connaddr) + '\n')
1746 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001747 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001748 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001749 except socket.timeout:
1750 pass
1751 except KeyboardInterrupt:
1752 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001753 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001754
Antoine Pitrou480a1242010-04-28 21:37:09 +00001755 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001756 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001757
Bill Janssen54cc54c2007-12-14 22:08:56 +00001758 class AsyncoreEchoServer(threading.Thread):
1759
1760 # this one's based on asyncore.dispatcher
1761
1762 class EchoServer (asyncore.dispatcher):
1763
1764 class ConnectionHandler (asyncore.dispatcher_with_send):
1765
1766 def __init__(self, conn, certfile):
1767 self.socket = ssl.wrap_socket(conn, server_side=True,
1768 certfile=certfile,
1769 do_handshake_on_connect=False)
1770 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001771 self._ssl_accepting = True
1772 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001773
1774 def readable(self):
1775 if isinstance(self.socket, ssl.SSLSocket):
1776 while self.socket.pending() > 0:
1777 self.handle_read_event()
1778 return True
1779
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001780 def _do_ssl_handshake(self):
1781 try:
1782 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02001783 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1784 return
1785 except ssl.SSLEOFError:
1786 return self.handle_close()
1787 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001788 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02001789 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001790 if err.args[0] == errno.ECONNABORTED:
1791 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001792 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001793 self._ssl_accepting = False
1794
1795 def handle_read(self):
1796 if self._ssl_accepting:
1797 self._do_ssl_handshake()
1798 else:
1799 data = self.recv(1024)
1800 if support.verbose:
1801 sys.stdout.write(" server: read %s from client\n" % repr(data))
1802 if not data:
1803 self.close()
1804 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001805 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00001806
1807 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00001808 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001809 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00001810 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1811
1812 def handle_error(self):
1813 raise
1814
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001815 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001816 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001817 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1818 self.port = support.bind_port(sock, '')
1819 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001820 self.listen(5)
1821
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00001822 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001823 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00001824 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1825 self.ConnectionHandler(sock_obj, self.certfile)
1826
1827 def handle_error(self):
1828 raise
1829
Trent Nelson78520002008-04-10 20:54:35 +00001830 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001831 self.flag = None
1832 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001833 self.server = self.EchoServer(certfile)
1834 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00001835 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001836 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00001837
1838 def __str__(self):
1839 return "<%s %s>" % (self.__class__.__name__, self.server)
1840
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001841 def __enter__(self):
1842 self.start(threading.Event())
1843 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001844 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001845
1846 def __exit__(self, *args):
1847 if support.verbose:
1848 sys.stdout.write(" cleanup: stopping server.\n")
1849 self.stop()
1850 if support.verbose:
1851 sys.stdout.write(" cleanup: joining server thread.\n")
1852 self.join()
1853 if support.verbose:
1854 sys.stdout.write(" cleanup: successfully joined.\n")
1855
Bill Janssen54cc54c2007-12-14 22:08:56 +00001856 def start (self, flag=None):
1857 self.flag = flag
1858 threading.Thread.start(self)
1859
Antoine Pitrou480a1242010-04-28 21:37:09 +00001860 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001861 self.active = True
1862 if self.flag:
1863 self.flag.set()
1864 while self.active:
1865 try:
1866 asyncore.loop(1)
1867 except:
1868 pass
1869
Antoine Pitrou480a1242010-04-28 21:37:09 +00001870 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001871 self.active = False
1872 self.server.close()
1873
Antoine Pitrou480a1242010-04-28 21:37:09 +00001874 def bad_cert_test(certfile):
1875 """
1876 Launch a server with CERT_REQUIRED, and check that trying to
1877 connect to it with the given client certificate fails.
1878 """
Trent Nelson78520002008-04-10 20:54:35 +00001879 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001880 certreqs=ssl.CERT_REQUIRED,
Bill Janssen6e027db2007-11-15 22:23:56 +00001881 cacerts=CERTFILE, chatty=False,
1882 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001883 with server:
Thomas Woutersed03b412007-08-28 21:37:11 +00001884 try:
Antoine Pitroud2eca372010-10-29 23:41:37 +00001885 with socket.socket() as sock:
1886 s = ssl.wrap_socket(sock,
1887 certfile=certfile,
1888 ssl_version=ssl.PROTOCOL_TLSv1)
1889 s.connect((HOST, server.port))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001890 except ssl.SSLError as x:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001891 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001892 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
Andrew Svetlov0832af62012-12-18 23:10:48 +02001893 except OSError as x:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001894 if support.verbose:
Andrew Svetlov0832af62012-12-18 23:10:48 +02001895 sys.stdout.write("\nOSError is %s\n" % x.args[1])
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001896 except OSError as x:
Giampaolo Rodolàcd9dfb92010-08-29 20:56:56 +00001897 if x.errno != errno.ENOENT:
1898 raise
Giampaolo Rodolà745ab382010-08-29 19:25:49 +00001899 if support.verbose:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001900 sys.stdout.write("\OSError is %s\n" % str(x))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001901 else:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00001902 raise AssertionError("Use of invalid cert should have failed!")
Thomas Woutersed03b412007-08-28 21:37:11 +00001903
Antoine Pitroub5218772010-05-21 09:56:06 +00001904 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001905 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00001906 """
1907 Launch a server, connect a client to it and try various reads
1908 and writes.
1909 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001910 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00001911 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001912 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00001913 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001914 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001915 with client_context.wrap_socket(socket.socket(),
1916 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01001917 s.connect((HOST, server.port))
1918 for arg in [indata, bytearray(indata), memoryview(indata)]:
1919 if connectionchatty:
1920 if support.verbose:
1921 sys.stdout.write(
1922 " client: sending %r...\n" % indata)
1923 s.write(arg)
1924 outdata = s.read()
1925 if connectionchatty:
1926 if support.verbose:
1927 sys.stdout.write(" client: read %r\n" % outdata)
1928 if outdata != indata.lower():
1929 raise AssertionError(
1930 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1931 % (outdata[:20], len(outdata),
1932 indata[:20].lower(), len(indata)))
1933 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00001934 if connectionchatty:
1935 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01001936 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001937 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01001938 'compression': s.compression(),
1939 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001940 'peercert': s.getpeercert(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001941 'client_npn_protocol': s.selected_npn_protocol()
1942 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01001943 s.close()
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001944 stats['server_npn_protocols'] = server.selected_protocols
1945 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00001946
Antoine Pitroub5218772010-05-21 09:56:06 +00001947 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1948 certsreqs=None, server_options=0, client_options=0):
Benjamin Peterson2a691a82008-03-31 01:51:45 +00001949 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001950 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00001951 certtype = {
1952 ssl.CERT_NONE: "CERT_NONE",
1953 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1954 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1955 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001956 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001957 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001958 sys.stdout.write(formatstr %
1959 (ssl.get_protocol_name(client_protocol),
1960 ssl.get_protocol_name(server_protocol),
1961 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00001962 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01001963 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00001964 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01001965 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001966
1967 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1968 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1969 # starting from OpenSSL 1.0.0 (see issue #8322).
1970 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1971 client_context.set_ciphers("ALL")
1972
Antoine Pitroub5218772010-05-21 09:56:06 +00001973 for ctx in (client_context, server_context):
1974 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00001975 ctx.load_cert_chain(CERTFILE)
1976 ctx.load_verify_locations(CERTFILE)
1977 try:
1978 server_params_test(client_context, server_context,
1979 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001980 # Protocol mismatch can result in either an SSLError, or a
1981 # "Connection reset by peer" error.
1982 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001983 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001984 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02001985 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001986 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001987 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001988 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001989 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00001990 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001991 "Client protocol %s succeeded with server protocol %s!"
1992 % (ssl.get_protocol_name(client_protocol),
1993 ssl.get_protocol_name(server_protocol)))
1994
1995
Bill Janssen6e027db2007-11-15 22:23:56 +00001996 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001997
Antoine Pitrou23df4832010-08-04 17:14:06 +00001998 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00001999 def test_echo(self):
2000 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002001 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002002 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002003 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002004 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2005 context = ssl.SSLContext(protocol)
2006 context.load_cert_chain(CERTFILE)
2007 server_params_test(context, context,
2008 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002009
Antoine Pitrou480a1242010-04-28 21:37:09 +00002010 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002011 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002012 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002013 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2014 context.verify_mode = ssl.CERT_REQUIRED
2015 context.load_verify_locations(CERTFILE)
2016 context.load_cert_chain(CERTFILE)
2017 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002018 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002019 s = context.wrap_socket(socket.socket(),
2020 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002021 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002022 # getpeercert() raise ValueError while the handshake isn't
2023 # done.
2024 with self.assertRaises(ValueError):
2025 s.getpeercert()
2026 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002027 cert = s.getpeercert()
2028 self.assertTrue(cert, "Can't get peer certificate.")
2029 cipher = s.cipher()
2030 if support.verbose:
2031 sys.stdout.write(pprint.pformat(cert) + '\n')
2032 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2033 if 'subject' not in cert:
2034 self.fail("No subject field in certificate: %s." %
2035 pprint.pformat(cert))
2036 if ((('organizationName', 'Python Software Foundation'),)
2037 not in cert['subject']):
2038 self.fail(
2039 "Missing or invalid 'organizationName' field in certificate subject; "
2040 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002041 self.assertIn('notBefore', cert)
2042 self.assertIn('notAfter', cert)
2043 before = ssl.cert_time_to_seconds(cert['notBefore'])
2044 after = ssl.cert_time_to_seconds(cert['notAfter'])
2045 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002046 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002047
Christian Heimes2427b502013-11-23 11:24:32 +01002048 @unittest.skipUnless(have_verify_flags(),
2049 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002050 def test_crl_check(self):
2051 if support.verbose:
2052 sys.stdout.write("\n")
2053
2054 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2055 server_context.load_cert_chain(SIGNED_CERTFILE)
2056
2057 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2058 context.verify_mode = ssl.CERT_REQUIRED
2059 context.load_verify_locations(SIGNING_CA)
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002060 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
Christian Heimes22587792013-11-21 23:56:13 +01002061
2062 # VERIFY_DEFAULT should pass
2063 server = ThreadedEchoServer(context=server_context, chatty=True)
2064 with server:
2065 with context.wrap_socket(socket.socket()) as s:
2066 s.connect((HOST, server.port))
2067 cert = s.getpeercert()
2068 self.assertTrue(cert, "Can't get peer certificate.")
2069
2070 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002071 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002072
2073 server = ThreadedEchoServer(context=server_context, chatty=True)
2074 with server:
2075 with context.wrap_socket(socket.socket()) as s:
2076 with self.assertRaisesRegex(ssl.SSLError,
2077 "certificate verify failed"):
2078 s.connect((HOST, server.port))
2079
2080 # now load a CRL file. The CRL file is signed by the CA.
2081 context.load_verify_locations(CRLFILE)
2082
2083 server = ThreadedEchoServer(context=server_context, chatty=True)
2084 with server:
2085 with context.wrap_socket(socket.socket()) as s:
2086 s.connect((HOST, server.port))
2087 cert = s.getpeercert()
2088 self.assertTrue(cert, "Can't get peer certificate.")
2089
Christian Heimes575596e2013-12-15 21:49:17 +01002090 @needs_sni
Christian Heimes1aa9a752013-12-02 02:41:19 +01002091 def test_check_hostname(self):
2092 if support.verbose:
2093 sys.stdout.write("\n")
2094
2095 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2096 server_context.load_cert_chain(SIGNED_CERTFILE)
2097
2098 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2099 context.verify_mode = ssl.CERT_REQUIRED
2100 context.check_hostname = True
2101 context.load_verify_locations(SIGNING_CA)
2102
2103 # correct hostname should verify
2104 server = ThreadedEchoServer(context=server_context, chatty=True)
2105 with server:
2106 with context.wrap_socket(socket.socket(),
2107 server_hostname="localhost") as s:
2108 s.connect((HOST, server.port))
2109 cert = s.getpeercert()
2110 self.assertTrue(cert, "Can't get peer certificate.")
2111
2112 # incorrect hostname should raise an exception
2113 server = ThreadedEchoServer(context=server_context, chatty=True)
2114 with server:
2115 with context.wrap_socket(socket.socket(),
2116 server_hostname="invalid") as s:
2117 with self.assertRaisesRegex(ssl.CertificateError,
2118 "hostname 'invalid' doesn't match 'localhost'"):
2119 s.connect((HOST, server.port))
2120
2121 # missing server_hostname arg should cause an exception, too
2122 server = ThreadedEchoServer(context=server_context, chatty=True)
2123 with server:
2124 with socket.socket() as s:
2125 with self.assertRaisesRegex(ValueError,
2126 "check_hostname requires server_hostname"):
2127 context.wrap_socket(s)
2128
Antoine Pitrou480a1242010-04-28 21:37:09 +00002129 def test_empty_cert(self):
2130 """Connecting with an empty cert file"""
2131 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2132 "nullcert.pem"))
2133 def test_malformed_cert(self):
2134 """Connecting with a badly formatted certificate (syntax error)"""
2135 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2136 "badcert.pem"))
2137 def test_nonexisting_cert(self):
2138 """Connecting with a non-existing cert file"""
2139 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2140 "wrongcert.pem"))
2141 def test_malformed_key(self):
2142 """Connecting with a badly formatted key (syntax error)"""
2143 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2144 "badkey.pem"))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002145
Antoine Pitrou480a1242010-04-28 21:37:09 +00002146 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002147 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002148 in the client when attempting handshake.
2149 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002150 listener_ready = threading.Event()
2151 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002152
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002153 s = socket.socket()
2154 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002155
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002156 # `listener` runs in a thread. It sits in an accept() until
2157 # the main thread connects. Then it rudely closes the socket,
2158 # and sets Event `listener_gone` to let the main thread know
2159 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002160 def listener():
Trent Nelson6b240cd2008-04-10 20:12:06 +00002161 s.listen(5)
2162 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002163 newsock, addr = s.accept()
2164 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002165 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002166 listener_gone.set()
2167
2168 def connector():
2169 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002170 with socket.socket() as c:
2171 c.connect((HOST, port))
2172 listener_gone.wait()
2173 try:
2174 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002175 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002176 pass
2177 else:
2178 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002179
2180 t = threading.Thread(target=listener)
2181 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002182 try:
2183 connector()
2184 finally:
2185 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002186
Antoine Pitrou23df4832010-08-04 17:14:06 +00002187 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002188 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2189 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002190 def test_protocol_sslv2(self):
2191 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002192 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002193 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002194 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2195 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2196 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002197 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002198 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2199 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002200 # SSLv23 client with specific SSL options
2201 if no_sslv2_implies_sslv3_hello():
2202 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2203 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2204 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002205 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002206 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002207 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002208 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002209
Antoine Pitrou23df4832010-08-04 17:14:06 +00002210 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002211 def test_protocol_sslv23(self):
2212 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002213 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002214 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002215 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2216 try:
2217 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002218 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002219 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2220 if support.verbose:
2221 sys.stdout.write(
2222 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2223 % str(x))
Antoine Pitrou480a1242010-04-28 21:37:09 +00002224 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
2225 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
2226 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002227
Antoine Pitrou480a1242010-04-28 21:37:09 +00002228 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
2229 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
2230 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002231
Antoine Pitrou480a1242010-04-28 21:37:09 +00002232 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
2233 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
2234 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002235
Antoine Pitroub5218772010-05-21 09:56:06 +00002236 # Server with specific SSL options
2237 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2238 server_options=ssl.OP_NO_SSLv3)
2239 # Will choose TLSv1
2240 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2241 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2242 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2243 server_options=ssl.OP_NO_TLSv1)
2244
2245
Antoine Pitrou23df4832010-08-04 17:14:06 +00002246 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002247 def test_protocol_sslv3(self):
2248 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002249 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002250 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002251 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
2252 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
2253 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002254 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2255 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002256 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2257 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002258 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002259 if no_sslv2_implies_sslv3_hello():
2260 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2261 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, True,
2262 client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002263
Antoine Pitrou23df4832010-08-04 17:14:06 +00002264 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002265 def test_protocol_tlsv1(self):
2266 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002267 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002268 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002269 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
2270 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
2271 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002272 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2273 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002274 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002275 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2276 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002277
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002278 @skip_if_broken_ubuntu_ssl
2279 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2280 "TLS version 1.1 not supported.")
2281 def test_protocol_tlsv1_1(self):
2282 """Connecting to a TLSv1.1 server with various client options.
2283 Testing against older TLS versions."""
2284 if support.verbose:
2285 sys.stdout.write("\n")
2286 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, True)
2287 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2288 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2289 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2290 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2291 client_options=ssl.OP_NO_TLSv1_1)
2292
2293 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, True)
2294 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2295 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2296
2297
2298 @skip_if_broken_ubuntu_ssl
2299 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2300 "TLS version 1.2 not supported.")
2301 def test_protocol_tlsv1_2(self):
2302 """Connecting to a TLSv1.2 server with various client options.
2303 Testing against older TLS versions."""
2304 if support.verbose:
2305 sys.stdout.write("\n")
2306 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, True,
2307 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2308 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2309 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2310 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2311 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2312 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2313 client_options=ssl.OP_NO_TLSv1_2)
2314
2315 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, True)
2316 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2317 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2318 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2319 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2320
Antoine Pitrou480a1242010-04-28 21:37:09 +00002321 def test_starttls(self):
2322 """Switching from clear text to encrypted and back again."""
2323 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002324
Trent Nelson78520002008-04-10 20:54:35 +00002325 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002326 ssl_version=ssl.PROTOCOL_TLSv1,
2327 starttls_server=True,
2328 chatty=True,
2329 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002330 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002331 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002332 s = socket.socket()
2333 s.setblocking(1)
2334 s.connect((HOST, server.port))
2335 if support.verbose:
2336 sys.stdout.write("\n")
2337 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002338 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002339 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002340 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002341 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002342 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002343 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002344 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002345 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002346 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002347 msg = outdata.strip().lower()
2348 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2349 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002350 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002351 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002352 " client: read %r from server, starting TLS...\n"
2353 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002354 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2355 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002356 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2357 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002358 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002359 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002360 " client: read %r from server, ending TLS...\n"
2361 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002362 s = conn.unwrap()
2363 wrapped = False
2364 else:
2365 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002366 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002367 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002368 if support.verbose:
2369 sys.stdout.write(" client: closing connection.\n")
2370 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002371 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002372 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002373 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002374 if wrapped:
2375 conn.close()
2376 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002377 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002378
Antoine Pitrou480a1242010-04-28 21:37:09 +00002379 def test_socketserver(self):
2380 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002381 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002382 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002383 if support.verbose:
2384 sys.stdout.write('\n')
2385 with open(CERTFILE, 'rb') as f:
2386 d1 = f.read()
2387 d2 = ''
2388 # now fetch the same data from the HTTPS server
2389 url = 'https://%s:%d/%s' % (
2390 HOST, server.port, os.path.split(CERTFILE)[1])
2391 f = urllib.request.urlopen(url)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002392 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002393 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002394 if dlen and (int(dlen) > 0):
2395 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002396 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002397 sys.stdout.write(
2398 " client: read %d bytes from remote server '%s'\n"
2399 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002400 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002401 f.close()
2402 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002403
Antoine Pitrou480a1242010-04-28 21:37:09 +00002404 def test_asyncore_server(self):
2405 """Check the example asyncore integration."""
2406 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002407
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002408 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002409 sys.stdout.write("\n")
2410
Antoine Pitrou480a1242010-04-28 21:37:09 +00002411 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002412 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002413 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002414 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002415 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002416 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002417 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002418 " client: sending %r...\n" % indata)
2419 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002420 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002421 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002422 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002423 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002424 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002425 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2426 % (outdata[:20], len(outdata),
2427 indata[:20].lower(), len(indata)))
2428 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002429 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002430 sys.stdout.write(" client: closing connection.\n")
2431 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002432 if support.verbose:
2433 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002434
Antoine Pitrou480a1242010-04-28 21:37:09 +00002435 def test_recv_send(self):
2436 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002437 if support.verbose:
2438 sys.stdout.write("\n")
2439
2440 server = ThreadedEchoServer(CERTFILE,
2441 certreqs=ssl.CERT_NONE,
2442 ssl_version=ssl.PROTOCOL_TLSv1,
2443 cacerts=CERTFILE,
2444 chatty=True,
2445 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002446 with server:
2447 s = ssl.wrap_socket(socket.socket(),
2448 server_side=False,
2449 certfile=CERTFILE,
2450 ca_certs=CERTFILE,
2451 cert_reqs=ssl.CERT_NONE,
2452 ssl_version=ssl.PROTOCOL_TLSv1)
2453 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002454 # helper methods for standardising recv* method signatures
2455 def _recv_into():
2456 b = bytearray(b"\0"*100)
2457 count = s.recv_into(b)
2458 return b[:count]
2459
2460 def _recvfrom_into():
2461 b = bytearray(b"\0"*100)
2462 count, addr = s.recvfrom_into(b)
2463 return b[:count]
2464
2465 # (name, method, whether to expect success, *args)
2466 send_methods = [
2467 ('send', s.send, True, []),
2468 ('sendto', s.sendto, False, ["some.address"]),
2469 ('sendall', s.sendall, True, []),
2470 ]
2471 recv_methods = [
2472 ('recv', s.recv, True, []),
2473 ('recvfrom', s.recvfrom, False, ["some.address"]),
2474 ('recv_into', _recv_into, True, []),
2475 ('recvfrom_into', _recvfrom_into, False, []),
2476 ]
2477 data_prefix = "PREFIX_"
2478
2479 for meth_name, send_meth, expect_success, args in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002480 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002481 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002482 send_meth(indata, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002483 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002484 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002485 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002486 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002487 "<<{outdata:r}>> ({nout:d}) received; "
2488 "expected <<{indata:r}>> ({nin:d})\n".format(
2489 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002490 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002491 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002492 )
2493 )
2494 except ValueError as e:
2495 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002496 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002497 "Failed to send with method <<{name:s}>>; "
2498 "expected to succeed.\n".format(name=meth_name)
2499 )
2500 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002501 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002502 "Method <<{name:s}>> failed with unexpected "
2503 "exception message: {exp:s}\n".format(
2504 name=meth_name, exp=e
2505 )
2506 )
2507
2508 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002509 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002510 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002511 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002512 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002513 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002514 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002515 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002516 "<<{outdata:r}>> ({nout:d}) received; "
2517 "expected <<{indata:r}>> ({nin:d})\n".format(
2518 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002519 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002520 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002521 )
2522 )
2523 except ValueError as e:
2524 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002525 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002526 "Failed to receive with method <<{name:s}>>; "
2527 "expected to succeed.\n".format(name=meth_name)
2528 )
2529 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002530 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002531 "Method <<{name:s}>> failed with unexpected "
2532 "exception message: {exp:s}\n".format(
2533 name=meth_name, exp=e
2534 )
2535 )
2536 # consume data
2537 s.read()
2538
Nick Coghlan513886a2011-08-28 00:00:27 +10002539 # Make sure sendmsg et al are disallowed to avoid
2540 # inadvertent disclosure of data and/or corruption
2541 # of the encrypted data stream
2542 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2543 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2544 self.assertRaises(NotImplementedError,
2545 s.recvmsg_into, bytearray(100))
2546
Antoine Pitrou480a1242010-04-28 21:37:09 +00002547 s.write(b"over\n")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002548 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002549
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002550 def test_nonblocking_send(self):
2551 server = ThreadedEchoServer(CERTFILE,
2552 certreqs=ssl.CERT_NONE,
2553 ssl_version=ssl.PROTOCOL_TLSv1,
2554 cacerts=CERTFILE,
2555 chatty=True,
2556 connectionchatty=False)
2557 with server:
2558 s = ssl.wrap_socket(socket.socket(),
2559 server_side=False,
2560 certfile=CERTFILE,
2561 ca_certs=CERTFILE,
2562 cert_reqs=ssl.CERT_NONE,
2563 ssl_version=ssl.PROTOCOL_TLSv1)
2564 s.connect((HOST, server.port))
2565 s.setblocking(False)
2566
2567 # If we keep sending data, at some point the buffers
2568 # will be full and the call will block
2569 buf = bytearray(8192)
2570 def fill_buffer():
2571 while True:
2572 s.send(buf)
2573 self.assertRaises((ssl.SSLWantWriteError,
2574 ssl.SSLWantReadError), fill_buffer)
2575
2576 # Now read all the output and discard it
2577 s.setblocking(True)
2578 s.close()
2579
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002580 def test_handshake_timeout(self):
2581 # Issue #5103: SSL handshake must respect the socket timeout
2582 server = socket.socket(socket.AF_INET)
2583 host = "127.0.0.1"
2584 port = support.bind_port(server)
2585 started = threading.Event()
2586 finish = False
2587
2588 def serve():
2589 server.listen(5)
2590 started.set()
2591 conns = []
2592 while not finish:
2593 r, w, e = select.select([server], [], [], 0.1)
2594 if server in r:
2595 # Let the socket hang around rather than having
2596 # it closed by garbage collection.
2597 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002598 for sock in conns:
2599 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002600
2601 t = threading.Thread(target=serve)
2602 t.start()
2603 started.wait()
2604
2605 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002606 try:
2607 c = socket.socket(socket.AF_INET)
2608 c.settimeout(0.2)
2609 c.connect((host, port))
2610 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002611 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002612 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002613 finally:
2614 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002615 try:
2616 c = socket.socket(socket.AF_INET)
2617 c = ssl.wrap_socket(c)
2618 c.settimeout(0.2)
2619 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002620 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002621 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002622 finally:
2623 c.close()
2624 finally:
2625 finish = True
2626 t.join()
2627 server.close()
2628
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002629 def test_server_accept(self):
2630 # Issue #16357: accept() on a SSLSocket created through
2631 # SSLContext.wrap_socket().
2632 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2633 context.verify_mode = ssl.CERT_REQUIRED
2634 context.load_verify_locations(CERTFILE)
2635 context.load_cert_chain(CERTFILE)
2636 server = socket.socket(socket.AF_INET)
2637 host = "127.0.0.1"
2638 port = support.bind_port(server)
2639 server = context.wrap_socket(server, server_side=True)
2640
2641 evt = threading.Event()
2642 remote = None
2643 peer = None
2644 def serve():
2645 nonlocal remote, peer
2646 server.listen(5)
2647 # Block on the accept and wait on the connection to close.
2648 evt.set()
2649 remote, peer = server.accept()
2650 remote.recv(1)
2651
2652 t = threading.Thread(target=serve)
2653 t.start()
2654 # Client wait until server setup and perform a connect.
2655 evt.wait()
2656 client = context.wrap_socket(socket.socket())
2657 client.connect((host, port))
2658 client_addr = client.getsockname()
2659 client.close()
2660 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002661 remote.close()
2662 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002663 # Sanity checks.
2664 self.assertIsInstance(remote, ssl.SSLSocket)
2665 self.assertEqual(peer, client_addr)
2666
Antoine Pitrou242db722013-05-01 20:52:07 +02002667 def test_getpeercert_enotconn(self):
2668 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2669 with context.wrap_socket(socket.socket()) as sock:
2670 with self.assertRaises(OSError) as cm:
2671 sock.getpeercert()
2672 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2673
2674 def test_do_handshake_enotconn(self):
2675 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2676 with context.wrap_socket(socket.socket()) as sock:
2677 with self.assertRaises(OSError) as cm:
2678 sock.do_handshake()
2679 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2680
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002681 def test_default_ciphers(self):
2682 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2683 try:
2684 # Force a set of weak ciphers on our client context
2685 context.set_ciphers("DES")
2686 except ssl.SSLError:
2687 self.skipTest("no DES cipher available")
2688 with ThreadedEchoServer(CERTFILE,
2689 ssl_version=ssl.PROTOCOL_SSLv23,
2690 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002691 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002692 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002693 s.connect((HOST, server.port))
2694 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2695
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002696 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2697 def test_default_ecdh_curve(self):
2698 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2699 # should be enabled by default on SSL contexts.
2700 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2701 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02002702 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2703 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2704 # our default cipher list should prefer ECDH-based ciphers
2705 # automatically.
2706 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2707 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002708 with ThreadedEchoServer(context=context) as server:
2709 with context.wrap_socket(socket.socket()) as s:
2710 s.connect((HOST, server.port))
2711 self.assertIn("ECDH", s.cipher()[0])
2712
Antoine Pitroud6494802011-07-21 01:11:30 +02002713 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2714 "'tls-unique' channel binding not available")
2715 def test_tls_unique_channel_binding(self):
2716 """Test tls-unique channel binding."""
2717 if support.verbose:
2718 sys.stdout.write("\n")
2719
2720 server = ThreadedEchoServer(CERTFILE,
2721 certreqs=ssl.CERT_NONE,
2722 ssl_version=ssl.PROTOCOL_TLSv1,
2723 cacerts=CERTFILE,
2724 chatty=True,
2725 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01002726 with server:
2727 s = ssl.wrap_socket(socket.socket(),
2728 server_side=False,
2729 certfile=CERTFILE,
2730 ca_certs=CERTFILE,
2731 cert_reqs=ssl.CERT_NONE,
2732 ssl_version=ssl.PROTOCOL_TLSv1)
2733 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002734 # get the data
2735 cb_data = s.get_channel_binding("tls-unique")
2736 if support.verbose:
2737 sys.stdout.write(" got channel binding data: {0!r}\n"
2738 .format(cb_data))
2739
2740 # check if it is sane
2741 self.assertIsNotNone(cb_data)
2742 self.assertEqual(len(cb_data), 12) # True for TLSv1
2743
2744 # and compare with the peers version
2745 s.write(b"CB tls-unique\n")
2746 peer_data_repr = s.read().strip()
2747 self.assertEqual(peer_data_repr,
2748 repr(cb_data).encode("us-ascii"))
2749 s.close()
2750
2751 # now, again
2752 s = ssl.wrap_socket(socket.socket(),
2753 server_side=False,
2754 certfile=CERTFILE,
2755 ca_certs=CERTFILE,
2756 cert_reqs=ssl.CERT_NONE,
2757 ssl_version=ssl.PROTOCOL_TLSv1)
2758 s.connect((HOST, server.port))
2759 new_cb_data = s.get_channel_binding("tls-unique")
2760 if support.verbose:
2761 sys.stdout.write(" got another channel binding data: {0!r}\n"
2762 .format(new_cb_data))
2763 # is it really unique
2764 self.assertNotEqual(cb_data, new_cb_data)
2765 self.assertIsNotNone(cb_data)
2766 self.assertEqual(len(cb_data), 12) # True for TLSv1
2767 s.write(b"CB tls-unique\n")
2768 peer_data_repr = s.read().strip()
2769 self.assertEqual(peer_data_repr,
2770 repr(new_cb_data).encode("us-ascii"))
2771 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002772
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002773 def test_compression(self):
2774 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2775 context.load_cert_chain(CERTFILE)
2776 stats = server_params_test(context, context,
2777 chatty=True, connectionchatty=True)
2778 if support.verbose:
2779 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2780 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2781
2782 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2783 "ssl.OP_NO_COMPRESSION needed for this test")
2784 def test_compression_disabled(self):
2785 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2786 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01002787 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002788 stats = server_params_test(context, context,
2789 chatty=True, connectionchatty=True)
2790 self.assertIs(stats['compression'], None)
2791
Antoine Pitrou0e576f12011-12-22 10:03:38 +01002792 def test_dh_params(self):
2793 # Check we can get a connection with ephemeral Diffie-Hellman
2794 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2795 context.load_cert_chain(CERTFILE)
2796 context.load_dh_params(DHFILE)
2797 context.set_ciphers("kEDH")
2798 stats = server_params_test(context, context,
2799 chatty=True, connectionchatty=True)
2800 cipher = stats["cipher"][0]
2801 parts = cipher.split("-")
2802 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2803 self.fail("Non-DH cipher: " + cipher[0])
2804
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002805 def test_selected_npn_protocol(self):
2806 # selected_npn_protocol() is None unless NPN is used
2807 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2808 context.load_cert_chain(CERTFILE)
2809 stats = server_params_test(context, context,
2810 chatty=True, connectionchatty=True)
2811 self.assertIs(stats['client_npn_protocol'], None)
2812
2813 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2814 def test_npn_protocols(self):
2815 server_protocols = ['http/1.1', 'spdy/2']
2816 protocol_tests = [
2817 (['http/1.1', 'spdy/2'], 'http/1.1'),
2818 (['spdy/2', 'http/1.1'], 'http/1.1'),
2819 (['spdy/2', 'test'], 'spdy/2'),
2820 (['abc', 'def'], 'abc')
2821 ]
2822 for client_protocols, expected in protocol_tests:
2823 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2824 server_context.load_cert_chain(CERTFILE)
2825 server_context.set_npn_protocols(server_protocols)
2826 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2827 client_context.load_cert_chain(CERTFILE)
2828 client_context.set_npn_protocols(client_protocols)
2829 stats = server_params_test(client_context, server_context,
2830 chatty=True, connectionchatty=True)
2831
2832 msg = "failed trying %s (s) and %s (c).\n" \
2833 "was expecting %s, but got %%s from the %%s" \
2834 % (str(server_protocols), str(client_protocols),
2835 str(expected))
2836 client_result = stats['client_npn_protocol']
2837 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2838 server_result = stats['server_npn_protocols'][-1] \
2839 if len(stats['server_npn_protocols']) else 'nothing'
2840 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2841
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002842 def sni_contexts(self):
2843 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2844 server_context.load_cert_chain(SIGNED_CERTFILE)
2845 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2846 other_context.load_cert_chain(SIGNED_CERTFILE2)
2847 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2848 client_context.verify_mode = ssl.CERT_REQUIRED
2849 client_context.load_verify_locations(SIGNING_CA)
2850 return server_context, other_context, client_context
2851
2852 def check_common_name(self, stats, name):
2853 cert = stats['peercert']
2854 self.assertIn((('commonName', name),), cert['subject'])
2855
2856 @needs_sni
2857 def test_sni_callback(self):
2858 calls = []
2859 server_context, other_context, client_context = self.sni_contexts()
2860
2861 def servername_cb(ssl_sock, server_name, initial_context):
2862 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02002863 if server_name is not None:
2864 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002865 server_context.set_servername_callback(servername_cb)
2866
2867 stats = server_params_test(client_context, server_context,
2868 chatty=True,
2869 sni_name='supermessage')
2870 # The hostname was fetched properly, and the certificate was
2871 # changed for the connection.
2872 self.assertEqual(calls, [("supermessage", server_context)])
2873 # CERTFILE4 was selected
2874 self.check_common_name(stats, 'fakehostname')
2875
Antoine Pitrou50b24d02013-04-11 20:48:42 +02002876 calls = []
2877 # The callback is called with server_name=None
2878 stats = server_params_test(client_context, server_context,
2879 chatty=True,
2880 sni_name=None)
2881 self.assertEqual(calls, [(None, server_context)])
2882 self.check_common_name(stats, 'localhost')
2883
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002884 # Check disabling the callback
2885 calls = []
2886 server_context.set_servername_callback(None)
2887
2888 stats = server_params_test(client_context, server_context,
2889 chatty=True,
2890 sni_name='notfunny')
2891 # Certificate didn't change
2892 self.check_common_name(stats, 'localhost')
2893 self.assertEqual(calls, [])
2894
2895 @needs_sni
2896 def test_sni_callback_alert(self):
2897 # Returning a TLS alert is reflected to the connecting client
2898 server_context, other_context, client_context = self.sni_contexts()
2899
2900 def cb_returning_alert(ssl_sock, server_name, initial_context):
2901 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2902 server_context.set_servername_callback(cb_returning_alert)
2903
2904 with self.assertRaises(ssl.SSLError) as cm:
2905 stats = server_params_test(client_context, server_context,
2906 chatty=False,
2907 sni_name='supermessage')
2908 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2909
2910 @needs_sni
2911 def test_sni_callback_raising(self):
2912 # Raising fails the connection with a TLS handshake failure alert.
2913 server_context, other_context, client_context = self.sni_contexts()
2914
2915 def cb_raising(ssl_sock, server_name, initial_context):
2916 1/0
2917 server_context.set_servername_callback(cb_raising)
2918
2919 with self.assertRaises(ssl.SSLError) as cm, \
2920 support.captured_stderr() as stderr:
2921 stats = server_params_test(client_context, server_context,
2922 chatty=False,
2923 sni_name='supermessage')
2924 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2925 self.assertIn("ZeroDivisionError", stderr.getvalue())
2926
2927 @needs_sni
2928 def test_sni_callback_wrong_return_type(self):
2929 # Returning the wrong return type terminates the TLS connection
2930 # with an internal error alert.
2931 server_context, other_context, client_context = self.sni_contexts()
2932
2933 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2934 return "foo"
2935 server_context.set_servername_callback(cb_wrong_return_type)
2936
2937 with self.assertRaises(ssl.SSLError) as cm, \
2938 support.captured_stderr() as stderr:
2939 stats = server_params_test(client_context, server_context,
2940 chatty=False,
2941 sni_name='supermessage')
2942 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2943 self.assertIn("TypeError", stderr.getvalue())
2944
Antoine Pitrou60a26e02013-07-20 19:35:16 +02002945 def test_read_write_after_close_raises_valuerror(self):
2946 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2947 context.verify_mode = ssl.CERT_REQUIRED
2948 context.load_verify_locations(CERTFILE)
2949 context.load_cert_chain(CERTFILE)
2950 server = ThreadedEchoServer(context=context, chatty=False)
2951
2952 with server:
2953 s = context.wrap_socket(socket.socket())
2954 s.connect((HOST, server.port))
2955 s.close()
2956
2957 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02002958 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02002959
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002960
Thomas Woutersed03b412007-08-28 21:37:11 +00002961def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00002962 if support.verbose:
2963 plats = {
2964 'Linux': platform.linux_distribution,
2965 'Mac': platform.mac_ver,
2966 'Windows': platform.win32_ver,
2967 }
2968 for name, func in plats.items():
2969 plat = func()
2970 if plat and plat[0]:
2971 plat = '%s %r' % (name, plat)
2972 break
2973 else:
2974 plat = repr(platform.platform())
2975 print("test_ssl: testing with %r %r" %
2976 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
2977 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00002978 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01002979 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2980 try:
2981 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2982 except AttributeError:
2983 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00002984
Antoine Pitrou152efa22010-05-16 18:19:27 +00002985 for filename in [
2986 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2987 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002988 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00002989 BADCERT, BADKEY, EMPTYCERT]:
2990 if not os.path.exists(filename):
2991 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002992
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02002993 tests = [ContextTests, BasicSocketTests, SSLErrorTests]
Thomas Woutersed03b412007-08-28 21:37:11 +00002994
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002995 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00002996 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00002997
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002998 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002999 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003000 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003001 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003002
Antoine Pitrou480a1242010-04-28 21:37:09 +00003003 try:
3004 support.run_unittest(*tests)
3005 finally:
3006 if _have_threads:
3007 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003008
3009if __name__ == "__main__":
3010 test_main()