blob: 0cf9cc2b2eb314bf0a9199bb54234ca20b549726 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Thomas Woutersed03b412007-08-28 21:37:11 +000021
Antoine Pitrou05d936d2010-10-13 11:38:36 +000022ssl = support.import_module("ssl")
23
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010024PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000025HOST = support.HOST
Antoine Pitrou152efa22010-05-16 18:19:27 +000026
Christian Heimesefff7062013-11-21 03:35:02 +010027def data_file(*name):
28 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000029
Antoine Pitrou81564092010-10-08 23:06:24 +000030# The custom key and certificate files used in test_ssl are generated
31# using Lib/test/make_ssl_certs.py.
32# Other certificates are simply fetched from the Internet servers they
33# are meant to authenticate.
34
Antoine Pitrou152efa22010-05-16 18:19:27 +000035CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000036BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000037ONLYCERT = data_file("ssl_cert.pem")
38ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000039BYTES_ONLYCERT = os.fsencode(ONLYCERT)
40BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020041CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
42ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
43KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010046CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
47CAFILE_CACERT = data_file("capath", "5ed36f99.0")
48
Antoine Pitrou152efa22010-05-16 18:19:27 +000049
Christian Heimes22587792013-11-21 23:56:13 +010050# empty CRL
51CRLFILE = data_file("revocation.crl")
52
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010053# Two keys and certs signed by the same CA (for SNI tests)
54SIGNED_CERTFILE = data_file("keycert3.pem")
55SIGNED_CERTFILE2 = data_file("keycert4.pem")
56SIGNING_CA = data_file("pycacert.pem")
57
Antoine Pitrou152efa22010-05-16 18:19:27 +000058SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
59
60EMPTYCERT = data_file("nullcert.pem")
61BADCERT = data_file("badcert.pem")
62WRONGCERT = data_file("XXXnonexisting.pem")
63BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020064NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020065NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000066
Antoine Pitrou0e576f12011-12-22 10:03:38 +010067DHFILE = data_file("dh512.pem")
68BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000069
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010070
Thomas Woutersed03b412007-08-28 21:37:11 +000071def handle_error(prefix):
72 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000073 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000074 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000075
Antoine Pitroub5218772010-05-21 09:56:06 +000076def can_clear_options():
77 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020078 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000079
80def no_sslv2_implies_sslv3_hello():
81 # 0.9.7h or higher
82 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
83
Christian Heimes2427b502013-11-23 11:24:32 +010084def have_verify_flags():
85 # 0.9.8 or higher
86 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
87
Antoine Pitrouc695c952014-04-28 20:57:36 +020088def utc_offset(): #NOTE: ignore issues like #1647654
89 # local time = utc time + utc offset
90 if time.daylight and time.localtime().tm_isdst > 0:
91 return -time.altzone # seconds
92 return -time.timezone
93
Christian Heimes9424bb42013-06-17 15:32:57 +020094def asn1time(cert_time):
95 # Some versions of OpenSSL ignore seconds, see #18207
96 # 0.9.8.i
97 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
98 fmt = "%b %d %H:%M:%S %Y GMT"
99 dt = datetime.datetime.strptime(cert_time, fmt)
100 dt = dt.replace(second=0)
101 cert_time = dt.strftime(fmt)
102 # %d adds leading zero but ASN1_TIME_print() uses leading space
103 if cert_time[4] == "0":
104 cert_time = cert_time[:4] + " " + cert_time[5:]
105
106 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000107
Antoine Pitrou23df4832010-08-04 17:14:06 +0000108# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
109def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200110 if hasattr(ssl, 'PROTOCOL_SSLv2'):
111 @functools.wraps(func)
112 def f(*args, **kwargs):
113 try:
114 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
115 except ssl.SSLError:
116 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
117 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
118 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
119 return func(*args, **kwargs)
120 return f
121 else:
122 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000123
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100124needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
125
Antoine Pitrou23df4832010-08-04 17:14:06 +0000126
Antoine Pitrou152efa22010-05-16 18:19:27 +0000127class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000128
Antoine Pitrou480a1242010-04-28 21:37:09 +0000129 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000130 ssl.CERT_NONE
131 ssl.CERT_OPTIONAL
132 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100133 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100134 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100135 if ssl.HAS_ECDH:
136 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100137 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
138 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000139 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100140 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000141
Antoine Pitrou172f0252014-04-18 20:33:08 +0200142 def test_str_for_enums(self):
143 # Make sure that the PROTOCOL_* constants have enum-like string
144 # reprs.
Victor Stinner648b8622014-12-12 12:23:59 +0100145 proto = ssl.PROTOCOL_SSLv23
146 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv23')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200147 ctx = ssl.SSLContext(proto)
148 self.assertIs(ctx.protocol, proto)
149
Antoine Pitrou480a1242010-04-28 21:37:09 +0000150 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000151 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000152 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000153 sys.stdout.write("\n RAND_status is %d (%s)\n"
154 % (v, (v and "sufficient randomness") or
155 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200156
157 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
158 self.assertEqual(len(data), 16)
159 self.assertEqual(is_cryptographic, v == 1)
160 if v:
161 data = ssl.RAND_bytes(16)
162 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200163 else:
164 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200165
Victor Stinner1e81a392013-12-19 16:47:04 +0100166 # negative num is invalid
167 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
168 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
169
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100170 if hasattr(ssl, 'RAND_egd'):
171 self.assertRaises(TypeError, ssl.RAND_egd, 1)
172 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000173 ssl.RAND_add("this is a random string", 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000174
Christian Heimesf77b4b22013-08-21 13:26:05 +0200175 @unittest.skipUnless(os.name == 'posix', 'requires posix')
176 def test_random_fork(self):
177 status = ssl.RAND_status()
178 if not status:
179 self.fail("OpenSSL's PRNG has insufficient randomness")
180
181 rfd, wfd = os.pipe()
182 pid = os.fork()
183 if pid == 0:
184 try:
185 os.close(rfd)
186 child_random = ssl.RAND_pseudo_bytes(16)[0]
187 self.assertEqual(len(child_random), 16)
188 os.write(wfd, child_random)
189 os.close(wfd)
190 except BaseException:
191 os._exit(1)
192 else:
193 os._exit(0)
194 else:
195 os.close(wfd)
196 self.addCleanup(os.close, rfd)
197 _, status = os.waitpid(pid, 0)
198 self.assertEqual(status, 0)
199
200 child_random = os.read(rfd, 16)
201 self.assertEqual(len(child_random), 16)
202 parent_random = ssl.RAND_pseudo_bytes(16)[0]
203 self.assertEqual(len(parent_random), 16)
204
205 self.assertNotEqual(child_random, parent_random)
206
Antoine Pitrou480a1242010-04-28 21:37:09 +0000207 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000208 # note that this uses an 'unofficial' function in _ssl.c,
209 # provided solely for this test, to exercise the certificate
210 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000211 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000212 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000213 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200214 self.assertEqual(p['issuer'],
215 ((('countryName', 'XY'),),
216 (('localityName', 'Castle Anthrax'),),
217 (('organizationName', 'Python Software Foundation'),),
218 (('commonName', 'localhost'),))
219 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100220 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200221 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
222 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200223 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
224 self.assertEqual(p['subject'],
225 ((('countryName', 'XY'),),
226 (('localityName', 'Castle Anthrax'),),
227 (('organizationName', 'Python Software Foundation'),),
228 (('commonName', 'localhost'),))
229 )
230 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
231 # Issue #13034: the subjectAltName in some certificates
232 # (notably projects.developer.nokia.com:443) wasn't parsed
233 p = ssl._ssl._test_decode_cert(NOKIACERT)
234 if support.verbose:
235 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
236 self.assertEqual(p['subjectAltName'],
237 (('DNS', 'projects.developer.nokia.com'),
238 ('DNS', 'projects.forum.nokia.com'))
239 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100240 # extra OCSP and AIA fields
241 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
242 self.assertEqual(p['caIssuers'],
243 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
244 self.assertEqual(p['crlDistributionPoints'],
245 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000246
Christian Heimes824f7f32013-08-17 00:54:47 +0200247 def test_parse_cert_CVE_2013_4238(self):
248 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
249 if support.verbose:
250 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
251 subject = ((('countryName', 'US'),),
252 (('stateOrProvinceName', 'Oregon'),),
253 (('localityName', 'Beaverton'),),
254 (('organizationName', 'Python Software Foundation'),),
255 (('organizationalUnitName', 'Python Core Development'),),
256 (('commonName', 'null.python.org\x00example.org'),),
257 (('emailAddress', 'python-dev@python.org'),))
258 self.assertEqual(p['subject'], subject)
259 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200260 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
261 san = (('DNS', 'altnull.python.org\x00example.com'),
262 ('email', 'null@python.org\x00user@example.org'),
263 ('URI', 'http://null.python.org\x00http://example.org'),
264 ('IP Address', '192.0.2.1'),
265 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
266 else:
267 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
268 san = (('DNS', 'altnull.python.org\x00example.com'),
269 ('email', 'null@python.org\x00user@example.org'),
270 ('URI', 'http://null.python.org\x00http://example.org'),
271 ('IP Address', '192.0.2.1'),
272 ('IP Address', '<invalid>'))
273
274 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200275
Antoine Pitrou480a1242010-04-28 21:37:09 +0000276 def test_DER_to_PEM(self):
277 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
278 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000279 d1 = ssl.PEM_cert_to_DER_cert(pem)
280 p2 = ssl.DER_cert_to_PEM_cert(d1)
281 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000282 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000283 if not p2.startswith(ssl.PEM_HEADER + '\n'):
284 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
285 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
286 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000287
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000288 def test_openssl_version(self):
289 n = ssl.OPENSSL_VERSION_NUMBER
290 t = ssl.OPENSSL_VERSION_INFO
291 s = ssl.OPENSSL_VERSION
292 self.assertIsInstance(n, int)
293 self.assertIsInstance(t, tuple)
294 self.assertIsInstance(s, str)
295 # Some sanity checks follow
296 # >= 0.9
297 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400298 # < 3.0
299 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000300 major, minor, fix, patch, status = t
301 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400302 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000303 self.assertGreaterEqual(minor, 0)
304 self.assertLess(minor, 256)
305 self.assertGreaterEqual(fix, 0)
306 self.assertLess(fix, 256)
307 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100308 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000309 self.assertGreaterEqual(status, 0)
310 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400311 # Version string as returned by {Open,Libre}SSL, the format might change
312 if "LibreSSL" in s:
313 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
Victor Stinner789b8052015-01-06 11:51:06 +0100314 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400315 else:
316 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100317 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000318
Antoine Pitrou9d543662010-04-23 23:10:32 +0000319 @support.cpython_only
320 def test_refcycle(self):
321 # Issue #7943: an SSL object doesn't create reference cycles with
322 # itself.
323 s = socket.socket(socket.AF_INET)
324 ss = ssl.wrap_socket(s)
325 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100326 with support.check_warnings(("", ResourceWarning)):
327 del ss
328 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000329
Antoine Pitroua468adc2010-09-14 14:43:44 +0000330 def test_wrapped_unconnected(self):
331 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200332 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000333 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100334 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100335 self.assertRaises(OSError, ss.recv, 1)
336 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
337 self.assertRaises(OSError, ss.recvfrom, 1)
338 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
339 self.assertRaises(OSError, ss.send, b'x')
340 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000341
Antoine Pitrou40f08742010-04-24 22:04:40 +0000342 def test_timeout(self):
343 # Issue #8524: when creating an SSL socket, the timeout of the
344 # original socket should be retained.
345 for timeout in (None, 0.0, 5.0):
346 s = socket.socket(socket.AF_INET)
347 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100348 with ssl.wrap_socket(s) as ss:
349 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000350
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000351 def test_errors(self):
352 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000353 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000354 "certfile must be specified",
355 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000356 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000357 "certfile must be specified for server-side operations",
358 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000359 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000360 "certfile must be specified for server-side operations",
361 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100362 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
363 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
364 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200365 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000366 with socket.socket() as sock:
367 ssl.wrap_socket(sock, certfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000368 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200369 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000370 with socket.socket() as sock:
371 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000372 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200373 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000374 with socket.socket() as sock:
375 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000376 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000377
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000378 def test_match_hostname(self):
379 def ok(cert, hostname):
380 ssl.match_hostname(cert, hostname)
381 def fail(cert, hostname):
382 self.assertRaises(ssl.CertificateError,
383 ssl.match_hostname, cert, hostname)
384
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100385 # -- Hostname matching --
386
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000387 cert = {'subject': ((('commonName', 'example.com'),),)}
388 ok(cert, 'example.com')
389 ok(cert, 'ExAmple.cOm')
390 fail(cert, 'www.example.com')
391 fail(cert, '.example.com')
392 fail(cert, 'example.org')
393 fail(cert, 'exampleXcom')
394
395 cert = {'subject': ((('commonName', '*.a.com'),),)}
396 ok(cert, 'foo.a.com')
397 fail(cert, 'bar.foo.a.com')
398 fail(cert, 'a.com')
399 fail(cert, 'Xa.com')
400 fail(cert, '.a.com')
401
Georg Brandl72c98d32013-10-27 07:16:53 +0100402 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000403 cert = {'subject': ((('commonName', 'f*.com'),),)}
404 ok(cert, 'foo.com')
405 ok(cert, 'f.com')
406 fail(cert, 'bar.com')
407 fail(cert, 'foo.a.com')
408 fail(cert, 'bar.foo.com')
409
Christian Heimes824f7f32013-08-17 00:54:47 +0200410 # NULL bytes are bad, CVE-2013-4073
411 cert = {'subject': ((('commonName',
412 'null.python.org\x00example.org'),),)}
413 ok(cert, 'null.python.org\x00example.org') # or raise an error?
414 fail(cert, 'example.org')
415 fail(cert, 'null.python.org')
416
Georg Brandl72c98d32013-10-27 07:16:53 +0100417 # error cases with wildcards
418 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
419 fail(cert, 'bar.foo.a.com')
420 fail(cert, 'a.com')
421 fail(cert, 'Xa.com')
422 fail(cert, '.a.com')
423
424 cert = {'subject': ((('commonName', 'a.*.com'),),)}
425 fail(cert, 'a.foo.com')
426 fail(cert, 'a..com')
427 fail(cert, 'a.com')
428
429 # wildcard doesn't match IDNA prefix 'xn--'
430 idna = 'püthon.python.org'.encode("idna").decode("ascii")
431 cert = {'subject': ((('commonName', idna),),)}
432 ok(cert, idna)
433 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
434 fail(cert, idna)
435 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
436 fail(cert, idna)
437
438 # wildcard in first fragment and IDNA A-labels in sequent fragments
439 # are supported.
440 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
441 cert = {'subject': ((('commonName', idna),),)}
442 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
443 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
444 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
445 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
446
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000447 # Slightly fake real-world example
448 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
449 'subject': ((('commonName', 'linuxfrz.org'),),),
450 'subjectAltName': (('DNS', 'linuxfr.org'),
451 ('DNS', 'linuxfr.com'),
452 ('othername', '<unsupported>'))}
453 ok(cert, 'linuxfr.org')
454 ok(cert, 'linuxfr.com')
455 # Not a "DNS" entry
456 fail(cert, '<unsupported>')
457 # When there is a subjectAltName, commonName isn't used
458 fail(cert, 'linuxfrz.org')
459
460 # A pristine real-world example
461 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
462 'subject': ((('countryName', 'US'),),
463 (('stateOrProvinceName', 'California'),),
464 (('localityName', 'Mountain View'),),
465 (('organizationName', 'Google Inc'),),
466 (('commonName', 'mail.google.com'),))}
467 ok(cert, 'mail.google.com')
468 fail(cert, 'gmail.com')
469 # Only commonName is considered
470 fail(cert, 'California')
471
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100472 # -- IPv4 matching --
473 cert = {'subject': ((('commonName', 'example.com'),),),
474 'subjectAltName': (('DNS', 'example.com'),
475 ('IP Address', '10.11.12.13'),
476 ('IP Address', '14.15.16.17'))}
477 ok(cert, '10.11.12.13')
478 ok(cert, '14.15.16.17')
479 fail(cert, '14.15.16.18')
480 fail(cert, 'example.net')
481
482 # -- IPv6 matching --
483 cert = {'subject': ((('commonName', 'example.com'),),),
484 'subjectAltName': (('DNS', 'example.com'),
485 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
486 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
487 ok(cert, '2001::cafe')
488 ok(cert, '2003::baba')
489 fail(cert, '2003::bebe')
490 fail(cert, 'example.net')
491
492 # -- Miscellaneous --
493
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000494 # Neither commonName nor subjectAltName
495 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
496 'subject': ((('countryName', 'US'),),
497 (('stateOrProvinceName', 'California'),),
498 (('localityName', 'Mountain View'),),
499 (('organizationName', 'Google Inc'),))}
500 fail(cert, 'mail.google.com')
501
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200502 # No DNS entry in subjectAltName but a commonName
503 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
504 'subject': ((('countryName', 'US'),),
505 (('stateOrProvinceName', 'California'),),
506 (('localityName', 'Mountain View'),),
507 (('commonName', 'mail.google.com'),)),
508 'subjectAltName': (('othername', 'blabla'), )}
509 ok(cert, 'mail.google.com')
510
511 # No DNS entry subjectAltName and no commonName
512 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
513 'subject': ((('countryName', 'US'),),
514 (('stateOrProvinceName', 'California'),),
515 (('localityName', 'Mountain View'),),
516 (('organizationName', 'Google Inc'),)),
517 'subjectAltName': (('othername', 'blabla'),)}
518 fail(cert, 'google.com')
519
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000520 # Empty cert / no cert
521 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
522 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
523
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200524 # Issue #17980: avoid denials of service by refusing more than one
525 # wildcard per fragment.
526 cert = {'subject': ((('commonName', 'a*b.com'),),)}
527 ok(cert, 'axxb.com')
528 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100529 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200530 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
531 with self.assertRaises(ssl.CertificateError) as cm:
532 ssl.match_hostname(cert, 'axxbxxc.com')
533 self.assertIn("too many wildcards", str(cm.exception))
534
Antoine Pitroud5323212010-10-22 18:19:07 +0000535 def test_server_side(self):
536 # server_hostname doesn't work for server sockets
537 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000538 with socket.socket() as sock:
539 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
540 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000541
Antoine Pitroud6494802011-07-21 01:11:30 +0200542 def test_unknown_channel_binding(self):
543 # should raise ValueError for unknown type
544 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200545 s.bind(('127.0.0.1', 0))
546 s.listen()
547 c = socket.socket(socket.AF_INET)
548 c.connect(s.getsockname())
549 with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100550 with self.assertRaises(ValueError):
551 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200552 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200553
554 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
555 "'tls-unique' channel binding not available")
556 def test_tls_unique_channel_binding(self):
557 # unconnected should return None for known type
558 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100559 with ssl.wrap_socket(s) as ss:
560 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200561 # the same for server-side
562 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100563 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
564 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200565
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600566 def test_dealloc_warn(self):
567 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
568 r = repr(ss)
569 with self.assertWarns(ResourceWarning) as cm:
570 ss = None
571 support.gc_collect()
572 self.assertIn(r, str(cm.warning.args[0]))
573
Christian Heimes6d7ad132013-06-09 18:02:55 +0200574 def test_get_default_verify_paths(self):
575 paths = ssl.get_default_verify_paths()
576 self.assertEqual(len(paths), 6)
577 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
578
579 with support.EnvironmentVarGuard() as env:
580 env["SSL_CERT_DIR"] = CAPATH
581 env["SSL_CERT_FILE"] = CERTFILE
582 paths = ssl.get_default_verify_paths()
583 self.assertEqual(paths.cafile, CERTFILE)
584 self.assertEqual(paths.capath, CAPATH)
585
Christian Heimes44109d72013-11-22 01:51:30 +0100586 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
587 def test_enum_certificates(self):
588 self.assertTrue(ssl.enum_certificates("CA"))
589 self.assertTrue(ssl.enum_certificates("ROOT"))
590
591 self.assertRaises(TypeError, ssl.enum_certificates)
592 self.assertRaises(WindowsError, ssl.enum_certificates, "")
593
Christian Heimesc2d65e12013-11-22 16:13:55 +0100594 trust_oids = set()
595 for storename in ("CA", "ROOT"):
596 store = ssl.enum_certificates(storename)
597 self.assertIsInstance(store, list)
598 for element in store:
599 self.assertIsInstance(element, tuple)
600 self.assertEqual(len(element), 3)
601 cert, enc, trust = element
602 self.assertIsInstance(cert, bytes)
603 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
604 self.assertIsInstance(trust, (set, bool))
605 if isinstance(trust, set):
606 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100607
608 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100609 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200610
Christian Heimes46bebee2013-06-09 19:03:31 +0200611 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100612 def test_enum_crls(self):
613 self.assertTrue(ssl.enum_crls("CA"))
614 self.assertRaises(TypeError, ssl.enum_crls)
615 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200616
Christian Heimes44109d72013-11-22 01:51:30 +0100617 crls = ssl.enum_crls("CA")
618 self.assertIsInstance(crls, list)
619 for element in crls:
620 self.assertIsInstance(element, tuple)
621 self.assertEqual(len(element), 2)
622 self.assertIsInstance(element[0], bytes)
623 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200624
Christian Heimes46bebee2013-06-09 19:03:31 +0200625
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100626 def test_asn1object(self):
627 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
628 '1.3.6.1.5.5.7.3.1')
629
630 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
631 self.assertEqual(val, expected)
632 self.assertEqual(val.nid, 129)
633 self.assertEqual(val.shortname, 'serverAuth')
634 self.assertEqual(val.longname, 'TLS Web Server Authentication')
635 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
636 self.assertIsInstance(val, ssl._ASN1Object)
637 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
638
639 val = ssl._ASN1Object.fromnid(129)
640 self.assertEqual(val, expected)
641 self.assertIsInstance(val, ssl._ASN1Object)
642 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100643 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
644 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100645 for i in range(1000):
646 try:
647 obj = ssl._ASN1Object.fromnid(i)
648 except ValueError:
649 pass
650 else:
651 self.assertIsInstance(obj.nid, int)
652 self.assertIsInstance(obj.shortname, str)
653 self.assertIsInstance(obj.longname, str)
654 self.assertIsInstance(obj.oid, (str, type(None)))
655
656 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
657 self.assertEqual(val, expected)
658 self.assertIsInstance(val, ssl._ASN1Object)
659 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
660 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
661 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100662 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
663 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100664
Christian Heimes72d28502013-11-23 13:56:58 +0100665 def test_purpose_enum(self):
666 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
667 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
668 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
669 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
670 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
671 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
672 '1.3.6.1.5.5.7.3.1')
673
674 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
675 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
676 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
677 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
678 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
679 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
680 '1.3.6.1.5.5.7.3.2')
681
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100682 def test_unsupported_dtls(self):
683 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
684 self.addCleanup(s.close)
685 with self.assertRaises(NotImplementedError) as cx:
686 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
687 self.assertEqual(str(cx.exception), "only stream sockets are supported")
688 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
689 with self.assertRaises(NotImplementedError) as cx:
690 ctx.wrap_socket(s)
691 self.assertEqual(str(cx.exception), "only stream sockets are supported")
692
Antoine Pitrouc695c952014-04-28 20:57:36 +0200693 def cert_time_ok(self, timestring, timestamp):
694 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
695
696 def cert_time_fail(self, timestring):
697 with self.assertRaises(ValueError):
698 ssl.cert_time_to_seconds(timestring)
699
700 @unittest.skipUnless(utc_offset(),
701 'local time needs to be different from UTC')
702 def test_cert_time_to_seconds_timezone(self):
703 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
704 # results if local timezone is not UTC
705 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
706 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
707
708 def test_cert_time_to_seconds(self):
709 timestring = "Jan 5 09:34:43 2018 GMT"
710 ts = 1515144883.0
711 self.cert_time_ok(timestring, ts)
712 # accept keyword parameter, assert its name
713 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
714 # accept both %e and %d (space or zero generated by strftime)
715 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
716 # case-insensitive
717 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
718 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
719 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
720 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
721 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
722 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
723 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
724 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
725
726 newyear_ts = 1230768000.0
727 # leap seconds
728 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
729 # same timestamp
730 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
731
732 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
733 # allow 60th second (even if it is not a leap second)
734 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
735 # allow 2nd leap second for compatibility with time.strptime()
736 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
737 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
738
739 # no special treatement for the special value:
740 # 99991231235959Z (rfc 5280)
741 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
742
743 @support.run_with_locale('LC_ALL', '')
744 def test_cert_time_to_seconds_locale(self):
745 # `cert_time_to_seconds()` should be locale independent
746
747 def local_february_name():
748 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
749
750 if local_february_name().lower() == 'feb':
751 self.skipTest("locale-specific month name needs to be "
752 "different from C locale")
753
754 # locale-independent
755 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
756 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
757
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100758
Antoine Pitrou152efa22010-05-16 18:19:27 +0000759class ContextTests(unittest.TestCase):
760
Antoine Pitrou23df4832010-08-04 17:14:06 +0000761 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000762 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100763 for protocol in PROTOCOLS:
764 ssl.SSLContext(protocol)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000765 self.assertRaises(TypeError, ssl.SSLContext)
766 self.assertRaises(ValueError, ssl.SSLContext, -1)
767 self.assertRaises(ValueError, ssl.SSLContext, 42)
768
Antoine Pitrou23df4832010-08-04 17:14:06 +0000769 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000770 def test_protocol(self):
771 for proto in PROTOCOLS:
772 ctx = ssl.SSLContext(proto)
773 self.assertEqual(ctx.protocol, proto)
774
775 def test_ciphers(self):
776 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
777 ctx.set_ciphers("ALL")
778 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000779 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000780 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000781
Antoine Pitrou23df4832010-08-04 17:14:06 +0000782 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000783 def test_options(self):
784 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +0100785 # OP_ALL | OP_NO_SSLv2 is the default value
Antoine Pitroub5218772010-05-21 09:56:06 +0000786 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
787 ctx.options)
788 ctx.options |= ssl.OP_NO_SSLv3
789 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
790 ctx.options)
791 if can_clear_options():
792 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
793 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
794 ctx.options)
795 ctx.options = 0
796 self.assertEqual(0, ctx.options)
797 else:
798 with self.assertRaises(ValueError):
799 ctx.options = 0
800
Christian Heimes22587792013-11-21 23:56:13 +0100801 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000802 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
803 # Default value
804 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
805 ctx.verify_mode = ssl.CERT_OPTIONAL
806 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
807 ctx.verify_mode = ssl.CERT_REQUIRED
808 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
809 ctx.verify_mode = ssl.CERT_NONE
810 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
811 with self.assertRaises(TypeError):
812 ctx.verify_mode = None
813 with self.assertRaises(ValueError):
814 ctx.verify_mode = 42
815
Christian Heimes2427b502013-11-23 11:24:32 +0100816 @unittest.skipUnless(have_verify_flags(),
817 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100818 def test_verify_flags(self):
819 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -0500820 # default value
821 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
822 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +0100823 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
824 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
825 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
826 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
827 ctx.verify_flags = ssl.VERIFY_DEFAULT
828 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
829 # supports any value
830 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
831 self.assertEqual(ctx.verify_flags,
832 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
833 with self.assertRaises(TypeError):
834 ctx.verify_flags = None
835
Antoine Pitrou152efa22010-05-16 18:19:27 +0000836 def test_load_cert_chain(self):
837 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
838 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500839 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000840 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
841 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200842 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000843 ctx.load_cert_chain(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000844 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000845 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000846 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000847 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000848 ctx.load_cert_chain(EMPTYCERT)
849 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000850 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000851 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
852 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
853 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000854 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000855 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000856 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000857 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000858 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000859 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
860 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000861 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000862 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Antoine Pitrou81564092010-10-08 23:06:24 +0000863 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200864 # Password protected key and cert
865 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
866 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
867 ctx.load_cert_chain(CERTFILE_PROTECTED,
868 password=bytearray(KEY_PASSWORD.encode()))
869 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
870 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
871 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
872 bytearray(KEY_PASSWORD.encode()))
873 with self.assertRaisesRegex(TypeError, "should be a string"):
874 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
875 with self.assertRaises(ssl.SSLError):
876 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
877 with self.assertRaisesRegex(ValueError, "cannot be longer"):
878 # openssl has a fixed limit on the password buffer.
879 # PEM_BUFSIZE is generally set to 1kb.
880 # Return a string larger than this.
881 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
882 # Password callback
883 def getpass_unicode():
884 return KEY_PASSWORD
885 def getpass_bytes():
886 return KEY_PASSWORD.encode()
887 def getpass_bytearray():
888 return bytearray(KEY_PASSWORD.encode())
889 def getpass_badpass():
890 return "badpass"
891 def getpass_huge():
892 return b'a' * (1024 * 1024)
893 def getpass_bad_type():
894 return 9
895 def getpass_exception():
896 raise Exception('getpass error')
897 class GetPassCallable:
898 def __call__(self):
899 return KEY_PASSWORD
900 def getpass(self):
901 return KEY_PASSWORD
902 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
903 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
904 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
905 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
906 ctx.load_cert_chain(CERTFILE_PROTECTED,
907 password=GetPassCallable().getpass)
908 with self.assertRaises(ssl.SSLError):
909 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
910 with self.assertRaisesRegex(ValueError, "cannot be longer"):
911 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
912 with self.assertRaisesRegex(TypeError, "must return a string"):
913 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
914 with self.assertRaisesRegex(Exception, "getpass error"):
915 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
916 # Make sure the password function isn't called if it isn't needed
917 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000918
919 def test_load_verify_locations(self):
920 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
921 ctx.load_verify_locations(CERTFILE)
922 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
923 ctx.load_verify_locations(BYTES_CERTFILE)
924 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
925 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100926 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200927 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000928 ctx.load_verify_locations(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000929 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000930 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000931 ctx.load_verify_locations(BADCERT)
932 ctx.load_verify_locations(CERTFILE, CAPATH)
933 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
934
Victor Stinner80f75e62011-01-29 11:31:20 +0000935 # Issue #10989: crash if the second argument type is invalid
936 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
937
Christian Heimesefff7062013-11-21 03:35:02 +0100938 def test_load_verify_cadata(self):
939 # test cadata
940 with open(CAFILE_CACERT) as f:
941 cacert_pem = f.read()
942 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
943 with open(CAFILE_NEURONIO) as f:
944 neuronio_pem = f.read()
945 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
946
947 # test PEM
948 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
949 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
950 ctx.load_verify_locations(cadata=cacert_pem)
951 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
952 ctx.load_verify_locations(cadata=neuronio_pem)
953 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
954 # cert already in hash table
955 ctx.load_verify_locations(cadata=neuronio_pem)
956 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
957
958 # combined
959 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
960 combined = "\n".join((cacert_pem, neuronio_pem))
961 ctx.load_verify_locations(cadata=combined)
962 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
963
964 # with junk around the certs
965 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
966 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
967 neuronio_pem, "tail"]
968 ctx.load_verify_locations(cadata="\n".join(combined))
969 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
970
971 # test DER
972 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
973 ctx.load_verify_locations(cadata=cacert_der)
974 ctx.load_verify_locations(cadata=neuronio_der)
975 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
976 # cert already in hash table
977 ctx.load_verify_locations(cadata=cacert_der)
978 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
979
980 # combined
981 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
982 combined = b"".join((cacert_der, neuronio_der))
983 ctx.load_verify_locations(cadata=combined)
984 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
985
986 # error cases
987 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
988 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
989
990 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
991 ctx.load_verify_locations(cadata="broken")
992 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
993 ctx.load_verify_locations(cadata=b"broken")
994
995
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100996 def test_load_dh_params(self):
997 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
998 ctx.load_dh_params(DHFILE)
999 if os.name != 'nt':
1000 ctx.load_dh_params(BYTES_DHFILE)
1001 self.assertRaises(TypeError, ctx.load_dh_params)
1002 self.assertRaises(TypeError, ctx.load_dh_params, None)
1003 with self.assertRaises(FileNotFoundError) as cm:
1004 ctx.load_dh_params(WRONGCERT)
1005 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001006 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001007 ctx.load_dh_params(CERTFILE)
1008
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001009 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001010 def test_session_stats(self):
1011 for proto in PROTOCOLS:
1012 ctx = ssl.SSLContext(proto)
1013 self.assertEqual(ctx.session_stats(), {
1014 'number': 0,
1015 'connect': 0,
1016 'connect_good': 0,
1017 'connect_renegotiate': 0,
1018 'accept': 0,
1019 'accept_good': 0,
1020 'accept_renegotiate': 0,
1021 'hits': 0,
1022 'misses': 0,
1023 'timeouts': 0,
1024 'cache_full': 0,
1025 })
1026
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001027 def test_set_default_verify_paths(self):
1028 # There's not much we can do to test that it acts as expected,
1029 # so just check it doesn't crash or raise an exception.
1030 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1031 ctx.set_default_verify_paths()
1032
Antoine Pitrou501da612011-12-21 09:27:41 +01001033 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001034 def test_set_ecdh_curve(self):
1035 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1036 ctx.set_ecdh_curve("prime256v1")
1037 ctx.set_ecdh_curve(b"prime256v1")
1038 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1039 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1040 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1041 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1042
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001043 @needs_sni
1044 def test_sni_callback(self):
1045 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1046
1047 # set_servername_callback expects a callable, or None
1048 self.assertRaises(TypeError, ctx.set_servername_callback)
1049 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1050 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1051 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1052
1053 def dummycallback(sock, servername, ctx):
1054 pass
1055 ctx.set_servername_callback(None)
1056 ctx.set_servername_callback(dummycallback)
1057
1058 @needs_sni
1059 def test_sni_callback_refcycle(self):
1060 # Reference cycles through the servername callback are detected
1061 # and cleared.
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1063 def dummycallback(sock, servername, ctx, cycle=ctx):
1064 pass
1065 ctx.set_servername_callback(dummycallback)
1066 wr = weakref.ref(ctx)
1067 del ctx, dummycallback
1068 gc.collect()
1069 self.assertIs(wr(), None)
1070
Christian Heimes9a5395a2013-06-17 15:44:12 +02001071 def test_cert_store_stats(self):
1072 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1073 self.assertEqual(ctx.cert_store_stats(),
1074 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1075 ctx.load_cert_chain(CERTFILE)
1076 self.assertEqual(ctx.cert_store_stats(),
1077 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1078 ctx.load_verify_locations(CERTFILE)
1079 self.assertEqual(ctx.cert_store_stats(),
1080 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1081 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1082 self.assertEqual(ctx.cert_store_stats(),
1083 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1084
1085 def test_get_ca_certs(self):
1086 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1087 self.assertEqual(ctx.get_ca_certs(), [])
1088 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1089 ctx.load_verify_locations(CERTFILE)
1090 self.assertEqual(ctx.get_ca_certs(), [])
1091 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1092 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1093 self.assertEqual(ctx.get_ca_certs(),
1094 [{'issuer': ((('organizationName', 'Root CA'),),
1095 (('organizationalUnitName', 'http://www.cacert.org'),),
1096 (('commonName', 'CA Cert Signing Authority'),),
1097 (('emailAddress', 'support@cacert.org'),)),
1098 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1099 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1100 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001101 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001102 'subject': ((('organizationName', 'Root CA'),),
1103 (('organizationalUnitName', 'http://www.cacert.org'),),
1104 (('commonName', 'CA Cert Signing Authority'),),
1105 (('emailAddress', 'support@cacert.org'),)),
1106 'version': 3}])
1107
1108 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1109 pem = f.read()
1110 der = ssl.PEM_cert_to_DER_cert(pem)
1111 self.assertEqual(ctx.get_ca_certs(True), [der])
1112
Christian Heimes72d28502013-11-23 13:56:58 +01001113 def test_load_default_certs(self):
1114 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1115 ctx.load_default_certs()
1116
1117 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1118 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1119 ctx.load_default_certs()
1120
1121 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1122 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1123
1124 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1125 self.assertRaises(TypeError, ctx.load_default_certs, None)
1126 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1127
Benjamin Peterson91244e02014-10-03 18:17:15 -04001128 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001129 def test_load_default_certs_env(self):
1130 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1131 with support.EnvironmentVarGuard() as env:
1132 env["SSL_CERT_DIR"] = CAPATH
1133 env["SSL_CERT_FILE"] = CERTFILE
1134 ctx.load_default_certs()
1135 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1136
Benjamin Peterson91244e02014-10-03 18:17:15 -04001137 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1138 def test_load_default_certs_env_windows(self):
1139 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1140 ctx.load_default_certs()
1141 stats = ctx.cert_store_stats()
1142
1143 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1144 with support.EnvironmentVarGuard() as env:
1145 env["SSL_CERT_DIR"] = CAPATH
1146 env["SSL_CERT_FILE"] = CERTFILE
1147 ctx.load_default_certs()
1148 stats["x509"] += 1
1149 self.assertEqual(ctx.cert_store_stats(), stats)
1150
Christian Heimes4c05b472013-11-23 15:58:30 +01001151 def test_create_default_context(self):
1152 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001153 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001154 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001155 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001156 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001157 self.assertEqual(
1158 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1159 getattr(ssl, "OP_NO_COMPRESSION", 0),
1160 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001161
1162 with open(SIGNING_CA) as f:
1163 cadata = f.read()
1164 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1165 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001166 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001167 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1168 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001169 self.assertEqual(
1170 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1171 getattr(ssl, "OP_NO_COMPRESSION", 0),
1172 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001173
1174 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001175 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001176 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1177 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001178 self.assertEqual(
1179 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1180 getattr(ssl, "OP_NO_COMPRESSION", 0),
1181 )
1182 self.assertEqual(
1183 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1184 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1185 )
1186 self.assertEqual(
1187 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1188 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1189 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001190
Christian Heimes67986f92013-11-23 22:43:47 +01001191 def test__create_stdlib_context(self):
1192 ctx = ssl._create_stdlib_context()
1193 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1194 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001195 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001196 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1197
1198 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1199 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1200 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1201 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1202
1203 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001204 cert_reqs=ssl.CERT_REQUIRED,
1205 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001206 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1207 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001208 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001209 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1210
1211 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1212 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1213 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1214 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001215
Christian Heimes1aa9a752013-12-02 02:41:19 +01001216 def test_check_hostname(self):
1217 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1218 self.assertFalse(ctx.check_hostname)
1219
1220 # Requires CERT_REQUIRED or CERT_OPTIONAL
1221 with self.assertRaises(ValueError):
1222 ctx.check_hostname = True
1223 ctx.verify_mode = ssl.CERT_REQUIRED
1224 self.assertFalse(ctx.check_hostname)
1225 ctx.check_hostname = True
1226 self.assertTrue(ctx.check_hostname)
1227
1228 ctx.verify_mode = ssl.CERT_OPTIONAL
1229 ctx.check_hostname = True
1230 self.assertTrue(ctx.check_hostname)
1231
1232 # Cannot set CERT_NONE with check_hostname enabled
1233 with self.assertRaises(ValueError):
1234 ctx.verify_mode = ssl.CERT_NONE
1235 ctx.check_hostname = False
1236 self.assertFalse(ctx.check_hostname)
1237
Antoine Pitrou152efa22010-05-16 18:19:27 +00001238
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001239class SSLErrorTests(unittest.TestCase):
1240
1241 def test_str(self):
1242 # The str() of a SSLError doesn't include the errno
1243 e = ssl.SSLError(1, "foo")
1244 self.assertEqual(str(e), "foo")
1245 self.assertEqual(e.errno, 1)
1246 # Same for a subclass
1247 e = ssl.SSLZeroReturnError(1, "foo")
1248 self.assertEqual(str(e), "foo")
1249 self.assertEqual(e.errno, 1)
1250
1251 def test_lib_reason(self):
1252 # Test the library and reason attributes
1253 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1254 with self.assertRaises(ssl.SSLError) as cm:
1255 ctx.load_dh_params(CERTFILE)
1256 self.assertEqual(cm.exception.library, 'PEM')
1257 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1258 s = str(cm.exception)
1259 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1260
1261 def test_subclass(self):
1262 # Check that the appropriate SSLError subclass is raised
1263 # (this only tests one of them)
1264 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1265 with socket.socket() as s:
1266 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001267 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001268 c = socket.socket()
1269 c.connect(s.getsockname())
1270 c.setblocking(False)
1271 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001272 with self.assertRaises(ssl.SSLWantReadError) as cm:
1273 c.do_handshake()
1274 s = str(cm.exception)
1275 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1276 # For compatibility
1277 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1278
1279
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001280class MemoryBIOTests(unittest.TestCase):
1281
1282 def test_read_write(self):
1283 bio = ssl.MemoryBIO()
1284 bio.write(b'foo')
1285 self.assertEqual(bio.read(), b'foo')
1286 self.assertEqual(bio.read(), b'')
1287 bio.write(b'foo')
1288 bio.write(b'bar')
1289 self.assertEqual(bio.read(), b'foobar')
1290 self.assertEqual(bio.read(), b'')
1291 bio.write(b'baz')
1292 self.assertEqual(bio.read(2), b'ba')
1293 self.assertEqual(bio.read(1), b'z')
1294 self.assertEqual(bio.read(1), b'')
1295
1296 def test_eof(self):
1297 bio = ssl.MemoryBIO()
1298 self.assertFalse(bio.eof)
1299 self.assertEqual(bio.read(), b'')
1300 self.assertFalse(bio.eof)
1301 bio.write(b'foo')
1302 self.assertFalse(bio.eof)
1303 bio.write_eof()
1304 self.assertFalse(bio.eof)
1305 self.assertEqual(bio.read(2), b'fo')
1306 self.assertFalse(bio.eof)
1307 self.assertEqual(bio.read(1), b'o')
1308 self.assertTrue(bio.eof)
1309 self.assertEqual(bio.read(), b'')
1310 self.assertTrue(bio.eof)
1311
1312 def test_pending(self):
1313 bio = ssl.MemoryBIO()
1314 self.assertEqual(bio.pending, 0)
1315 bio.write(b'foo')
1316 self.assertEqual(bio.pending, 3)
1317 for i in range(3):
1318 bio.read(1)
1319 self.assertEqual(bio.pending, 3-i-1)
1320 for i in range(3):
1321 bio.write(b'x')
1322 self.assertEqual(bio.pending, i+1)
1323 bio.read()
1324 self.assertEqual(bio.pending, 0)
1325
1326 def test_buffer_types(self):
1327 bio = ssl.MemoryBIO()
1328 bio.write(b'foo')
1329 self.assertEqual(bio.read(), b'foo')
1330 bio.write(bytearray(b'bar'))
1331 self.assertEqual(bio.read(), b'bar')
1332 bio.write(memoryview(b'baz'))
1333 self.assertEqual(bio.read(), b'baz')
1334
1335 def test_error_types(self):
1336 bio = ssl.MemoryBIO()
1337 self.assertRaises(TypeError, bio.write, 'foo')
1338 self.assertRaises(TypeError, bio.write, None)
1339 self.assertRaises(TypeError, bio.write, True)
1340 self.assertRaises(TypeError, bio.write, 1)
1341
1342
Bill Janssen6e027db2007-11-15 22:23:56 +00001343class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001344
Antoine Pitrou480a1242010-04-28 21:37:09 +00001345 def test_connect(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001346 with support.transient_internet("svn.python.org"):
1347 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1348 cert_reqs=ssl.CERT_NONE)
1349 try:
1350 s.connect(("svn.python.org", 443))
1351 self.assertEqual({}, s.getpeercert())
1352 finally:
1353 s.close()
1354
1355 # this should fail because we have no verification certs
1356 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1357 cert_reqs=ssl.CERT_REQUIRED)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001358 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1359 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001360 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001361
Antoine Pitrou350c7222010-09-09 13:31:46 +00001362 # this should succeed because we specify the root cert
1363 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1364 cert_reqs=ssl.CERT_REQUIRED,
1365 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1366 try:
1367 s.connect(("svn.python.org", 443))
1368 self.assertTrue(s.getpeercert())
1369 finally:
1370 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001371
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001372 def test_connect_ex(self):
1373 # Issue #11326: check connect_ex() implementation
1374 with support.transient_internet("svn.python.org"):
1375 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1376 cert_reqs=ssl.CERT_REQUIRED,
1377 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1378 try:
1379 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1380 self.assertTrue(s.getpeercert())
1381 finally:
1382 s.close()
1383
1384 def test_non_blocking_connect_ex(self):
1385 # Issue #11326: non-blocking connect_ex() should allow handshake
1386 # to proceed after the socket gets ready.
1387 with support.transient_internet("svn.python.org"):
1388 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1389 cert_reqs=ssl.CERT_REQUIRED,
1390 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1391 do_handshake_on_connect=False)
1392 try:
1393 s.setblocking(False)
1394 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8a14a0c2011-02-27 15:44:12 +00001395 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1396 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001397 # Wait for connect to finish
1398 select.select([], [s], [], 5.0)
1399 # Non-blocking handshake
1400 while True:
1401 try:
1402 s.do_handshake()
1403 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001404 except ssl.SSLWantReadError:
1405 select.select([s], [], [], 5.0)
1406 except ssl.SSLWantWriteError:
1407 select.select([], [s], [], 5.0)
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001408 # SSL established
1409 self.assertTrue(s.getpeercert())
1410 finally:
1411 s.close()
1412
Antoine Pitroub4410db2011-05-18 18:51:06 +02001413 def test_timeout_connect_ex(self):
1414 # Issue #12065: on a timeout, connect_ex() should return the original
1415 # errno (mimicking the behaviour of non-SSL sockets).
1416 with support.transient_internet("svn.python.org"):
1417 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1418 cert_reqs=ssl.CERT_REQUIRED,
1419 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1420 do_handshake_on_connect=False)
1421 try:
1422 s.settimeout(0.0000001)
1423 rc = s.connect_ex(('svn.python.org', 443))
1424 if rc == 0:
1425 self.skipTest("svn.python.org responded too quickly")
1426 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1427 finally:
1428 s.close()
1429
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001430 def test_connect_ex_error(self):
Antoine Pitrouddb87ab2012-12-28 19:07:43 +01001431 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001432 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1433 cert_reqs=ssl.CERT_REQUIRED,
1434 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1435 try:
Christian Heimesde570742013-12-16 21:15:44 +01001436 rc = s.connect_ex(("svn.python.org", 444))
1437 # Issue #19919: Windows machines or VMs hosted on Windows
1438 # machines sometimes return EWOULDBLOCK.
1439 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001440 finally:
1441 s.close()
1442
Antoine Pitrou152efa22010-05-16 18:19:27 +00001443 def test_connect_with_context(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001444 with support.transient_internet("svn.python.org"):
1445 # Same as test_connect, but with a separately created context
1446 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1447 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1448 s.connect(("svn.python.org", 443))
1449 try:
1450 self.assertEqual({}, s.getpeercert())
1451 finally:
1452 s.close()
Antoine Pitroud5323212010-10-22 18:19:07 +00001453 # Same with a server hostname
1454 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1455 server_hostname="svn.python.org")
Benjamin Peterson7243b572014-11-23 17:04:34 -06001456 s.connect(("svn.python.org", 443))
1457 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001458 # This should fail because we have no verification certs
1459 ctx.verify_mode = ssl.CERT_REQUIRED
1460 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Ezio Melottied3a7d22010-12-01 02:32:32 +00001461 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
Antoine Pitrou350c7222010-09-09 13:31:46 +00001462 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001463 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001464 # This should succeed because we specify the root cert
1465 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1466 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1467 s.connect(("svn.python.org", 443))
1468 try:
1469 cert = s.getpeercert()
1470 self.assertTrue(cert)
1471 finally:
1472 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001473
1474 def test_connect_capath(self):
1475 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001476 # NOTE: the subject hashing algorithm has been changed between
1477 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1478 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001479 # filename) for this test to be portable across OpenSSL releases.
Antoine Pitrou350c7222010-09-09 13:31:46 +00001480 with support.transient_internet("svn.python.org"):
1481 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1482 ctx.verify_mode = ssl.CERT_REQUIRED
1483 ctx.load_verify_locations(capath=CAPATH)
1484 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1485 s.connect(("svn.python.org", 443))
1486 try:
1487 cert = s.getpeercert()
1488 self.assertTrue(cert)
1489 finally:
1490 s.close()
1491 # Same with a bytes `capath` argument
1492 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1493 ctx.verify_mode = ssl.CERT_REQUIRED
1494 ctx.load_verify_locations(capath=BYTES_CAPATH)
1495 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1496 s.connect(("svn.python.org", 443))
1497 try:
1498 cert = s.getpeercert()
1499 self.assertTrue(cert)
1500 finally:
1501 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001502
Christian Heimesefff7062013-11-21 03:35:02 +01001503 def test_connect_cadata(self):
1504 with open(CAFILE_CACERT) as f:
1505 pem = f.read()
1506 der = ssl.PEM_cert_to_DER_cert(pem)
1507 with support.transient_internet("svn.python.org"):
1508 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1509 ctx.verify_mode = ssl.CERT_REQUIRED
1510 ctx.load_verify_locations(cadata=pem)
1511 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1512 s.connect(("svn.python.org", 443))
1513 cert = s.getpeercert()
1514 self.assertTrue(cert)
1515
1516 # same with DER
1517 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1518 ctx.verify_mode = ssl.CERT_REQUIRED
1519 ctx.load_verify_locations(cadata=der)
1520 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1521 s.connect(("svn.python.org", 443))
1522 cert = s.getpeercert()
1523 self.assertTrue(cert)
1524
Antoine Pitroue3220242010-04-24 11:13:53 +00001525 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1526 def test_makefile_close(self):
1527 # Issue #5238: creating a file-like object with makefile() shouldn't
1528 # delay closing the underlying "real socket" (here tested with its
1529 # file descriptor, hence skipping the test under Windows).
Antoine Pitrou350c7222010-09-09 13:31:46 +00001530 with support.transient_internet("svn.python.org"):
1531 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1532 ss.connect(("svn.python.org", 443))
1533 fd = ss.fileno()
1534 f = ss.makefile()
1535 f.close()
1536 # The fd is still open
Antoine Pitroue3220242010-04-24 11:13:53 +00001537 os.read(fd, 0)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001538 # Closing the SSL socket should close the fd too
1539 ss.close()
1540 gc.collect()
1541 with self.assertRaises(OSError) as e:
1542 os.read(fd, 0)
1543 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001544
Antoine Pitrou480a1242010-04-28 21:37:09 +00001545 def test_non_blocking_handshake(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001546 with support.transient_internet("svn.python.org"):
1547 s = socket.socket(socket.AF_INET)
1548 s.connect(("svn.python.org", 443))
1549 s.setblocking(False)
1550 s = ssl.wrap_socket(s,
1551 cert_reqs=ssl.CERT_NONE,
1552 do_handshake_on_connect=False)
1553 count = 0
1554 while True:
1555 try:
1556 count += 1
1557 s.do_handshake()
1558 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001559 except ssl.SSLWantReadError:
1560 select.select([s], [], [])
1561 except ssl.SSLWantWriteError:
1562 select.select([], [s], [])
Antoine Pitrou350c7222010-09-09 13:31:46 +00001563 s.close()
1564 if support.verbose:
1565 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001566
Antoine Pitrou480a1242010-04-28 21:37:09 +00001567 def test_get_server_certificate(self):
Antoine Pitrou15399c32011-04-28 19:23:55 +02001568 def _test_get_server_certificate(host, port, cert=None):
1569 with support.transient_internet(host):
Antoine Pitrou94a5b662014-04-16 18:56:28 +02001570 pem = ssl.get_server_certificate((host, port))
Antoine Pitrou15399c32011-04-28 19:23:55 +02001571 if not pem:
1572 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001573
Antoine Pitrou15399c32011-04-28 19:23:55 +02001574 try:
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001575 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001576 ca_certs=CERTFILE)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001577 except ssl.SSLError as x:
1578 #should fail
1579 if support.verbose:
1580 sys.stdout.write("%s\n" % x)
1581 else:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001582 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1583
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001584 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001585 ca_certs=cert)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001586 if not pem:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001587 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001588 if support.verbose:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001589 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001590
Antoine Pitrou15399c32011-04-28 19:23:55 +02001591 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1592 if support.IPV6_ENABLED:
1593 _test_get_server_certificate('ipv6.google.com', 443)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001594
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001595 def test_ciphers(self):
1596 remote = ("svn.python.org", 443)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001597 with support.transient_internet(remote[0]):
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001598 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1599 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1600 s.connect(remote)
1601 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1602 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1603 s.connect(remote)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001604 # Error checking can happen at instantiation or when connecting
Ezio Melottied3a7d22010-12-01 02:32:32 +00001605 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitroud2eca372010-10-29 23:41:37 +00001606 with socket.socket(socket.AF_INET) as sock:
1607 s = ssl.wrap_socket(sock,
1608 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1609 s.connect(remote)
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001610
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001611 def test_algorithms(self):
1612 # Issue #8484: all algorithms should be available when verifying a
1613 # certificate.
Antoine Pitrou29619b22010-04-22 18:43:31 +00001614 # SHA256 was added in OpenSSL 0.9.8
1615 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1616 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001617 # sha256.tbs-internet.com needs SNI to use the correct certificate
1618 if not ssl.HAS_SNI:
1619 self.skipTest("SNI needed for this test")
Victor Stinnerf332abb2011-01-08 03:16:05 +00001620 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1621 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001622 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Antoine Pitrou160fd932011-01-08 10:23:29 +00001623 with support.transient_internet("sha256.tbs-internet.com"):
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001624 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1625 ctx.verify_mode = ssl.CERT_REQUIRED
1626 ctx.load_verify_locations(sha256_cert)
1627 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1628 server_hostname="sha256.tbs-internet.com")
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001629 try:
1630 s.connect(remote)
1631 if support.verbose:
1632 sys.stdout.write("\nCipher with %r is %r\n" %
1633 (remote, s.cipher()))
1634 sys.stdout.write("Certificate is:\n%s\n" %
1635 pprint.pformat(s.getpeercert()))
1636 finally:
1637 s.close()
1638
Christian Heimes9a5395a2013-06-17 15:44:12 +02001639 def test_get_ca_certs_capath(self):
1640 # capath certs are loaded on request
1641 with support.transient_internet("svn.python.org"):
1642 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1643 ctx.verify_mode = ssl.CERT_REQUIRED
1644 ctx.load_verify_locations(capath=CAPATH)
1645 self.assertEqual(ctx.get_ca_certs(), [])
1646 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1647 s.connect(("svn.python.org", 443))
1648 try:
1649 cert = s.getpeercert()
1650 self.assertTrue(cert)
1651 finally:
1652 s.close()
1653 self.assertEqual(len(ctx.get_ca_certs()), 1)
1654
Christian Heimes575596e2013-12-15 21:49:17 +01001655 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001656 def test_context_setget(self):
1657 # Check that the context of a connected socket can be replaced.
1658 with support.transient_internet("svn.python.org"):
1659 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1660 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1661 s = socket.socket(socket.AF_INET)
1662 with ctx1.wrap_socket(s) as ss:
1663 ss.connect(("svn.python.org", 443))
1664 self.assertIs(ss.context, ctx1)
1665 self.assertIs(ss._sslobj.context, ctx1)
1666 ss.context = ctx2
1667 self.assertIs(ss.context, ctx2)
1668 self.assertIs(ss._sslobj.context, ctx2)
1669
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001670
1671class NetworkedBIOTests(unittest.TestCase):
1672
1673 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1674 # A simple IO loop. Call func(*args) depending on the error we get
1675 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1676 timeout = kwargs.get('timeout', 10)
1677 count = 0
1678 while True:
1679 errno = None
1680 count += 1
1681 try:
1682 ret = func(*args)
1683 except ssl.SSLError as e:
1684 # Note that we get a spurious -1/SSL_ERROR_SYSCALL for
1685 # non-blocking IO. The SSL_shutdown manpage hints at this.
1686 # It *should* be safe to just ignore SYS_ERROR_SYSCALL because
1687 # with a Memory BIO there's no syscalls (for IO at least).
1688 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
1689 ssl.SSL_ERROR_WANT_WRITE,
1690 ssl.SSL_ERROR_SYSCALL):
1691 raise
1692 errno = e.errno
1693 # Get any data from the outgoing BIO irrespective of any error, and
1694 # send it to the socket.
1695 buf = outgoing.read()
1696 sock.sendall(buf)
1697 # If there's no error, we're done. For WANT_READ, we need to get
1698 # data from the socket and put it in the incoming BIO.
1699 if errno is None:
1700 break
1701 elif errno == ssl.SSL_ERROR_WANT_READ:
1702 buf = sock.recv(32768)
1703 if buf:
1704 incoming.write(buf)
1705 else:
1706 incoming.write_eof()
1707 if support.verbose:
1708 sys.stdout.write("Needed %d calls to complete %s().\n"
1709 % (count, func.__name__))
1710 return ret
1711
1712 def test_handshake(self):
1713 with support.transient_internet("svn.python.org"):
1714 sock = socket.socket(socket.AF_INET)
1715 sock.connect(("svn.python.org", 443))
1716 incoming = ssl.MemoryBIO()
1717 outgoing = ssl.MemoryBIO()
1718 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1719 ctx.verify_mode = ssl.CERT_REQUIRED
1720 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
Benjamin Petersonf9284ae2014-11-23 17:06:39 -06001721 ctx.check_hostname = True
1722 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'svn.python.org')
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001723 self.assertIs(sslobj._sslobj.owner, sslobj)
1724 self.assertIsNone(sslobj.cipher())
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001725 self.assertIsNone(sslobj.shared_ciphers())
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001726 self.assertRaises(ValueError, sslobj.getpeercert)
1727 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1728 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1729 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1730 self.assertTrue(sslobj.cipher())
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001731 self.assertIsNone(sslobj.shared_ciphers())
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001732 self.assertTrue(sslobj.getpeercert())
1733 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1734 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1735 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1736 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1737 sock.close()
1738
1739 def test_read_write_data(self):
1740 with support.transient_internet("svn.python.org"):
1741 sock = socket.socket(socket.AF_INET)
1742 sock.connect(("svn.python.org", 443))
1743 incoming = ssl.MemoryBIO()
1744 outgoing = ssl.MemoryBIO()
1745 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1746 ctx.verify_mode = ssl.CERT_NONE
1747 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1748 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1749 req = b'GET / HTTP/1.0\r\n\r\n'
1750 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1751 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1752 self.assertEqual(buf[:5], b'HTTP/')
1753 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1754 sock.close()
1755
1756
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001757try:
1758 import threading
1759except ImportError:
1760 _have_threads = False
1761else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001762 _have_threads = True
1763
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001764 from test.ssl_servers import make_https_server
1765
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001766 class ThreadedEchoServer(threading.Thread):
1767
1768 class ConnectionHandler(threading.Thread):
1769
1770 """A mildly complicated class, because we want it to work both
1771 with and without the SSL wrapper around the socket connection, so
1772 that we can test the STARTTLS functionality."""
1773
Bill Janssen6e027db2007-11-15 22:23:56 +00001774 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001775 self.server = server
1776 self.running = False
1777 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001778 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001779 self.sock.setblocking(1)
1780 self.sslconn = None
1781 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001782 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001783
Antoine Pitrou480a1242010-04-28 21:37:09 +00001784 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001785 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001786 self.sslconn = self.server.context.wrap_socket(
1787 self.sock, server_side=True)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001788 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1789 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001790 except (ssl.SSLError, ConnectionResetError) as e:
1791 # We treat ConnectionResetError as though it were an
1792 # SSLError - OpenSSL on Ubuntu abruptly closes the
1793 # connection when asked to use an unsupported protocol.
1794 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001795 # XXX Various errors can have happened here, for example
1796 # a mismatching protocol version, an invalid certificate,
1797 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001798 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001799 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001800 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001801 self.running = False
1802 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001803 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001804 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001805 else:
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001806 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
Antoine Pitroub5218772010-05-21 09:56:06 +00001807 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001808 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001809 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001810 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1811 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001812 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001813 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1814 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001815 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001816 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001817 sys.stdout.write(" server: selected protocol is now "
1818 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001819 return True
1820
1821 def read(self):
1822 if self.sslconn:
1823 return self.sslconn.read()
1824 else:
1825 return self.sock.recv(1024)
1826
1827 def write(self, bytes):
1828 if self.sslconn:
1829 return self.sslconn.write(bytes)
1830 else:
1831 return self.sock.send(bytes)
1832
1833 def close(self):
1834 if self.sslconn:
1835 self.sslconn.close()
1836 else:
1837 self.sock.close()
1838
Antoine Pitrou480a1242010-04-28 21:37:09 +00001839 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001840 self.running = True
1841 if not self.server.starttls_server:
1842 if not self.wrap_conn():
1843 return
1844 while self.running:
1845 try:
1846 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001847 stripped = msg.strip()
1848 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001849 # eof, so quit this handler
1850 self.running = False
1851 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001852 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001853 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001854 sys.stdout.write(" server: client closed connection\n")
1855 self.close()
1856 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001857 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001858 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001859 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001860 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001861 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001862 if not self.wrap_conn():
1863 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001864 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001865 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001866 if support.verbose and self.server.connectionchatty:
1867 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001868 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001869 self.sock = self.sslconn.unwrap()
1870 self.sslconn = None
1871 if support.verbose and self.server.connectionchatty:
1872 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001873 elif stripped == b'CB tls-unique':
1874 if support.verbose and self.server.connectionchatty:
1875 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1876 data = self.sslconn.get_channel_binding("tls-unique")
1877 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001878 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001879 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001880 self.server.connectionchatty):
1881 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001882 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1883 % (msg, ctype, msg.lower(), ctype))
1884 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001885 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001886 if self.server.chatty:
1887 handle_error("Test server failure:\n")
1888 self.close()
1889 self.running = False
1890 # normally, we'd just stop here, but for the test
1891 # harness, we want to stop the server
1892 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001893
Antoine Pitroub5218772010-05-21 09:56:06 +00001894 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001895 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001896 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersoncca27322015-01-23 16:35:37 -05001897 npn_protocols=None, alpn_protocols=None,
1898 ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001899 if context:
1900 self.context = context
1901 else:
1902 self.context = ssl.SSLContext(ssl_version
1903 if ssl_version is not None
1904 else ssl.PROTOCOL_TLSv1)
1905 self.context.verify_mode = (certreqs if certreqs is not None
1906 else ssl.CERT_NONE)
1907 if cacerts:
1908 self.context.load_verify_locations(cacerts)
1909 if certificate:
1910 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001911 if npn_protocols:
1912 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001913 if alpn_protocols:
1914 self.context.set_alpn_protocols(alpn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001915 if ciphers:
1916 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001917 self.chatty = chatty
1918 self.connectionchatty = connectionchatty
1919 self.starttls_server = starttls_server
1920 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001921 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001922 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001923 self.active = False
Benjamin Petersoncca27322015-01-23 16:35:37 -05001924 self.selected_npn_protocols = []
1925 self.selected_alpn_protocols = []
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001926 self.shared_ciphers = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001927 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001928 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001929 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001930
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001931 def __enter__(self):
1932 self.start(threading.Event())
1933 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001934 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001935
1936 def __exit__(self, *args):
1937 self.stop()
1938 self.join()
1939
Antoine Pitrou480a1242010-04-28 21:37:09 +00001940 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001941 self.flag = flag
1942 threading.Thread.start(self)
1943
Antoine Pitrou480a1242010-04-28 21:37:09 +00001944 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001945 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01001946 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001947 self.active = True
1948 if self.flag:
1949 # signal an event
1950 self.flag.set()
1951 while self.active:
1952 try:
1953 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001954 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001955 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001956 + repr(connaddr) + '\n')
1957 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001958 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001959 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001960 except socket.timeout:
1961 pass
1962 except KeyboardInterrupt:
1963 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001964 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001965
Antoine Pitrou480a1242010-04-28 21:37:09 +00001966 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001967 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001968
Bill Janssen54cc54c2007-12-14 22:08:56 +00001969 class AsyncoreEchoServer(threading.Thread):
1970
1971 # this one's based on asyncore.dispatcher
1972
1973 class EchoServer (asyncore.dispatcher):
1974
1975 class ConnectionHandler (asyncore.dispatcher_with_send):
1976
1977 def __init__(self, conn, certfile):
1978 self.socket = ssl.wrap_socket(conn, server_side=True,
1979 certfile=certfile,
1980 do_handshake_on_connect=False)
1981 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001982 self._ssl_accepting = True
1983 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001984
1985 def readable(self):
1986 if isinstance(self.socket, ssl.SSLSocket):
1987 while self.socket.pending() > 0:
1988 self.handle_read_event()
1989 return True
1990
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001991 def _do_ssl_handshake(self):
1992 try:
1993 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02001994 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1995 return
1996 except ssl.SSLEOFError:
1997 return self.handle_close()
1998 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001999 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002000 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002001 if err.args[0] == errno.ECONNABORTED:
2002 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002003 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002004 self._ssl_accepting = False
2005
2006 def handle_read(self):
2007 if self._ssl_accepting:
2008 self._do_ssl_handshake()
2009 else:
2010 data = self.recv(1024)
2011 if support.verbose:
2012 sys.stdout.write(" server: read %s from client\n" % repr(data))
2013 if not data:
2014 self.close()
2015 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002016 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002017
2018 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00002019 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002020 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002021 sys.stdout.write(" server: closed connection %s\n" % self.socket)
2022
2023 def handle_error(self):
2024 raise
2025
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002026 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002027 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002028 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2029 self.port = support.bind_port(sock, '')
2030 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002031 self.listen(5)
2032
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00002033 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002034 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002035 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2036 self.ConnectionHandler(sock_obj, self.certfile)
2037
2038 def handle_error(self):
2039 raise
2040
Trent Nelson78520002008-04-10 20:54:35 +00002041 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002042 self.flag = None
2043 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002044 self.server = self.EchoServer(certfile)
2045 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00002046 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002047 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002048
2049 def __str__(self):
2050 return "<%s %s>" % (self.__class__.__name__, self.server)
2051
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002052 def __enter__(self):
2053 self.start(threading.Event())
2054 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002055 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002056
2057 def __exit__(self, *args):
2058 if support.verbose:
2059 sys.stdout.write(" cleanup: stopping server.\n")
2060 self.stop()
2061 if support.verbose:
2062 sys.stdout.write(" cleanup: joining server thread.\n")
2063 self.join()
2064 if support.verbose:
2065 sys.stdout.write(" cleanup: successfully joined.\n")
2066
Bill Janssen54cc54c2007-12-14 22:08:56 +00002067 def start (self, flag=None):
2068 self.flag = flag
2069 threading.Thread.start(self)
2070
Antoine Pitrou480a1242010-04-28 21:37:09 +00002071 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002072 self.active = True
2073 if self.flag:
2074 self.flag.set()
2075 while self.active:
2076 try:
2077 asyncore.loop(1)
2078 except:
2079 pass
2080
Antoine Pitrou480a1242010-04-28 21:37:09 +00002081 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002082 self.active = False
2083 self.server.close()
2084
Antoine Pitrou480a1242010-04-28 21:37:09 +00002085 def bad_cert_test(certfile):
2086 """
2087 Launch a server with CERT_REQUIRED, and check that trying to
2088 connect to it with the given client certificate fails.
2089 """
Trent Nelson78520002008-04-10 20:54:35 +00002090 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002091 certreqs=ssl.CERT_REQUIRED,
Bill Janssen6e027db2007-11-15 22:23:56 +00002092 cacerts=CERTFILE, chatty=False,
2093 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002094 with server:
Thomas Woutersed03b412007-08-28 21:37:11 +00002095 try:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002096 with socket.socket() as sock:
2097 s = ssl.wrap_socket(sock,
2098 certfile=certfile,
2099 ssl_version=ssl.PROTOCOL_TLSv1)
2100 s.connect((HOST, server.port))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002101 except ssl.SSLError as x:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002102 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002103 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
Andrew Svetlov0832af62012-12-18 23:10:48 +02002104 except OSError as x:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002105 if support.verbose:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002106 sys.stdout.write("\nOSError is %s\n" % x.args[1])
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002107 except OSError as x:
Giampaolo Rodolàcd9dfb92010-08-29 20:56:56 +00002108 if x.errno != errno.ENOENT:
2109 raise
Giampaolo Rodolà745ab382010-08-29 19:25:49 +00002110 if support.verbose:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002111 sys.stdout.write("\OSError is %s\n" % str(x))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002112 else:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002113 raise AssertionError("Use of invalid cert should have failed!")
Thomas Woutersed03b412007-08-28 21:37:11 +00002114
Antoine Pitroub5218772010-05-21 09:56:06 +00002115 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002116 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00002117 """
2118 Launch a server, connect a client to it and try various reads
2119 and writes.
2120 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002121 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00002122 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002123 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00002124 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002125 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002126 with client_context.wrap_socket(socket.socket(),
2127 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002128 s.connect((HOST, server.port))
2129 for arg in [indata, bytearray(indata), memoryview(indata)]:
2130 if connectionchatty:
2131 if support.verbose:
2132 sys.stdout.write(
2133 " client: sending %r...\n" % indata)
2134 s.write(arg)
2135 outdata = s.read()
2136 if connectionchatty:
2137 if support.verbose:
2138 sys.stdout.write(" client: read %r\n" % outdata)
2139 if outdata != indata.lower():
2140 raise AssertionError(
2141 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2142 % (outdata[:20], len(outdata),
2143 indata[:20].lower(), len(indata)))
2144 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00002145 if connectionchatty:
2146 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002147 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002148 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01002149 'compression': s.compression(),
2150 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002151 'peercert': s.getpeercert(),
Benjamin Petersoncca27322015-01-23 16:35:37 -05002152 'client_alpn_protocol': s.selected_alpn_protocol(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02002153 'client_npn_protocol': s.selected_npn_protocol(),
2154 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002155 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01002156 s.close()
Benjamin Petersoncca27322015-01-23 16:35:37 -05002157 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2158 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Peterson4cb17812015-01-07 11:14:26 -06002159 stats['server_shared_ciphers'] = server.shared_ciphers
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002160 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00002161
Antoine Pitroub5218772010-05-21 09:56:06 +00002162 def try_protocol_combo(server_protocol, client_protocol, expect_success,
2163 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02002164 """
2165 Try to SSL-connect using *client_protocol* to *server_protocol*.
2166 If *expect_success* is true, assert that the connection succeeds,
2167 if it's false, assert that the connection fails.
2168 Also, if *expect_success* is a string, assert that it is the protocol
2169 version actually used by the connection.
2170 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00002171 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002172 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00002173 certtype = {
2174 ssl.CERT_NONE: "CERT_NONE",
2175 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2176 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2177 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002178 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002179 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002180 sys.stdout.write(formatstr %
2181 (ssl.get_protocol_name(client_protocol),
2182 ssl.get_protocol_name(server_protocol),
2183 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00002184 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002185 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00002186 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002187 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002188
2189 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2190 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2191 # starting from OpenSSL 1.0.0 (see issue #8322).
2192 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2193 client_context.set_ciphers("ALL")
2194
Antoine Pitroub5218772010-05-21 09:56:06 +00002195 for ctx in (client_context, server_context):
2196 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00002197 ctx.load_cert_chain(CERTFILE)
2198 ctx.load_verify_locations(CERTFILE)
2199 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02002200 stats = server_params_test(client_context, server_context,
2201 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002202 # Protocol mismatch can result in either an SSLError, or a
2203 # "Connection reset by peer" error.
2204 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002205 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002206 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002207 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002208 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002209 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002210 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002211 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002212 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002213 "Client protocol %s succeeded with server protocol %s!"
2214 % (ssl.get_protocol_name(client_protocol),
2215 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002216 elif (expect_success is not True
2217 and expect_success != stats['version']):
2218 raise AssertionError("version mismatch: expected %r, got %r"
2219 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002220
2221
Bill Janssen6e027db2007-11-15 22:23:56 +00002222 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002223
Antoine Pitrou23df4832010-08-04 17:14:06 +00002224 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002225 def test_echo(self):
2226 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002227 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002228 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002229 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002230 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2231 context = ssl.SSLContext(protocol)
2232 context.load_cert_chain(CERTFILE)
2233 server_params_test(context, context,
2234 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002235
Antoine Pitrou480a1242010-04-28 21:37:09 +00002236 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002237 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002238 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002239 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2240 context.verify_mode = ssl.CERT_REQUIRED
2241 context.load_verify_locations(CERTFILE)
2242 context.load_cert_chain(CERTFILE)
2243 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002244 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002245 s = context.wrap_socket(socket.socket(),
2246 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002247 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002248 # getpeercert() raise ValueError while the handshake isn't
2249 # done.
2250 with self.assertRaises(ValueError):
2251 s.getpeercert()
2252 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002253 cert = s.getpeercert()
2254 self.assertTrue(cert, "Can't get peer certificate.")
2255 cipher = s.cipher()
2256 if support.verbose:
2257 sys.stdout.write(pprint.pformat(cert) + '\n')
2258 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2259 if 'subject' not in cert:
2260 self.fail("No subject field in certificate: %s." %
2261 pprint.pformat(cert))
2262 if ((('organizationName', 'Python Software Foundation'),)
2263 not in cert['subject']):
2264 self.fail(
2265 "Missing or invalid 'organizationName' field in certificate subject; "
2266 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002267 self.assertIn('notBefore', cert)
2268 self.assertIn('notAfter', cert)
2269 before = ssl.cert_time_to_seconds(cert['notBefore'])
2270 after = ssl.cert_time_to_seconds(cert['notAfter'])
2271 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002272 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002273
Christian Heimes2427b502013-11-23 11:24:32 +01002274 @unittest.skipUnless(have_verify_flags(),
2275 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002276 def test_crl_check(self):
2277 if support.verbose:
2278 sys.stdout.write("\n")
2279
2280 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2281 server_context.load_cert_chain(SIGNED_CERTFILE)
2282
2283 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2284 context.verify_mode = ssl.CERT_REQUIRED
2285 context.load_verify_locations(SIGNING_CA)
Benjamin Petersonc3d9c5c2015-03-04 23:18:48 -05002286 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2287 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01002288
2289 # VERIFY_DEFAULT should pass
2290 server = ThreadedEchoServer(context=server_context, chatty=True)
2291 with server:
2292 with context.wrap_socket(socket.socket()) as s:
2293 s.connect((HOST, server.port))
2294 cert = s.getpeercert()
2295 self.assertTrue(cert, "Can't get peer certificate.")
2296
2297 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002298 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002299
2300 server = ThreadedEchoServer(context=server_context, chatty=True)
2301 with server:
2302 with context.wrap_socket(socket.socket()) as s:
2303 with self.assertRaisesRegex(ssl.SSLError,
2304 "certificate verify failed"):
2305 s.connect((HOST, server.port))
2306
2307 # now load a CRL file. The CRL file is signed by the CA.
2308 context.load_verify_locations(CRLFILE)
2309
2310 server = ThreadedEchoServer(context=server_context, chatty=True)
2311 with server:
2312 with context.wrap_socket(socket.socket()) as s:
2313 s.connect((HOST, server.port))
2314 cert = s.getpeercert()
2315 self.assertTrue(cert, "Can't get peer certificate.")
2316
Christian Heimes1aa9a752013-12-02 02:41:19 +01002317 def test_check_hostname(self):
2318 if support.verbose:
2319 sys.stdout.write("\n")
2320
2321 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2322 server_context.load_cert_chain(SIGNED_CERTFILE)
2323
2324 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2325 context.verify_mode = ssl.CERT_REQUIRED
2326 context.check_hostname = True
2327 context.load_verify_locations(SIGNING_CA)
2328
2329 # correct hostname should verify
2330 server = ThreadedEchoServer(context=server_context, chatty=True)
2331 with server:
2332 with context.wrap_socket(socket.socket(),
2333 server_hostname="localhost") as s:
2334 s.connect((HOST, server.port))
2335 cert = s.getpeercert()
2336 self.assertTrue(cert, "Can't get peer certificate.")
2337
2338 # incorrect hostname should raise an exception
2339 server = ThreadedEchoServer(context=server_context, chatty=True)
2340 with server:
2341 with context.wrap_socket(socket.socket(),
2342 server_hostname="invalid") as s:
2343 with self.assertRaisesRegex(ssl.CertificateError,
2344 "hostname 'invalid' doesn't match 'localhost'"):
2345 s.connect((HOST, server.port))
2346
2347 # missing server_hostname arg should cause an exception, too
2348 server = ThreadedEchoServer(context=server_context, chatty=True)
2349 with server:
2350 with socket.socket() as s:
2351 with self.assertRaisesRegex(ValueError,
2352 "check_hostname requires server_hostname"):
2353 context.wrap_socket(s)
2354
Antoine Pitrou480a1242010-04-28 21:37:09 +00002355 def test_empty_cert(self):
2356 """Connecting with an empty cert file"""
2357 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2358 "nullcert.pem"))
2359 def test_malformed_cert(self):
2360 """Connecting with a badly formatted certificate (syntax error)"""
2361 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2362 "badcert.pem"))
2363 def test_nonexisting_cert(self):
2364 """Connecting with a non-existing cert file"""
2365 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2366 "wrongcert.pem"))
2367 def test_malformed_key(self):
2368 """Connecting with a badly formatted key (syntax error)"""
2369 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2370 "badkey.pem"))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002371
Antoine Pitrou480a1242010-04-28 21:37:09 +00002372 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002373 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002374 in the client when attempting handshake.
2375 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002376 listener_ready = threading.Event()
2377 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002378
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002379 s = socket.socket()
2380 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002381
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002382 # `listener` runs in a thread. It sits in an accept() until
2383 # the main thread connects. Then it rudely closes the socket,
2384 # and sets Event `listener_gone` to let the main thread know
2385 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002386 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002387 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002388 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002389 newsock, addr = s.accept()
2390 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002391 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002392 listener_gone.set()
2393
2394 def connector():
2395 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002396 with socket.socket() as c:
2397 c.connect((HOST, port))
2398 listener_gone.wait()
2399 try:
2400 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002401 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002402 pass
2403 else:
2404 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002405
2406 t = threading.Thread(target=listener)
2407 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002408 try:
2409 connector()
2410 finally:
2411 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002412
Antoine Pitrou23df4832010-08-04 17:14:06 +00002413 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002414 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2415 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002416 def test_protocol_sslv2(self):
2417 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002418 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002419 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002420 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2421 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2422 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002423 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Victor Stinner648b8622014-12-12 12:23:59 +01002424 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2425 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002426 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002427 # SSLv23 client with specific SSL options
2428 if no_sslv2_implies_sslv3_hello():
2429 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2430 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2431 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002432 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002433 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002434 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002435 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002436
Antoine Pitrou23df4832010-08-04 17:14:06 +00002437 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002438 def test_protocol_sslv23(self):
2439 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002440 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002441 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002442 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2443 try:
2444 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002445 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002446 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2447 if support.verbose:
2448 sys.stdout.write(
2449 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2450 % str(x))
Benjamin Petersone32467c2014-12-05 21:59:35 -05002451 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson22293df2014-12-05 22:11:33 -05002452 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou480a1242010-04-28 21:37:09 +00002453 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002454 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002455
Benjamin Petersone32467c2014-12-05 21:59:35 -05002456 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson22293df2014-12-05 22:11:33 -05002457 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002458 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002459 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002460
Benjamin Petersone32467c2014-12-05 21:59:35 -05002461 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson22293df2014-12-05 22:11:33 -05002462 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002463 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002464 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002465
Antoine Pitroub5218772010-05-21 09:56:06 +00002466 # Server with specific SSL options
Benjamin Petersone32467c2014-12-05 21:59:35 -05002467 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2468 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002469 server_options=ssl.OP_NO_SSLv3)
2470 # Will choose TLSv1
2471 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2472 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2473 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2474 server_options=ssl.OP_NO_TLSv1)
2475
2476
Antoine Pitrou23df4832010-08-04 17:14:06 +00002477 @skip_if_broken_ubuntu_ssl
Benjamin Petersone32467c2014-12-05 21:59:35 -05002478 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2479 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002480 def test_protocol_sslv3(self):
2481 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002482 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002483 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002484 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2485 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2486 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002487 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2488 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002489 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2490 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002491 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002492 if no_sslv2_implies_sslv3_hello():
2493 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Antoine Pitrou47e40422014-09-04 21:00:10 +02002494 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Antoine Pitroub5218772010-05-21 09:56:06 +00002495 client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002496
Antoine Pitrou23df4832010-08-04 17:14:06 +00002497 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002498 def test_protocol_tlsv1(self):
2499 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002500 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002501 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002502 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2503 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2504 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002505 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2506 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002507 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2508 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002509 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2510 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002511
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002512 @skip_if_broken_ubuntu_ssl
2513 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2514 "TLS version 1.1 not supported.")
2515 def test_protocol_tlsv1_1(self):
2516 """Connecting to a TLSv1.1 server with various client options.
2517 Testing against older TLS versions."""
2518 if support.verbose:
2519 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002520 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002521 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2522 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002523 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2524 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002525 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2526 client_options=ssl.OP_NO_TLSv1_1)
2527
Antoine Pitrou47e40422014-09-04 21:00:10 +02002528 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002529 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2530 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2531
2532
2533 @skip_if_broken_ubuntu_ssl
2534 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2535 "TLS version 1.2 not supported.")
2536 def test_protocol_tlsv1_2(self):
2537 """Connecting to a TLSv1.2 server with various client options.
2538 Testing against older TLS versions."""
2539 if support.verbose:
2540 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002541 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002542 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2543 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2544 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2545 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002546 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2547 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002548 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2549 client_options=ssl.OP_NO_TLSv1_2)
2550
Antoine Pitrou47e40422014-09-04 21:00:10 +02002551 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002552 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2553 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2554 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2555 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2556
Antoine Pitrou480a1242010-04-28 21:37:09 +00002557 def test_starttls(self):
2558 """Switching from clear text to encrypted and back again."""
2559 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002560
Trent Nelson78520002008-04-10 20:54:35 +00002561 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002562 ssl_version=ssl.PROTOCOL_TLSv1,
2563 starttls_server=True,
2564 chatty=True,
2565 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002566 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002567 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002568 s = socket.socket()
2569 s.setblocking(1)
2570 s.connect((HOST, server.port))
2571 if support.verbose:
2572 sys.stdout.write("\n")
2573 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002574 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002575 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002576 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002577 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002578 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002579 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002580 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002581 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002582 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002583 msg = outdata.strip().lower()
2584 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2585 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002586 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002587 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002588 " client: read %r from server, starting TLS...\n"
2589 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002590 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2591 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002592 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2593 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002594 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002595 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002596 " client: read %r from server, ending TLS...\n"
2597 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002598 s = conn.unwrap()
2599 wrapped = False
2600 else:
2601 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002602 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002603 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002604 if support.verbose:
2605 sys.stdout.write(" client: closing connection.\n")
2606 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002607 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002608 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002609 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002610 if wrapped:
2611 conn.close()
2612 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002613 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002614
Antoine Pitrou480a1242010-04-28 21:37:09 +00002615 def test_socketserver(self):
2616 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002617 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002618 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002619 if support.verbose:
2620 sys.stdout.write('\n')
2621 with open(CERTFILE, 'rb') as f:
2622 d1 = f.read()
2623 d2 = ''
2624 # now fetch the same data from the HTTPS server
Benjamin Peterson4ffb0752014-11-03 14:29:33 -05002625 url = 'https://localhost:%d/%s' % (
2626 server.port, os.path.split(CERTFILE)[1])
2627 context = ssl.create_default_context(cafile=CERTFILE)
2628 f = urllib.request.urlopen(url, context=context)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002629 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002630 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002631 if dlen and (int(dlen) > 0):
2632 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002633 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002634 sys.stdout.write(
2635 " client: read %d bytes from remote server '%s'\n"
2636 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002637 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002638 f.close()
2639 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002640
Antoine Pitrou480a1242010-04-28 21:37:09 +00002641 def test_asyncore_server(self):
2642 """Check the example asyncore integration."""
2643 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002644
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002645 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002646 sys.stdout.write("\n")
2647
Antoine Pitrou480a1242010-04-28 21:37:09 +00002648 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002649 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002650 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002651 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002652 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002653 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002654 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002655 " client: sending %r...\n" % indata)
2656 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002657 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002658 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002659 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002660 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002661 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002662 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2663 % (outdata[:20], len(outdata),
2664 indata[:20].lower(), len(indata)))
2665 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002666 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002667 sys.stdout.write(" client: closing connection.\n")
2668 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002669 if support.verbose:
2670 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002671
Antoine Pitrou480a1242010-04-28 21:37:09 +00002672 def test_recv_send(self):
2673 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002674 if support.verbose:
2675 sys.stdout.write("\n")
2676
2677 server = ThreadedEchoServer(CERTFILE,
2678 certreqs=ssl.CERT_NONE,
2679 ssl_version=ssl.PROTOCOL_TLSv1,
2680 cacerts=CERTFILE,
2681 chatty=True,
2682 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002683 with server:
2684 s = ssl.wrap_socket(socket.socket(),
2685 server_side=False,
2686 certfile=CERTFILE,
2687 ca_certs=CERTFILE,
2688 cert_reqs=ssl.CERT_NONE,
2689 ssl_version=ssl.PROTOCOL_TLSv1)
2690 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002691 # helper methods for standardising recv* method signatures
2692 def _recv_into():
2693 b = bytearray(b"\0"*100)
2694 count = s.recv_into(b)
2695 return b[:count]
2696
2697 def _recvfrom_into():
2698 b = bytearray(b"\0"*100)
2699 count, addr = s.recvfrom_into(b)
2700 return b[:count]
2701
2702 # (name, method, whether to expect success, *args)
2703 send_methods = [
2704 ('send', s.send, True, []),
2705 ('sendto', s.sendto, False, ["some.address"]),
2706 ('sendall', s.sendall, True, []),
2707 ]
2708 recv_methods = [
2709 ('recv', s.recv, True, []),
2710 ('recvfrom', s.recvfrom, False, ["some.address"]),
2711 ('recv_into', _recv_into, True, []),
2712 ('recvfrom_into', _recvfrom_into, False, []),
2713 ]
2714 data_prefix = "PREFIX_"
2715
2716 for meth_name, send_meth, expect_success, args in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002717 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002718 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002719 send_meth(indata, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002720 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002721 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002722 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002723 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002724 "<<{outdata:r}>> ({nout:d}) received; "
2725 "expected <<{indata:r}>> ({nin:d})\n".format(
2726 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002727 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002728 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002729 )
2730 )
2731 except ValueError as e:
2732 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002733 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002734 "Failed to send with method <<{name:s}>>; "
2735 "expected to succeed.\n".format(name=meth_name)
2736 )
2737 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002738 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002739 "Method <<{name:s}>> failed with unexpected "
2740 "exception message: {exp:s}\n".format(
2741 name=meth_name, exp=e
2742 )
2743 )
2744
2745 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002746 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002747 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002748 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002749 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002750 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002751 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002752 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002753 "<<{outdata:r}>> ({nout:d}) received; "
2754 "expected <<{indata:r}>> ({nin:d})\n".format(
2755 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002756 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002757 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002758 )
2759 )
2760 except ValueError as e:
2761 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002762 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002763 "Failed to receive with method <<{name:s}>>; "
2764 "expected to succeed.\n".format(name=meth_name)
2765 )
2766 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002767 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002768 "Method <<{name:s}>> failed with unexpected "
2769 "exception message: {exp:s}\n".format(
2770 name=meth_name, exp=e
2771 )
2772 )
2773 # consume data
2774 s.read()
2775
Nick Coghlan513886a2011-08-28 00:00:27 +10002776 # Make sure sendmsg et al are disallowed to avoid
2777 # inadvertent disclosure of data and/or corruption
2778 # of the encrypted data stream
2779 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2780 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2781 self.assertRaises(NotImplementedError,
2782 s.recvmsg_into, bytearray(100))
2783
Antoine Pitrou480a1242010-04-28 21:37:09 +00002784 s.write(b"over\n")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002785 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002786
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002787 def test_nonblocking_send(self):
2788 server = ThreadedEchoServer(CERTFILE,
2789 certreqs=ssl.CERT_NONE,
2790 ssl_version=ssl.PROTOCOL_TLSv1,
2791 cacerts=CERTFILE,
2792 chatty=True,
2793 connectionchatty=False)
2794 with server:
2795 s = ssl.wrap_socket(socket.socket(),
2796 server_side=False,
2797 certfile=CERTFILE,
2798 ca_certs=CERTFILE,
2799 cert_reqs=ssl.CERT_NONE,
2800 ssl_version=ssl.PROTOCOL_TLSv1)
2801 s.connect((HOST, server.port))
2802 s.setblocking(False)
2803
2804 # If we keep sending data, at some point the buffers
2805 # will be full and the call will block
2806 buf = bytearray(8192)
2807 def fill_buffer():
2808 while True:
2809 s.send(buf)
2810 self.assertRaises((ssl.SSLWantWriteError,
2811 ssl.SSLWantReadError), fill_buffer)
2812
2813 # Now read all the output and discard it
2814 s.setblocking(True)
2815 s.close()
2816
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002817 def test_handshake_timeout(self):
2818 # Issue #5103: SSL handshake must respect the socket timeout
2819 server = socket.socket(socket.AF_INET)
2820 host = "127.0.0.1"
2821 port = support.bind_port(server)
2822 started = threading.Event()
2823 finish = False
2824
2825 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002826 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002827 started.set()
2828 conns = []
2829 while not finish:
2830 r, w, e = select.select([server], [], [], 0.1)
2831 if server in r:
2832 # Let the socket hang around rather than having
2833 # it closed by garbage collection.
2834 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002835 for sock in conns:
2836 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002837
2838 t = threading.Thread(target=serve)
2839 t.start()
2840 started.wait()
2841
2842 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002843 try:
2844 c = socket.socket(socket.AF_INET)
2845 c.settimeout(0.2)
2846 c.connect((host, port))
2847 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002848 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002849 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002850 finally:
2851 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002852 try:
2853 c = socket.socket(socket.AF_INET)
2854 c = ssl.wrap_socket(c)
2855 c.settimeout(0.2)
2856 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002857 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002858 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002859 finally:
2860 c.close()
2861 finally:
2862 finish = True
2863 t.join()
2864 server.close()
2865
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002866 def test_server_accept(self):
2867 # Issue #16357: accept() on a SSLSocket created through
2868 # SSLContext.wrap_socket().
2869 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2870 context.verify_mode = ssl.CERT_REQUIRED
2871 context.load_verify_locations(CERTFILE)
2872 context.load_cert_chain(CERTFILE)
2873 server = socket.socket(socket.AF_INET)
2874 host = "127.0.0.1"
2875 port = support.bind_port(server)
2876 server = context.wrap_socket(server, server_side=True)
2877
2878 evt = threading.Event()
2879 remote = None
2880 peer = None
2881 def serve():
2882 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002883 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002884 # Block on the accept and wait on the connection to close.
2885 evt.set()
2886 remote, peer = server.accept()
2887 remote.recv(1)
2888
2889 t = threading.Thread(target=serve)
2890 t.start()
2891 # Client wait until server setup and perform a connect.
2892 evt.wait()
2893 client = context.wrap_socket(socket.socket())
2894 client.connect((host, port))
2895 client_addr = client.getsockname()
2896 client.close()
2897 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002898 remote.close()
2899 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002900 # Sanity checks.
2901 self.assertIsInstance(remote, ssl.SSLSocket)
2902 self.assertEqual(peer, client_addr)
2903
Antoine Pitrou242db722013-05-01 20:52:07 +02002904 def test_getpeercert_enotconn(self):
2905 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2906 with context.wrap_socket(socket.socket()) as sock:
2907 with self.assertRaises(OSError) as cm:
2908 sock.getpeercert()
2909 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2910
2911 def test_do_handshake_enotconn(self):
2912 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2913 with context.wrap_socket(socket.socket()) as sock:
2914 with self.assertRaises(OSError) as cm:
2915 sock.do_handshake()
2916 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2917
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002918 def test_default_ciphers(self):
2919 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2920 try:
2921 # Force a set of weak ciphers on our client context
2922 context.set_ciphers("DES")
2923 except ssl.SSLError:
2924 self.skipTest("no DES cipher available")
2925 with ThreadedEchoServer(CERTFILE,
2926 ssl_version=ssl.PROTOCOL_SSLv23,
2927 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002928 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002929 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002930 s.connect((HOST, server.port))
2931 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2932
Antoine Pitrou47e40422014-09-04 21:00:10 +02002933 def test_version_basic(self):
2934 """
2935 Basic tests for SSLSocket.version().
2936 More tests are done in the test_protocol_*() methods.
2937 """
2938 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2939 with ThreadedEchoServer(CERTFILE,
2940 ssl_version=ssl.PROTOCOL_TLSv1,
2941 chatty=False) as server:
2942 with context.wrap_socket(socket.socket()) as s:
2943 self.assertIs(s.version(), None)
2944 s.connect((HOST, server.port))
2945 self.assertEqual(s.version(), "TLSv1")
2946 self.assertIs(s.version(), None)
2947
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002948 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2949 def test_default_ecdh_curve(self):
2950 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2951 # should be enabled by default on SSL contexts.
2952 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2953 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02002954 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2955 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2956 # our default cipher list should prefer ECDH-based ciphers
2957 # automatically.
2958 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2959 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002960 with ThreadedEchoServer(context=context) as server:
2961 with context.wrap_socket(socket.socket()) as s:
2962 s.connect((HOST, server.port))
2963 self.assertIn("ECDH", s.cipher()[0])
2964
Antoine Pitroud6494802011-07-21 01:11:30 +02002965 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2966 "'tls-unique' channel binding not available")
2967 def test_tls_unique_channel_binding(self):
2968 """Test tls-unique channel binding."""
2969 if support.verbose:
2970 sys.stdout.write("\n")
2971
2972 server = ThreadedEchoServer(CERTFILE,
2973 certreqs=ssl.CERT_NONE,
2974 ssl_version=ssl.PROTOCOL_TLSv1,
2975 cacerts=CERTFILE,
2976 chatty=True,
2977 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01002978 with server:
2979 s = ssl.wrap_socket(socket.socket(),
2980 server_side=False,
2981 certfile=CERTFILE,
2982 ca_certs=CERTFILE,
2983 cert_reqs=ssl.CERT_NONE,
2984 ssl_version=ssl.PROTOCOL_TLSv1)
2985 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002986 # get the data
2987 cb_data = s.get_channel_binding("tls-unique")
2988 if support.verbose:
2989 sys.stdout.write(" got channel binding data: {0!r}\n"
2990 .format(cb_data))
2991
2992 # check if it is sane
2993 self.assertIsNotNone(cb_data)
2994 self.assertEqual(len(cb_data), 12) # True for TLSv1
2995
2996 # and compare with the peers version
2997 s.write(b"CB tls-unique\n")
2998 peer_data_repr = s.read().strip()
2999 self.assertEqual(peer_data_repr,
3000 repr(cb_data).encode("us-ascii"))
3001 s.close()
3002
3003 # now, again
3004 s = ssl.wrap_socket(socket.socket(),
3005 server_side=False,
3006 certfile=CERTFILE,
3007 ca_certs=CERTFILE,
3008 cert_reqs=ssl.CERT_NONE,
3009 ssl_version=ssl.PROTOCOL_TLSv1)
3010 s.connect((HOST, server.port))
3011 new_cb_data = s.get_channel_binding("tls-unique")
3012 if support.verbose:
3013 sys.stdout.write(" got another channel binding data: {0!r}\n"
3014 .format(new_cb_data))
3015 # is it really unique
3016 self.assertNotEqual(cb_data, new_cb_data)
3017 self.assertIsNotNone(cb_data)
3018 self.assertEqual(len(cb_data), 12) # True for TLSv1
3019 s.write(b"CB tls-unique\n")
3020 peer_data_repr = s.read().strip()
3021 self.assertEqual(peer_data_repr,
3022 repr(new_cb_data).encode("us-ascii"))
3023 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00003024
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003025 def test_compression(self):
3026 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3027 context.load_cert_chain(CERTFILE)
3028 stats = server_params_test(context, context,
3029 chatty=True, connectionchatty=True)
3030 if support.verbose:
3031 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3032 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3033
3034 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3035 "ssl.OP_NO_COMPRESSION needed for this test")
3036 def test_compression_disabled(self):
3037 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3038 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01003039 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003040 stats = server_params_test(context, context,
3041 chatty=True, connectionchatty=True)
3042 self.assertIs(stats['compression'], None)
3043
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003044 def test_dh_params(self):
3045 # Check we can get a connection with ephemeral Diffie-Hellman
3046 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3047 context.load_cert_chain(CERTFILE)
3048 context.load_dh_params(DHFILE)
3049 context.set_ciphers("kEDH")
3050 stats = server_params_test(context, context,
3051 chatty=True, connectionchatty=True)
3052 cipher = stats["cipher"][0]
3053 parts = cipher.split("-")
3054 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3055 self.fail("Non-DH cipher: " + cipher[0])
3056
Benjamin Petersoncca27322015-01-23 16:35:37 -05003057 def test_selected_alpn_protocol(self):
3058 # selected_alpn_protocol() is None unless ALPN is used.
3059 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3060 context.load_cert_chain(CERTFILE)
3061 stats = server_params_test(context, context,
3062 chatty=True, connectionchatty=True)
3063 self.assertIs(stats['client_alpn_protocol'], None)
3064
3065 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3066 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3067 # selected_alpn_protocol() is None unless ALPN is used by the client.
3068 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3069 client_context.load_verify_locations(CERTFILE)
3070 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3071 server_context.load_cert_chain(CERTFILE)
3072 server_context.set_alpn_protocols(['foo', 'bar'])
3073 stats = server_params_test(client_context, server_context,
3074 chatty=True, connectionchatty=True)
3075 self.assertIs(stats['client_alpn_protocol'], None)
3076
3077 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3078 def test_alpn_protocols(self):
3079 server_protocols = ['foo', 'bar', 'milkshake']
3080 protocol_tests = [
3081 (['foo', 'bar'], 'foo'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003082 (['bar', 'foo'], 'foo'),
Benjamin Petersoncca27322015-01-23 16:35:37 -05003083 (['milkshake'], 'milkshake'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003084 (['http/3.0', 'http/4.0'], None)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003085 ]
3086 for client_protocols, expected in protocol_tests:
3087 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3088 server_context.load_cert_chain(CERTFILE)
3089 server_context.set_alpn_protocols(server_protocols)
3090 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3091 client_context.load_cert_chain(CERTFILE)
3092 client_context.set_alpn_protocols(client_protocols)
3093 stats = server_params_test(client_context, server_context,
3094 chatty=True, connectionchatty=True)
3095
3096 msg = "failed trying %s (s) and %s (c).\n" \
3097 "was expecting %s, but got %%s from the %%s" \
3098 % (str(server_protocols), str(client_protocols),
3099 str(expected))
3100 client_result = stats['client_alpn_protocol']
3101 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3102 server_result = stats['server_alpn_protocols'][-1] \
3103 if len(stats['server_alpn_protocols']) else 'nothing'
3104 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3105
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01003106 def test_selected_npn_protocol(self):
3107 # selected_npn_protocol() is None unless NPN is used
3108 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3109 context.load_cert_chain(CERTFILE)
3110 stats = server_params_test(context, context,
3111 chatty=True, connectionchatty=True)
3112 self.assertIs(stats['client_npn_protocol'], None)
3113
3114 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3115 def test_npn_protocols(self):
3116 server_protocols = ['http/1.1', 'spdy/2']
3117 protocol_tests = [
3118 (['http/1.1', 'spdy/2'], 'http/1.1'),
3119 (['spdy/2', 'http/1.1'], 'http/1.1'),
3120 (['spdy/2', 'test'], 'spdy/2'),
3121 (['abc', 'def'], 'abc')
3122 ]
3123 for client_protocols, expected in protocol_tests:
3124 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3125 server_context.load_cert_chain(CERTFILE)
3126 server_context.set_npn_protocols(server_protocols)
3127 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3128 client_context.load_cert_chain(CERTFILE)
3129 client_context.set_npn_protocols(client_protocols)
3130 stats = server_params_test(client_context, server_context,
3131 chatty=True, connectionchatty=True)
3132
3133 msg = "failed trying %s (s) and %s (c).\n" \
3134 "was expecting %s, but got %%s from the %%s" \
3135 % (str(server_protocols), str(client_protocols),
3136 str(expected))
3137 client_result = stats['client_npn_protocol']
3138 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3139 server_result = stats['server_npn_protocols'][-1] \
3140 if len(stats['server_npn_protocols']) else 'nothing'
3141 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3142
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003143 def sni_contexts(self):
3144 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3145 server_context.load_cert_chain(SIGNED_CERTFILE)
3146 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3147 other_context.load_cert_chain(SIGNED_CERTFILE2)
3148 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3149 client_context.verify_mode = ssl.CERT_REQUIRED
3150 client_context.load_verify_locations(SIGNING_CA)
3151 return server_context, other_context, client_context
3152
3153 def check_common_name(self, stats, name):
3154 cert = stats['peercert']
3155 self.assertIn((('commonName', name),), cert['subject'])
3156
3157 @needs_sni
3158 def test_sni_callback(self):
3159 calls = []
3160 server_context, other_context, client_context = self.sni_contexts()
3161
3162 def servername_cb(ssl_sock, server_name, initial_context):
3163 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003164 if server_name is not None:
3165 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003166 server_context.set_servername_callback(servername_cb)
3167
3168 stats = server_params_test(client_context, server_context,
3169 chatty=True,
3170 sni_name='supermessage')
3171 # The hostname was fetched properly, and the certificate was
3172 # changed for the connection.
3173 self.assertEqual(calls, [("supermessage", server_context)])
3174 # CERTFILE4 was selected
3175 self.check_common_name(stats, 'fakehostname')
3176
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003177 calls = []
3178 # The callback is called with server_name=None
3179 stats = server_params_test(client_context, server_context,
3180 chatty=True,
3181 sni_name=None)
3182 self.assertEqual(calls, [(None, server_context)])
3183 self.check_common_name(stats, 'localhost')
3184
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003185 # Check disabling the callback
3186 calls = []
3187 server_context.set_servername_callback(None)
3188
3189 stats = server_params_test(client_context, server_context,
3190 chatty=True,
3191 sni_name='notfunny')
3192 # Certificate didn't change
3193 self.check_common_name(stats, 'localhost')
3194 self.assertEqual(calls, [])
3195
3196 @needs_sni
3197 def test_sni_callback_alert(self):
3198 # Returning a TLS alert is reflected to the connecting client
3199 server_context, other_context, client_context = self.sni_contexts()
3200
3201 def cb_returning_alert(ssl_sock, server_name, initial_context):
3202 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3203 server_context.set_servername_callback(cb_returning_alert)
3204
3205 with self.assertRaises(ssl.SSLError) as cm:
3206 stats = server_params_test(client_context, server_context,
3207 chatty=False,
3208 sni_name='supermessage')
3209 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3210
3211 @needs_sni
3212 def test_sni_callback_raising(self):
3213 # Raising fails the connection with a TLS handshake failure alert.
3214 server_context, other_context, client_context = self.sni_contexts()
3215
3216 def cb_raising(ssl_sock, server_name, initial_context):
3217 1/0
3218 server_context.set_servername_callback(cb_raising)
3219
3220 with self.assertRaises(ssl.SSLError) as cm, \
3221 support.captured_stderr() as stderr:
3222 stats = server_params_test(client_context, server_context,
3223 chatty=False,
3224 sni_name='supermessage')
3225 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3226 self.assertIn("ZeroDivisionError", stderr.getvalue())
3227
3228 @needs_sni
3229 def test_sni_callback_wrong_return_type(self):
3230 # Returning the wrong return type terminates the TLS connection
3231 # with an internal error alert.
3232 server_context, other_context, client_context = self.sni_contexts()
3233
3234 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3235 return "foo"
3236 server_context.set_servername_callback(cb_wrong_return_type)
3237
3238 with self.assertRaises(ssl.SSLError) as cm, \
3239 support.captured_stderr() as stderr:
3240 stats = server_params_test(client_context, server_context,
3241 chatty=False,
3242 sni_name='supermessage')
3243 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3244 self.assertIn("TypeError", stderr.getvalue())
3245
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003246 def test_shared_ciphers(self):
Benjamin Petersonaacd5242015-01-07 11:42:38 -06003247 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson15042922015-01-07 22:12:43 -06003248 server_context.load_cert_chain(SIGNED_CERTFILE)
3249 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3250 client_context.verify_mode = ssl.CERT_REQUIRED
3251 client_context.load_verify_locations(SIGNING_CA)
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003252 client_context.set_ciphers("RC4")
Benjamin Petersone6838e02015-01-07 20:52:40 -06003253 server_context.set_ciphers("AES:RC4")
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003254 stats = server_params_test(client_context, server_context)
3255 ciphers = stats['server_shared_ciphers'][0]
3256 self.assertGreater(len(ciphers), 0)
3257 for name, tls_version, bits in ciphers:
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003258 self.assertIn("RC4", name.split("-"))
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003259
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003260 def test_read_write_after_close_raises_valuerror(self):
3261 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3262 context.verify_mode = ssl.CERT_REQUIRED
3263 context.load_verify_locations(CERTFILE)
3264 context.load_cert_chain(CERTFILE)
3265 server = ThreadedEchoServer(context=context, chatty=False)
3266
3267 with server:
3268 s = context.wrap_socket(socket.socket())
3269 s.connect((HOST, server.port))
3270 s.close()
3271
3272 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02003273 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003274
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003275 def test_sendfile(self):
3276 TEST_DATA = b"x" * 512
3277 with open(support.TESTFN, 'wb') as f:
3278 f.write(TEST_DATA)
3279 self.addCleanup(support.unlink, support.TESTFN)
3280 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3281 context.verify_mode = ssl.CERT_REQUIRED
3282 context.load_verify_locations(CERTFILE)
3283 context.load_cert_chain(CERTFILE)
3284 server = ThreadedEchoServer(context=context, chatty=False)
3285 with server:
3286 with context.wrap_socket(socket.socket()) as s:
3287 s.connect((HOST, server.port))
3288 with open(support.TESTFN, 'rb') as file:
3289 s.sendfile(file)
3290 self.assertEqual(s.recv(1024), TEST_DATA)
3291
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003292
Thomas Woutersed03b412007-08-28 21:37:11 +00003293def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003294 if support.verbose:
3295 plats = {
3296 'Linux': platform.linux_distribution,
3297 'Mac': platform.mac_ver,
3298 'Windows': platform.win32_ver,
3299 }
3300 for name, func in plats.items():
3301 plat = func()
3302 if plat and plat[0]:
3303 plat = '%s %r' % (name, plat)
3304 break
3305 else:
3306 plat = repr(platform.platform())
3307 print("test_ssl: testing with %r %r" %
3308 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3309 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003310 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003311 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3312 try:
3313 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3314 except AttributeError:
3315 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003316
Antoine Pitrou152efa22010-05-16 18:19:27 +00003317 for filename in [
3318 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
3319 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003320 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003321 BADCERT, BADKEY, EMPTYCERT]:
3322 if not os.path.exists(filename):
3323 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003324
Antoine Pitroub1fdf472014-10-05 20:41:53 +02003325 tests = [ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests]
Thomas Woutersed03b412007-08-28 21:37:11 +00003326
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003327 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003328 tests.append(NetworkedTests)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02003329 tests.append(NetworkedBIOTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003330
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003331 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003332 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003333 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003334 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003335
Antoine Pitrou480a1242010-04-28 21:37:09 +00003336 try:
3337 support.run_unittest(*tests)
3338 finally:
3339 if _have_threads:
3340 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003341
3342if __name__ == "__main__":
3343 test_main()