blob: 15d6b821a20bf134305d84c8101db2044054f133 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Thomas Woutersed03b412007-08-28 21:37:11 +000021
Antoine Pitrou05d936d2010-10-13 11:38:36 +000022ssl = support.import_module("ssl")
23
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010024PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000025HOST = support.HOST
Antoine Pitrou152efa22010-05-16 18:19:27 +000026
Christian Heimesefff7062013-11-21 03:35:02 +010027def data_file(*name):
28 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000029
Antoine Pitrou81564092010-10-08 23:06:24 +000030# The custom key and certificate files used in test_ssl are generated
31# using Lib/test/make_ssl_certs.py.
32# Other certificates are simply fetched from the Internet servers they
33# are meant to authenticate.
34
Antoine Pitrou152efa22010-05-16 18:19:27 +000035CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000036BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000037ONLYCERT = data_file("ssl_cert.pem")
38ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000039BYTES_ONLYCERT = os.fsencode(ONLYCERT)
40BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020041CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
42ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
43KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010046CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
47CAFILE_CACERT = data_file("capath", "5ed36f99.0")
48
Antoine Pitrou152efa22010-05-16 18:19:27 +000049
Christian Heimes22587792013-11-21 23:56:13 +010050# empty CRL
51CRLFILE = data_file("revocation.crl")
52
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010053# Two keys and certs signed by the same CA (for SNI tests)
54SIGNED_CERTFILE = data_file("keycert3.pem")
55SIGNED_CERTFILE2 = data_file("keycert4.pem")
56SIGNING_CA = data_file("pycacert.pem")
57
Antoine Pitrou152efa22010-05-16 18:19:27 +000058SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
59
60EMPTYCERT = data_file("nullcert.pem")
61BADCERT = data_file("badcert.pem")
62WRONGCERT = data_file("XXXnonexisting.pem")
63BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020064NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020065NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000066
Antoine Pitrou0e576f12011-12-22 10:03:38 +010067DHFILE = data_file("dh512.pem")
68BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000069
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010070
Thomas Woutersed03b412007-08-28 21:37:11 +000071def handle_error(prefix):
72 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000073 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000074 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000075
Antoine Pitroub5218772010-05-21 09:56:06 +000076def can_clear_options():
77 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020078 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000079
80def no_sslv2_implies_sslv3_hello():
81 # 0.9.7h or higher
82 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
83
Christian Heimes2427b502013-11-23 11:24:32 +010084def have_verify_flags():
85 # 0.9.8 or higher
86 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
87
Antoine Pitrouc695c952014-04-28 20:57:36 +020088def utc_offset(): #NOTE: ignore issues like #1647654
89 # local time = utc time + utc offset
90 if time.daylight and time.localtime().tm_isdst > 0:
91 return -time.altzone # seconds
92 return -time.timezone
93
Christian Heimes9424bb42013-06-17 15:32:57 +020094def asn1time(cert_time):
95 # Some versions of OpenSSL ignore seconds, see #18207
96 # 0.9.8.i
97 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
98 fmt = "%b %d %H:%M:%S %Y GMT"
99 dt = datetime.datetime.strptime(cert_time, fmt)
100 dt = dt.replace(second=0)
101 cert_time = dt.strftime(fmt)
102 # %d adds leading zero but ASN1_TIME_print() uses leading space
103 if cert_time[4] == "0":
104 cert_time = cert_time[:4] + " " + cert_time[5:]
105
106 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000107
Antoine Pitrou23df4832010-08-04 17:14:06 +0000108# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
109def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200110 if hasattr(ssl, 'PROTOCOL_SSLv2'):
111 @functools.wraps(func)
112 def f(*args, **kwargs):
113 try:
114 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
115 except ssl.SSLError:
116 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
117 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
118 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
119 return func(*args, **kwargs)
120 return f
121 else:
122 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000123
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100124needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
125
Antoine Pitrou23df4832010-08-04 17:14:06 +0000126
Antoine Pitrou152efa22010-05-16 18:19:27 +0000127class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000128
Antoine Pitrou480a1242010-04-28 21:37:09 +0000129 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000130 ssl.CERT_NONE
131 ssl.CERT_OPTIONAL
132 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100133 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100134 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100135 if ssl.HAS_ECDH:
136 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100137 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
138 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000139 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100140 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000141
Antoine Pitrou172f0252014-04-18 20:33:08 +0200142 def test_str_for_enums(self):
143 # Make sure that the PROTOCOL_* constants have enum-like string
144 # reprs.
Victor Stinner648b8622014-12-12 12:23:59 +0100145 proto = ssl.PROTOCOL_SSLv23
146 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv23')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200147 ctx = ssl.SSLContext(proto)
148 self.assertIs(ctx.protocol, proto)
149
Antoine Pitrou480a1242010-04-28 21:37:09 +0000150 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000151 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000152 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000153 sys.stdout.write("\n RAND_status is %d (%s)\n"
154 % (v, (v and "sufficient randomness") or
155 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200156
157 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
158 self.assertEqual(len(data), 16)
159 self.assertEqual(is_cryptographic, v == 1)
160 if v:
161 data = ssl.RAND_bytes(16)
162 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200163 else:
164 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200165
Victor Stinner1e81a392013-12-19 16:47:04 +0100166 # negative num is invalid
167 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
168 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
169
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100170 if hasattr(ssl, 'RAND_egd'):
171 self.assertRaises(TypeError, ssl.RAND_egd, 1)
172 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000173 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200174 ssl.RAND_add(b"this is a random bytes object", 75.0)
175 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000176
Christian Heimesf77b4b22013-08-21 13:26:05 +0200177 @unittest.skipUnless(os.name == 'posix', 'requires posix')
178 def test_random_fork(self):
179 status = ssl.RAND_status()
180 if not status:
181 self.fail("OpenSSL's PRNG has insufficient randomness")
182
183 rfd, wfd = os.pipe()
184 pid = os.fork()
185 if pid == 0:
186 try:
187 os.close(rfd)
188 child_random = ssl.RAND_pseudo_bytes(16)[0]
189 self.assertEqual(len(child_random), 16)
190 os.write(wfd, child_random)
191 os.close(wfd)
192 except BaseException:
193 os._exit(1)
194 else:
195 os._exit(0)
196 else:
197 os.close(wfd)
198 self.addCleanup(os.close, rfd)
199 _, status = os.waitpid(pid, 0)
200 self.assertEqual(status, 0)
201
202 child_random = os.read(rfd, 16)
203 self.assertEqual(len(child_random), 16)
204 parent_random = ssl.RAND_pseudo_bytes(16)[0]
205 self.assertEqual(len(parent_random), 16)
206
207 self.assertNotEqual(child_random, parent_random)
208
Antoine Pitrou480a1242010-04-28 21:37:09 +0000209 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000210 # note that this uses an 'unofficial' function in _ssl.c,
211 # provided solely for this test, to exercise the certificate
212 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000213 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000214 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000215 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200216 self.assertEqual(p['issuer'],
217 ((('countryName', 'XY'),),
218 (('localityName', 'Castle Anthrax'),),
219 (('organizationName', 'Python Software Foundation'),),
220 (('commonName', 'localhost'),))
221 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100222 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200223 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
224 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200225 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
226 self.assertEqual(p['subject'],
227 ((('countryName', 'XY'),),
228 (('localityName', 'Castle Anthrax'),),
229 (('organizationName', 'Python Software Foundation'),),
230 (('commonName', 'localhost'),))
231 )
232 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
233 # Issue #13034: the subjectAltName in some certificates
234 # (notably projects.developer.nokia.com:443) wasn't parsed
235 p = ssl._ssl._test_decode_cert(NOKIACERT)
236 if support.verbose:
237 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
238 self.assertEqual(p['subjectAltName'],
239 (('DNS', 'projects.developer.nokia.com'),
240 ('DNS', 'projects.forum.nokia.com'))
241 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100242 # extra OCSP and AIA fields
243 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
244 self.assertEqual(p['caIssuers'],
245 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
246 self.assertEqual(p['crlDistributionPoints'],
247 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000248
Christian Heimes824f7f32013-08-17 00:54:47 +0200249 def test_parse_cert_CVE_2013_4238(self):
250 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
251 if support.verbose:
252 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
253 subject = ((('countryName', 'US'),),
254 (('stateOrProvinceName', 'Oregon'),),
255 (('localityName', 'Beaverton'),),
256 (('organizationName', 'Python Software Foundation'),),
257 (('organizationalUnitName', 'Python Core Development'),),
258 (('commonName', 'null.python.org\x00example.org'),),
259 (('emailAddress', 'python-dev@python.org'),))
260 self.assertEqual(p['subject'], subject)
261 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200262 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
263 san = (('DNS', 'altnull.python.org\x00example.com'),
264 ('email', 'null@python.org\x00user@example.org'),
265 ('URI', 'http://null.python.org\x00http://example.org'),
266 ('IP Address', '192.0.2.1'),
267 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
268 else:
269 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
270 san = (('DNS', 'altnull.python.org\x00example.com'),
271 ('email', 'null@python.org\x00user@example.org'),
272 ('URI', 'http://null.python.org\x00http://example.org'),
273 ('IP Address', '192.0.2.1'),
274 ('IP Address', '<invalid>'))
275
276 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200277
Antoine Pitrou480a1242010-04-28 21:37:09 +0000278 def test_DER_to_PEM(self):
279 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
280 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000281 d1 = ssl.PEM_cert_to_DER_cert(pem)
282 p2 = ssl.DER_cert_to_PEM_cert(d1)
283 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000284 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000285 if not p2.startswith(ssl.PEM_HEADER + '\n'):
286 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
287 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
288 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000289
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000290 def test_openssl_version(self):
291 n = ssl.OPENSSL_VERSION_NUMBER
292 t = ssl.OPENSSL_VERSION_INFO
293 s = ssl.OPENSSL_VERSION
294 self.assertIsInstance(n, int)
295 self.assertIsInstance(t, tuple)
296 self.assertIsInstance(s, str)
297 # Some sanity checks follow
298 # >= 0.9
299 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400300 # < 3.0
301 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000302 major, minor, fix, patch, status = t
303 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400304 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000305 self.assertGreaterEqual(minor, 0)
306 self.assertLess(minor, 256)
307 self.assertGreaterEqual(fix, 0)
308 self.assertLess(fix, 256)
309 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100310 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000311 self.assertGreaterEqual(status, 0)
312 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400313 # Version string as returned by {Open,Libre}SSL, the format might change
314 if "LibreSSL" in s:
315 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
Victor Stinner789b8052015-01-06 11:51:06 +0100316 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400317 else:
318 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100319 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000320
Antoine Pitrou9d543662010-04-23 23:10:32 +0000321 @support.cpython_only
322 def test_refcycle(self):
323 # Issue #7943: an SSL object doesn't create reference cycles with
324 # itself.
325 s = socket.socket(socket.AF_INET)
326 ss = ssl.wrap_socket(s)
327 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100328 with support.check_warnings(("", ResourceWarning)):
329 del ss
330 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000331
Antoine Pitroua468adc2010-09-14 14:43:44 +0000332 def test_wrapped_unconnected(self):
333 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200334 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000335 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100336 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100337 self.assertRaises(OSError, ss.recv, 1)
338 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
339 self.assertRaises(OSError, ss.recvfrom, 1)
340 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
341 self.assertRaises(OSError, ss.send, b'x')
342 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000343
Antoine Pitrou40f08742010-04-24 22:04:40 +0000344 def test_timeout(self):
345 # Issue #8524: when creating an SSL socket, the timeout of the
346 # original socket should be retained.
347 for timeout in (None, 0.0, 5.0):
348 s = socket.socket(socket.AF_INET)
349 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100350 with ssl.wrap_socket(s) as ss:
351 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000352
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000353 def test_errors(self):
354 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000355 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000356 "certfile must be specified",
357 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000358 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000359 "certfile must be specified for server-side operations",
360 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000361 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000362 "certfile must be specified for server-side operations",
363 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100364 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
365 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
366 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200367 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000368 with socket.socket() as sock:
369 ssl.wrap_socket(sock, certfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000370 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200371 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000372 with socket.socket() as sock:
373 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000374 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200375 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000376 with socket.socket() as sock:
377 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000378 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000379
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000380 def test_match_hostname(self):
381 def ok(cert, hostname):
382 ssl.match_hostname(cert, hostname)
383 def fail(cert, hostname):
384 self.assertRaises(ssl.CertificateError,
385 ssl.match_hostname, cert, hostname)
386
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100387 # -- Hostname matching --
388
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000389 cert = {'subject': ((('commonName', 'example.com'),),)}
390 ok(cert, 'example.com')
391 ok(cert, 'ExAmple.cOm')
392 fail(cert, 'www.example.com')
393 fail(cert, '.example.com')
394 fail(cert, 'example.org')
395 fail(cert, 'exampleXcom')
396
397 cert = {'subject': ((('commonName', '*.a.com'),),)}
398 ok(cert, 'foo.a.com')
399 fail(cert, 'bar.foo.a.com')
400 fail(cert, 'a.com')
401 fail(cert, 'Xa.com')
402 fail(cert, '.a.com')
403
Georg Brandl72c98d32013-10-27 07:16:53 +0100404 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000405 cert = {'subject': ((('commonName', 'f*.com'),),)}
406 ok(cert, 'foo.com')
407 ok(cert, 'f.com')
408 fail(cert, 'bar.com')
409 fail(cert, 'foo.a.com')
410 fail(cert, 'bar.foo.com')
411
Christian Heimes824f7f32013-08-17 00:54:47 +0200412 # NULL bytes are bad, CVE-2013-4073
413 cert = {'subject': ((('commonName',
414 'null.python.org\x00example.org'),),)}
415 ok(cert, 'null.python.org\x00example.org') # or raise an error?
416 fail(cert, 'example.org')
417 fail(cert, 'null.python.org')
418
Georg Brandl72c98d32013-10-27 07:16:53 +0100419 # error cases with wildcards
420 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
421 fail(cert, 'bar.foo.a.com')
422 fail(cert, 'a.com')
423 fail(cert, 'Xa.com')
424 fail(cert, '.a.com')
425
426 cert = {'subject': ((('commonName', 'a.*.com'),),)}
427 fail(cert, 'a.foo.com')
428 fail(cert, 'a..com')
429 fail(cert, 'a.com')
430
431 # wildcard doesn't match IDNA prefix 'xn--'
432 idna = 'püthon.python.org'.encode("idna").decode("ascii")
433 cert = {'subject': ((('commonName', idna),),)}
434 ok(cert, idna)
435 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
436 fail(cert, idna)
437 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
438 fail(cert, idna)
439
440 # wildcard in first fragment and IDNA A-labels in sequent fragments
441 # are supported.
442 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
443 cert = {'subject': ((('commonName', idna),),)}
444 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
445 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
446 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
447 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
448
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000449 # Slightly fake real-world example
450 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
451 'subject': ((('commonName', 'linuxfrz.org'),),),
452 'subjectAltName': (('DNS', 'linuxfr.org'),
453 ('DNS', 'linuxfr.com'),
454 ('othername', '<unsupported>'))}
455 ok(cert, 'linuxfr.org')
456 ok(cert, 'linuxfr.com')
457 # Not a "DNS" entry
458 fail(cert, '<unsupported>')
459 # When there is a subjectAltName, commonName isn't used
460 fail(cert, 'linuxfrz.org')
461
462 # A pristine real-world example
463 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
464 'subject': ((('countryName', 'US'),),
465 (('stateOrProvinceName', 'California'),),
466 (('localityName', 'Mountain View'),),
467 (('organizationName', 'Google Inc'),),
468 (('commonName', 'mail.google.com'),))}
469 ok(cert, 'mail.google.com')
470 fail(cert, 'gmail.com')
471 # Only commonName is considered
472 fail(cert, 'California')
473
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100474 # -- IPv4 matching --
475 cert = {'subject': ((('commonName', 'example.com'),),),
476 'subjectAltName': (('DNS', 'example.com'),
477 ('IP Address', '10.11.12.13'),
478 ('IP Address', '14.15.16.17'))}
479 ok(cert, '10.11.12.13')
480 ok(cert, '14.15.16.17')
481 fail(cert, '14.15.16.18')
482 fail(cert, 'example.net')
483
484 # -- IPv6 matching --
485 cert = {'subject': ((('commonName', 'example.com'),),),
486 'subjectAltName': (('DNS', 'example.com'),
487 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
488 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
489 ok(cert, '2001::cafe')
490 ok(cert, '2003::baba')
491 fail(cert, '2003::bebe')
492 fail(cert, 'example.net')
493
494 # -- Miscellaneous --
495
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000496 # Neither commonName nor subjectAltName
497 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
498 'subject': ((('countryName', 'US'),),
499 (('stateOrProvinceName', 'California'),),
500 (('localityName', 'Mountain View'),),
501 (('organizationName', 'Google Inc'),))}
502 fail(cert, 'mail.google.com')
503
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200504 # No DNS entry in subjectAltName but a commonName
505 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
506 'subject': ((('countryName', 'US'),),
507 (('stateOrProvinceName', 'California'),),
508 (('localityName', 'Mountain View'),),
509 (('commonName', 'mail.google.com'),)),
510 'subjectAltName': (('othername', 'blabla'), )}
511 ok(cert, 'mail.google.com')
512
513 # No DNS entry subjectAltName and no commonName
514 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
515 'subject': ((('countryName', 'US'),),
516 (('stateOrProvinceName', 'California'),),
517 (('localityName', 'Mountain View'),),
518 (('organizationName', 'Google Inc'),)),
519 'subjectAltName': (('othername', 'blabla'),)}
520 fail(cert, 'google.com')
521
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000522 # Empty cert / no cert
523 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
524 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
525
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200526 # Issue #17980: avoid denials of service by refusing more than one
527 # wildcard per fragment.
528 cert = {'subject': ((('commonName', 'a*b.com'),),)}
529 ok(cert, 'axxb.com')
530 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100531 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200532 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
533 with self.assertRaises(ssl.CertificateError) as cm:
534 ssl.match_hostname(cert, 'axxbxxc.com')
535 self.assertIn("too many wildcards", str(cm.exception))
536
Antoine Pitroud5323212010-10-22 18:19:07 +0000537 def test_server_side(self):
538 # server_hostname doesn't work for server sockets
539 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000540 with socket.socket() as sock:
541 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
542 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000543
Antoine Pitroud6494802011-07-21 01:11:30 +0200544 def test_unknown_channel_binding(self):
545 # should raise ValueError for unknown type
546 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200547 s.bind(('127.0.0.1', 0))
548 s.listen()
549 c = socket.socket(socket.AF_INET)
550 c.connect(s.getsockname())
551 with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100552 with self.assertRaises(ValueError):
553 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200554 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200555
556 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
557 "'tls-unique' channel binding not available")
558 def test_tls_unique_channel_binding(self):
559 # unconnected should return None for known type
560 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100561 with ssl.wrap_socket(s) as ss:
562 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200563 # the same for server-side
564 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100565 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
566 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200567
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600568 def test_dealloc_warn(self):
569 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
570 r = repr(ss)
571 with self.assertWarns(ResourceWarning) as cm:
572 ss = None
573 support.gc_collect()
574 self.assertIn(r, str(cm.warning.args[0]))
575
Christian Heimes6d7ad132013-06-09 18:02:55 +0200576 def test_get_default_verify_paths(self):
577 paths = ssl.get_default_verify_paths()
578 self.assertEqual(len(paths), 6)
579 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
580
581 with support.EnvironmentVarGuard() as env:
582 env["SSL_CERT_DIR"] = CAPATH
583 env["SSL_CERT_FILE"] = CERTFILE
584 paths = ssl.get_default_verify_paths()
585 self.assertEqual(paths.cafile, CERTFILE)
586 self.assertEqual(paths.capath, CAPATH)
587
Christian Heimes44109d72013-11-22 01:51:30 +0100588 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
589 def test_enum_certificates(self):
590 self.assertTrue(ssl.enum_certificates("CA"))
591 self.assertTrue(ssl.enum_certificates("ROOT"))
592
593 self.assertRaises(TypeError, ssl.enum_certificates)
594 self.assertRaises(WindowsError, ssl.enum_certificates, "")
595
Christian Heimesc2d65e12013-11-22 16:13:55 +0100596 trust_oids = set()
597 for storename in ("CA", "ROOT"):
598 store = ssl.enum_certificates(storename)
599 self.assertIsInstance(store, list)
600 for element in store:
601 self.assertIsInstance(element, tuple)
602 self.assertEqual(len(element), 3)
603 cert, enc, trust = element
604 self.assertIsInstance(cert, bytes)
605 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
606 self.assertIsInstance(trust, (set, bool))
607 if isinstance(trust, set):
608 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100609
610 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100611 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200612
Christian Heimes46bebee2013-06-09 19:03:31 +0200613 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100614 def test_enum_crls(self):
615 self.assertTrue(ssl.enum_crls("CA"))
616 self.assertRaises(TypeError, ssl.enum_crls)
617 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200618
Christian Heimes44109d72013-11-22 01:51:30 +0100619 crls = ssl.enum_crls("CA")
620 self.assertIsInstance(crls, list)
621 for element in crls:
622 self.assertIsInstance(element, tuple)
623 self.assertEqual(len(element), 2)
624 self.assertIsInstance(element[0], bytes)
625 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200626
Christian Heimes46bebee2013-06-09 19:03:31 +0200627
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100628 def test_asn1object(self):
629 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
630 '1.3.6.1.5.5.7.3.1')
631
632 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
633 self.assertEqual(val, expected)
634 self.assertEqual(val.nid, 129)
635 self.assertEqual(val.shortname, 'serverAuth')
636 self.assertEqual(val.longname, 'TLS Web Server Authentication')
637 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
638 self.assertIsInstance(val, ssl._ASN1Object)
639 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
640
641 val = ssl._ASN1Object.fromnid(129)
642 self.assertEqual(val, expected)
643 self.assertIsInstance(val, ssl._ASN1Object)
644 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100645 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
646 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100647 for i in range(1000):
648 try:
649 obj = ssl._ASN1Object.fromnid(i)
650 except ValueError:
651 pass
652 else:
653 self.assertIsInstance(obj.nid, int)
654 self.assertIsInstance(obj.shortname, str)
655 self.assertIsInstance(obj.longname, str)
656 self.assertIsInstance(obj.oid, (str, type(None)))
657
658 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
659 self.assertEqual(val, expected)
660 self.assertIsInstance(val, ssl._ASN1Object)
661 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
662 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
663 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100664 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
665 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100666
Christian Heimes72d28502013-11-23 13:56:58 +0100667 def test_purpose_enum(self):
668 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
669 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
670 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
671 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
672 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
673 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
674 '1.3.6.1.5.5.7.3.1')
675
676 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
677 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
678 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
679 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
680 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
681 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
682 '1.3.6.1.5.5.7.3.2')
683
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100684 def test_unsupported_dtls(self):
685 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
686 self.addCleanup(s.close)
687 with self.assertRaises(NotImplementedError) as cx:
688 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
689 self.assertEqual(str(cx.exception), "only stream sockets are supported")
690 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
691 with self.assertRaises(NotImplementedError) as cx:
692 ctx.wrap_socket(s)
693 self.assertEqual(str(cx.exception), "only stream sockets are supported")
694
Antoine Pitrouc695c952014-04-28 20:57:36 +0200695 def cert_time_ok(self, timestring, timestamp):
696 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
697
698 def cert_time_fail(self, timestring):
699 with self.assertRaises(ValueError):
700 ssl.cert_time_to_seconds(timestring)
701
702 @unittest.skipUnless(utc_offset(),
703 'local time needs to be different from UTC')
704 def test_cert_time_to_seconds_timezone(self):
705 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
706 # results if local timezone is not UTC
707 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
708 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
709
710 def test_cert_time_to_seconds(self):
711 timestring = "Jan 5 09:34:43 2018 GMT"
712 ts = 1515144883.0
713 self.cert_time_ok(timestring, ts)
714 # accept keyword parameter, assert its name
715 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
716 # accept both %e and %d (space or zero generated by strftime)
717 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
718 # case-insensitive
719 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
720 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
721 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
722 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
723 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
724 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
725 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
726 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
727
728 newyear_ts = 1230768000.0
729 # leap seconds
730 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
731 # same timestamp
732 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
733
734 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
735 # allow 60th second (even if it is not a leap second)
736 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
737 # allow 2nd leap second for compatibility with time.strptime()
738 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
739 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
740
741 # no special treatement for the special value:
742 # 99991231235959Z (rfc 5280)
743 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
744
745 @support.run_with_locale('LC_ALL', '')
746 def test_cert_time_to_seconds_locale(self):
747 # `cert_time_to_seconds()` should be locale independent
748
749 def local_february_name():
750 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
751
752 if local_february_name().lower() == 'feb':
753 self.skipTest("locale-specific month name needs to be "
754 "different from C locale")
755
756 # locale-independent
757 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
758 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
759
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100760
Antoine Pitrou152efa22010-05-16 18:19:27 +0000761class ContextTests(unittest.TestCase):
762
Antoine Pitrou23df4832010-08-04 17:14:06 +0000763 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000764 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100765 for protocol in PROTOCOLS:
766 ssl.SSLContext(protocol)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000767 self.assertRaises(TypeError, ssl.SSLContext)
768 self.assertRaises(ValueError, ssl.SSLContext, -1)
769 self.assertRaises(ValueError, ssl.SSLContext, 42)
770
Antoine Pitrou23df4832010-08-04 17:14:06 +0000771 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000772 def test_protocol(self):
773 for proto in PROTOCOLS:
774 ctx = ssl.SSLContext(proto)
775 self.assertEqual(ctx.protocol, proto)
776
777 def test_ciphers(self):
778 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
779 ctx.set_ciphers("ALL")
780 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000781 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000782 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000783
Antoine Pitrou23df4832010-08-04 17:14:06 +0000784 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000785 def test_options(self):
786 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +0100787 # OP_ALL | OP_NO_SSLv2 is the default value
Antoine Pitroub5218772010-05-21 09:56:06 +0000788 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
789 ctx.options)
790 ctx.options |= ssl.OP_NO_SSLv3
791 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
792 ctx.options)
793 if can_clear_options():
794 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
795 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
796 ctx.options)
797 ctx.options = 0
798 self.assertEqual(0, ctx.options)
799 else:
800 with self.assertRaises(ValueError):
801 ctx.options = 0
802
Christian Heimes22587792013-11-21 23:56:13 +0100803 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000804 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
805 # Default value
806 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
807 ctx.verify_mode = ssl.CERT_OPTIONAL
808 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
809 ctx.verify_mode = ssl.CERT_REQUIRED
810 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
811 ctx.verify_mode = ssl.CERT_NONE
812 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
813 with self.assertRaises(TypeError):
814 ctx.verify_mode = None
815 with self.assertRaises(ValueError):
816 ctx.verify_mode = 42
817
Christian Heimes2427b502013-11-23 11:24:32 +0100818 @unittest.skipUnless(have_verify_flags(),
819 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100820 def test_verify_flags(self):
821 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -0500822 # default value
823 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
824 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +0100825 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
826 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
827 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
828 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
829 ctx.verify_flags = ssl.VERIFY_DEFAULT
830 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
831 # supports any value
832 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
833 self.assertEqual(ctx.verify_flags,
834 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
835 with self.assertRaises(TypeError):
836 ctx.verify_flags = None
837
Antoine Pitrou152efa22010-05-16 18:19:27 +0000838 def test_load_cert_chain(self):
839 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
840 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500841 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000842 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
843 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200844 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000845 ctx.load_cert_chain(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000846 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000847 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000848 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000849 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000850 ctx.load_cert_chain(EMPTYCERT)
851 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000853 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
854 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
855 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000856 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000857 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000858 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000859 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000860 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000861 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
862 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000863 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000864 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Antoine Pitrou81564092010-10-08 23:06:24 +0000865 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200866 # Password protected key and cert
867 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
868 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
869 ctx.load_cert_chain(CERTFILE_PROTECTED,
870 password=bytearray(KEY_PASSWORD.encode()))
871 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
872 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
873 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
874 bytearray(KEY_PASSWORD.encode()))
875 with self.assertRaisesRegex(TypeError, "should be a string"):
876 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
877 with self.assertRaises(ssl.SSLError):
878 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
879 with self.assertRaisesRegex(ValueError, "cannot be longer"):
880 # openssl has a fixed limit on the password buffer.
881 # PEM_BUFSIZE is generally set to 1kb.
882 # Return a string larger than this.
883 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
884 # Password callback
885 def getpass_unicode():
886 return KEY_PASSWORD
887 def getpass_bytes():
888 return KEY_PASSWORD.encode()
889 def getpass_bytearray():
890 return bytearray(KEY_PASSWORD.encode())
891 def getpass_badpass():
892 return "badpass"
893 def getpass_huge():
894 return b'a' * (1024 * 1024)
895 def getpass_bad_type():
896 return 9
897 def getpass_exception():
898 raise Exception('getpass error')
899 class GetPassCallable:
900 def __call__(self):
901 return KEY_PASSWORD
902 def getpass(self):
903 return KEY_PASSWORD
904 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
905 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
906 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
907 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
908 ctx.load_cert_chain(CERTFILE_PROTECTED,
909 password=GetPassCallable().getpass)
910 with self.assertRaises(ssl.SSLError):
911 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
912 with self.assertRaisesRegex(ValueError, "cannot be longer"):
913 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
914 with self.assertRaisesRegex(TypeError, "must return a string"):
915 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
916 with self.assertRaisesRegex(Exception, "getpass error"):
917 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
918 # Make sure the password function isn't called if it isn't needed
919 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000920
921 def test_load_verify_locations(self):
922 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
923 ctx.load_verify_locations(CERTFILE)
924 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
925 ctx.load_verify_locations(BYTES_CERTFILE)
926 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
927 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100928 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200929 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000930 ctx.load_verify_locations(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000931 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000932 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000933 ctx.load_verify_locations(BADCERT)
934 ctx.load_verify_locations(CERTFILE, CAPATH)
935 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
936
Victor Stinner80f75e62011-01-29 11:31:20 +0000937 # Issue #10989: crash if the second argument type is invalid
938 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
939
Christian Heimesefff7062013-11-21 03:35:02 +0100940 def test_load_verify_cadata(self):
941 # test cadata
942 with open(CAFILE_CACERT) as f:
943 cacert_pem = f.read()
944 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
945 with open(CAFILE_NEURONIO) as f:
946 neuronio_pem = f.read()
947 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
948
949 # test PEM
950 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
951 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
952 ctx.load_verify_locations(cadata=cacert_pem)
953 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
954 ctx.load_verify_locations(cadata=neuronio_pem)
955 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
956 # cert already in hash table
957 ctx.load_verify_locations(cadata=neuronio_pem)
958 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
959
960 # combined
961 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
962 combined = "\n".join((cacert_pem, neuronio_pem))
963 ctx.load_verify_locations(cadata=combined)
964 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
965
966 # with junk around the certs
967 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
968 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
969 neuronio_pem, "tail"]
970 ctx.load_verify_locations(cadata="\n".join(combined))
971 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
972
973 # test DER
974 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
975 ctx.load_verify_locations(cadata=cacert_der)
976 ctx.load_verify_locations(cadata=neuronio_der)
977 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
978 # cert already in hash table
979 ctx.load_verify_locations(cadata=cacert_der)
980 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
981
982 # combined
983 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
984 combined = b"".join((cacert_der, neuronio_der))
985 ctx.load_verify_locations(cadata=combined)
986 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
987
988 # error cases
989 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
990 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
991
992 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
993 ctx.load_verify_locations(cadata="broken")
994 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
995 ctx.load_verify_locations(cadata=b"broken")
996
997
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100998 def test_load_dh_params(self):
999 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1000 ctx.load_dh_params(DHFILE)
1001 if os.name != 'nt':
1002 ctx.load_dh_params(BYTES_DHFILE)
1003 self.assertRaises(TypeError, ctx.load_dh_params)
1004 self.assertRaises(TypeError, ctx.load_dh_params, None)
1005 with self.assertRaises(FileNotFoundError) as cm:
1006 ctx.load_dh_params(WRONGCERT)
1007 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001008 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001009 ctx.load_dh_params(CERTFILE)
1010
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001011 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001012 def test_session_stats(self):
1013 for proto in PROTOCOLS:
1014 ctx = ssl.SSLContext(proto)
1015 self.assertEqual(ctx.session_stats(), {
1016 'number': 0,
1017 'connect': 0,
1018 'connect_good': 0,
1019 'connect_renegotiate': 0,
1020 'accept': 0,
1021 'accept_good': 0,
1022 'accept_renegotiate': 0,
1023 'hits': 0,
1024 'misses': 0,
1025 'timeouts': 0,
1026 'cache_full': 0,
1027 })
1028
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001029 def test_set_default_verify_paths(self):
1030 # There's not much we can do to test that it acts as expected,
1031 # so just check it doesn't crash or raise an exception.
1032 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1033 ctx.set_default_verify_paths()
1034
Antoine Pitrou501da612011-12-21 09:27:41 +01001035 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001036 def test_set_ecdh_curve(self):
1037 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1038 ctx.set_ecdh_curve("prime256v1")
1039 ctx.set_ecdh_curve(b"prime256v1")
1040 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1041 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1042 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1043 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1044
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001045 @needs_sni
1046 def test_sni_callback(self):
1047 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1048
1049 # set_servername_callback expects a callable, or None
1050 self.assertRaises(TypeError, ctx.set_servername_callback)
1051 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1052 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1053 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1054
1055 def dummycallback(sock, servername, ctx):
1056 pass
1057 ctx.set_servername_callback(None)
1058 ctx.set_servername_callback(dummycallback)
1059
1060 @needs_sni
1061 def test_sni_callback_refcycle(self):
1062 # Reference cycles through the servername callback are detected
1063 # and cleared.
1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1065 def dummycallback(sock, servername, ctx, cycle=ctx):
1066 pass
1067 ctx.set_servername_callback(dummycallback)
1068 wr = weakref.ref(ctx)
1069 del ctx, dummycallback
1070 gc.collect()
1071 self.assertIs(wr(), None)
1072
Christian Heimes9a5395a2013-06-17 15:44:12 +02001073 def test_cert_store_stats(self):
1074 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1075 self.assertEqual(ctx.cert_store_stats(),
1076 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1077 ctx.load_cert_chain(CERTFILE)
1078 self.assertEqual(ctx.cert_store_stats(),
1079 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1080 ctx.load_verify_locations(CERTFILE)
1081 self.assertEqual(ctx.cert_store_stats(),
1082 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1083 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1084 self.assertEqual(ctx.cert_store_stats(),
1085 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1086
1087 def test_get_ca_certs(self):
1088 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1089 self.assertEqual(ctx.get_ca_certs(), [])
1090 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1091 ctx.load_verify_locations(CERTFILE)
1092 self.assertEqual(ctx.get_ca_certs(), [])
1093 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1094 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1095 self.assertEqual(ctx.get_ca_certs(),
1096 [{'issuer': ((('organizationName', 'Root CA'),),
1097 (('organizationalUnitName', 'http://www.cacert.org'),),
1098 (('commonName', 'CA Cert Signing Authority'),),
1099 (('emailAddress', 'support@cacert.org'),)),
1100 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1101 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1102 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001103 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001104 'subject': ((('organizationName', 'Root CA'),),
1105 (('organizationalUnitName', 'http://www.cacert.org'),),
1106 (('commonName', 'CA Cert Signing Authority'),),
1107 (('emailAddress', 'support@cacert.org'),)),
1108 'version': 3}])
1109
1110 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1111 pem = f.read()
1112 der = ssl.PEM_cert_to_DER_cert(pem)
1113 self.assertEqual(ctx.get_ca_certs(True), [der])
1114
Christian Heimes72d28502013-11-23 13:56:58 +01001115 def test_load_default_certs(self):
1116 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1117 ctx.load_default_certs()
1118
1119 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1120 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1121 ctx.load_default_certs()
1122
1123 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1124 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1125
1126 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1127 self.assertRaises(TypeError, ctx.load_default_certs, None)
1128 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1129
Benjamin Peterson91244e02014-10-03 18:17:15 -04001130 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001131 def test_load_default_certs_env(self):
1132 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1133 with support.EnvironmentVarGuard() as env:
1134 env["SSL_CERT_DIR"] = CAPATH
1135 env["SSL_CERT_FILE"] = CERTFILE
1136 ctx.load_default_certs()
1137 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1138
Benjamin Peterson91244e02014-10-03 18:17:15 -04001139 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1140 def test_load_default_certs_env_windows(self):
1141 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1142 ctx.load_default_certs()
1143 stats = ctx.cert_store_stats()
1144
1145 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1146 with support.EnvironmentVarGuard() as env:
1147 env["SSL_CERT_DIR"] = CAPATH
1148 env["SSL_CERT_FILE"] = CERTFILE
1149 ctx.load_default_certs()
1150 stats["x509"] += 1
1151 self.assertEqual(ctx.cert_store_stats(), stats)
1152
Christian Heimes4c05b472013-11-23 15:58:30 +01001153 def test_create_default_context(self):
1154 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001155 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001156 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001157 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001158 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001159 self.assertEqual(
1160 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1161 getattr(ssl, "OP_NO_COMPRESSION", 0),
1162 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001163
1164 with open(SIGNING_CA) as f:
1165 cadata = f.read()
1166 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1167 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001168 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001169 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1170 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001171 self.assertEqual(
1172 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1173 getattr(ssl, "OP_NO_COMPRESSION", 0),
1174 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001175
1176 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001177 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001178 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1179 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001180 self.assertEqual(
1181 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1182 getattr(ssl, "OP_NO_COMPRESSION", 0),
1183 )
1184 self.assertEqual(
1185 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1186 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1187 )
1188 self.assertEqual(
1189 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1190 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1191 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001192
Christian Heimes67986f92013-11-23 22:43:47 +01001193 def test__create_stdlib_context(self):
1194 ctx = ssl._create_stdlib_context()
1195 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1196 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001197 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001198 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1199
1200 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1201 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1202 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1203 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1204
1205 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001206 cert_reqs=ssl.CERT_REQUIRED,
1207 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001208 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1209 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001210 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001211 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1212
1213 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1214 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1215 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1216 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001217
Christian Heimes1aa9a752013-12-02 02:41:19 +01001218 def test_check_hostname(self):
1219 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1220 self.assertFalse(ctx.check_hostname)
1221
1222 # Requires CERT_REQUIRED or CERT_OPTIONAL
1223 with self.assertRaises(ValueError):
1224 ctx.check_hostname = True
1225 ctx.verify_mode = ssl.CERT_REQUIRED
1226 self.assertFalse(ctx.check_hostname)
1227 ctx.check_hostname = True
1228 self.assertTrue(ctx.check_hostname)
1229
1230 ctx.verify_mode = ssl.CERT_OPTIONAL
1231 ctx.check_hostname = True
1232 self.assertTrue(ctx.check_hostname)
1233
1234 # Cannot set CERT_NONE with check_hostname enabled
1235 with self.assertRaises(ValueError):
1236 ctx.verify_mode = ssl.CERT_NONE
1237 ctx.check_hostname = False
1238 self.assertFalse(ctx.check_hostname)
1239
Antoine Pitrou152efa22010-05-16 18:19:27 +00001240
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001241class SSLErrorTests(unittest.TestCase):
1242
1243 def test_str(self):
1244 # The str() of a SSLError doesn't include the errno
1245 e = ssl.SSLError(1, "foo")
1246 self.assertEqual(str(e), "foo")
1247 self.assertEqual(e.errno, 1)
1248 # Same for a subclass
1249 e = ssl.SSLZeroReturnError(1, "foo")
1250 self.assertEqual(str(e), "foo")
1251 self.assertEqual(e.errno, 1)
1252
1253 def test_lib_reason(self):
1254 # Test the library and reason attributes
1255 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1256 with self.assertRaises(ssl.SSLError) as cm:
1257 ctx.load_dh_params(CERTFILE)
1258 self.assertEqual(cm.exception.library, 'PEM')
1259 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1260 s = str(cm.exception)
1261 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1262
1263 def test_subclass(self):
1264 # Check that the appropriate SSLError subclass is raised
1265 # (this only tests one of them)
1266 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1267 with socket.socket() as s:
1268 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001269 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001270 c = socket.socket()
1271 c.connect(s.getsockname())
1272 c.setblocking(False)
1273 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001274 with self.assertRaises(ssl.SSLWantReadError) as cm:
1275 c.do_handshake()
1276 s = str(cm.exception)
1277 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1278 # For compatibility
1279 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1280
1281
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001282class MemoryBIOTests(unittest.TestCase):
1283
1284 def test_read_write(self):
1285 bio = ssl.MemoryBIO()
1286 bio.write(b'foo')
1287 self.assertEqual(bio.read(), b'foo')
1288 self.assertEqual(bio.read(), b'')
1289 bio.write(b'foo')
1290 bio.write(b'bar')
1291 self.assertEqual(bio.read(), b'foobar')
1292 self.assertEqual(bio.read(), b'')
1293 bio.write(b'baz')
1294 self.assertEqual(bio.read(2), b'ba')
1295 self.assertEqual(bio.read(1), b'z')
1296 self.assertEqual(bio.read(1), b'')
1297
1298 def test_eof(self):
1299 bio = ssl.MemoryBIO()
1300 self.assertFalse(bio.eof)
1301 self.assertEqual(bio.read(), b'')
1302 self.assertFalse(bio.eof)
1303 bio.write(b'foo')
1304 self.assertFalse(bio.eof)
1305 bio.write_eof()
1306 self.assertFalse(bio.eof)
1307 self.assertEqual(bio.read(2), b'fo')
1308 self.assertFalse(bio.eof)
1309 self.assertEqual(bio.read(1), b'o')
1310 self.assertTrue(bio.eof)
1311 self.assertEqual(bio.read(), b'')
1312 self.assertTrue(bio.eof)
1313
1314 def test_pending(self):
1315 bio = ssl.MemoryBIO()
1316 self.assertEqual(bio.pending, 0)
1317 bio.write(b'foo')
1318 self.assertEqual(bio.pending, 3)
1319 for i in range(3):
1320 bio.read(1)
1321 self.assertEqual(bio.pending, 3-i-1)
1322 for i in range(3):
1323 bio.write(b'x')
1324 self.assertEqual(bio.pending, i+1)
1325 bio.read()
1326 self.assertEqual(bio.pending, 0)
1327
1328 def test_buffer_types(self):
1329 bio = ssl.MemoryBIO()
1330 bio.write(b'foo')
1331 self.assertEqual(bio.read(), b'foo')
1332 bio.write(bytearray(b'bar'))
1333 self.assertEqual(bio.read(), b'bar')
1334 bio.write(memoryview(b'baz'))
1335 self.assertEqual(bio.read(), b'baz')
1336
1337 def test_error_types(self):
1338 bio = ssl.MemoryBIO()
1339 self.assertRaises(TypeError, bio.write, 'foo')
1340 self.assertRaises(TypeError, bio.write, None)
1341 self.assertRaises(TypeError, bio.write, True)
1342 self.assertRaises(TypeError, bio.write, 1)
1343
1344
Bill Janssen6e027db2007-11-15 22:23:56 +00001345class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001346
Antoine Pitrou480a1242010-04-28 21:37:09 +00001347 def test_connect(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001348 with support.transient_internet("svn.python.org"):
1349 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1350 cert_reqs=ssl.CERT_NONE)
1351 try:
1352 s.connect(("svn.python.org", 443))
1353 self.assertEqual({}, s.getpeercert())
1354 finally:
1355 s.close()
1356
1357 # this should fail because we have no verification certs
1358 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1359 cert_reqs=ssl.CERT_REQUIRED)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001360 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1361 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001362 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001363
Antoine Pitrou350c7222010-09-09 13:31:46 +00001364 # this should succeed because we specify the root cert
1365 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1366 cert_reqs=ssl.CERT_REQUIRED,
1367 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1368 try:
1369 s.connect(("svn.python.org", 443))
1370 self.assertTrue(s.getpeercert())
1371 finally:
1372 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001373
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001374 def test_connect_ex(self):
1375 # Issue #11326: check connect_ex() implementation
1376 with support.transient_internet("svn.python.org"):
1377 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1378 cert_reqs=ssl.CERT_REQUIRED,
1379 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1380 try:
1381 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1382 self.assertTrue(s.getpeercert())
1383 finally:
1384 s.close()
1385
1386 def test_non_blocking_connect_ex(self):
1387 # Issue #11326: non-blocking connect_ex() should allow handshake
1388 # to proceed after the socket gets ready.
1389 with support.transient_internet("svn.python.org"):
1390 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1391 cert_reqs=ssl.CERT_REQUIRED,
1392 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1393 do_handshake_on_connect=False)
1394 try:
1395 s.setblocking(False)
1396 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8a14a0c2011-02-27 15:44:12 +00001397 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1398 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001399 # Wait for connect to finish
1400 select.select([], [s], [], 5.0)
1401 # Non-blocking handshake
1402 while True:
1403 try:
1404 s.do_handshake()
1405 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001406 except ssl.SSLWantReadError:
1407 select.select([s], [], [], 5.0)
1408 except ssl.SSLWantWriteError:
1409 select.select([], [s], [], 5.0)
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001410 # SSL established
1411 self.assertTrue(s.getpeercert())
1412 finally:
1413 s.close()
1414
Antoine Pitroub4410db2011-05-18 18:51:06 +02001415 def test_timeout_connect_ex(self):
1416 # Issue #12065: on a timeout, connect_ex() should return the original
1417 # errno (mimicking the behaviour of non-SSL sockets).
1418 with support.transient_internet("svn.python.org"):
1419 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1420 cert_reqs=ssl.CERT_REQUIRED,
1421 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1422 do_handshake_on_connect=False)
1423 try:
1424 s.settimeout(0.0000001)
1425 rc = s.connect_ex(('svn.python.org', 443))
1426 if rc == 0:
1427 self.skipTest("svn.python.org responded too quickly")
1428 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1429 finally:
1430 s.close()
1431
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001432 def test_connect_ex_error(self):
Antoine Pitrouddb87ab2012-12-28 19:07:43 +01001433 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001434 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1435 cert_reqs=ssl.CERT_REQUIRED,
1436 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1437 try:
Christian Heimesde570742013-12-16 21:15:44 +01001438 rc = s.connect_ex(("svn.python.org", 444))
1439 # Issue #19919: Windows machines or VMs hosted on Windows
1440 # machines sometimes return EWOULDBLOCK.
1441 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001442 finally:
1443 s.close()
1444
Antoine Pitrou152efa22010-05-16 18:19:27 +00001445 def test_connect_with_context(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001446 with support.transient_internet("svn.python.org"):
1447 # Same as test_connect, but with a separately created context
1448 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1449 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1450 s.connect(("svn.python.org", 443))
1451 try:
1452 self.assertEqual({}, s.getpeercert())
1453 finally:
1454 s.close()
Antoine Pitroud5323212010-10-22 18:19:07 +00001455 # Same with a server hostname
1456 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1457 server_hostname="svn.python.org")
Benjamin Peterson7243b572014-11-23 17:04:34 -06001458 s.connect(("svn.python.org", 443))
1459 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001460 # This should fail because we have no verification certs
1461 ctx.verify_mode = ssl.CERT_REQUIRED
1462 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Ezio Melottied3a7d22010-12-01 02:32:32 +00001463 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
Antoine Pitrou350c7222010-09-09 13:31:46 +00001464 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001465 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001466 # This should succeed because we specify the root cert
1467 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1468 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1469 s.connect(("svn.python.org", 443))
1470 try:
1471 cert = s.getpeercert()
1472 self.assertTrue(cert)
1473 finally:
1474 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001475
1476 def test_connect_capath(self):
1477 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001478 # NOTE: the subject hashing algorithm has been changed between
1479 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1480 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001481 # filename) for this test to be portable across OpenSSL releases.
Antoine Pitrou350c7222010-09-09 13:31:46 +00001482 with support.transient_internet("svn.python.org"):
1483 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1484 ctx.verify_mode = ssl.CERT_REQUIRED
1485 ctx.load_verify_locations(capath=CAPATH)
1486 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1487 s.connect(("svn.python.org", 443))
1488 try:
1489 cert = s.getpeercert()
1490 self.assertTrue(cert)
1491 finally:
1492 s.close()
1493 # Same with a bytes `capath` argument
1494 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1495 ctx.verify_mode = ssl.CERT_REQUIRED
1496 ctx.load_verify_locations(capath=BYTES_CAPATH)
1497 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1498 s.connect(("svn.python.org", 443))
1499 try:
1500 cert = s.getpeercert()
1501 self.assertTrue(cert)
1502 finally:
1503 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001504
Christian Heimesefff7062013-11-21 03:35:02 +01001505 def test_connect_cadata(self):
1506 with open(CAFILE_CACERT) as f:
1507 pem = f.read()
1508 der = ssl.PEM_cert_to_DER_cert(pem)
1509 with support.transient_internet("svn.python.org"):
1510 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1511 ctx.verify_mode = ssl.CERT_REQUIRED
1512 ctx.load_verify_locations(cadata=pem)
1513 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1514 s.connect(("svn.python.org", 443))
1515 cert = s.getpeercert()
1516 self.assertTrue(cert)
1517
1518 # same with DER
1519 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1520 ctx.verify_mode = ssl.CERT_REQUIRED
1521 ctx.load_verify_locations(cadata=der)
1522 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1523 s.connect(("svn.python.org", 443))
1524 cert = s.getpeercert()
1525 self.assertTrue(cert)
1526
Antoine Pitroue3220242010-04-24 11:13:53 +00001527 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1528 def test_makefile_close(self):
1529 # Issue #5238: creating a file-like object with makefile() shouldn't
1530 # delay closing the underlying "real socket" (here tested with its
1531 # file descriptor, hence skipping the test under Windows).
Antoine Pitrou350c7222010-09-09 13:31:46 +00001532 with support.transient_internet("svn.python.org"):
1533 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1534 ss.connect(("svn.python.org", 443))
1535 fd = ss.fileno()
1536 f = ss.makefile()
1537 f.close()
1538 # The fd is still open
Antoine Pitroue3220242010-04-24 11:13:53 +00001539 os.read(fd, 0)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001540 # Closing the SSL socket should close the fd too
1541 ss.close()
1542 gc.collect()
1543 with self.assertRaises(OSError) as e:
1544 os.read(fd, 0)
1545 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001546
Antoine Pitrou480a1242010-04-28 21:37:09 +00001547 def test_non_blocking_handshake(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001548 with support.transient_internet("svn.python.org"):
1549 s = socket.socket(socket.AF_INET)
1550 s.connect(("svn.python.org", 443))
1551 s.setblocking(False)
1552 s = ssl.wrap_socket(s,
1553 cert_reqs=ssl.CERT_NONE,
1554 do_handshake_on_connect=False)
1555 count = 0
1556 while True:
1557 try:
1558 count += 1
1559 s.do_handshake()
1560 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001561 except ssl.SSLWantReadError:
1562 select.select([s], [], [])
1563 except ssl.SSLWantWriteError:
1564 select.select([], [s], [])
Antoine Pitrou350c7222010-09-09 13:31:46 +00001565 s.close()
1566 if support.verbose:
1567 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001568
Antoine Pitrou480a1242010-04-28 21:37:09 +00001569 def test_get_server_certificate(self):
Antoine Pitrou15399c32011-04-28 19:23:55 +02001570 def _test_get_server_certificate(host, port, cert=None):
1571 with support.transient_internet(host):
Antoine Pitrou94a5b662014-04-16 18:56:28 +02001572 pem = ssl.get_server_certificate((host, port))
Antoine Pitrou15399c32011-04-28 19:23:55 +02001573 if not pem:
1574 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001575
Antoine Pitrou15399c32011-04-28 19:23:55 +02001576 try:
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001577 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001578 ca_certs=CERTFILE)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001579 except ssl.SSLError as x:
1580 #should fail
1581 if support.verbose:
1582 sys.stdout.write("%s\n" % x)
1583 else:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001584 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1585
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001586 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001587 ca_certs=cert)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001588 if not pem:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001589 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001590 if support.verbose:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001591 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001592
Antoine Pitrou15399c32011-04-28 19:23:55 +02001593 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1594 if support.IPV6_ENABLED:
1595 _test_get_server_certificate('ipv6.google.com', 443)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001596
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001597 def test_ciphers(self):
1598 remote = ("svn.python.org", 443)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001599 with support.transient_internet(remote[0]):
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001600 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1601 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1602 s.connect(remote)
1603 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1604 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1605 s.connect(remote)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001606 # Error checking can happen at instantiation or when connecting
Ezio Melottied3a7d22010-12-01 02:32:32 +00001607 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitroud2eca372010-10-29 23:41:37 +00001608 with socket.socket(socket.AF_INET) as sock:
1609 s = ssl.wrap_socket(sock,
1610 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1611 s.connect(remote)
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001612
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001613 def test_algorithms(self):
1614 # Issue #8484: all algorithms should be available when verifying a
1615 # certificate.
Antoine Pitrou29619b22010-04-22 18:43:31 +00001616 # SHA256 was added in OpenSSL 0.9.8
1617 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1618 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001619 # sha256.tbs-internet.com needs SNI to use the correct certificate
1620 if not ssl.HAS_SNI:
1621 self.skipTest("SNI needed for this test")
Victor Stinnerf332abb2011-01-08 03:16:05 +00001622 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1623 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001624 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Antoine Pitrou160fd932011-01-08 10:23:29 +00001625 with support.transient_internet("sha256.tbs-internet.com"):
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001626 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1627 ctx.verify_mode = ssl.CERT_REQUIRED
1628 ctx.load_verify_locations(sha256_cert)
1629 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1630 server_hostname="sha256.tbs-internet.com")
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001631 try:
1632 s.connect(remote)
1633 if support.verbose:
1634 sys.stdout.write("\nCipher with %r is %r\n" %
1635 (remote, s.cipher()))
1636 sys.stdout.write("Certificate is:\n%s\n" %
1637 pprint.pformat(s.getpeercert()))
1638 finally:
1639 s.close()
1640
Christian Heimes9a5395a2013-06-17 15:44:12 +02001641 def test_get_ca_certs_capath(self):
1642 # capath certs are loaded on request
1643 with support.transient_internet("svn.python.org"):
1644 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1645 ctx.verify_mode = ssl.CERT_REQUIRED
1646 ctx.load_verify_locations(capath=CAPATH)
1647 self.assertEqual(ctx.get_ca_certs(), [])
1648 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1649 s.connect(("svn.python.org", 443))
1650 try:
1651 cert = s.getpeercert()
1652 self.assertTrue(cert)
1653 finally:
1654 s.close()
1655 self.assertEqual(len(ctx.get_ca_certs()), 1)
1656
Christian Heimes575596e2013-12-15 21:49:17 +01001657 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001658 def test_context_setget(self):
1659 # Check that the context of a connected socket can be replaced.
1660 with support.transient_internet("svn.python.org"):
1661 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1662 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1663 s = socket.socket(socket.AF_INET)
1664 with ctx1.wrap_socket(s) as ss:
1665 ss.connect(("svn.python.org", 443))
1666 self.assertIs(ss.context, ctx1)
1667 self.assertIs(ss._sslobj.context, ctx1)
1668 ss.context = ctx2
1669 self.assertIs(ss.context, ctx2)
1670 self.assertIs(ss._sslobj.context, ctx2)
1671
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001672
1673class NetworkedBIOTests(unittest.TestCase):
1674
1675 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1676 # A simple IO loop. Call func(*args) depending on the error we get
1677 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1678 timeout = kwargs.get('timeout', 10)
1679 count = 0
1680 while True:
1681 errno = None
1682 count += 1
1683 try:
1684 ret = func(*args)
1685 except ssl.SSLError as e:
1686 # Note that we get a spurious -1/SSL_ERROR_SYSCALL for
1687 # non-blocking IO. The SSL_shutdown manpage hints at this.
1688 # It *should* be safe to just ignore SYS_ERROR_SYSCALL because
1689 # with a Memory BIO there's no syscalls (for IO at least).
1690 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
1691 ssl.SSL_ERROR_WANT_WRITE,
1692 ssl.SSL_ERROR_SYSCALL):
1693 raise
1694 errno = e.errno
1695 # Get any data from the outgoing BIO irrespective of any error, and
1696 # send it to the socket.
1697 buf = outgoing.read()
1698 sock.sendall(buf)
1699 # If there's no error, we're done. For WANT_READ, we need to get
1700 # data from the socket and put it in the incoming BIO.
1701 if errno is None:
1702 break
1703 elif errno == ssl.SSL_ERROR_WANT_READ:
1704 buf = sock.recv(32768)
1705 if buf:
1706 incoming.write(buf)
1707 else:
1708 incoming.write_eof()
1709 if support.verbose:
1710 sys.stdout.write("Needed %d calls to complete %s().\n"
1711 % (count, func.__name__))
1712 return ret
1713
1714 def test_handshake(self):
1715 with support.transient_internet("svn.python.org"):
1716 sock = socket.socket(socket.AF_INET)
1717 sock.connect(("svn.python.org", 443))
1718 incoming = ssl.MemoryBIO()
1719 outgoing = ssl.MemoryBIO()
1720 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1721 ctx.verify_mode = ssl.CERT_REQUIRED
1722 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
Benjamin Petersonf9284ae2014-11-23 17:06:39 -06001723 ctx.check_hostname = True
1724 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'svn.python.org')
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001725 self.assertIs(sslobj._sslobj.owner, sslobj)
1726 self.assertIsNone(sslobj.cipher())
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001727 self.assertIsNone(sslobj.shared_ciphers())
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001728 self.assertRaises(ValueError, sslobj.getpeercert)
1729 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1730 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1731 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1732 self.assertTrue(sslobj.cipher())
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001733 self.assertIsNone(sslobj.shared_ciphers())
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001734 self.assertTrue(sslobj.getpeercert())
1735 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1736 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1737 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1738 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1739 sock.close()
1740
1741 def test_read_write_data(self):
1742 with support.transient_internet("svn.python.org"):
1743 sock = socket.socket(socket.AF_INET)
1744 sock.connect(("svn.python.org", 443))
1745 incoming = ssl.MemoryBIO()
1746 outgoing = ssl.MemoryBIO()
1747 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1748 ctx.verify_mode = ssl.CERT_NONE
1749 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1750 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1751 req = b'GET / HTTP/1.0\r\n\r\n'
1752 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1753 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1754 self.assertEqual(buf[:5], b'HTTP/')
1755 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1756 sock.close()
1757
1758
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001759try:
1760 import threading
1761except ImportError:
1762 _have_threads = False
1763else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001764 _have_threads = True
1765
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001766 from test.ssl_servers import make_https_server
1767
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001768 class ThreadedEchoServer(threading.Thread):
1769
1770 class ConnectionHandler(threading.Thread):
1771
1772 """A mildly complicated class, because we want it to work both
1773 with and without the SSL wrapper around the socket connection, so
1774 that we can test the STARTTLS functionality."""
1775
Bill Janssen6e027db2007-11-15 22:23:56 +00001776 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001777 self.server = server
1778 self.running = False
1779 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001780 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001781 self.sock.setblocking(1)
1782 self.sslconn = None
1783 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001784 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001785
Antoine Pitrou480a1242010-04-28 21:37:09 +00001786 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001787 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001788 self.sslconn = self.server.context.wrap_socket(
1789 self.sock, server_side=True)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001790 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1791 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001792 except (ssl.SSLError, ConnectionResetError) as e:
1793 # We treat ConnectionResetError as though it were an
1794 # SSLError - OpenSSL on Ubuntu abruptly closes the
1795 # connection when asked to use an unsupported protocol.
1796 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001797 # XXX Various errors can have happened here, for example
1798 # a mismatching protocol version, an invalid certificate,
1799 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001800 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001801 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001802 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001803 self.running = False
1804 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001805 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001806 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001807 else:
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001808 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
Antoine Pitroub5218772010-05-21 09:56:06 +00001809 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001810 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001811 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001812 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1813 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001814 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001815 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1816 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001817 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001818 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001819 sys.stdout.write(" server: selected protocol is now "
1820 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001821 return True
1822
1823 def read(self):
1824 if self.sslconn:
1825 return self.sslconn.read()
1826 else:
1827 return self.sock.recv(1024)
1828
1829 def write(self, bytes):
1830 if self.sslconn:
1831 return self.sslconn.write(bytes)
1832 else:
1833 return self.sock.send(bytes)
1834
1835 def close(self):
1836 if self.sslconn:
1837 self.sslconn.close()
1838 else:
1839 self.sock.close()
1840
Antoine Pitrou480a1242010-04-28 21:37:09 +00001841 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001842 self.running = True
1843 if not self.server.starttls_server:
1844 if not self.wrap_conn():
1845 return
1846 while self.running:
1847 try:
1848 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001849 stripped = msg.strip()
1850 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001851 # eof, so quit this handler
1852 self.running = False
1853 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001854 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001855 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001856 sys.stdout.write(" server: client closed connection\n")
1857 self.close()
1858 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001859 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001860 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001861 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001862 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001863 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001864 if not self.wrap_conn():
1865 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001866 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001867 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001868 if support.verbose and self.server.connectionchatty:
1869 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001870 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001871 self.sock = self.sslconn.unwrap()
1872 self.sslconn = None
1873 if support.verbose and self.server.connectionchatty:
1874 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001875 elif stripped == b'CB tls-unique':
1876 if support.verbose and self.server.connectionchatty:
1877 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1878 data = self.sslconn.get_channel_binding("tls-unique")
1879 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001880 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001881 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001882 self.server.connectionchatty):
1883 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001884 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1885 % (msg, ctype, msg.lower(), ctype))
1886 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001887 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001888 if self.server.chatty:
1889 handle_error("Test server failure:\n")
1890 self.close()
1891 self.running = False
1892 # normally, we'd just stop here, but for the test
1893 # harness, we want to stop the server
1894 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001895
Antoine Pitroub5218772010-05-21 09:56:06 +00001896 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001897 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001898 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersoncca27322015-01-23 16:35:37 -05001899 npn_protocols=None, alpn_protocols=None,
1900 ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001901 if context:
1902 self.context = context
1903 else:
1904 self.context = ssl.SSLContext(ssl_version
1905 if ssl_version is not None
1906 else ssl.PROTOCOL_TLSv1)
1907 self.context.verify_mode = (certreqs if certreqs is not None
1908 else ssl.CERT_NONE)
1909 if cacerts:
1910 self.context.load_verify_locations(cacerts)
1911 if certificate:
1912 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001913 if npn_protocols:
1914 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001915 if alpn_protocols:
1916 self.context.set_alpn_protocols(alpn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001917 if ciphers:
1918 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001919 self.chatty = chatty
1920 self.connectionchatty = connectionchatty
1921 self.starttls_server = starttls_server
1922 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001923 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001924 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001925 self.active = False
Benjamin Petersoncca27322015-01-23 16:35:37 -05001926 self.selected_npn_protocols = []
1927 self.selected_alpn_protocols = []
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001928 self.shared_ciphers = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001929 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001930 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001931 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001932
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001933 def __enter__(self):
1934 self.start(threading.Event())
1935 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001936 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001937
1938 def __exit__(self, *args):
1939 self.stop()
1940 self.join()
1941
Antoine Pitrou480a1242010-04-28 21:37:09 +00001942 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001943 self.flag = flag
1944 threading.Thread.start(self)
1945
Antoine Pitrou480a1242010-04-28 21:37:09 +00001946 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001947 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01001948 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001949 self.active = True
1950 if self.flag:
1951 # signal an event
1952 self.flag.set()
1953 while self.active:
1954 try:
1955 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001956 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001957 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001958 + repr(connaddr) + '\n')
1959 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001960 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001961 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001962 except socket.timeout:
1963 pass
1964 except KeyboardInterrupt:
1965 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001966 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001967
Antoine Pitrou480a1242010-04-28 21:37:09 +00001968 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001969 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001970
Bill Janssen54cc54c2007-12-14 22:08:56 +00001971 class AsyncoreEchoServer(threading.Thread):
1972
1973 # this one's based on asyncore.dispatcher
1974
1975 class EchoServer (asyncore.dispatcher):
1976
1977 class ConnectionHandler (asyncore.dispatcher_with_send):
1978
1979 def __init__(self, conn, certfile):
1980 self.socket = ssl.wrap_socket(conn, server_side=True,
1981 certfile=certfile,
1982 do_handshake_on_connect=False)
1983 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001984 self._ssl_accepting = True
1985 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001986
1987 def readable(self):
1988 if isinstance(self.socket, ssl.SSLSocket):
1989 while self.socket.pending() > 0:
1990 self.handle_read_event()
1991 return True
1992
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001993 def _do_ssl_handshake(self):
1994 try:
1995 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02001996 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1997 return
1998 except ssl.SSLEOFError:
1999 return self.handle_close()
2000 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002001 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002002 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002003 if err.args[0] == errno.ECONNABORTED:
2004 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002005 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002006 self._ssl_accepting = False
2007
2008 def handle_read(self):
2009 if self._ssl_accepting:
2010 self._do_ssl_handshake()
2011 else:
2012 data = self.recv(1024)
2013 if support.verbose:
2014 sys.stdout.write(" server: read %s from client\n" % repr(data))
2015 if not data:
2016 self.close()
2017 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002018 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002019
2020 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00002021 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002022 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002023 sys.stdout.write(" server: closed connection %s\n" % self.socket)
2024
2025 def handle_error(self):
2026 raise
2027
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002028 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002029 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002030 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2031 self.port = support.bind_port(sock, '')
2032 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002033 self.listen(5)
2034
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00002035 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002036 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002037 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2038 self.ConnectionHandler(sock_obj, self.certfile)
2039
2040 def handle_error(self):
2041 raise
2042
Trent Nelson78520002008-04-10 20:54:35 +00002043 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002044 self.flag = None
2045 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002046 self.server = self.EchoServer(certfile)
2047 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00002048 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002049 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002050
2051 def __str__(self):
2052 return "<%s %s>" % (self.__class__.__name__, self.server)
2053
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002054 def __enter__(self):
2055 self.start(threading.Event())
2056 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002057 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002058
2059 def __exit__(self, *args):
2060 if support.verbose:
2061 sys.stdout.write(" cleanup: stopping server.\n")
2062 self.stop()
2063 if support.verbose:
2064 sys.stdout.write(" cleanup: joining server thread.\n")
2065 self.join()
2066 if support.verbose:
2067 sys.stdout.write(" cleanup: successfully joined.\n")
2068
Bill Janssen54cc54c2007-12-14 22:08:56 +00002069 def start (self, flag=None):
2070 self.flag = flag
2071 threading.Thread.start(self)
2072
Antoine Pitrou480a1242010-04-28 21:37:09 +00002073 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002074 self.active = True
2075 if self.flag:
2076 self.flag.set()
2077 while self.active:
2078 try:
2079 asyncore.loop(1)
2080 except:
2081 pass
2082
Antoine Pitrou480a1242010-04-28 21:37:09 +00002083 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002084 self.active = False
2085 self.server.close()
2086
Antoine Pitrou480a1242010-04-28 21:37:09 +00002087 def bad_cert_test(certfile):
2088 """
2089 Launch a server with CERT_REQUIRED, and check that trying to
2090 connect to it with the given client certificate fails.
2091 """
Trent Nelson78520002008-04-10 20:54:35 +00002092 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002093 certreqs=ssl.CERT_REQUIRED,
Bill Janssen6e027db2007-11-15 22:23:56 +00002094 cacerts=CERTFILE, chatty=False,
2095 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002096 with server:
Thomas Woutersed03b412007-08-28 21:37:11 +00002097 try:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002098 with socket.socket() as sock:
2099 s = ssl.wrap_socket(sock,
2100 certfile=certfile,
2101 ssl_version=ssl.PROTOCOL_TLSv1)
2102 s.connect((HOST, server.port))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002103 except ssl.SSLError as x:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002104 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002105 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
Andrew Svetlov0832af62012-12-18 23:10:48 +02002106 except OSError as x:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002107 if support.verbose:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002108 sys.stdout.write("\nOSError is %s\n" % x.args[1])
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002109 except OSError as x:
Giampaolo Rodolàcd9dfb92010-08-29 20:56:56 +00002110 if x.errno != errno.ENOENT:
2111 raise
Giampaolo Rodolà745ab382010-08-29 19:25:49 +00002112 if support.verbose:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002113 sys.stdout.write("\OSError is %s\n" % str(x))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002114 else:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002115 raise AssertionError("Use of invalid cert should have failed!")
Thomas Woutersed03b412007-08-28 21:37:11 +00002116
Antoine Pitroub5218772010-05-21 09:56:06 +00002117 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002118 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00002119 """
2120 Launch a server, connect a client to it and try various reads
2121 and writes.
2122 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002123 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00002124 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002125 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00002126 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002127 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002128 with client_context.wrap_socket(socket.socket(),
2129 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002130 s.connect((HOST, server.port))
2131 for arg in [indata, bytearray(indata), memoryview(indata)]:
2132 if connectionchatty:
2133 if support.verbose:
2134 sys.stdout.write(
2135 " client: sending %r...\n" % indata)
2136 s.write(arg)
2137 outdata = s.read()
2138 if connectionchatty:
2139 if support.verbose:
2140 sys.stdout.write(" client: read %r\n" % outdata)
2141 if outdata != indata.lower():
2142 raise AssertionError(
2143 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2144 % (outdata[:20], len(outdata),
2145 indata[:20].lower(), len(indata)))
2146 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00002147 if connectionchatty:
2148 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002149 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002150 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01002151 'compression': s.compression(),
2152 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002153 'peercert': s.getpeercert(),
Benjamin Petersoncca27322015-01-23 16:35:37 -05002154 'client_alpn_protocol': s.selected_alpn_protocol(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02002155 'client_npn_protocol': s.selected_npn_protocol(),
2156 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002157 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01002158 s.close()
Benjamin Petersoncca27322015-01-23 16:35:37 -05002159 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2160 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Peterson4cb17812015-01-07 11:14:26 -06002161 stats['server_shared_ciphers'] = server.shared_ciphers
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002162 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00002163
Antoine Pitroub5218772010-05-21 09:56:06 +00002164 def try_protocol_combo(server_protocol, client_protocol, expect_success,
2165 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02002166 """
2167 Try to SSL-connect using *client_protocol* to *server_protocol*.
2168 If *expect_success* is true, assert that the connection succeeds,
2169 if it's false, assert that the connection fails.
2170 Also, if *expect_success* is a string, assert that it is the protocol
2171 version actually used by the connection.
2172 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00002173 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002174 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00002175 certtype = {
2176 ssl.CERT_NONE: "CERT_NONE",
2177 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2178 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2179 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002180 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002181 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002182 sys.stdout.write(formatstr %
2183 (ssl.get_protocol_name(client_protocol),
2184 ssl.get_protocol_name(server_protocol),
2185 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00002186 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002187 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00002188 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002189 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002190
2191 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2192 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2193 # starting from OpenSSL 1.0.0 (see issue #8322).
2194 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2195 client_context.set_ciphers("ALL")
2196
Antoine Pitroub5218772010-05-21 09:56:06 +00002197 for ctx in (client_context, server_context):
2198 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00002199 ctx.load_cert_chain(CERTFILE)
2200 ctx.load_verify_locations(CERTFILE)
2201 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02002202 stats = server_params_test(client_context, server_context,
2203 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002204 # Protocol mismatch can result in either an SSLError, or a
2205 # "Connection reset by peer" error.
2206 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002207 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002208 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002209 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002210 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002211 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002212 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002213 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002214 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002215 "Client protocol %s succeeded with server protocol %s!"
2216 % (ssl.get_protocol_name(client_protocol),
2217 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002218 elif (expect_success is not True
2219 and expect_success != stats['version']):
2220 raise AssertionError("version mismatch: expected %r, got %r"
2221 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002222
2223
Bill Janssen6e027db2007-11-15 22:23:56 +00002224 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002225
Antoine Pitrou23df4832010-08-04 17:14:06 +00002226 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002227 def test_echo(self):
2228 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002229 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002230 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002231 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002232 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2233 context = ssl.SSLContext(protocol)
2234 context.load_cert_chain(CERTFILE)
2235 server_params_test(context, context,
2236 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002237
Antoine Pitrou480a1242010-04-28 21:37:09 +00002238 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002239 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002240 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002241 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2242 context.verify_mode = ssl.CERT_REQUIRED
2243 context.load_verify_locations(CERTFILE)
2244 context.load_cert_chain(CERTFILE)
2245 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002246 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002247 s = context.wrap_socket(socket.socket(),
2248 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002249 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002250 # getpeercert() raise ValueError while the handshake isn't
2251 # done.
2252 with self.assertRaises(ValueError):
2253 s.getpeercert()
2254 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002255 cert = s.getpeercert()
2256 self.assertTrue(cert, "Can't get peer certificate.")
2257 cipher = s.cipher()
2258 if support.verbose:
2259 sys.stdout.write(pprint.pformat(cert) + '\n')
2260 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2261 if 'subject' not in cert:
2262 self.fail("No subject field in certificate: %s." %
2263 pprint.pformat(cert))
2264 if ((('organizationName', 'Python Software Foundation'),)
2265 not in cert['subject']):
2266 self.fail(
2267 "Missing or invalid 'organizationName' field in certificate subject; "
2268 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002269 self.assertIn('notBefore', cert)
2270 self.assertIn('notAfter', cert)
2271 before = ssl.cert_time_to_seconds(cert['notBefore'])
2272 after = ssl.cert_time_to_seconds(cert['notAfter'])
2273 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002274 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002275
Christian Heimes2427b502013-11-23 11:24:32 +01002276 @unittest.skipUnless(have_verify_flags(),
2277 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002278 def test_crl_check(self):
2279 if support.verbose:
2280 sys.stdout.write("\n")
2281
2282 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2283 server_context.load_cert_chain(SIGNED_CERTFILE)
2284
2285 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2286 context.verify_mode = ssl.CERT_REQUIRED
2287 context.load_verify_locations(SIGNING_CA)
Benjamin Petersonc3d9c5c2015-03-04 23:18:48 -05002288 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2289 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01002290
2291 # VERIFY_DEFAULT should pass
2292 server = ThreadedEchoServer(context=server_context, chatty=True)
2293 with server:
2294 with context.wrap_socket(socket.socket()) as s:
2295 s.connect((HOST, server.port))
2296 cert = s.getpeercert()
2297 self.assertTrue(cert, "Can't get peer certificate.")
2298
2299 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002300 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002301
2302 server = ThreadedEchoServer(context=server_context, chatty=True)
2303 with server:
2304 with context.wrap_socket(socket.socket()) as s:
2305 with self.assertRaisesRegex(ssl.SSLError,
2306 "certificate verify failed"):
2307 s.connect((HOST, server.port))
2308
2309 # now load a CRL file. The CRL file is signed by the CA.
2310 context.load_verify_locations(CRLFILE)
2311
2312 server = ThreadedEchoServer(context=server_context, chatty=True)
2313 with server:
2314 with context.wrap_socket(socket.socket()) as s:
2315 s.connect((HOST, server.port))
2316 cert = s.getpeercert()
2317 self.assertTrue(cert, "Can't get peer certificate.")
2318
Christian Heimes1aa9a752013-12-02 02:41:19 +01002319 def test_check_hostname(self):
2320 if support.verbose:
2321 sys.stdout.write("\n")
2322
2323 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2324 server_context.load_cert_chain(SIGNED_CERTFILE)
2325
2326 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2327 context.verify_mode = ssl.CERT_REQUIRED
2328 context.check_hostname = True
2329 context.load_verify_locations(SIGNING_CA)
2330
2331 # correct hostname should verify
2332 server = ThreadedEchoServer(context=server_context, chatty=True)
2333 with server:
2334 with context.wrap_socket(socket.socket(),
2335 server_hostname="localhost") as s:
2336 s.connect((HOST, server.port))
2337 cert = s.getpeercert()
2338 self.assertTrue(cert, "Can't get peer certificate.")
2339
2340 # incorrect hostname should raise an exception
2341 server = ThreadedEchoServer(context=server_context, chatty=True)
2342 with server:
2343 with context.wrap_socket(socket.socket(),
2344 server_hostname="invalid") as s:
2345 with self.assertRaisesRegex(ssl.CertificateError,
2346 "hostname 'invalid' doesn't match 'localhost'"):
2347 s.connect((HOST, server.port))
2348
2349 # missing server_hostname arg should cause an exception, too
2350 server = ThreadedEchoServer(context=server_context, chatty=True)
2351 with server:
2352 with socket.socket() as s:
2353 with self.assertRaisesRegex(ValueError,
2354 "check_hostname requires server_hostname"):
2355 context.wrap_socket(s)
2356
Antoine Pitrou480a1242010-04-28 21:37:09 +00002357 def test_empty_cert(self):
2358 """Connecting with an empty cert file"""
2359 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2360 "nullcert.pem"))
2361 def test_malformed_cert(self):
2362 """Connecting with a badly formatted certificate (syntax error)"""
2363 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2364 "badcert.pem"))
2365 def test_nonexisting_cert(self):
2366 """Connecting with a non-existing cert file"""
2367 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2368 "wrongcert.pem"))
2369 def test_malformed_key(self):
2370 """Connecting with a badly formatted key (syntax error)"""
2371 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2372 "badkey.pem"))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002373
Antoine Pitrou480a1242010-04-28 21:37:09 +00002374 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002375 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002376 in the client when attempting handshake.
2377 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002378 listener_ready = threading.Event()
2379 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002380
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002381 s = socket.socket()
2382 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002383
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002384 # `listener` runs in a thread. It sits in an accept() until
2385 # the main thread connects. Then it rudely closes the socket,
2386 # and sets Event `listener_gone` to let the main thread know
2387 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002388 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002389 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002390 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002391 newsock, addr = s.accept()
2392 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002393 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002394 listener_gone.set()
2395
2396 def connector():
2397 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002398 with socket.socket() as c:
2399 c.connect((HOST, port))
2400 listener_gone.wait()
2401 try:
2402 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002403 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002404 pass
2405 else:
2406 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002407
2408 t = threading.Thread(target=listener)
2409 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002410 try:
2411 connector()
2412 finally:
2413 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002414
Antoine Pitrou23df4832010-08-04 17:14:06 +00002415 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002416 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2417 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002418 def test_protocol_sslv2(self):
2419 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002420 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002421 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002422 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2423 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2424 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002425 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Victor Stinner648b8622014-12-12 12:23:59 +01002426 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2427 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002428 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002429 # SSLv23 client with specific SSL options
2430 if no_sslv2_implies_sslv3_hello():
2431 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2432 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2433 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002434 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002435 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002436 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002437 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002438
Antoine Pitrou23df4832010-08-04 17:14:06 +00002439 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002440 def test_protocol_sslv23(self):
2441 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002442 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002443 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002444 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2445 try:
2446 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002447 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002448 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2449 if support.verbose:
2450 sys.stdout.write(
2451 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2452 % str(x))
Benjamin Petersone32467c2014-12-05 21:59:35 -05002453 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson22293df2014-12-05 22:11:33 -05002454 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou480a1242010-04-28 21:37:09 +00002455 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002456 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002457
Benjamin Petersone32467c2014-12-05 21:59:35 -05002458 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson22293df2014-12-05 22:11:33 -05002459 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002460 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002461 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002462
Benjamin Petersone32467c2014-12-05 21:59:35 -05002463 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson22293df2014-12-05 22:11:33 -05002464 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002465 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002466 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002467
Antoine Pitroub5218772010-05-21 09:56:06 +00002468 # Server with specific SSL options
Benjamin Petersone32467c2014-12-05 21:59:35 -05002469 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2470 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002471 server_options=ssl.OP_NO_SSLv3)
2472 # Will choose TLSv1
2473 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2474 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2475 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2476 server_options=ssl.OP_NO_TLSv1)
2477
2478
Antoine Pitrou23df4832010-08-04 17:14:06 +00002479 @skip_if_broken_ubuntu_ssl
Benjamin Petersone32467c2014-12-05 21:59:35 -05002480 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2481 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002482 def test_protocol_sslv3(self):
2483 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002484 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002485 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002486 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2487 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2488 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002489 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2490 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002491 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2492 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002493 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002494 if no_sslv2_implies_sslv3_hello():
2495 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Antoine Pitrou47e40422014-09-04 21:00:10 +02002496 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Antoine Pitroub5218772010-05-21 09:56:06 +00002497 client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002498
Antoine Pitrou23df4832010-08-04 17:14:06 +00002499 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002500 def test_protocol_tlsv1(self):
2501 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002502 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002503 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002504 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2505 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2506 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002507 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2508 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002509 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2510 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002511 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2512 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002513
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002514 @skip_if_broken_ubuntu_ssl
2515 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2516 "TLS version 1.1 not supported.")
2517 def test_protocol_tlsv1_1(self):
2518 """Connecting to a TLSv1.1 server with various client options.
2519 Testing against older TLS versions."""
2520 if support.verbose:
2521 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002522 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002523 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2524 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002525 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2526 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002527 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2528 client_options=ssl.OP_NO_TLSv1_1)
2529
Antoine Pitrou47e40422014-09-04 21:00:10 +02002530 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002531 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2532 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2533
2534
2535 @skip_if_broken_ubuntu_ssl
2536 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2537 "TLS version 1.2 not supported.")
2538 def test_protocol_tlsv1_2(self):
2539 """Connecting to a TLSv1.2 server with various client options.
2540 Testing against older TLS versions."""
2541 if support.verbose:
2542 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002543 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002544 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2545 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2546 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2547 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002548 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2549 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002550 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2551 client_options=ssl.OP_NO_TLSv1_2)
2552
Antoine Pitrou47e40422014-09-04 21:00:10 +02002553 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002554 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2555 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2556 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2557 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2558
Antoine Pitrou480a1242010-04-28 21:37:09 +00002559 def test_starttls(self):
2560 """Switching from clear text to encrypted and back again."""
2561 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002562
Trent Nelson78520002008-04-10 20:54:35 +00002563 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002564 ssl_version=ssl.PROTOCOL_TLSv1,
2565 starttls_server=True,
2566 chatty=True,
2567 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002568 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002569 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002570 s = socket.socket()
2571 s.setblocking(1)
2572 s.connect((HOST, server.port))
2573 if support.verbose:
2574 sys.stdout.write("\n")
2575 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002576 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002577 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002578 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002579 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002580 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002581 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002582 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002583 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002584 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002585 msg = outdata.strip().lower()
2586 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2587 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002588 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002589 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002590 " client: read %r from server, starting TLS...\n"
2591 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002592 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2593 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002594 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2595 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002596 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002597 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002598 " client: read %r from server, ending TLS...\n"
2599 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002600 s = conn.unwrap()
2601 wrapped = False
2602 else:
2603 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002604 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002605 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002606 if support.verbose:
2607 sys.stdout.write(" client: closing connection.\n")
2608 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002609 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002610 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002611 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002612 if wrapped:
2613 conn.close()
2614 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002615 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002616
Antoine Pitrou480a1242010-04-28 21:37:09 +00002617 def test_socketserver(self):
2618 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002619 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002620 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002621 if support.verbose:
2622 sys.stdout.write('\n')
2623 with open(CERTFILE, 'rb') as f:
2624 d1 = f.read()
2625 d2 = ''
2626 # now fetch the same data from the HTTPS server
Benjamin Peterson4ffb0752014-11-03 14:29:33 -05002627 url = 'https://localhost:%d/%s' % (
2628 server.port, os.path.split(CERTFILE)[1])
2629 context = ssl.create_default_context(cafile=CERTFILE)
2630 f = urllib.request.urlopen(url, context=context)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002631 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002632 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002633 if dlen and (int(dlen) > 0):
2634 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002635 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002636 sys.stdout.write(
2637 " client: read %d bytes from remote server '%s'\n"
2638 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002639 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002640 f.close()
2641 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002642
Antoine Pitrou480a1242010-04-28 21:37:09 +00002643 def test_asyncore_server(self):
2644 """Check the example asyncore integration."""
2645 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002646
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002647 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002648 sys.stdout.write("\n")
2649
Antoine Pitrou480a1242010-04-28 21:37:09 +00002650 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002651 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002652 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002653 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002654 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002655 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002656 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002657 " client: sending %r...\n" % indata)
2658 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002659 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002660 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002661 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002662 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002663 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002664 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2665 % (outdata[:20], len(outdata),
2666 indata[:20].lower(), len(indata)))
2667 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002668 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002669 sys.stdout.write(" client: closing connection.\n")
2670 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002671 if support.verbose:
2672 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002673
Antoine Pitrou480a1242010-04-28 21:37:09 +00002674 def test_recv_send(self):
2675 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002676 if support.verbose:
2677 sys.stdout.write("\n")
2678
2679 server = ThreadedEchoServer(CERTFILE,
2680 certreqs=ssl.CERT_NONE,
2681 ssl_version=ssl.PROTOCOL_TLSv1,
2682 cacerts=CERTFILE,
2683 chatty=True,
2684 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002685 with server:
2686 s = ssl.wrap_socket(socket.socket(),
2687 server_side=False,
2688 certfile=CERTFILE,
2689 ca_certs=CERTFILE,
2690 cert_reqs=ssl.CERT_NONE,
2691 ssl_version=ssl.PROTOCOL_TLSv1)
2692 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002693 # helper methods for standardising recv* method signatures
2694 def _recv_into():
2695 b = bytearray(b"\0"*100)
2696 count = s.recv_into(b)
2697 return b[:count]
2698
2699 def _recvfrom_into():
2700 b = bytearray(b"\0"*100)
2701 count, addr = s.recvfrom_into(b)
2702 return b[:count]
2703
2704 # (name, method, whether to expect success, *args)
2705 send_methods = [
2706 ('send', s.send, True, []),
2707 ('sendto', s.sendto, False, ["some.address"]),
2708 ('sendall', s.sendall, True, []),
2709 ]
2710 recv_methods = [
2711 ('recv', s.recv, True, []),
2712 ('recvfrom', s.recvfrom, False, ["some.address"]),
2713 ('recv_into', _recv_into, True, []),
2714 ('recvfrom_into', _recvfrom_into, False, []),
2715 ]
2716 data_prefix = "PREFIX_"
2717
2718 for meth_name, send_meth, expect_success, args in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002719 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002720 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002721 send_meth(indata, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002722 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002723 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002724 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002725 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002726 "<<{outdata:r}>> ({nout:d}) received; "
2727 "expected <<{indata:r}>> ({nin:d})\n".format(
2728 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002729 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002730 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002731 )
2732 )
2733 except ValueError as e:
2734 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002735 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002736 "Failed to send with method <<{name:s}>>; "
2737 "expected to succeed.\n".format(name=meth_name)
2738 )
2739 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002740 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002741 "Method <<{name:s}>> failed with unexpected "
2742 "exception message: {exp:s}\n".format(
2743 name=meth_name, exp=e
2744 )
2745 )
2746
2747 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002748 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002749 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002750 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002751 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002752 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002753 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002754 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002755 "<<{outdata:r}>> ({nout:d}) received; "
2756 "expected <<{indata:r}>> ({nin:d})\n".format(
2757 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002758 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002759 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002760 )
2761 )
2762 except ValueError as e:
2763 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002764 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002765 "Failed to receive with method <<{name:s}>>; "
2766 "expected to succeed.\n".format(name=meth_name)
2767 )
2768 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002769 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002770 "Method <<{name:s}>> failed with unexpected "
2771 "exception message: {exp:s}\n".format(
2772 name=meth_name, exp=e
2773 )
2774 )
2775 # consume data
2776 s.read()
2777
Nick Coghlan513886a2011-08-28 00:00:27 +10002778 # Make sure sendmsg et al are disallowed to avoid
2779 # inadvertent disclosure of data and/or corruption
2780 # of the encrypted data stream
2781 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2782 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2783 self.assertRaises(NotImplementedError,
2784 s.recvmsg_into, bytearray(100))
2785
Antoine Pitrou480a1242010-04-28 21:37:09 +00002786 s.write(b"over\n")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002787 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002788
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002789 def test_nonblocking_send(self):
2790 server = ThreadedEchoServer(CERTFILE,
2791 certreqs=ssl.CERT_NONE,
2792 ssl_version=ssl.PROTOCOL_TLSv1,
2793 cacerts=CERTFILE,
2794 chatty=True,
2795 connectionchatty=False)
2796 with server:
2797 s = ssl.wrap_socket(socket.socket(),
2798 server_side=False,
2799 certfile=CERTFILE,
2800 ca_certs=CERTFILE,
2801 cert_reqs=ssl.CERT_NONE,
2802 ssl_version=ssl.PROTOCOL_TLSv1)
2803 s.connect((HOST, server.port))
2804 s.setblocking(False)
2805
2806 # If we keep sending data, at some point the buffers
2807 # will be full and the call will block
2808 buf = bytearray(8192)
2809 def fill_buffer():
2810 while True:
2811 s.send(buf)
2812 self.assertRaises((ssl.SSLWantWriteError,
2813 ssl.SSLWantReadError), fill_buffer)
2814
2815 # Now read all the output and discard it
2816 s.setblocking(True)
2817 s.close()
2818
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002819 def test_handshake_timeout(self):
2820 # Issue #5103: SSL handshake must respect the socket timeout
2821 server = socket.socket(socket.AF_INET)
2822 host = "127.0.0.1"
2823 port = support.bind_port(server)
2824 started = threading.Event()
2825 finish = False
2826
2827 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002828 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002829 started.set()
2830 conns = []
2831 while not finish:
2832 r, w, e = select.select([server], [], [], 0.1)
2833 if server in r:
2834 # Let the socket hang around rather than having
2835 # it closed by garbage collection.
2836 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002837 for sock in conns:
2838 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002839
2840 t = threading.Thread(target=serve)
2841 t.start()
2842 started.wait()
2843
2844 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002845 try:
2846 c = socket.socket(socket.AF_INET)
2847 c.settimeout(0.2)
2848 c.connect((host, port))
2849 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002850 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002851 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002852 finally:
2853 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002854 try:
2855 c = socket.socket(socket.AF_INET)
2856 c = ssl.wrap_socket(c)
2857 c.settimeout(0.2)
2858 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002859 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002860 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002861 finally:
2862 c.close()
2863 finally:
2864 finish = True
2865 t.join()
2866 server.close()
2867
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002868 def test_server_accept(self):
2869 # Issue #16357: accept() on a SSLSocket created through
2870 # SSLContext.wrap_socket().
2871 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2872 context.verify_mode = ssl.CERT_REQUIRED
2873 context.load_verify_locations(CERTFILE)
2874 context.load_cert_chain(CERTFILE)
2875 server = socket.socket(socket.AF_INET)
2876 host = "127.0.0.1"
2877 port = support.bind_port(server)
2878 server = context.wrap_socket(server, server_side=True)
2879
2880 evt = threading.Event()
2881 remote = None
2882 peer = None
2883 def serve():
2884 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002885 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002886 # Block on the accept and wait on the connection to close.
2887 evt.set()
2888 remote, peer = server.accept()
2889 remote.recv(1)
2890
2891 t = threading.Thread(target=serve)
2892 t.start()
2893 # Client wait until server setup and perform a connect.
2894 evt.wait()
2895 client = context.wrap_socket(socket.socket())
2896 client.connect((host, port))
2897 client_addr = client.getsockname()
2898 client.close()
2899 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002900 remote.close()
2901 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002902 # Sanity checks.
2903 self.assertIsInstance(remote, ssl.SSLSocket)
2904 self.assertEqual(peer, client_addr)
2905
Antoine Pitrou242db722013-05-01 20:52:07 +02002906 def test_getpeercert_enotconn(self):
2907 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2908 with context.wrap_socket(socket.socket()) as sock:
2909 with self.assertRaises(OSError) as cm:
2910 sock.getpeercert()
2911 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2912
2913 def test_do_handshake_enotconn(self):
2914 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2915 with context.wrap_socket(socket.socket()) as sock:
2916 with self.assertRaises(OSError) as cm:
2917 sock.do_handshake()
2918 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2919
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002920 def test_default_ciphers(self):
2921 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2922 try:
2923 # Force a set of weak ciphers on our client context
2924 context.set_ciphers("DES")
2925 except ssl.SSLError:
2926 self.skipTest("no DES cipher available")
2927 with ThreadedEchoServer(CERTFILE,
2928 ssl_version=ssl.PROTOCOL_SSLv23,
2929 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002930 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002931 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002932 s.connect((HOST, server.port))
2933 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2934
Antoine Pitrou47e40422014-09-04 21:00:10 +02002935 def test_version_basic(self):
2936 """
2937 Basic tests for SSLSocket.version().
2938 More tests are done in the test_protocol_*() methods.
2939 """
2940 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2941 with ThreadedEchoServer(CERTFILE,
2942 ssl_version=ssl.PROTOCOL_TLSv1,
2943 chatty=False) as server:
2944 with context.wrap_socket(socket.socket()) as s:
2945 self.assertIs(s.version(), None)
2946 s.connect((HOST, server.port))
2947 self.assertEqual(s.version(), "TLSv1")
2948 self.assertIs(s.version(), None)
2949
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002950 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2951 def test_default_ecdh_curve(self):
2952 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2953 # should be enabled by default on SSL contexts.
2954 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2955 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02002956 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2957 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2958 # our default cipher list should prefer ECDH-based ciphers
2959 # automatically.
2960 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2961 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002962 with ThreadedEchoServer(context=context) as server:
2963 with context.wrap_socket(socket.socket()) as s:
2964 s.connect((HOST, server.port))
2965 self.assertIn("ECDH", s.cipher()[0])
2966
Antoine Pitroud6494802011-07-21 01:11:30 +02002967 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2968 "'tls-unique' channel binding not available")
2969 def test_tls_unique_channel_binding(self):
2970 """Test tls-unique channel binding."""
2971 if support.verbose:
2972 sys.stdout.write("\n")
2973
2974 server = ThreadedEchoServer(CERTFILE,
2975 certreqs=ssl.CERT_NONE,
2976 ssl_version=ssl.PROTOCOL_TLSv1,
2977 cacerts=CERTFILE,
2978 chatty=True,
2979 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01002980 with server:
2981 s = ssl.wrap_socket(socket.socket(),
2982 server_side=False,
2983 certfile=CERTFILE,
2984 ca_certs=CERTFILE,
2985 cert_reqs=ssl.CERT_NONE,
2986 ssl_version=ssl.PROTOCOL_TLSv1)
2987 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002988 # get the data
2989 cb_data = s.get_channel_binding("tls-unique")
2990 if support.verbose:
2991 sys.stdout.write(" got channel binding data: {0!r}\n"
2992 .format(cb_data))
2993
2994 # check if it is sane
2995 self.assertIsNotNone(cb_data)
2996 self.assertEqual(len(cb_data), 12) # True for TLSv1
2997
2998 # and compare with the peers version
2999 s.write(b"CB tls-unique\n")
3000 peer_data_repr = s.read().strip()
3001 self.assertEqual(peer_data_repr,
3002 repr(cb_data).encode("us-ascii"))
3003 s.close()
3004
3005 # now, again
3006 s = ssl.wrap_socket(socket.socket(),
3007 server_side=False,
3008 certfile=CERTFILE,
3009 ca_certs=CERTFILE,
3010 cert_reqs=ssl.CERT_NONE,
3011 ssl_version=ssl.PROTOCOL_TLSv1)
3012 s.connect((HOST, server.port))
3013 new_cb_data = s.get_channel_binding("tls-unique")
3014 if support.verbose:
3015 sys.stdout.write(" got another channel binding data: {0!r}\n"
3016 .format(new_cb_data))
3017 # is it really unique
3018 self.assertNotEqual(cb_data, new_cb_data)
3019 self.assertIsNotNone(cb_data)
3020 self.assertEqual(len(cb_data), 12) # True for TLSv1
3021 s.write(b"CB tls-unique\n")
3022 peer_data_repr = s.read().strip()
3023 self.assertEqual(peer_data_repr,
3024 repr(new_cb_data).encode("us-ascii"))
3025 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00003026
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003027 def test_compression(self):
3028 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3029 context.load_cert_chain(CERTFILE)
3030 stats = server_params_test(context, context,
3031 chatty=True, connectionchatty=True)
3032 if support.verbose:
3033 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3034 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3035
3036 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3037 "ssl.OP_NO_COMPRESSION needed for this test")
3038 def test_compression_disabled(self):
3039 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3040 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01003041 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003042 stats = server_params_test(context, context,
3043 chatty=True, connectionchatty=True)
3044 self.assertIs(stats['compression'], None)
3045
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003046 def test_dh_params(self):
3047 # Check we can get a connection with ephemeral Diffie-Hellman
3048 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3049 context.load_cert_chain(CERTFILE)
3050 context.load_dh_params(DHFILE)
3051 context.set_ciphers("kEDH")
3052 stats = server_params_test(context, context,
3053 chatty=True, connectionchatty=True)
3054 cipher = stats["cipher"][0]
3055 parts = cipher.split("-")
3056 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3057 self.fail("Non-DH cipher: " + cipher[0])
3058
Benjamin Petersoncca27322015-01-23 16:35:37 -05003059 def test_selected_alpn_protocol(self):
3060 # selected_alpn_protocol() is None unless ALPN is used.
3061 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3062 context.load_cert_chain(CERTFILE)
3063 stats = server_params_test(context, context,
3064 chatty=True, connectionchatty=True)
3065 self.assertIs(stats['client_alpn_protocol'], None)
3066
3067 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3068 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3069 # selected_alpn_protocol() is None unless ALPN is used by the client.
3070 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3071 client_context.load_verify_locations(CERTFILE)
3072 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3073 server_context.load_cert_chain(CERTFILE)
3074 server_context.set_alpn_protocols(['foo', 'bar'])
3075 stats = server_params_test(client_context, server_context,
3076 chatty=True, connectionchatty=True)
3077 self.assertIs(stats['client_alpn_protocol'], None)
3078
3079 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3080 def test_alpn_protocols(self):
3081 server_protocols = ['foo', 'bar', 'milkshake']
3082 protocol_tests = [
3083 (['foo', 'bar'], 'foo'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003084 (['bar', 'foo'], 'foo'),
Benjamin Petersoncca27322015-01-23 16:35:37 -05003085 (['milkshake'], 'milkshake'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003086 (['http/3.0', 'http/4.0'], None)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003087 ]
3088 for client_protocols, expected in protocol_tests:
3089 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3090 server_context.load_cert_chain(CERTFILE)
3091 server_context.set_alpn_protocols(server_protocols)
3092 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3093 client_context.load_cert_chain(CERTFILE)
3094 client_context.set_alpn_protocols(client_protocols)
3095 stats = server_params_test(client_context, server_context,
3096 chatty=True, connectionchatty=True)
3097
3098 msg = "failed trying %s (s) and %s (c).\n" \
3099 "was expecting %s, but got %%s from the %%s" \
3100 % (str(server_protocols), str(client_protocols),
3101 str(expected))
3102 client_result = stats['client_alpn_protocol']
3103 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3104 server_result = stats['server_alpn_protocols'][-1] \
3105 if len(stats['server_alpn_protocols']) else 'nothing'
3106 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3107
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01003108 def test_selected_npn_protocol(self):
3109 # selected_npn_protocol() is None unless NPN is used
3110 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3111 context.load_cert_chain(CERTFILE)
3112 stats = server_params_test(context, context,
3113 chatty=True, connectionchatty=True)
3114 self.assertIs(stats['client_npn_protocol'], None)
3115
3116 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3117 def test_npn_protocols(self):
3118 server_protocols = ['http/1.1', 'spdy/2']
3119 protocol_tests = [
3120 (['http/1.1', 'spdy/2'], 'http/1.1'),
3121 (['spdy/2', 'http/1.1'], 'http/1.1'),
3122 (['spdy/2', 'test'], 'spdy/2'),
3123 (['abc', 'def'], 'abc')
3124 ]
3125 for client_protocols, expected in protocol_tests:
3126 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3127 server_context.load_cert_chain(CERTFILE)
3128 server_context.set_npn_protocols(server_protocols)
3129 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3130 client_context.load_cert_chain(CERTFILE)
3131 client_context.set_npn_protocols(client_protocols)
3132 stats = server_params_test(client_context, server_context,
3133 chatty=True, connectionchatty=True)
3134
3135 msg = "failed trying %s (s) and %s (c).\n" \
3136 "was expecting %s, but got %%s from the %%s" \
3137 % (str(server_protocols), str(client_protocols),
3138 str(expected))
3139 client_result = stats['client_npn_protocol']
3140 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3141 server_result = stats['server_npn_protocols'][-1] \
3142 if len(stats['server_npn_protocols']) else 'nothing'
3143 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3144
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003145 def sni_contexts(self):
3146 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3147 server_context.load_cert_chain(SIGNED_CERTFILE)
3148 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3149 other_context.load_cert_chain(SIGNED_CERTFILE2)
3150 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3151 client_context.verify_mode = ssl.CERT_REQUIRED
3152 client_context.load_verify_locations(SIGNING_CA)
3153 return server_context, other_context, client_context
3154
3155 def check_common_name(self, stats, name):
3156 cert = stats['peercert']
3157 self.assertIn((('commonName', name),), cert['subject'])
3158
3159 @needs_sni
3160 def test_sni_callback(self):
3161 calls = []
3162 server_context, other_context, client_context = self.sni_contexts()
3163
3164 def servername_cb(ssl_sock, server_name, initial_context):
3165 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003166 if server_name is not None:
3167 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003168 server_context.set_servername_callback(servername_cb)
3169
3170 stats = server_params_test(client_context, server_context,
3171 chatty=True,
3172 sni_name='supermessage')
3173 # The hostname was fetched properly, and the certificate was
3174 # changed for the connection.
3175 self.assertEqual(calls, [("supermessage", server_context)])
3176 # CERTFILE4 was selected
3177 self.check_common_name(stats, 'fakehostname')
3178
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003179 calls = []
3180 # The callback is called with server_name=None
3181 stats = server_params_test(client_context, server_context,
3182 chatty=True,
3183 sni_name=None)
3184 self.assertEqual(calls, [(None, server_context)])
3185 self.check_common_name(stats, 'localhost')
3186
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003187 # Check disabling the callback
3188 calls = []
3189 server_context.set_servername_callback(None)
3190
3191 stats = server_params_test(client_context, server_context,
3192 chatty=True,
3193 sni_name='notfunny')
3194 # Certificate didn't change
3195 self.check_common_name(stats, 'localhost')
3196 self.assertEqual(calls, [])
3197
3198 @needs_sni
3199 def test_sni_callback_alert(self):
3200 # Returning a TLS alert is reflected to the connecting client
3201 server_context, other_context, client_context = self.sni_contexts()
3202
3203 def cb_returning_alert(ssl_sock, server_name, initial_context):
3204 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3205 server_context.set_servername_callback(cb_returning_alert)
3206
3207 with self.assertRaises(ssl.SSLError) as cm:
3208 stats = server_params_test(client_context, server_context,
3209 chatty=False,
3210 sni_name='supermessage')
3211 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3212
3213 @needs_sni
3214 def test_sni_callback_raising(self):
3215 # Raising fails the connection with a TLS handshake failure alert.
3216 server_context, other_context, client_context = self.sni_contexts()
3217
3218 def cb_raising(ssl_sock, server_name, initial_context):
3219 1/0
3220 server_context.set_servername_callback(cb_raising)
3221
3222 with self.assertRaises(ssl.SSLError) as cm, \
3223 support.captured_stderr() as stderr:
3224 stats = server_params_test(client_context, server_context,
3225 chatty=False,
3226 sni_name='supermessage')
3227 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3228 self.assertIn("ZeroDivisionError", stderr.getvalue())
3229
3230 @needs_sni
3231 def test_sni_callback_wrong_return_type(self):
3232 # Returning the wrong return type terminates the TLS connection
3233 # with an internal error alert.
3234 server_context, other_context, client_context = self.sni_contexts()
3235
3236 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3237 return "foo"
3238 server_context.set_servername_callback(cb_wrong_return_type)
3239
3240 with self.assertRaises(ssl.SSLError) as cm, \
3241 support.captured_stderr() as stderr:
3242 stats = server_params_test(client_context, server_context,
3243 chatty=False,
3244 sni_name='supermessage')
3245 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3246 self.assertIn("TypeError", stderr.getvalue())
3247
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003248 def test_shared_ciphers(self):
Benjamin Petersonaacd5242015-01-07 11:42:38 -06003249 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson15042922015-01-07 22:12:43 -06003250 server_context.load_cert_chain(SIGNED_CERTFILE)
3251 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3252 client_context.verify_mode = ssl.CERT_REQUIRED
3253 client_context.load_verify_locations(SIGNING_CA)
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003254 client_context.set_ciphers("RC4")
Benjamin Petersone6838e02015-01-07 20:52:40 -06003255 server_context.set_ciphers("AES:RC4")
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003256 stats = server_params_test(client_context, server_context)
3257 ciphers = stats['server_shared_ciphers'][0]
3258 self.assertGreater(len(ciphers), 0)
3259 for name, tls_version, bits in ciphers:
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003260 self.assertIn("RC4", name.split("-"))
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003261
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003262 def test_read_write_after_close_raises_valuerror(self):
3263 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3264 context.verify_mode = ssl.CERT_REQUIRED
3265 context.load_verify_locations(CERTFILE)
3266 context.load_cert_chain(CERTFILE)
3267 server = ThreadedEchoServer(context=context, chatty=False)
3268
3269 with server:
3270 s = context.wrap_socket(socket.socket())
3271 s.connect((HOST, server.port))
3272 s.close()
3273
3274 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02003275 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003276
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003277 def test_sendfile(self):
3278 TEST_DATA = b"x" * 512
3279 with open(support.TESTFN, 'wb') as f:
3280 f.write(TEST_DATA)
3281 self.addCleanup(support.unlink, support.TESTFN)
3282 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3283 context.verify_mode = ssl.CERT_REQUIRED
3284 context.load_verify_locations(CERTFILE)
3285 context.load_cert_chain(CERTFILE)
3286 server = ThreadedEchoServer(context=context, chatty=False)
3287 with server:
3288 with context.wrap_socket(socket.socket()) as s:
3289 s.connect((HOST, server.port))
3290 with open(support.TESTFN, 'rb') as file:
3291 s.sendfile(file)
3292 self.assertEqual(s.recv(1024), TEST_DATA)
3293
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003294
Thomas Woutersed03b412007-08-28 21:37:11 +00003295def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003296 if support.verbose:
3297 plats = {
3298 'Linux': platform.linux_distribution,
3299 'Mac': platform.mac_ver,
3300 'Windows': platform.win32_ver,
3301 }
3302 for name, func in plats.items():
3303 plat = func()
3304 if plat and plat[0]:
3305 plat = '%s %r' % (name, plat)
3306 break
3307 else:
3308 plat = repr(platform.platform())
3309 print("test_ssl: testing with %r %r" %
3310 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3311 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003312 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003313 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3314 try:
3315 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3316 except AttributeError:
3317 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003318
Antoine Pitrou152efa22010-05-16 18:19:27 +00003319 for filename in [
3320 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
3321 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003322 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003323 BADCERT, BADKEY, EMPTYCERT]:
3324 if not os.path.exists(filename):
3325 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003326
Antoine Pitroub1fdf472014-10-05 20:41:53 +02003327 tests = [ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests]
Thomas Woutersed03b412007-08-28 21:37:11 +00003328
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003329 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003330 tests.append(NetworkedTests)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02003331 tests.append(NetworkedBIOTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003332
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003333 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003334 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003335 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003336 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003337
Antoine Pitrou480a1242010-04-28 21:37:09 +00003338 try:
3339 support.run_unittest(*tests)
3340 finally:
3341 if _have_threads:
3342 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003343
3344if __name__ == "__main__":
3345 test_main()