blob: f216bd2a2b276148a23f09dac2aa171068eb491b [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Antoine Pitrou242db722013-05-01 20:52:07 +020021from unittest import mock
Thomas Woutersed03b412007-08-28 21:37:11 +000022
Antoine Pitrou05d936d2010-10-13 11:38:36 +000023ssl = support.import_module("ssl")
24
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010025PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000026HOST = support.HOST
Antoine Pitrou152efa22010-05-16 18:19:27 +000027
Christian Heimesefff7062013-11-21 03:35:02 +010028def data_file(*name):
29 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000030
Antoine Pitrou81564092010-10-08 23:06:24 +000031# The custom key and certificate files used in test_ssl are generated
32# using Lib/test/make_ssl_certs.py.
33# Other certificates are simply fetched from the Internet servers they
34# are meant to authenticate.
35
Antoine Pitrou152efa22010-05-16 18:19:27 +000036CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000037BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038ONLYCERT = data_file("ssl_cert.pem")
39ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000040BYTES_ONLYCERT = os.fsencode(ONLYCERT)
41BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020042CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
43ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
44KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010047CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
48CAFILE_CACERT = data_file("capath", "5ed36f99.0")
49
Antoine Pitrou152efa22010-05-16 18:19:27 +000050
Christian Heimes22587792013-11-21 23:56:13 +010051# empty CRL
52CRLFILE = data_file("revocation.crl")
53
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010054# Two keys and certs signed by the same CA (for SNI tests)
55SIGNED_CERTFILE = data_file("keycert3.pem")
56SIGNED_CERTFILE2 = data_file("keycert4.pem")
57SIGNING_CA = data_file("pycacert.pem")
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
60
61EMPTYCERT = data_file("nullcert.pem")
62BADCERT = data_file("badcert.pem")
63WRONGCERT = data_file("XXXnonexisting.pem")
64BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020065NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020066NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000067
Antoine Pitrou0e576f12011-12-22 10:03:38 +010068DHFILE = data_file("dh512.pem")
69BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000070
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010071
Thomas Woutersed03b412007-08-28 21:37:11 +000072def handle_error(prefix):
73 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000074 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000075 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000076
Antoine Pitroub5218772010-05-21 09:56:06 +000077def can_clear_options():
78 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020079 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000080
81def no_sslv2_implies_sslv3_hello():
82 # 0.9.7h or higher
83 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
84
Christian Heimes2427b502013-11-23 11:24:32 +010085def have_verify_flags():
86 # 0.9.8 or higher
87 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
88
Antoine Pitrouc695c952014-04-28 20:57:36 +020089def utc_offset(): #NOTE: ignore issues like #1647654
90 # local time = utc time + utc offset
91 if time.daylight and time.localtime().tm_isdst > 0:
92 return -time.altzone # seconds
93 return -time.timezone
94
Christian Heimes9424bb42013-06-17 15:32:57 +020095def asn1time(cert_time):
96 # Some versions of OpenSSL ignore seconds, see #18207
97 # 0.9.8.i
98 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
99 fmt = "%b %d %H:%M:%S %Y GMT"
100 dt = datetime.datetime.strptime(cert_time, fmt)
101 dt = dt.replace(second=0)
102 cert_time = dt.strftime(fmt)
103 # %d adds leading zero but ASN1_TIME_print() uses leading space
104 if cert_time[4] == "0":
105 cert_time = cert_time[:4] + " " + cert_time[5:]
106
107 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000108
Antoine Pitrou23df4832010-08-04 17:14:06 +0000109# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
110def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200111 if hasattr(ssl, 'PROTOCOL_SSLv2'):
112 @functools.wraps(func)
113 def f(*args, **kwargs):
114 try:
115 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
116 except ssl.SSLError:
117 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
118 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
119 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
120 return func(*args, **kwargs)
121 return f
122 else:
123 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000124
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100125needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
126
Antoine Pitrou23df4832010-08-04 17:14:06 +0000127
Antoine Pitrou152efa22010-05-16 18:19:27 +0000128class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000129
Antoine Pitrou480a1242010-04-28 21:37:09 +0000130 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000131 ssl.CERT_NONE
132 ssl.CERT_OPTIONAL
133 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100134 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100135 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100136 if ssl.HAS_ECDH:
137 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100138 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
139 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000140 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100141 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000142
Antoine Pitrou172f0252014-04-18 20:33:08 +0200143 def test_str_for_enums(self):
144 # Make sure that the PROTOCOL_* constants have enum-like string
145 # reprs.
146 proto = ssl.PROTOCOL_SSLv3
147 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv3')
148 ctx = ssl.SSLContext(proto)
149 self.assertIs(ctx.protocol, proto)
150
Antoine Pitrou480a1242010-04-28 21:37:09 +0000151 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000152 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000153 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000154 sys.stdout.write("\n RAND_status is %d (%s)\n"
155 % (v, (v and "sufficient randomness") or
156 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200157
158 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
159 self.assertEqual(len(data), 16)
160 self.assertEqual(is_cryptographic, v == 1)
161 if v:
162 data = ssl.RAND_bytes(16)
163 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200164 else:
165 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200166
Victor Stinner1e81a392013-12-19 16:47:04 +0100167 # negative num is invalid
168 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
169 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
170
Jesus Ceac8754a12012-09-11 02:00:58 +0200171 self.assertRaises(TypeError, ssl.RAND_egd, 1)
172 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000173 ssl.RAND_add("this is a random string", 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000174
Christian Heimesf77b4b22013-08-21 13:26:05 +0200175 @unittest.skipUnless(os.name == 'posix', 'requires posix')
176 def test_random_fork(self):
177 status = ssl.RAND_status()
178 if not status:
179 self.fail("OpenSSL's PRNG has insufficient randomness")
180
181 rfd, wfd = os.pipe()
182 pid = os.fork()
183 if pid == 0:
184 try:
185 os.close(rfd)
186 child_random = ssl.RAND_pseudo_bytes(16)[0]
187 self.assertEqual(len(child_random), 16)
188 os.write(wfd, child_random)
189 os.close(wfd)
190 except BaseException:
191 os._exit(1)
192 else:
193 os._exit(0)
194 else:
195 os.close(wfd)
196 self.addCleanup(os.close, rfd)
197 _, status = os.waitpid(pid, 0)
198 self.assertEqual(status, 0)
199
200 child_random = os.read(rfd, 16)
201 self.assertEqual(len(child_random), 16)
202 parent_random = ssl.RAND_pseudo_bytes(16)[0]
203 self.assertEqual(len(parent_random), 16)
204
205 self.assertNotEqual(child_random, parent_random)
206
Antoine Pitrou480a1242010-04-28 21:37:09 +0000207 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000208 # note that this uses an 'unofficial' function in _ssl.c,
209 # provided solely for this test, to exercise the certificate
210 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000211 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000212 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000213 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200214 self.assertEqual(p['issuer'],
215 ((('countryName', 'XY'),),
216 (('localityName', 'Castle Anthrax'),),
217 (('organizationName', 'Python Software Foundation'),),
218 (('commonName', 'localhost'),))
219 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100220 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200221 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
222 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200223 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
224 self.assertEqual(p['subject'],
225 ((('countryName', 'XY'),),
226 (('localityName', 'Castle Anthrax'),),
227 (('organizationName', 'Python Software Foundation'),),
228 (('commonName', 'localhost'),))
229 )
230 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
231 # Issue #13034: the subjectAltName in some certificates
232 # (notably projects.developer.nokia.com:443) wasn't parsed
233 p = ssl._ssl._test_decode_cert(NOKIACERT)
234 if support.verbose:
235 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
236 self.assertEqual(p['subjectAltName'],
237 (('DNS', 'projects.developer.nokia.com'),
238 ('DNS', 'projects.forum.nokia.com'))
239 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100240 # extra OCSP and AIA fields
241 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
242 self.assertEqual(p['caIssuers'],
243 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
244 self.assertEqual(p['crlDistributionPoints'],
245 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000246
Christian Heimes824f7f32013-08-17 00:54:47 +0200247 def test_parse_cert_CVE_2013_4238(self):
248 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
249 if support.verbose:
250 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
251 subject = ((('countryName', 'US'),),
252 (('stateOrProvinceName', 'Oregon'),),
253 (('localityName', 'Beaverton'),),
254 (('organizationName', 'Python Software Foundation'),),
255 (('organizationalUnitName', 'Python Core Development'),),
256 (('commonName', 'null.python.org\x00example.org'),),
257 (('emailAddress', 'python-dev@python.org'),))
258 self.assertEqual(p['subject'], subject)
259 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200260 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
261 san = (('DNS', 'altnull.python.org\x00example.com'),
262 ('email', 'null@python.org\x00user@example.org'),
263 ('URI', 'http://null.python.org\x00http://example.org'),
264 ('IP Address', '192.0.2.1'),
265 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
266 else:
267 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
268 san = (('DNS', 'altnull.python.org\x00example.com'),
269 ('email', 'null@python.org\x00user@example.org'),
270 ('URI', 'http://null.python.org\x00http://example.org'),
271 ('IP Address', '192.0.2.1'),
272 ('IP Address', '<invalid>'))
273
274 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200275
Antoine Pitrou480a1242010-04-28 21:37:09 +0000276 def test_DER_to_PEM(self):
277 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
278 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000279 d1 = ssl.PEM_cert_to_DER_cert(pem)
280 p2 = ssl.DER_cert_to_PEM_cert(d1)
281 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000282 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000283 if not p2.startswith(ssl.PEM_HEADER + '\n'):
284 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
285 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
286 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000287
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000288 def test_openssl_version(self):
289 n = ssl.OPENSSL_VERSION_NUMBER
290 t = ssl.OPENSSL_VERSION_INFO
291 s = ssl.OPENSSL_VERSION
292 self.assertIsInstance(n, int)
293 self.assertIsInstance(t, tuple)
294 self.assertIsInstance(s, str)
295 # Some sanity checks follow
296 # >= 0.9
297 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400298 # < 3.0
299 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000300 major, minor, fix, patch, status = t
301 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400302 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000303 self.assertGreaterEqual(minor, 0)
304 self.assertLess(minor, 256)
305 self.assertGreaterEqual(fix, 0)
306 self.assertLess(fix, 256)
307 self.assertGreaterEqual(patch, 0)
308 self.assertLessEqual(patch, 26)
309 self.assertGreaterEqual(status, 0)
310 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400311 # Version string as returned by {Open,Libre}SSL, the format might change
312 if "LibreSSL" in s:
313 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
314 (s, t))
315 else:
316 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
317 (s, t))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000318
Antoine Pitrou9d543662010-04-23 23:10:32 +0000319 @support.cpython_only
320 def test_refcycle(self):
321 # Issue #7943: an SSL object doesn't create reference cycles with
322 # itself.
323 s = socket.socket(socket.AF_INET)
324 ss = ssl.wrap_socket(s)
325 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100326 with support.check_warnings(("", ResourceWarning)):
327 del ss
328 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000329
Antoine Pitroua468adc2010-09-14 14:43:44 +0000330 def test_wrapped_unconnected(self):
331 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200332 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000333 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100334 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100335 self.assertRaises(OSError, ss.recv, 1)
336 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
337 self.assertRaises(OSError, ss.recvfrom, 1)
338 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
339 self.assertRaises(OSError, ss.send, b'x')
340 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000341
Antoine Pitrou40f08742010-04-24 22:04:40 +0000342 def test_timeout(self):
343 # Issue #8524: when creating an SSL socket, the timeout of the
344 # original socket should be retained.
345 for timeout in (None, 0.0, 5.0):
346 s = socket.socket(socket.AF_INET)
347 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100348 with ssl.wrap_socket(s) as ss:
349 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000350
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000351 def test_errors(self):
352 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000353 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000354 "certfile must be specified",
355 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000356 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000357 "certfile must be specified for server-side operations",
358 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000359 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000360 "certfile must be specified for server-side operations",
361 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100362 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
363 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
364 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200365 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000366 with socket.socket() as sock:
367 ssl.wrap_socket(sock, certfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000368 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200369 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000370 with socket.socket() as sock:
371 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000372 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200373 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000374 with socket.socket() as sock:
375 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000376 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000377
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000378 def test_match_hostname(self):
379 def ok(cert, hostname):
380 ssl.match_hostname(cert, hostname)
381 def fail(cert, hostname):
382 self.assertRaises(ssl.CertificateError,
383 ssl.match_hostname, cert, hostname)
384
385 cert = {'subject': ((('commonName', 'example.com'),),)}
386 ok(cert, 'example.com')
387 ok(cert, 'ExAmple.cOm')
388 fail(cert, 'www.example.com')
389 fail(cert, '.example.com')
390 fail(cert, 'example.org')
391 fail(cert, 'exampleXcom')
392
393 cert = {'subject': ((('commonName', '*.a.com'),),)}
394 ok(cert, 'foo.a.com')
395 fail(cert, 'bar.foo.a.com')
396 fail(cert, 'a.com')
397 fail(cert, 'Xa.com')
398 fail(cert, '.a.com')
399
Georg Brandl72c98d32013-10-27 07:16:53 +0100400 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000401 cert = {'subject': ((('commonName', 'f*.com'),),)}
402 ok(cert, 'foo.com')
403 ok(cert, 'f.com')
404 fail(cert, 'bar.com')
405 fail(cert, 'foo.a.com')
406 fail(cert, 'bar.foo.com')
407
Christian Heimes824f7f32013-08-17 00:54:47 +0200408 # NULL bytes are bad, CVE-2013-4073
409 cert = {'subject': ((('commonName',
410 'null.python.org\x00example.org'),),)}
411 ok(cert, 'null.python.org\x00example.org') # or raise an error?
412 fail(cert, 'example.org')
413 fail(cert, 'null.python.org')
414
Georg Brandl72c98d32013-10-27 07:16:53 +0100415 # error cases with wildcards
416 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
417 fail(cert, 'bar.foo.a.com')
418 fail(cert, 'a.com')
419 fail(cert, 'Xa.com')
420 fail(cert, '.a.com')
421
422 cert = {'subject': ((('commonName', 'a.*.com'),),)}
423 fail(cert, 'a.foo.com')
424 fail(cert, 'a..com')
425 fail(cert, 'a.com')
426
427 # wildcard doesn't match IDNA prefix 'xn--'
428 idna = 'püthon.python.org'.encode("idna").decode("ascii")
429 cert = {'subject': ((('commonName', idna),),)}
430 ok(cert, idna)
431 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
432 fail(cert, idna)
433 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
434 fail(cert, idna)
435
436 # wildcard in first fragment and IDNA A-labels in sequent fragments
437 # are supported.
438 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
439 cert = {'subject': ((('commonName', idna),),)}
440 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
441 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
442 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
443 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
444
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000445 # Slightly fake real-world example
446 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
447 'subject': ((('commonName', 'linuxfrz.org'),),),
448 'subjectAltName': (('DNS', 'linuxfr.org'),
449 ('DNS', 'linuxfr.com'),
450 ('othername', '<unsupported>'))}
451 ok(cert, 'linuxfr.org')
452 ok(cert, 'linuxfr.com')
453 # Not a "DNS" entry
454 fail(cert, '<unsupported>')
455 # When there is a subjectAltName, commonName isn't used
456 fail(cert, 'linuxfrz.org')
457
458 # A pristine real-world example
459 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
460 'subject': ((('countryName', 'US'),),
461 (('stateOrProvinceName', 'California'),),
462 (('localityName', 'Mountain View'),),
463 (('organizationName', 'Google Inc'),),
464 (('commonName', 'mail.google.com'),))}
465 ok(cert, 'mail.google.com')
466 fail(cert, 'gmail.com')
467 # Only commonName is considered
468 fail(cert, 'California')
469
470 # Neither commonName nor subjectAltName
471 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
472 'subject': ((('countryName', 'US'),),
473 (('stateOrProvinceName', 'California'),),
474 (('localityName', 'Mountain View'),),
475 (('organizationName', 'Google Inc'),))}
476 fail(cert, 'mail.google.com')
477
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200478 # No DNS entry in subjectAltName but a commonName
479 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
480 'subject': ((('countryName', 'US'),),
481 (('stateOrProvinceName', 'California'),),
482 (('localityName', 'Mountain View'),),
483 (('commonName', 'mail.google.com'),)),
484 'subjectAltName': (('othername', 'blabla'), )}
485 ok(cert, 'mail.google.com')
486
487 # No DNS entry subjectAltName and no commonName
488 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
489 'subject': ((('countryName', 'US'),),
490 (('stateOrProvinceName', 'California'),),
491 (('localityName', 'Mountain View'),),
492 (('organizationName', 'Google Inc'),)),
493 'subjectAltName': (('othername', 'blabla'),)}
494 fail(cert, 'google.com')
495
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000496 # Empty cert / no cert
497 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
498 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
499
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200500 # Issue #17980: avoid denials of service by refusing more than one
501 # wildcard per fragment.
502 cert = {'subject': ((('commonName', 'a*b.com'),),)}
503 ok(cert, 'axxb.com')
504 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100505 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200506 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
507 with self.assertRaises(ssl.CertificateError) as cm:
508 ssl.match_hostname(cert, 'axxbxxc.com')
509 self.assertIn("too many wildcards", str(cm.exception))
510
Antoine Pitroud5323212010-10-22 18:19:07 +0000511 def test_server_side(self):
512 # server_hostname doesn't work for server sockets
513 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000514 with socket.socket() as sock:
515 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
516 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000517
Antoine Pitroud6494802011-07-21 01:11:30 +0200518 def test_unknown_channel_binding(self):
519 # should raise ValueError for unknown type
520 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100521 with ssl.wrap_socket(s) as ss:
522 with self.assertRaises(ValueError):
523 ss.get_channel_binding("unknown-type")
Antoine Pitroud6494802011-07-21 01:11:30 +0200524
525 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
526 "'tls-unique' channel binding not available")
527 def test_tls_unique_channel_binding(self):
528 # unconnected should return None for known type
529 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100530 with ssl.wrap_socket(s) as ss:
531 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200532 # the same for server-side
533 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100534 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
535 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200536
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600537 def test_dealloc_warn(self):
538 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
539 r = repr(ss)
540 with self.assertWarns(ResourceWarning) as cm:
541 ss = None
542 support.gc_collect()
543 self.assertIn(r, str(cm.warning.args[0]))
544
Christian Heimes6d7ad132013-06-09 18:02:55 +0200545 def test_get_default_verify_paths(self):
546 paths = ssl.get_default_verify_paths()
547 self.assertEqual(len(paths), 6)
548 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
549
550 with support.EnvironmentVarGuard() as env:
551 env["SSL_CERT_DIR"] = CAPATH
552 env["SSL_CERT_FILE"] = CERTFILE
553 paths = ssl.get_default_verify_paths()
554 self.assertEqual(paths.cafile, CERTFILE)
555 self.assertEqual(paths.capath, CAPATH)
556
Christian Heimes44109d72013-11-22 01:51:30 +0100557 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
558 def test_enum_certificates(self):
559 self.assertTrue(ssl.enum_certificates("CA"))
560 self.assertTrue(ssl.enum_certificates("ROOT"))
561
562 self.assertRaises(TypeError, ssl.enum_certificates)
563 self.assertRaises(WindowsError, ssl.enum_certificates, "")
564
Christian Heimesc2d65e12013-11-22 16:13:55 +0100565 trust_oids = set()
566 for storename in ("CA", "ROOT"):
567 store = ssl.enum_certificates(storename)
568 self.assertIsInstance(store, list)
569 for element in store:
570 self.assertIsInstance(element, tuple)
571 self.assertEqual(len(element), 3)
572 cert, enc, trust = element
573 self.assertIsInstance(cert, bytes)
574 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
575 self.assertIsInstance(trust, (set, bool))
576 if isinstance(trust, set):
577 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100578
579 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100580 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200581
Christian Heimes46bebee2013-06-09 19:03:31 +0200582 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100583 def test_enum_crls(self):
584 self.assertTrue(ssl.enum_crls("CA"))
585 self.assertRaises(TypeError, ssl.enum_crls)
586 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200587
Christian Heimes44109d72013-11-22 01:51:30 +0100588 crls = ssl.enum_crls("CA")
589 self.assertIsInstance(crls, list)
590 for element in crls:
591 self.assertIsInstance(element, tuple)
592 self.assertEqual(len(element), 2)
593 self.assertIsInstance(element[0], bytes)
594 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200595
Christian Heimes46bebee2013-06-09 19:03:31 +0200596
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100597 def test_asn1object(self):
598 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
599 '1.3.6.1.5.5.7.3.1')
600
601 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
602 self.assertEqual(val, expected)
603 self.assertEqual(val.nid, 129)
604 self.assertEqual(val.shortname, 'serverAuth')
605 self.assertEqual(val.longname, 'TLS Web Server Authentication')
606 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
607 self.assertIsInstance(val, ssl._ASN1Object)
608 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
609
610 val = ssl._ASN1Object.fromnid(129)
611 self.assertEqual(val, expected)
612 self.assertIsInstance(val, ssl._ASN1Object)
613 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100614 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
615 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100616 for i in range(1000):
617 try:
618 obj = ssl._ASN1Object.fromnid(i)
619 except ValueError:
620 pass
621 else:
622 self.assertIsInstance(obj.nid, int)
623 self.assertIsInstance(obj.shortname, str)
624 self.assertIsInstance(obj.longname, str)
625 self.assertIsInstance(obj.oid, (str, type(None)))
626
627 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
628 self.assertEqual(val, expected)
629 self.assertIsInstance(val, ssl._ASN1Object)
630 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
631 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
632 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100633 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
634 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100635
Christian Heimes72d28502013-11-23 13:56:58 +0100636 def test_purpose_enum(self):
637 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
638 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
639 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
640 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
641 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
642 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
643 '1.3.6.1.5.5.7.3.1')
644
645 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
646 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
647 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
648 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
649 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
650 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
651 '1.3.6.1.5.5.7.3.2')
652
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100653 def test_unsupported_dtls(self):
654 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
655 self.addCleanup(s.close)
656 with self.assertRaises(NotImplementedError) as cx:
657 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
658 self.assertEqual(str(cx.exception), "only stream sockets are supported")
659 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
660 with self.assertRaises(NotImplementedError) as cx:
661 ctx.wrap_socket(s)
662 self.assertEqual(str(cx.exception), "only stream sockets are supported")
663
Antoine Pitrouc695c952014-04-28 20:57:36 +0200664 def cert_time_ok(self, timestring, timestamp):
665 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
666
667 def cert_time_fail(self, timestring):
668 with self.assertRaises(ValueError):
669 ssl.cert_time_to_seconds(timestring)
670
671 @unittest.skipUnless(utc_offset(),
672 'local time needs to be different from UTC')
673 def test_cert_time_to_seconds_timezone(self):
674 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
675 # results if local timezone is not UTC
676 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
677 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
678
679 def test_cert_time_to_seconds(self):
680 timestring = "Jan 5 09:34:43 2018 GMT"
681 ts = 1515144883.0
682 self.cert_time_ok(timestring, ts)
683 # accept keyword parameter, assert its name
684 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
685 # accept both %e and %d (space or zero generated by strftime)
686 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
687 # case-insensitive
688 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
689 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
690 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
691 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
692 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
693 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
694 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
695 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
696
697 newyear_ts = 1230768000.0
698 # leap seconds
699 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
700 # same timestamp
701 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
702
703 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
704 # allow 60th second (even if it is not a leap second)
705 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
706 # allow 2nd leap second for compatibility with time.strptime()
707 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
708 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
709
710 # no special treatement for the special value:
711 # 99991231235959Z (rfc 5280)
712 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
713
714 @support.run_with_locale('LC_ALL', '')
715 def test_cert_time_to_seconds_locale(self):
716 # `cert_time_to_seconds()` should be locale independent
717
718 def local_february_name():
719 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
720
721 if local_february_name().lower() == 'feb':
722 self.skipTest("locale-specific month name needs to be "
723 "different from C locale")
724
725 # locale-independent
726 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
727 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
728
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100729
Antoine Pitrou152efa22010-05-16 18:19:27 +0000730class ContextTests(unittest.TestCase):
731
Antoine Pitrou23df4832010-08-04 17:14:06 +0000732 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000733 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100734 for protocol in PROTOCOLS:
735 ssl.SSLContext(protocol)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000736 self.assertRaises(TypeError, ssl.SSLContext)
737 self.assertRaises(ValueError, ssl.SSLContext, -1)
738 self.assertRaises(ValueError, ssl.SSLContext, 42)
739
Antoine Pitrou23df4832010-08-04 17:14:06 +0000740 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000741 def test_protocol(self):
742 for proto in PROTOCOLS:
743 ctx = ssl.SSLContext(proto)
744 self.assertEqual(ctx.protocol, proto)
745
746 def test_ciphers(self):
747 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
748 ctx.set_ciphers("ALL")
749 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000750 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000751 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000752
Antoine Pitrou23df4832010-08-04 17:14:06 +0000753 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000754 def test_options(self):
755 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +0100756 # OP_ALL | OP_NO_SSLv2 is the default value
Antoine Pitroub5218772010-05-21 09:56:06 +0000757 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
758 ctx.options)
759 ctx.options |= ssl.OP_NO_SSLv3
760 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
761 ctx.options)
762 if can_clear_options():
763 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
764 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
765 ctx.options)
766 ctx.options = 0
767 self.assertEqual(0, ctx.options)
768 else:
769 with self.assertRaises(ValueError):
770 ctx.options = 0
771
Christian Heimes22587792013-11-21 23:56:13 +0100772 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000773 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
774 # Default value
775 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
776 ctx.verify_mode = ssl.CERT_OPTIONAL
777 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
778 ctx.verify_mode = ssl.CERT_REQUIRED
779 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
780 ctx.verify_mode = ssl.CERT_NONE
781 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
782 with self.assertRaises(TypeError):
783 ctx.verify_mode = None
784 with self.assertRaises(ValueError):
785 ctx.verify_mode = 42
786
Christian Heimes2427b502013-11-23 11:24:32 +0100787 @unittest.skipUnless(have_verify_flags(),
788 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100789 def test_verify_flags(self):
790 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
791 # default value by OpenSSL
792 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
793 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
794 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
795 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
796 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
797 ctx.verify_flags = ssl.VERIFY_DEFAULT
798 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
799 # supports any value
800 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
801 self.assertEqual(ctx.verify_flags,
802 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
803 with self.assertRaises(TypeError):
804 ctx.verify_flags = None
805
Antoine Pitrou152efa22010-05-16 18:19:27 +0000806 def test_load_cert_chain(self):
807 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
808 # Combined key and cert in a single file
809 ctx.load_cert_chain(CERTFILE)
810 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
811 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200812 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000813 ctx.load_cert_chain(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000814 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000815 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000816 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000817 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000818 ctx.load_cert_chain(EMPTYCERT)
819 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000820 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000821 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
822 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
823 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000824 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000825 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000826 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000827 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000828 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000829 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
830 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000831 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000832 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Antoine Pitrou81564092010-10-08 23:06:24 +0000833 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200834 # Password protected key and cert
835 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
836 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
837 ctx.load_cert_chain(CERTFILE_PROTECTED,
838 password=bytearray(KEY_PASSWORD.encode()))
839 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
840 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
841 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
842 bytearray(KEY_PASSWORD.encode()))
843 with self.assertRaisesRegex(TypeError, "should be a string"):
844 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
845 with self.assertRaises(ssl.SSLError):
846 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
847 with self.assertRaisesRegex(ValueError, "cannot be longer"):
848 # openssl has a fixed limit on the password buffer.
849 # PEM_BUFSIZE is generally set to 1kb.
850 # Return a string larger than this.
851 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
852 # Password callback
853 def getpass_unicode():
854 return KEY_PASSWORD
855 def getpass_bytes():
856 return KEY_PASSWORD.encode()
857 def getpass_bytearray():
858 return bytearray(KEY_PASSWORD.encode())
859 def getpass_badpass():
860 return "badpass"
861 def getpass_huge():
862 return b'a' * (1024 * 1024)
863 def getpass_bad_type():
864 return 9
865 def getpass_exception():
866 raise Exception('getpass error')
867 class GetPassCallable:
868 def __call__(self):
869 return KEY_PASSWORD
870 def getpass(self):
871 return KEY_PASSWORD
872 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
873 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
874 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
875 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
876 ctx.load_cert_chain(CERTFILE_PROTECTED,
877 password=GetPassCallable().getpass)
878 with self.assertRaises(ssl.SSLError):
879 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
880 with self.assertRaisesRegex(ValueError, "cannot be longer"):
881 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
882 with self.assertRaisesRegex(TypeError, "must return a string"):
883 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
884 with self.assertRaisesRegex(Exception, "getpass error"):
885 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
886 # Make sure the password function isn't called if it isn't needed
887 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000888
889 def test_load_verify_locations(self):
890 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
891 ctx.load_verify_locations(CERTFILE)
892 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
893 ctx.load_verify_locations(BYTES_CERTFILE)
894 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
895 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100896 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200897 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000898 ctx.load_verify_locations(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000899 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000900 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000901 ctx.load_verify_locations(BADCERT)
902 ctx.load_verify_locations(CERTFILE, CAPATH)
903 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
904
Victor Stinner80f75e62011-01-29 11:31:20 +0000905 # Issue #10989: crash if the second argument type is invalid
906 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
907
Christian Heimesefff7062013-11-21 03:35:02 +0100908 def test_load_verify_cadata(self):
909 # test cadata
910 with open(CAFILE_CACERT) as f:
911 cacert_pem = f.read()
912 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
913 with open(CAFILE_NEURONIO) as f:
914 neuronio_pem = f.read()
915 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
916
917 # test PEM
918 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
919 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
920 ctx.load_verify_locations(cadata=cacert_pem)
921 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
922 ctx.load_verify_locations(cadata=neuronio_pem)
923 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
924 # cert already in hash table
925 ctx.load_verify_locations(cadata=neuronio_pem)
926 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
927
928 # combined
929 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
930 combined = "\n".join((cacert_pem, neuronio_pem))
931 ctx.load_verify_locations(cadata=combined)
932 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
933
934 # with junk around the certs
935 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
936 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
937 neuronio_pem, "tail"]
938 ctx.load_verify_locations(cadata="\n".join(combined))
939 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
940
941 # test DER
942 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
943 ctx.load_verify_locations(cadata=cacert_der)
944 ctx.load_verify_locations(cadata=neuronio_der)
945 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
946 # cert already in hash table
947 ctx.load_verify_locations(cadata=cacert_der)
948 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
949
950 # combined
951 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
952 combined = b"".join((cacert_der, neuronio_der))
953 ctx.load_verify_locations(cadata=combined)
954 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
955
956 # error cases
957 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
958 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
959
960 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
961 ctx.load_verify_locations(cadata="broken")
962 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
963 ctx.load_verify_locations(cadata=b"broken")
964
965
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100966 def test_load_dh_params(self):
967 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
968 ctx.load_dh_params(DHFILE)
969 if os.name != 'nt':
970 ctx.load_dh_params(BYTES_DHFILE)
971 self.assertRaises(TypeError, ctx.load_dh_params)
972 self.assertRaises(TypeError, ctx.load_dh_params, None)
973 with self.assertRaises(FileNotFoundError) as cm:
974 ctx.load_dh_params(WRONGCERT)
975 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +0200976 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100977 ctx.load_dh_params(CERTFILE)
978
Antoine Pitroueb585ad2010-10-22 18:24:20 +0000979 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +0000980 def test_session_stats(self):
981 for proto in PROTOCOLS:
982 ctx = ssl.SSLContext(proto)
983 self.assertEqual(ctx.session_stats(), {
984 'number': 0,
985 'connect': 0,
986 'connect_good': 0,
987 'connect_renegotiate': 0,
988 'accept': 0,
989 'accept_good': 0,
990 'accept_renegotiate': 0,
991 'hits': 0,
992 'misses': 0,
993 'timeouts': 0,
994 'cache_full': 0,
995 })
996
Antoine Pitrou664c2d12010-11-17 20:29:42 +0000997 def test_set_default_verify_paths(self):
998 # There's not much we can do to test that it acts as expected,
999 # so just check it doesn't crash or raise an exception.
1000 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1001 ctx.set_default_verify_paths()
1002
Antoine Pitrou501da612011-12-21 09:27:41 +01001003 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001004 def test_set_ecdh_curve(self):
1005 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1006 ctx.set_ecdh_curve("prime256v1")
1007 ctx.set_ecdh_curve(b"prime256v1")
1008 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1009 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1010 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1011 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1012
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001013 @needs_sni
1014 def test_sni_callback(self):
1015 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1016
1017 # set_servername_callback expects a callable, or None
1018 self.assertRaises(TypeError, ctx.set_servername_callback)
1019 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1020 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1021 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1022
1023 def dummycallback(sock, servername, ctx):
1024 pass
1025 ctx.set_servername_callback(None)
1026 ctx.set_servername_callback(dummycallback)
1027
1028 @needs_sni
1029 def test_sni_callback_refcycle(self):
1030 # Reference cycles through the servername callback are detected
1031 # and cleared.
1032 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1033 def dummycallback(sock, servername, ctx, cycle=ctx):
1034 pass
1035 ctx.set_servername_callback(dummycallback)
1036 wr = weakref.ref(ctx)
1037 del ctx, dummycallback
1038 gc.collect()
1039 self.assertIs(wr(), None)
1040
Christian Heimes9a5395a2013-06-17 15:44:12 +02001041 def test_cert_store_stats(self):
1042 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1043 self.assertEqual(ctx.cert_store_stats(),
1044 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1045 ctx.load_cert_chain(CERTFILE)
1046 self.assertEqual(ctx.cert_store_stats(),
1047 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1048 ctx.load_verify_locations(CERTFILE)
1049 self.assertEqual(ctx.cert_store_stats(),
1050 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1051 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1052 self.assertEqual(ctx.cert_store_stats(),
1053 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1054
1055 def test_get_ca_certs(self):
1056 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1057 self.assertEqual(ctx.get_ca_certs(), [])
1058 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1059 ctx.load_verify_locations(CERTFILE)
1060 self.assertEqual(ctx.get_ca_certs(), [])
1061 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1062 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1063 self.assertEqual(ctx.get_ca_certs(),
1064 [{'issuer': ((('organizationName', 'Root CA'),),
1065 (('organizationalUnitName', 'http://www.cacert.org'),),
1066 (('commonName', 'CA Cert Signing Authority'),),
1067 (('emailAddress', 'support@cacert.org'),)),
1068 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1069 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1070 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001071 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001072 'subject': ((('organizationName', 'Root CA'),),
1073 (('organizationalUnitName', 'http://www.cacert.org'),),
1074 (('commonName', 'CA Cert Signing Authority'),),
1075 (('emailAddress', 'support@cacert.org'),)),
1076 'version': 3}])
1077
1078 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1079 pem = f.read()
1080 der = ssl.PEM_cert_to_DER_cert(pem)
1081 self.assertEqual(ctx.get_ca_certs(True), [der])
1082
Christian Heimes72d28502013-11-23 13:56:58 +01001083 def test_load_default_certs(self):
1084 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1085 ctx.load_default_certs()
1086
1087 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1088 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1089 ctx.load_default_certs()
1090
1091 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1092 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1093
1094 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1095 self.assertRaises(TypeError, ctx.load_default_certs, None)
1096 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1097
Christian Heimes4c05b472013-11-23 15:58:30 +01001098 def test_create_default_context(self):
1099 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001100 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001101 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001102 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001103 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001104 self.assertEqual(
1105 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1106 getattr(ssl, "OP_NO_COMPRESSION", 0),
1107 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001108
1109 with open(SIGNING_CA) as f:
1110 cadata = f.read()
1111 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1112 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001113 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001114 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1115 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001116 self.assertEqual(
1117 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1118 getattr(ssl, "OP_NO_COMPRESSION", 0),
1119 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001120
1121 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001122 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001123 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1124 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001125 self.assertEqual(
1126 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1127 getattr(ssl, "OP_NO_COMPRESSION", 0),
1128 )
1129 self.assertEqual(
1130 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1131 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1132 )
1133 self.assertEqual(
1134 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1135 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1136 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001137
Christian Heimes67986f92013-11-23 22:43:47 +01001138 def test__create_stdlib_context(self):
1139 ctx = ssl._create_stdlib_context()
1140 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1141 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001142 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001143 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1144
1145 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1146 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1147 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1148 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1149
1150 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001151 cert_reqs=ssl.CERT_REQUIRED,
1152 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001153 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1154 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001155 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001156 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1157
1158 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1159 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1160 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1161 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001162
Christian Heimes1aa9a752013-12-02 02:41:19 +01001163 def test_check_hostname(self):
1164 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1165 self.assertFalse(ctx.check_hostname)
1166
1167 # Requires CERT_REQUIRED or CERT_OPTIONAL
1168 with self.assertRaises(ValueError):
1169 ctx.check_hostname = True
1170 ctx.verify_mode = ssl.CERT_REQUIRED
1171 self.assertFalse(ctx.check_hostname)
1172 ctx.check_hostname = True
1173 self.assertTrue(ctx.check_hostname)
1174
1175 ctx.verify_mode = ssl.CERT_OPTIONAL
1176 ctx.check_hostname = True
1177 self.assertTrue(ctx.check_hostname)
1178
1179 # Cannot set CERT_NONE with check_hostname enabled
1180 with self.assertRaises(ValueError):
1181 ctx.verify_mode = ssl.CERT_NONE
1182 ctx.check_hostname = False
1183 self.assertFalse(ctx.check_hostname)
1184
Antoine Pitrou152efa22010-05-16 18:19:27 +00001185
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001186class SSLErrorTests(unittest.TestCase):
1187
1188 def test_str(self):
1189 # The str() of a SSLError doesn't include the errno
1190 e = ssl.SSLError(1, "foo")
1191 self.assertEqual(str(e), "foo")
1192 self.assertEqual(e.errno, 1)
1193 # Same for a subclass
1194 e = ssl.SSLZeroReturnError(1, "foo")
1195 self.assertEqual(str(e), "foo")
1196 self.assertEqual(e.errno, 1)
1197
1198 def test_lib_reason(self):
1199 # Test the library and reason attributes
1200 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1201 with self.assertRaises(ssl.SSLError) as cm:
1202 ctx.load_dh_params(CERTFILE)
1203 self.assertEqual(cm.exception.library, 'PEM')
1204 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1205 s = str(cm.exception)
1206 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1207
1208 def test_subclass(self):
1209 # Check that the appropriate SSLError subclass is raised
1210 # (this only tests one of them)
1211 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1212 with socket.socket() as s:
1213 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001214 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001215 c = socket.socket()
1216 c.connect(s.getsockname())
1217 c.setblocking(False)
1218 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001219 with self.assertRaises(ssl.SSLWantReadError) as cm:
1220 c.do_handshake()
1221 s = str(cm.exception)
1222 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1223 # For compatibility
1224 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1225
1226
Bill Janssen6e027db2007-11-15 22:23:56 +00001227class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001228
Antoine Pitrou480a1242010-04-28 21:37:09 +00001229 def test_connect(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001230 with support.transient_internet("svn.python.org"):
1231 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1232 cert_reqs=ssl.CERT_NONE)
1233 try:
1234 s.connect(("svn.python.org", 443))
1235 self.assertEqual({}, s.getpeercert())
1236 finally:
1237 s.close()
1238
1239 # this should fail because we have no verification certs
1240 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1241 cert_reqs=ssl.CERT_REQUIRED)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001242 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1243 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001244 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001245
Antoine Pitrou350c7222010-09-09 13:31:46 +00001246 # this should succeed because we specify the root cert
1247 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1248 cert_reqs=ssl.CERT_REQUIRED,
1249 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1250 try:
1251 s.connect(("svn.python.org", 443))
1252 self.assertTrue(s.getpeercert())
1253 finally:
1254 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001255
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001256 def test_connect_ex(self):
1257 # Issue #11326: check connect_ex() implementation
1258 with support.transient_internet("svn.python.org"):
1259 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1260 cert_reqs=ssl.CERT_REQUIRED,
1261 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1262 try:
1263 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1264 self.assertTrue(s.getpeercert())
1265 finally:
1266 s.close()
1267
1268 def test_non_blocking_connect_ex(self):
1269 # Issue #11326: non-blocking connect_ex() should allow handshake
1270 # to proceed after the socket gets ready.
1271 with support.transient_internet("svn.python.org"):
1272 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1273 cert_reqs=ssl.CERT_REQUIRED,
1274 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1275 do_handshake_on_connect=False)
1276 try:
1277 s.setblocking(False)
1278 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8a14a0c2011-02-27 15:44:12 +00001279 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1280 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001281 # Wait for connect to finish
1282 select.select([], [s], [], 5.0)
1283 # Non-blocking handshake
1284 while True:
1285 try:
1286 s.do_handshake()
1287 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001288 except ssl.SSLWantReadError:
1289 select.select([s], [], [], 5.0)
1290 except ssl.SSLWantWriteError:
1291 select.select([], [s], [], 5.0)
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001292 # SSL established
1293 self.assertTrue(s.getpeercert())
1294 finally:
1295 s.close()
1296
Antoine Pitroub4410db2011-05-18 18:51:06 +02001297 def test_timeout_connect_ex(self):
1298 # Issue #12065: on a timeout, connect_ex() should return the original
1299 # errno (mimicking the behaviour of non-SSL sockets).
1300 with support.transient_internet("svn.python.org"):
1301 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1302 cert_reqs=ssl.CERT_REQUIRED,
1303 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1304 do_handshake_on_connect=False)
1305 try:
1306 s.settimeout(0.0000001)
1307 rc = s.connect_ex(('svn.python.org', 443))
1308 if rc == 0:
1309 self.skipTest("svn.python.org responded too quickly")
1310 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1311 finally:
1312 s.close()
1313
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001314 def test_connect_ex_error(self):
Antoine Pitrouddb87ab2012-12-28 19:07:43 +01001315 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001316 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1317 cert_reqs=ssl.CERT_REQUIRED,
1318 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1319 try:
Christian Heimesde570742013-12-16 21:15:44 +01001320 rc = s.connect_ex(("svn.python.org", 444))
1321 # Issue #19919: Windows machines or VMs hosted on Windows
1322 # machines sometimes return EWOULDBLOCK.
1323 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001324 finally:
1325 s.close()
1326
Antoine Pitrou152efa22010-05-16 18:19:27 +00001327 def test_connect_with_context(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001328 with support.transient_internet("svn.python.org"):
1329 # Same as test_connect, but with a separately created context
1330 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1331 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1332 s.connect(("svn.python.org", 443))
1333 try:
1334 self.assertEqual({}, s.getpeercert())
1335 finally:
1336 s.close()
Antoine Pitroud5323212010-10-22 18:19:07 +00001337 # Same with a server hostname
1338 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1339 server_hostname="svn.python.org")
1340 if ssl.HAS_SNI:
1341 s.connect(("svn.python.org", 443))
1342 s.close()
1343 else:
1344 self.assertRaises(ValueError, s.connect, ("svn.python.org", 443))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001345 # This should fail because we have no verification certs
1346 ctx.verify_mode = ssl.CERT_REQUIRED
1347 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Ezio Melottied3a7d22010-12-01 02:32:32 +00001348 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
Antoine Pitrou350c7222010-09-09 13:31:46 +00001349 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001350 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001351 # This should succeed because we specify the root cert
1352 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1353 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1354 s.connect(("svn.python.org", 443))
1355 try:
1356 cert = s.getpeercert()
1357 self.assertTrue(cert)
1358 finally:
1359 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001360
1361 def test_connect_capath(self):
1362 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001363 # NOTE: the subject hashing algorithm has been changed between
1364 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1365 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001366 # filename) for this test to be portable across OpenSSL releases.
Antoine Pitrou350c7222010-09-09 13:31:46 +00001367 with support.transient_internet("svn.python.org"):
1368 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1369 ctx.verify_mode = ssl.CERT_REQUIRED
1370 ctx.load_verify_locations(capath=CAPATH)
1371 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1372 s.connect(("svn.python.org", 443))
1373 try:
1374 cert = s.getpeercert()
1375 self.assertTrue(cert)
1376 finally:
1377 s.close()
1378 # Same with a bytes `capath` argument
1379 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1380 ctx.verify_mode = ssl.CERT_REQUIRED
1381 ctx.load_verify_locations(capath=BYTES_CAPATH)
1382 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1383 s.connect(("svn.python.org", 443))
1384 try:
1385 cert = s.getpeercert()
1386 self.assertTrue(cert)
1387 finally:
1388 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001389
Christian Heimesefff7062013-11-21 03:35:02 +01001390 def test_connect_cadata(self):
1391 with open(CAFILE_CACERT) as f:
1392 pem = f.read()
1393 der = ssl.PEM_cert_to_DER_cert(pem)
1394 with support.transient_internet("svn.python.org"):
1395 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1396 ctx.verify_mode = ssl.CERT_REQUIRED
1397 ctx.load_verify_locations(cadata=pem)
1398 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1399 s.connect(("svn.python.org", 443))
1400 cert = s.getpeercert()
1401 self.assertTrue(cert)
1402
1403 # same with DER
1404 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1405 ctx.verify_mode = ssl.CERT_REQUIRED
1406 ctx.load_verify_locations(cadata=der)
1407 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1408 s.connect(("svn.python.org", 443))
1409 cert = s.getpeercert()
1410 self.assertTrue(cert)
1411
Antoine Pitroue3220242010-04-24 11:13:53 +00001412 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1413 def test_makefile_close(self):
1414 # Issue #5238: creating a file-like object with makefile() shouldn't
1415 # delay closing the underlying "real socket" (here tested with its
1416 # file descriptor, hence skipping the test under Windows).
Antoine Pitrou350c7222010-09-09 13:31:46 +00001417 with support.transient_internet("svn.python.org"):
1418 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1419 ss.connect(("svn.python.org", 443))
1420 fd = ss.fileno()
1421 f = ss.makefile()
1422 f.close()
1423 # The fd is still open
Antoine Pitroue3220242010-04-24 11:13:53 +00001424 os.read(fd, 0)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001425 # Closing the SSL socket should close the fd too
1426 ss.close()
1427 gc.collect()
1428 with self.assertRaises(OSError) as e:
1429 os.read(fd, 0)
1430 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001431
Antoine Pitrou480a1242010-04-28 21:37:09 +00001432 def test_non_blocking_handshake(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001433 with support.transient_internet("svn.python.org"):
1434 s = socket.socket(socket.AF_INET)
1435 s.connect(("svn.python.org", 443))
1436 s.setblocking(False)
1437 s = ssl.wrap_socket(s,
1438 cert_reqs=ssl.CERT_NONE,
1439 do_handshake_on_connect=False)
1440 count = 0
1441 while True:
1442 try:
1443 count += 1
1444 s.do_handshake()
1445 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001446 except ssl.SSLWantReadError:
1447 select.select([s], [], [])
1448 except ssl.SSLWantWriteError:
1449 select.select([], [s], [])
Antoine Pitrou350c7222010-09-09 13:31:46 +00001450 s.close()
1451 if support.verbose:
1452 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001453
Antoine Pitrou480a1242010-04-28 21:37:09 +00001454 def test_get_server_certificate(self):
Antoine Pitrou15399c32011-04-28 19:23:55 +02001455 def _test_get_server_certificate(host, port, cert=None):
1456 with support.transient_internet(host):
Antoine Pitrou94a5b662014-04-16 18:56:28 +02001457 pem = ssl.get_server_certificate((host, port))
Antoine Pitrou15399c32011-04-28 19:23:55 +02001458 if not pem:
1459 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001460
Antoine Pitrou15399c32011-04-28 19:23:55 +02001461 try:
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001462 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001463 ca_certs=CERTFILE)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001464 except ssl.SSLError as x:
1465 #should fail
1466 if support.verbose:
1467 sys.stdout.write("%s\n" % x)
1468 else:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001469 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1470
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001471 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001472 ca_certs=cert)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001473 if not pem:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001474 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001475 if support.verbose:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001476 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001477
Antoine Pitrou15399c32011-04-28 19:23:55 +02001478 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1479 if support.IPV6_ENABLED:
1480 _test_get_server_certificate('ipv6.google.com', 443)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001481
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001482 def test_ciphers(self):
1483 remote = ("svn.python.org", 443)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001484 with support.transient_internet(remote[0]):
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001485 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1486 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1487 s.connect(remote)
1488 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1489 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1490 s.connect(remote)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001491 # Error checking can happen at instantiation or when connecting
Ezio Melottied3a7d22010-12-01 02:32:32 +00001492 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitroud2eca372010-10-29 23:41:37 +00001493 with socket.socket(socket.AF_INET) as sock:
1494 s = ssl.wrap_socket(sock,
1495 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1496 s.connect(remote)
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001497
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001498 def test_algorithms(self):
1499 # Issue #8484: all algorithms should be available when verifying a
1500 # certificate.
Antoine Pitrou29619b22010-04-22 18:43:31 +00001501 # SHA256 was added in OpenSSL 0.9.8
1502 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1503 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001504 # sha256.tbs-internet.com needs SNI to use the correct certificate
1505 if not ssl.HAS_SNI:
1506 self.skipTest("SNI needed for this test")
Victor Stinnerf332abb2011-01-08 03:16:05 +00001507 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1508 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001509 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Antoine Pitrou160fd932011-01-08 10:23:29 +00001510 with support.transient_internet("sha256.tbs-internet.com"):
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001511 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1512 ctx.verify_mode = ssl.CERT_REQUIRED
1513 ctx.load_verify_locations(sha256_cert)
1514 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1515 server_hostname="sha256.tbs-internet.com")
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001516 try:
1517 s.connect(remote)
1518 if support.verbose:
1519 sys.stdout.write("\nCipher with %r is %r\n" %
1520 (remote, s.cipher()))
1521 sys.stdout.write("Certificate is:\n%s\n" %
1522 pprint.pformat(s.getpeercert()))
1523 finally:
1524 s.close()
1525
Christian Heimes9a5395a2013-06-17 15:44:12 +02001526 def test_get_ca_certs_capath(self):
1527 # capath certs are loaded on request
1528 with support.transient_internet("svn.python.org"):
1529 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1530 ctx.verify_mode = ssl.CERT_REQUIRED
1531 ctx.load_verify_locations(capath=CAPATH)
1532 self.assertEqual(ctx.get_ca_certs(), [])
1533 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1534 s.connect(("svn.python.org", 443))
1535 try:
1536 cert = s.getpeercert()
1537 self.assertTrue(cert)
1538 finally:
1539 s.close()
1540 self.assertEqual(len(ctx.get_ca_certs()), 1)
1541
Christian Heimes575596e2013-12-15 21:49:17 +01001542 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001543 def test_context_setget(self):
1544 # Check that the context of a connected socket can be replaced.
1545 with support.transient_internet("svn.python.org"):
1546 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1547 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1548 s = socket.socket(socket.AF_INET)
1549 with ctx1.wrap_socket(s) as ss:
1550 ss.connect(("svn.python.org", 443))
1551 self.assertIs(ss.context, ctx1)
1552 self.assertIs(ss._sslobj.context, ctx1)
1553 ss.context = ctx2
1554 self.assertIs(ss.context, ctx2)
1555 self.assertIs(ss._sslobj.context, ctx2)
1556
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001557try:
1558 import threading
1559except ImportError:
1560 _have_threads = False
1561else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001562 _have_threads = True
1563
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001564 from test.ssl_servers import make_https_server
1565
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001566 class ThreadedEchoServer(threading.Thread):
1567
1568 class ConnectionHandler(threading.Thread):
1569
1570 """A mildly complicated class, because we want it to work both
1571 with and without the SSL wrapper around the socket connection, so
1572 that we can test the STARTTLS functionality."""
1573
Bill Janssen6e027db2007-11-15 22:23:56 +00001574 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001575 self.server = server
1576 self.running = False
1577 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001578 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001579 self.sock.setblocking(1)
1580 self.sslconn = None
1581 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001582 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001583
Antoine Pitrou480a1242010-04-28 21:37:09 +00001584 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001585 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001586 self.sslconn = self.server.context.wrap_socket(
1587 self.sock, server_side=True)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001588 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001589 except (ssl.SSLError, ConnectionResetError) as e:
1590 # We treat ConnectionResetError as though it were an
1591 # SSLError - OpenSSL on Ubuntu abruptly closes the
1592 # connection when asked to use an unsupported protocol.
1593 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001594 # XXX Various errors can have happened here, for example
1595 # a mismatching protocol version, an invalid certificate,
1596 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001597 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001598 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001599 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001600 self.running = False
1601 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001602 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001603 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001604 else:
Antoine Pitroub5218772010-05-21 09:56:06 +00001605 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001606 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001607 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001608 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1609 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001610 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001611 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1612 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001613 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001614 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001615 sys.stdout.write(" server: selected protocol is now "
1616 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001617 return True
1618
1619 def read(self):
1620 if self.sslconn:
1621 return self.sslconn.read()
1622 else:
1623 return self.sock.recv(1024)
1624
1625 def write(self, bytes):
1626 if self.sslconn:
1627 return self.sslconn.write(bytes)
1628 else:
1629 return self.sock.send(bytes)
1630
1631 def close(self):
1632 if self.sslconn:
1633 self.sslconn.close()
1634 else:
1635 self.sock.close()
1636
Antoine Pitrou480a1242010-04-28 21:37:09 +00001637 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001638 self.running = True
1639 if not self.server.starttls_server:
1640 if not self.wrap_conn():
1641 return
1642 while self.running:
1643 try:
1644 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001645 stripped = msg.strip()
1646 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001647 # eof, so quit this handler
1648 self.running = False
1649 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001650 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001651 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001652 sys.stdout.write(" server: client closed connection\n")
1653 self.close()
1654 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001655 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001656 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001657 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001658 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001659 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001660 if not self.wrap_conn():
1661 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001662 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001663 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001664 if support.verbose and self.server.connectionchatty:
1665 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001666 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001667 self.sock = self.sslconn.unwrap()
1668 self.sslconn = None
1669 if support.verbose and self.server.connectionchatty:
1670 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001671 elif stripped == b'CB tls-unique':
1672 if support.verbose and self.server.connectionchatty:
1673 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1674 data = self.sslconn.get_channel_binding("tls-unique")
1675 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001676 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001677 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001678 self.server.connectionchatty):
1679 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001680 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1681 % (msg, ctype, msg.lower(), ctype))
1682 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001683 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001684 if self.server.chatty:
1685 handle_error("Test server failure:\n")
1686 self.close()
1687 self.running = False
1688 # normally, we'd just stop here, but for the test
1689 # harness, we want to stop the server
1690 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001691
Antoine Pitroub5218772010-05-21 09:56:06 +00001692 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001693 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001694 chatty=True, connectionchatty=False, starttls_server=False,
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001695 npn_protocols=None, ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001696 if context:
1697 self.context = context
1698 else:
1699 self.context = ssl.SSLContext(ssl_version
1700 if ssl_version is not None
1701 else ssl.PROTOCOL_TLSv1)
1702 self.context.verify_mode = (certreqs if certreqs is not None
1703 else ssl.CERT_NONE)
1704 if cacerts:
1705 self.context.load_verify_locations(cacerts)
1706 if certificate:
1707 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001708 if npn_protocols:
1709 self.context.set_npn_protocols(npn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001710 if ciphers:
1711 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001712 self.chatty = chatty
1713 self.connectionchatty = connectionchatty
1714 self.starttls_server = starttls_server
1715 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001716 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001717 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001718 self.active = False
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001719 self.selected_protocols = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001720 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001721 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001722 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001723
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001724 def __enter__(self):
1725 self.start(threading.Event())
1726 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001727 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001728
1729 def __exit__(self, *args):
1730 self.stop()
1731 self.join()
1732
Antoine Pitrou480a1242010-04-28 21:37:09 +00001733 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001734 self.flag = flag
1735 threading.Thread.start(self)
1736
Antoine Pitrou480a1242010-04-28 21:37:09 +00001737 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001738 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01001739 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001740 self.active = True
1741 if self.flag:
1742 # signal an event
1743 self.flag.set()
1744 while self.active:
1745 try:
1746 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001747 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001748 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001749 + repr(connaddr) + '\n')
1750 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001751 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001752 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001753 except socket.timeout:
1754 pass
1755 except KeyboardInterrupt:
1756 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001757 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001758
Antoine Pitrou480a1242010-04-28 21:37:09 +00001759 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001760 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001761
Bill Janssen54cc54c2007-12-14 22:08:56 +00001762 class AsyncoreEchoServer(threading.Thread):
1763
1764 # this one's based on asyncore.dispatcher
1765
1766 class EchoServer (asyncore.dispatcher):
1767
1768 class ConnectionHandler (asyncore.dispatcher_with_send):
1769
1770 def __init__(self, conn, certfile):
1771 self.socket = ssl.wrap_socket(conn, server_side=True,
1772 certfile=certfile,
1773 do_handshake_on_connect=False)
1774 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001775 self._ssl_accepting = True
1776 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001777
1778 def readable(self):
1779 if isinstance(self.socket, ssl.SSLSocket):
1780 while self.socket.pending() > 0:
1781 self.handle_read_event()
1782 return True
1783
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001784 def _do_ssl_handshake(self):
1785 try:
1786 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02001787 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1788 return
1789 except ssl.SSLEOFError:
1790 return self.handle_close()
1791 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001792 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02001793 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001794 if err.args[0] == errno.ECONNABORTED:
1795 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001796 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001797 self._ssl_accepting = False
1798
1799 def handle_read(self):
1800 if self._ssl_accepting:
1801 self._do_ssl_handshake()
1802 else:
1803 data = self.recv(1024)
1804 if support.verbose:
1805 sys.stdout.write(" server: read %s from client\n" % repr(data))
1806 if not data:
1807 self.close()
1808 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001809 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00001810
1811 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00001812 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001813 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00001814 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1815
1816 def handle_error(self):
1817 raise
1818
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001819 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001820 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001821 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1822 self.port = support.bind_port(sock, '')
1823 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001824 self.listen(5)
1825
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00001826 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001827 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00001828 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1829 self.ConnectionHandler(sock_obj, self.certfile)
1830
1831 def handle_error(self):
1832 raise
1833
Trent Nelson78520002008-04-10 20:54:35 +00001834 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001835 self.flag = None
1836 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001837 self.server = self.EchoServer(certfile)
1838 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00001839 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001840 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00001841
1842 def __str__(self):
1843 return "<%s %s>" % (self.__class__.__name__, self.server)
1844
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001845 def __enter__(self):
1846 self.start(threading.Event())
1847 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001848 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001849
1850 def __exit__(self, *args):
1851 if support.verbose:
1852 sys.stdout.write(" cleanup: stopping server.\n")
1853 self.stop()
1854 if support.verbose:
1855 sys.stdout.write(" cleanup: joining server thread.\n")
1856 self.join()
1857 if support.verbose:
1858 sys.stdout.write(" cleanup: successfully joined.\n")
1859
Bill Janssen54cc54c2007-12-14 22:08:56 +00001860 def start (self, flag=None):
1861 self.flag = flag
1862 threading.Thread.start(self)
1863
Antoine Pitrou480a1242010-04-28 21:37:09 +00001864 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001865 self.active = True
1866 if self.flag:
1867 self.flag.set()
1868 while self.active:
1869 try:
1870 asyncore.loop(1)
1871 except:
1872 pass
1873
Antoine Pitrou480a1242010-04-28 21:37:09 +00001874 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001875 self.active = False
1876 self.server.close()
1877
Antoine Pitrou480a1242010-04-28 21:37:09 +00001878 def bad_cert_test(certfile):
1879 """
1880 Launch a server with CERT_REQUIRED, and check that trying to
1881 connect to it with the given client certificate fails.
1882 """
Trent Nelson78520002008-04-10 20:54:35 +00001883 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001884 certreqs=ssl.CERT_REQUIRED,
Bill Janssen6e027db2007-11-15 22:23:56 +00001885 cacerts=CERTFILE, chatty=False,
1886 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001887 with server:
Thomas Woutersed03b412007-08-28 21:37:11 +00001888 try:
Antoine Pitroud2eca372010-10-29 23:41:37 +00001889 with socket.socket() as sock:
1890 s = ssl.wrap_socket(sock,
1891 certfile=certfile,
1892 ssl_version=ssl.PROTOCOL_TLSv1)
1893 s.connect((HOST, server.port))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001894 except ssl.SSLError as x:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001895 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001896 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
Andrew Svetlov0832af62012-12-18 23:10:48 +02001897 except OSError as x:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001898 if support.verbose:
Andrew Svetlov0832af62012-12-18 23:10:48 +02001899 sys.stdout.write("\nOSError is %s\n" % x.args[1])
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001900 except OSError as x:
Giampaolo Rodolàcd9dfb92010-08-29 20:56:56 +00001901 if x.errno != errno.ENOENT:
1902 raise
Giampaolo Rodolà745ab382010-08-29 19:25:49 +00001903 if support.verbose:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001904 sys.stdout.write("\OSError is %s\n" % str(x))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001905 else:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00001906 raise AssertionError("Use of invalid cert should have failed!")
Thomas Woutersed03b412007-08-28 21:37:11 +00001907
Antoine Pitroub5218772010-05-21 09:56:06 +00001908 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001909 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00001910 """
1911 Launch a server, connect a client to it and try various reads
1912 and writes.
1913 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001914 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00001915 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001916 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00001917 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001918 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001919 with client_context.wrap_socket(socket.socket(),
1920 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01001921 s.connect((HOST, server.port))
1922 for arg in [indata, bytearray(indata), memoryview(indata)]:
1923 if connectionchatty:
1924 if support.verbose:
1925 sys.stdout.write(
1926 " client: sending %r...\n" % indata)
1927 s.write(arg)
1928 outdata = s.read()
1929 if connectionchatty:
1930 if support.verbose:
1931 sys.stdout.write(" client: read %r\n" % outdata)
1932 if outdata != indata.lower():
1933 raise AssertionError(
1934 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1935 % (outdata[:20], len(outdata),
1936 indata[:20].lower(), len(indata)))
1937 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00001938 if connectionchatty:
1939 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01001940 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001941 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01001942 'compression': s.compression(),
1943 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001944 'peercert': s.getpeercert(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02001945 'client_npn_protocol': s.selected_npn_protocol(),
1946 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001947 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01001948 s.close()
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001949 stats['server_npn_protocols'] = server.selected_protocols
1950 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00001951
Antoine Pitroub5218772010-05-21 09:56:06 +00001952 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1953 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02001954 """
1955 Try to SSL-connect using *client_protocol* to *server_protocol*.
1956 If *expect_success* is true, assert that the connection succeeds,
1957 if it's false, assert that the connection fails.
1958 Also, if *expect_success* is a string, assert that it is the protocol
1959 version actually used by the connection.
1960 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00001961 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001962 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00001963 certtype = {
1964 ssl.CERT_NONE: "CERT_NONE",
1965 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1966 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1967 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001968 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001969 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001970 sys.stdout.write(formatstr %
1971 (ssl.get_protocol_name(client_protocol),
1972 ssl.get_protocol_name(server_protocol),
1973 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00001974 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01001975 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00001976 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01001977 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001978
1979 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1980 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1981 # starting from OpenSSL 1.0.0 (see issue #8322).
1982 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1983 client_context.set_ciphers("ALL")
1984
Antoine Pitroub5218772010-05-21 09:56:06 +00001985 for ctx in (client_context, server_context):
1986 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00001987 ctx.load_cert_chain(CERTFILE)
1988 ctx.load_verify_locations(CERTFILE)
1989 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02001990 stats = server_params_test(client_context, server_context,
1991 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001992 # Protocol mismatch can result in either an SSLError, or a
1993 # "Connection reset by peer" error.
1994 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001995 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001996 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02001997 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001998 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001999 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002000 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002001 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002002 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002003 "Client protocol %s succeeded with server protocol %s!"
2004 % (ssl.get_protocol_name(client_protocol),
2005 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002006 elif (expect_success is not True
2007 and expect_success != stats['version']):
2008 raise AssertionError("version mismatch: expected %r, got %r"
2009 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002010
2011
Bill Janssen6e027db2007-11-15 22:23:56 +00002012 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002013
Antoine Pitrou23df4832010-08-04 17:14:06 +00002014 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002015 def test_echo(self):
2016 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002017 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002018 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002019 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002020 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2021 context = ssl.SSLContext(protocol)
2022 context.load_cert_chain(CERTFILE)
2023 server_params_test(context, context,
2024 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002025
Antoine Pitrou480a1242010-04-28 21:37:09 +00002026 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002027 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002028 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002029 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2030 context.verify_mode = ssl.CERT_REQUIRED
2031 context.load_verify_locations(CERTFILE)
2032 context.load_cert_chain(CERTFILE)
2033 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002034 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002035 s = context.wrap_socket(socket.socket(),
2036 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002037 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002038 # getpeercert() raise ValueError while the handshake isn't
2039 # done.
2040 with self.assertRaises(ValueError):
2041 s.getpeercert()
2042 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002043 cert = s.getpeercert()
2044 self.assertTrue(cert, "Can't get peer certificate.")
2045 cipher = s.cipher()
2046 if support.verbose:
2047 sys.stdout.write(pprint.pformat(cert) + '\n')
2048 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2049 if 'subject' not in cert:
2050 self.fail("No subject field in certificate: %s." %
2051 pprint.pformat(cert))
2052 if ((('organizationName', 'Python Software Foundation'),)
2053 not in cert['subject']):
2054 self.fail(
2055 "Missing or invalid 'organizationName' field in certificate subject; "
2056 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002057 self.assertIn('notBefore', cert)
2058 self.assertIn('notAfter', cert)
2059 before = ssl.cert_time_to_seconds(cert['notBefore'])
2060 after = ssl.cert_time_to_seconds(cert['notAfter'])
2061 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002062 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002063
Christian Heimes2427b502013-11-23 11:24:32 +01002064 @unittest.skipUnless(have_verify_flags(),
2065 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002066 def test_crl_check(self):
2067 if support.verbose:
2068 sys.stdout.write("\n")
2069
2070 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2071 server_context.load_cert_chain(SIGNED_CERTFILE)
2072
2073 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2074 context.verify_mode = ssl.CERT_REQUIRED
2075 context.load_verify_locations(SIGNING_CA)
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002076 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
Christian Heimes22587792013-11-21 23:56:13 +01002077
2078 # VERIFY_DEFAULT should pass
2079 server = ThreadedEchoServer(context=server_context, chatty=True)
2080 with server:
2081 with context.wrap_socket(socket.socket()) as s:
2082 s.connect((HOST, server.port))
2083 cert = s.getpeercert()
2084 self.assertTrue(cert, "Can't get peer certificate.")
2085
2086 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002087 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002088
2089 server = ThreadedEchoServer(context=server_context, chatty=True)
2090 with server:
2091 with context.wrap_socket(socket.socket()) as s:
2092 with self.assertRaisesRegex(ssl.SSLError,
2093 "certificate verify failed"):
2094 s.connect((HOST, server.port))
2095
2096 # now load a CRL file. The CRL file is signed by the CA.
2097 context.load_verify_locations(CRLFILE)
2098
2099 server = ThreadedEchoServer(context=server_context, chatty=True)
2100 with server:
2101 with context.wrap_socket(socket.socket()) as s:
2102 s.connect((HOST, server.port))
2103 cert = s.getpeercert()
2104 self.assertTrue(cert, "Can't get peer certificate.")
2105
Christian Heimes575596e2013-12-15 21:49:17 +01002106 @needs_sni
Christian Heimes1aa9a752013-12-02 02:41:19 +01002107 def test_check_hostname(self):
2108 if support.verbose:
2109 sys.stdout.write("\n")
2110
2111 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2112 server_context.load_cert_chain(SIGNED_CERTFILE)
2113
2114 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2115 context.verify_mode = ssl.CERT_REQUIRED
2116 context.check_hostname = True
2117 context.load_verify_locations(SIGNING_CA)
2118
2119 # correct hostname should verify
2120 server = ThreadedEchoServer(context=server_context, chatty=True)
2121 with server:
2122 with context.wrap_socket(socket.socket(),
2123 server_hostname="localhost") as s:
2124 s.connect((HOST, server.port))
2125 cert = s.getpeercert()
2126 self.assertTrue(cert, "Can't get peer certificate.")
2127
2128 # incorrect hostname should raise an exception
2129 server = ThreadedEchoServer(context=server_context, chatty=True)
2130 with server:
2131 with context.wrap_socket(socket.socket(),
2132 server_hostname="invalid") as s:
2133 with self.assertRaisesRegex(ssl.CertificateError,
2134 "hostname 'invalid' doesn't match 'localhost'"):
2135 s.connect((HOST, server.port))
2136
2137 # missing server_hostname arg should cause an exception, too
2138 server = ThreadedEchoServer(context=server_context, chatty=True)
2139 with server:
2140 with socket.socket() as s:
2141 with self.assertRaisesRegex(ValueError,
2142 "check_hostname requires server_hostname"):
2143 context.wrap_socket(s)
2144
Antoine Pitrou480a1242010-04-28 21:37:09 +00002145 def test_empty_cert(self):
2146 """Connecting with an empty cert file"""
2147 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2148 "nullcert.pem"))
2149 def test_malformed_cert(self):
2150 """Connecting with a badly formatted certificate (syntax error)"""
2151 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2152 "badcert.pem"))
2153 def test_nonexisting_cert(self):
2154 """Connecting with a non-existing cert file"""
2155 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2156 "wrongcert.pem"))
2157 def test_malformed_key(self):
2158 """Connecting with a badly formatted key (syntax error)"""
2159 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2160 "badkey.pem"))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002161
Antoine Pitrou480a1242010-04-28 21:37:09 +00002162 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002163 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002164 in the client when attempting handshake.
2165 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002166 listener_ready = threading.Event()
2167 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002168
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002169 s = socket.socket()
2170 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002171
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002172 # `listener` runs in a thread. It sits in an accept() until
2173 # the main thread connects. Then it rudely closes the socket,
2174 # and sets Event `listener_gone` to let the main thread know
2175 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002176 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002177 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002178 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002179 newsock, addr = s.accept()
2180 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002181 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002182 listener_gone.set()
2183
2184 def connector():
2185 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002186 with socket.socket() as c:
2187 c.connect((HOST, port))
2188 listener_gone.wait()
2189 try:
2190 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002191 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002192 pass
2193 else:
2194 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002195
2196 t = threading.Thread(target=listener)
2197 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002198 try:
2199 connector()
2200 finally:
2201 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002202
Antoine Pitrou23df4832010-08-04 17:14:06 +00002203 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002204 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2205 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002206 def test_protocol_sslv2(self):
2207 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002208 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002209 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002210 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2211 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2212 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002213 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002214 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2215 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002216 # SSLv23 client with specific SSL options
2217 if no_sslv2_implies_sslv3_hello():
2218 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2219 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2220 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002221 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002222 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002223 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002224 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002225
Antoine Pitrou23df4832010-08-04 17:14:06 +00002226 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002227 def test_protocol_sslv23(self):
2228 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002229 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002230 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002231 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2232 try:
2233 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002234 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002235 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2236 if support.verbose:
2237 sys.stdout.write(
2238 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2239 % str(x))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002240 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou480a1242010-04-28 21:37:09 +00002241 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002242 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002243
Antoine Pitrou47e40422014-09-04 21:00:10 +02002244 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002245 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002246 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002247
Antoine Pitrou47e40422014-09-04 21:00:10 +02002248 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002249 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002250 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002251
Antoine Pitroub5218772010-05-21 09:56:06 +00002252 # Server with specific SSL options
2253 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2254 server_options=ssl.OP_NO_SSLv3)
2255 # Will choose TLSv1
2256 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2257 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2258 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2259 server_options=ssl.OP_NO_TLSv1)
2260
2261
Antoine Pitrou23df4832010-08-04 17:14:06 +00002262 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002263 def test_protocol_sslv3(self):
2264 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002265 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002266 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002267 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2268 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2269 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002270 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2271 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002272 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2273 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002274 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002275 if no_sslv2_implies_sslv3_hello():
2276 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Antoine Pitrou47e40422014-09-04 21:00:10 +02002277 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Antoine Pitroub5218772010-05-21 09:56:06 +00002278 client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002279
Antoine Pitrou23df4832010-08-04 17:14:06 +00002280 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002281 def test_protocol_tlsv1(self):
2282 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002283 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002284 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002285 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2286 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2287 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002288 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2289 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002290 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002291 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2292 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002293
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002294 @skip_if_broken_ubuntu_ssl
2295 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2296 "TLS version 1.1 not supported.")
2297 def test_protocol_tlsv1_1(self):
2298 """Connecting to a TLSv1.1 server with various client options.
2299 Testing against older TLS versions."""
2300 if support.verbose:
2301 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002302 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002303 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2304 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2305 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2306 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2307 client_options=ssl.OP_NO_TLSv1_1)
2308
Antoine Pitrou47e40422014-09-04 21:00:10 +02002309 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002310 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2311 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2312
2313
2314 @skip_if_broken_ubuntu_ssl
2315 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2316 "TLS version 1.2 not supported.")
2317 def test_protocol_tlsv1_2(self):
2318 """Connecting to a TLSv1.2 server with various client options.
2319 Testing against older TLS versions."""
2320 if support.verbose:
2321 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002322 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002323 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2324 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2325 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2326 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2327 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2328 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2329 client_options=ssl.OP_NO_TLSv1_2)
2330
Antoine Pitrou47e40422014-09-04 21:00:10 +02002331 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002332 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2333 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2334 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2335 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2336
Antoine Pitrou480a1242010-04-28 21:37:09 +00002337 def test_starttls(self):
2338 """Switching from clear text to encrypted and back again."""
2339 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002340
Trent Nelson78520002008-04-10 20:54:35 +00002341 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002342 ssl_version=ssl.PROTOCOL_TLSv1,
2343 starttls_server=True,
2344 chatty=True,
2345 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002346 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002347 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002348 s = socket.socket()
2349 s.setblocking(1)
2350 s.connect((HOST, server.port))
2351 if support.verbose:
2352 sys.stdout.write("\n")
2353 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002354 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002355 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002356 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002357 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002358 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002359 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002360 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002361 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002362 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002363 msg = outdata.strip().lower()
2364 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2365 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002366 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002367 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002368 " client: read %r from server, starting TLS...\n"
2369 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002370 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2371 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002372 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2373 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002374 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002375 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002376 " client: read %r from server, ending TLS...\n"
2377 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002378 s = conn.unwrap()
2379 wrapped = False
2380 else:
2381 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002382 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002383 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002384 if support.verbose:
2385 sys.stdout.write(" client: closing connection.\n")
2386 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002387 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002388 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002389 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002390 if wrapped:
2391 conn.close()
2392 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002393 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002394
Antoine Pitrou480a1242010-04-28 21:37:09 +00002395 def test_socketserver(self):
2396 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002397 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002398 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002399 if support.verbose:
2400 sys.stdout.write('\n')
2401 with open(CERTFILE, 'rb') as f:
2402 d1 = f.read()
2403 d2 = ''
2404 # now fetch the same data from the HTTPS server
2405 url = 'https://%s:%d/%s' % (
2406 HOST, server.port, os.path.split(CERTFILE)[1])
2407 f = urllib.request.urlopen(url)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002408 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002409 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002410 if dlen and (int(dlen) > 0):
2411 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002412 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002413 sys.stdout.write(
2414 " client: read %d bytes from remote server '%s'\n"
2415 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002416 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002417 f.close()
2418 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002419
Antoine Pitrou480a1242010-04-28 21:37:09 +00002420 def test_asyncore_server(self):
2421 """Check the example asyncore integration."""
2422 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002423
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002424 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002425 sys.stdout.write("\n")
2426
Antoine Pitrou480a1242010-04-28 21:37:09 +00002427 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002428 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002429 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002430 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002431 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002432 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002433 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002434 " client: sending %r...\n" % indata)
2435 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002436 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002437 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002438 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002439 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002440 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002441 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2442 % (outdata[:20], len(outdata),
2443 indata[:20].lower(), len(indata)))
2444 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002445 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002446 sys.stdout.write(" client: closing connection.\n")
2447 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002448 if support.verbose:
2449 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002450
Antoine Pitrou480a1242010-04-28 21:37:09 +00002451 def test_recv_send(self):
2452 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002453 if support.verbose:
2454 sys.stdout.write("\n")
2455
2456 server = ThreadedEchoServer(CERTFILE,
2457 certreqs=ssl.CERT_NONE,
2458 ssl_version=ssl.PROTOCOL_TLSv1,
2459 cacerts=CERTFILE,
2460 chatty=True,
2461 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002462 with server:
2463 s = ssl.wrap_socket(socket.socket(),
2464 server_side=False,
2465 certfile=CERTFILE,
2466 ca_certs=CERTFILE,
2467 cert_reqs=ssl.CERT_NONE,
2468 ssl_version=ssl.PROTOCOL_TLSv1)
2469 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002470 # helper methods for standardising recv* method signatures
2471 def _recv_into():
2472 b = bytearray(b"\0"*100)
2473 count = s.recv_into(b)
2474 return b[:count]
2475
2476 def _recvfrom_into():
2477 b = bytearray(b"\0"*100)
2478 count, addr = s.recvfrom_into(b)
2479 return b[:count]
2480
2481 # (name, method, whether to expect success, *args)
2482 send_methods = [
2483 ('send', s.send, True, []),
2484 ('sendto', s.sendto, False, ["some.address"]),
2485 ('sendall', s.sendall, True, []),
2486 ]
2487 recv_methods = [
2488 ('recv', s.recv, True, []),
2489 ('recvfrom', s.recvfrom, False, ["some.address"]),
2490 ('recv_into', _recv_into, True, []),
2491 ('recvfrom_into', _recvfrom_into, False, []),
2492 ]
2493 data_prefix = "PREFIX_"
2494
2495 for meth_name, send_meth, expect_success, args in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002496 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002497 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002498 send_meth(indata, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002499 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002500 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002501 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002502 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002503 "<<{outdata:r}>> ({nout:d}) received; "
2504 "expected <<{indata:r}>> ({nin:d})\n".format(
2505 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002506 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002507 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002508 )
2509 )
2510 except ValueError as e:
2511 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002512 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002513 "Failed to send with method <<{name:s}>>; "
2514 "expected to succeed.\n".format(name=meth_name)
2515 )
2516 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002517 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002518 "Method <<{name:s}>> failed with unexpected "
2519 "exception message: {exp:s}\n".format(
2520 name=meth_name, exp=e
2521 )
2522 )
2523
2524 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002525 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002526 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002527 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002528 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002529 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002530 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002531 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002532 "<<{outdata:r}>> ({nout:d}) received; "
2533 "expected <<{indata:r}>> ({nin:d})\n".format(
2534 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002535 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002536 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002537 )
2538 )
2539 except ValueError as e:
2540 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002541 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002542 "Failed to receive with method <<{name:s}>>; "
2543 "expected to succeed.\n".format(name=meth_name)
2544 )
2545 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002546 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002547 "Method <<{name:s}>> failed with unexpected "
2548 "exception message: {exp:s}\n".format(
2549 name=meth_name, exp=e
2550 )
2551 )
2552 # consume data
2553 s.read()
2554
Nick Coghlan513886a2011-08-28 00:00:27 +10002555 # Make sure sendmsg et al are disallowed to avoid
2556 # inadvertent disclosure of data and/or corruption
2557 # of the encrypted data stream
2558 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2559 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2560 self.assertRaises(NotImplementedError,
2561 s.recvmsg_into, bytearray(100))
2562
Antoine Pitrou480a1242010-04-28 21:37:09 +00002563 s.write(b"over\n")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002564 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002565
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002566 def test_nonblocking_send(self):
2567 server = ThreadedEchoServer(CERTFILE,
2568 certreqs=ssl.CERT_NONE,
2569 ssl_version=ssl.PROTOCOL_TLSv1,
2570 cacerts=CERTFILE,
2571 chatty=True,
2572 connectionchatty=False)
2573 with server:
2574 s = ssl.wrap_socket(socket.socket(),
2575 server_side=False,
2576 certfile=CERTFILE,
2577 ca_certs=CERTFILE,
2578 cert_reqs=ssl.CERT_NONE,
2579 ssl_version=ssl.PROTOCOL_TLSv1)
2580 s.connect((HOST, server.port))
2581 s.setblocking(False)
2582
2583 # If we keep sending data, at some point the buffers
2584 # will be full and the call will block
2585 buf = bytearray(8192)
2586 def fill_buffer():
2587 while True:
2588 s.send(buf)
2589 self.assertRaises((ssl.SSLWantWriteError,
2590 ssl.SSLWantReadError), fill_buffer)
2591
2592 # Now read all the output and discard it
2593 s.setblocking(True)
2594 s.close()
2595
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002596 def test_handshake_timeout(self):
2597 # Issue #5103: SSL handshake must respect the socket timeout
2598 server = socket.socket(socket.AF_INET)
2599 host = "127.0.0.1"
2600 port = support.bind_port(server)
2601 started = threading.Event()
2602 finish = False
2603
2604 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002605 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002606 started.set()
2607 conns = []
2608 while not finish:
2609 r, w, e = select.select([server], [], [], 0.1)
2610 if server in r:
2611 # Let the socket hang around rather than having
2612 # it closed by garbage collection.
2613 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002614 for sock in conns:
2615 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002616
2617 t = threading.Thread(target=serve)
2618 t.start()
2619 started.wait()
2620
2621 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002622 try:
2623 c = socket.socket(socket.AF_INET)
2624 c.settimeout(0.2)
2625 c.connect((host, port))
2626 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002627 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002628 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002629 finally:
2630 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002631 try:
2632 c = socket.socket(socket.AF_INET)
2633 c = ssl.wrap_socket(c)
2634 c.settimeout(0.2)
2635 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002636 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002637 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002638 finally:
2639 c.close()
2640 finally:
2641 finish = True
2642 t.join()
2643 server.close()
2644
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002645 def test_server_accept(self):
2646 # Issue #16357: accept() on a SSLSocket created through
2647 # SSLContext.wrap_socket().
2648 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2649 context.verify_mode = ssl.CERT_REQUIRED
2650 context.load_verify_locations(CERTFILE)
2651 context.load_cert_chain(CERTFILE)
2652 server = socket.socket(socket.AF_INET)
2653 host = "127.0.0.1"
2654 port = support.bind_port(server)
2655 server = context.wrap_socket(server, server_side=True)
2656
2657 evt = threading.Event()
2658 remote = None
2659 peer = None
2660 def serve():
2661 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002662 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002663 # Block on the accept and wait on the connection to close.
2664 evt.set()
2665 remote, peer = server.accept()
2666 remote.recv(1)
2667
2668 t = threading.Thread(target=serve)
2669 t.start()
2670 # Client wait until server setup and perform a connect.
2671 evt.wait()
2672 client = context.wrap_socket(socket.socket())
2673 client.connect((host, port))
2674 client_addr = client.getsockname()
2675 client.close()
2676 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002677 remote.close()
2678 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002679 # Sanity checks.
2680 self.assertIsInstance(remote, ssl.SSLSocket)
2681 self.assertEqual(peer, client_addr)
2682
Antoine Pitrou242db722013-05-01 20:52:07 +02002683 def test_getpeercert_enotconn(self):
2684 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2685 with context.wrap_socket(socket.socket()) as sock:
2686 with self.assertRaises(OSError) as cm:
2687 sock.getpeercert()
2688 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2689
2690 def test_do_handshake_enotconn(self):
2691 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2692 with context.wrap_socket(socket.socket()) as sock:
2693 with self.assertRaises(OSError) as cm:
2694 sock.do_handshake()
2695 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2696
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002697 def test_default_ciphers(self):
2698 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2699 try:
2700 # Force a set of weak ciphers on our client context
2701 context.set_ciphers("DES")
2702 except ssl.SSLError:
2703 self.skipTest("no DES cipher available")
2704 with ThreadedEchoServer(CERTFILE,
2705 ssl_version=ssl.PROTOCOL_SSLv23,
2706 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002707 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002708 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002709 s.connect((HOST, server.port))
2710 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2711
Antoine Pitrou47e40422014-09-04 21:00:10 +02002712 def test_version_basic(self):
2713 """
2714 Basic tests for SSLSocket.version().
2715 More tests are done in the test_protocol_*() methods.
2716 """
2717 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2718 with ThreadedEchoServer(CERTFILE,
2719 ssl_version=ssl.PROTOCOL_TLSv1,
2720 chatty=False) as server:
2721 with context.wrap_socket(socket.socket()) as s:
2722 self.assertIs(s.version(), None)
2723 s.connect((HOST, server.port))
2724 self.assertEqual(s.version(), "TLSv1")
2725 self.assertIs(s.version(), None)
2726
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002727 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2728 def test_default_ecdh_curve(self):
2729 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2730 # should be enabled by default on SSL contexts.
2731 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2732 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02002733 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2734 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2735 # our default cipher list should prefer ECDH-based ciphers
2736 # automatically.
2737 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2738 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002739 with ThreadedEchoServer(context=context) as server:
2740 with context.wrap_socket(socket.socket()) as s:
2741 s.connect((HOST, server.port))
2742 self.assertIn("ECDH", s.cipher()[0])
2743
Antoine Pitroud6494802011-07-21 01:11:30 +02002744 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2745 "'tls-unique' channel binding not available")
2746 def test_tls_unique_channel_binding(self):
2747 """Test tls-unique channel binding."""
2748 if support.verbose:
2749 sys.stdout.write("\n")
2750
2751 server = ThreadedEchoServer(CERTFILE,
2752 certreqs=ssl.CERT_NONE,
2753 ssl_version=ssl.PROTOCOL_TLSv1,
2754 cacerts=CERTFILE,
2755 chatty=True,
2756 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01002757 with server:
2758 s = ssl.wrap_socket(socket.socket(),
2759 server_side=False,
2760 certfile=CERTFILE,
2761 ca_certs=CERTFILE,
2762 cert_reqs=ssl.CERT_NONE,
2763 ssl_version=ssl.PROTOCOL_TLSv1)
2764 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002765 # get the data
2766 cb_data = s.get_channel_binding("tls-unique")
2767 if support.verbose:
2768 sys.stdout.write(" got channel binding data: {0!r}\n"
2769 .format(cb_data))
2770
2771 # check if it is sane
2772 self.assertIsNotNone(cb_data)
2773 self.assertEqual(len(cb_data), 12) # True for TLSv1
2774
2775 # and compare with the peers version
2776 s.write(b"CB tls-unique\n")
2777 peer_data_repr = s.read().strip()
2778 self.assertEqual(peer_data_repr,
2779 repr(cb_data).encode("us-ascii"))
2780 s.close()
2781
2782 # now, again
2783 s = ssl.wrap_socket(socket.socket(),
2784 server_side=False,
2785 certfile=CERTFILE,
2786 ca_certs=CERTFILE,
2787 cert_reqs=ssl.CERT_NONE,
2788 ssl_version=ssl.PROTOCOL_TLSv1)
2789 s.connect((HOST, server.port))
2790 new_cb_data = s.get_channel_binding("tls-unique")
2791 if support.verbose:
2792 sys.stdout.write(" got another channel binding data: {0!r}\n"
2793 .format(new_cb_data))
2794 # is it really unique
2795 self.assertNotEqual(cb_data, new_cb_data)
2796 self.assertIsNotNone(cb_data)
2797 self.assertEqual(len(cb_data), 12) # True for TLSv1
2798 s.write(b"CB tls-unique\n")
2799 peer_data_repr = s.read().strip()
2800 self.assertEqual(peer_data_repr,
2801 repr(new_cb_data).encode("us-ascii"))
2802 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002803
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002804 def test_compression(self):
2805 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2806 context.load_cert_chain(CERTFILE)
2807 stats = server_params_test(context, context,
2808 chatty=True, connectionchatty=True)
2809 if support.verbose:
2810 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2811 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2812
2813 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2814 "ssl.OP_NO_COMPRESSION needed for this test")
2815 def test_compression_disabled(self):
2816 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2817 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01002818 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002819 stats = server_params_test(context, context,
2820 chatty=True, connectionchatty=True)
2821 self.assertIs(stats['compression'], None)
2822
Antoine Pitrou0e576f12011-12-22 10:03:38 +01002823 def test_dh_params(self):
2824 # Check we can get a connection with ephemeral Diffie-Hellman
2825 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2826 context.load_cert_chain(CERTFILE)
2827 context.load_dh_params(DHFILE)
2828 context.set_ciphers("kEDH")
2829 stats = server_params_test(context, context,
2830 chatty=True, connectionchatty=True)
2831 cipher = stats["cipher"][0]
2832 parts = cipher.split("-")
2833 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2834 self.fail("Non-DH cipher: " + cipher[0])
2835
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002836 def test_selected_npn_protocol(self):
2837 # selected_npn_protocol() is None unless NPN is used
2838 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2839 context.load_cert_chain(CERTFILE)
2840 stats = server_params_test(context, context,
2841 chatty=True, connectionchatty=True)
2842 self.assertIs(stats['client_npn_protocol'], None)
2843
2844 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2845 def test_npn_protocols(self):
2846 server_protocols = ['http/1.1', 'spdy/2']
2847 protocol_tests = [
2848 (['http/1.1', 'spdy/2'], 'http/1.1'),
2849 (['spdy/2', 'http/1.1'], 'http/1.1'),
2850 (['spdy/2', 'test'], 'spdy/2'),
2851 (['abc', 'def'], 'abc')
2852 ]
2853 for client_protocols, expected in protocol_tests:
2854 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2855 server_context.load_cert_chain(CERTFILE)
2856 server_context.set_npn_protocols(server_protocols)
2857 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2858 client_context.load_cert_chain(CERTFILE)
2859 client_context.set_npn_protocols(client_protocols)
2860 stats = server_params_test(client_context, server_context,
2861 chatty=True, connectionchatty=True)
2862
2863 msg = "failed trying %s (s) and %s (c).\n" \
2864 "was expecting %s, but got %%s from the %%s" \
2865 % (str(server_protocols), str(client_protocols),
2866 str(expected))
2867 client_result = stats['client_npn_protocol']
2868 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2869 server_result = stats['server_npn_protocols'][-1] \
2870 if len(stats['server_npn_protocols']) else 'nothing'
2871 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2872
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002873 def sni_contexts(self):
2874 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2875 server_context.load_cert_chain(SIGNED_CERTFILE)
2876 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2877 other_context.load_cert_chain(SIGNED_CERTFILE2)
2878 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2879 client_context.verify_mode = ssl.CERT_REQUIRED
2880 client_context.load_verify_locations(SIGNING_CA)
2881 return server_context, other_context, client_context
2882
2883 def check_common_name(self, stats, name):
2884 cert = stats['peercert']
2885 self.assertIn((('commonName', name),), cert['subject'])
2886
2887 @needs_sni
2888 def test_sni_callback(self):
2889 calls = []
2890 server_context, other_context, client_context = self.sni_contexts()
2891
2892 def servername_cb(ssl_sock, server_name, initial_context):
2893 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02002894 if server_name is not None:
2895 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002896 server_context.set_servername_callback(servername_cb)
2897
2898 stats = server_params_test(client_context, server_context,
2899 chatty=True,
2900 sni_name='supermessage')
2901 # The hostname was fetched properly, and the certificate was
2902 # changed for the connection.
2903 self.assertEqual(calls, [("supermessage", server_context)])
2904 # CERTFILE4 was selected
2905 self.check_common_name(stats, 'fakehostname')
2906
Antoine Pitrou50b24d02013-04-11 20:48:42 +02002907 calls = []
2908 # The callback is called with server_name=None
2909 stats = server_params_test(client_context, server_context,
2910 chatty=True,
2911 sni_name=None)
2912 self.assertEqual(calls, [(None, server_context)])
2913 self.check_common_name(stats, 'localhost')
2914
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002915 # Check disabling the callback
2916 calls = []
2917 server_context.set_servername_callback(None)
2918
2919 stats = server_params_test(client_context, server_context,
2920 chatty=True,
2921 sni_name='notfunny')
2922 # Certificate didn't change
2923 self.check_common_name(stats, 'localhost')
2924 self.assertEqual(calls, [])
2925
2926 @needs_sni
2927 def test_sni_callback_alert(self):
2928 # Returning a TLS alert is reflected to the connecting client
2929 server_context, other_context, client_context = self.sni_contexts()
2930
2931 def cb_returning_alert(ssl_sock, server_name, initial_context):
2932 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2933 server_context.set_servername_callback(cb_returning_alert)
2934
2935 with self.assertRaises(ssl.SSLError) as cm:
2936 stats = server_params_test(client_context, server_context,
2937 chatty=False,
2938 sni_name='supermessage')
2939 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2940
2941 @needs_sni
2942 def test_sni_callback_raising(self):
2943 # Raising fails the connection with a TLS handshake failure alert.
2944 server_context, other_context, client_context = self.sni_contexts()
2945
2946 def cb_raising(ssl_sock, server_name, initial_context):
2947 1/0
2948 server_context.set_servername_callback(cb_raising)
2949
2950 with self.assertRaises(ssl.SSLError) as cm, \
2951 support.captured_stderr() as stderr:
2952 stats = server_params_test(client_context, server_context,
2953 chatty=False,
2954 sni_name='supermessage')
2955 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2956 self.assertIn("ZeroDivisionError", stderr.getvalue())
2957
2958 @needs_sni
2959 def test_sni_callback_wrong_return_type(self):
2960 # Returning the wrong return type terminates the TLS connection
2961 # with an internal error alert.
2962 server_context, other_context, client_context = self.sni_contexts()
2963
2964 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2965 return "foo"
2966 server_context.set_servername_callback(cb_wrong_return_type)
2967
2968 with self.assertRaises(ssl.SSLError) as cm, \
2969 support.captured_stderr() as stderr:
2970 stats = server_params_test(client_context, server_context,
2971 chatty=False,
2972 sni_name='supermessage')
2973 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2974 self.assertIn("TypeError", stderr.getvalue())
2975
Antoine Pitrou60a26e02013-07-20 19:35:16 +02002976 def test_read_write_after_close_raises_valuerror(self):
2977 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2978 context.verify_mode = ssl.CERT_REQUIRED
2979 context.load_verify_locations(CERTFILE)
2980 context.load_cert_chain(CERTFILE)
2981 server = ThreadedEchoServer(context=context, chatty=False)
2982
2983 with server:
2984 s = context.wrap_socket(socket.socket())
2985 s.connect((HOST, server.port))
2986 s.close()
2987
2988 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02002989 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02002990
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02002991 def test_sendfile(self):
2992 TEST_DATA = b"x" * 512
2993 with open(support.TESTFN, 'wb') as f:
2994 f.write(TEST_DATA)
2995 self.addCleanup(support.unlink, support.TESTFN)
2996 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2997 context.verify_mode = ssl.CERT_REQUIRED
2998 context.load_verify_locations(CERTFILE)
2999 context.load_cert_chain(CERTFILE)
3000 server = ThreadedEchoServer(context=context, chatty=False)
3001 with server:
3002 with context.wrap_socket(socket.socket()) as s:
3003 s.connect((HOST, server.port))
3004 with open(support.TESTFN, 'rb') as file:
3005 s.sendfile(file)
3006 self.assertEqual(s.recv(1024), TEST_DATA)
3007
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003008
Thomas Woutersed03b412007-08-28 21:37:11 +00003009def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003010 if support.verbose:
3011 plats = {
3012 'Linux': platform.linux_distribution,
3013 'Mac': platform.mac_ver,
3014 'Windows': platform.win32_ver,
3015 }
3016 for name, func in plats.items():
3017 plat = func()
3018 if plat and plat[0]:
3019 plat = '%s %r' % (name, plat)
3020 break
3021 else:
3022 plat = repr(platform.platform())
3023 print("test_ssl: testing with %r %r" %
3024 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3025 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003026 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003027 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3028 try:
3029 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3030 except AttributeError:
3031 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003032
Antoine Pitrou152efa22010-05-16 18:19:27 +00003033 for filename in [
3034 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
3035 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003036 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003037 BADCERT, BADKEY, EMPTYCERT]:
3038 if not os.path.exists(filename):
3039 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003040
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02003041 tests = [ContextTests, BasicSocketTests, SSLErrorTests]
Thomas Woutersed03b412007-08-28 21:37:11 +00003042
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003043 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003044 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003045
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003046 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003047 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003048 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003049 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003050
Antoine Pitrou480a1242010-04-28 21:37:09 +00003051 try:
3052 support.run_unittest(*tests)
3053 finally:
3054 if _have_threads:
3055 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003056
3057if __name__ == "__main__":
3058 test_main()