blob: aa17317782c6791bf79fbcfe132e677f5e022e45 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Antoine Pitrou242db722013-05-01 20:52:07 +020021from unittest import mock
Thomas Woutersed03b412007-08-28 21:37:11 +000022
Antoine Pitrou05d936d2010-10-13 11:38:36 +000023ssl = support.import_module("ssl")
24
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010025PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000026HOST = support.HOST
Antoine Pitrou152efa22010-05-16 18:19:27 +000027
Christian Heimesefff7062013-11-21 03:35:02 +010028def data_file(*name):
29 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000030
Antoine Pitrou81564092010-10-08 23:06:24 +000031# The custom key and certificate files used in test_ssl are generated
32# using Lib/test/make_ssl_certs.py.
33# Other certificates are simply fetched from the Internet servers they
34# are meant to authenticate.
35
Antoine Pitrou152efa22010-05-16 18:19:27 +000036CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000037BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038ONLYCERT = data_file("ssl_cert.pem")
39ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000040BYTES_ONLYCERT = os.fsencode(ONLYCERT)
41BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020042CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
43ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
44KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010047CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
48CAFILE_CACERT = data_file("capath", "5ed36f99.0")
49
Antoine Pitrou152efa22010-05-16 18:19:27 +000050
Christian Heimes22587792013-11-21 23:56:13 +010051# empty CRL
52CRLFILE = data_file("revocation.crl")
53
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010054# Two keys and certs signed by the same CA (for SNI tests)
55SIGNED_CERTFILE = data_file("keycert3.pem")
56SIGNED_CERTFILE2 = data_file("keycert4.pem")
57SIGNING_CA = data_file("pycacert.pem")
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
60
61EMPTYCERT = data_file("nullcert.pem")
62BADCERT = data_file("badcert.pem")
63WRONGCERT = data_file("XXXnonexisting.pem")
64BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020065NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020066NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000067
Antoine Pitrou0e576f12011-12-22 10:03:38 +010068DHFILE = data_file("dh512.pem")
69BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000070
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010071
Thomas Woutersed03b412007-08-28 21:37:11 +000072def handle_error(prefix):
73 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000074 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000075 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000076
Antoine Pitroub5218772010-05-21 09:56:06 +000077def can_clear_options():
78 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020079 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000080
81def no_sslv2_implies_sslv3_hello():
82 # 0.9.7h or higher
83 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
84
Christian Heimes2427b502013-11-23 11:24:32 +010085def have_verify_flags():
86 # 0.9.8 or higher
87 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
88
Antoine Pitrouc695c952014-04-28 20:57:36 +020089def utc_offset(): #NOTE: ignore issues like #1647654
90 # local time = utc time + utc offset
91 if time.daylight and time.localtime().tm_isdst > 0:
92 return -time.altzone # seconds
93 return -time.timezone
94
Christian Heimes9424bb42013-06-17 15:32:57 +020095def asn1time(cert_time):
96 # Some versions of OpenSSL ignore seconds, see #18207
97 # 0.9.8.i
98 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
99 fmt = "%b %d %H:%M:%S %Y GMT"
100 dt = datetime.datetime.strptime(cert_time, fmt)
101 dt = dt.replace(second=0)
102 cert_time = dt.strftime(fmt)
103 # %d adds leading zero but ASN1_TIME_print() uses leading space
104 if cert_time[4] == "0":
105 cert_time = cert_time[:4] + " " + cert_time[5:]
106
107 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000108
Antoine Pitrou23df4832010-08-04 17:14:06 +0000109# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
110def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200111 if hasattr(ssl, 'PROTOCOL_SSLv2'):
112 @functools.wraps(func)
113 def f(*args, **kwargs):
114 try:
115 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
116 except ssl.SSLError:
117 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
118 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
119 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
120 return func(*args, **kwargs)
121 return f
122 else:
123 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000124
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100125needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
126
Antoine Pitrou23df4832010-08-04 17:14:06 +0000127
Antoine Pitrou152efa22010-05-16 18:19:27 +0000128class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000129
Antoine Pitrou480a1242010-04-28 21:37:09 +0000130 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000131 ssl.CERT_NONE
132 ssl.CERT_OPTIONAL
133 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100134 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100135 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100136 if ssl.HAS_ECDH:
137 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100138 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
139 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000140 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100141 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000142
Antoine Pitrou172f0252014-04-18 20:33:08 +0200143 def test_str_for_enums(self):
144 # Make sure that the PROTOCOL_* constants have enum-like string
145 # reprs.
146 proto = ssl.PROTOCOL_SSLv3
147 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv3')
148 ctx = ssl.SSLContext(proto)
149 self.assertIs(ctx.protocol, proto)
150
Antoine Pitrou480a1242010-04-28 21:37:09 +0000151 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000152 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000153 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000154 sys.stdout.write("\n RAND_status is %d (%s)\n"
155 % (v, (v and "sufficient randomness") or
156 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200157
158 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
159 self.assertEqual(len(data), 16)
160 self.assertEqual(is_cryptographic, v == 1)
161 if v:
162 data = ssl.RAND_bytes(16)
163 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200164 else:
165 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200166
Victor Stinner1e81a392013-12-19 16:47:04 +0100167 # negative num is invalid
168 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
169 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
170
Jesus Ceac8754a12012-09-11 02:00:58 +0200171 self.assertRaises(TypeError, ssl.RAND_egd, 1)
172 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000173 ssl.RAND_add("this is a random string", 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000174
Christian Heimesf77b4b22013-08-21 13:26:05 +0200175 @unittest.skipUnless(os.name == 'posix', 'requires posix')
176 def test_random_fork(self):
177 status = ssl.RAND_status()
178 if not status:
179 self.fail("OpenSSL's PRNG has insufficient randomness")
180
181 rfd, wfd = os.pipe()
182 pid = os.fork()
183 if pid == 0:
184 try:
185 os.close(rfd)
186 child_random = ssl.RAND_pseudo_bytes(16)[0]
187 self.assertEqual(len(child_random), 16)
188 os.write(wfd, child_random)
189 os.close(wfd)
190 except BaseException:
191 os._exit(1)
192 else:
193 os._exit(0)
194 else:
195 os.close(wfd)
196 self.addCleanup(os.close, rfd)
197 _, status = os.waitpid(pid, 0)
198 self.assertEqual(status, 0)
199
200 child_random = os.read(rfd, 16)
201 self.assertEqual(len(child_random), 16)
202 parent_random = ssl.RAND_pseudo_bytes(16)[0]
203 self.assertEqual(len(parent_random), 16)
204
205 self.assertNotEqual(child_random, parent_random)
206
Antoine Pitrou480a1242010-04-28 21:37:09 +0000207 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000208 # note that this uses an 'unofficial' function in _ssl.c,
209 # provided solely for this test, to exercise the certificate
210 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000211 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000212 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000213 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200214 self.assertEqual(p['issuer'],
215 ((('countryName', 'XY'),),
216 (('localityName', 'Castle Anthrax'),),
217 (('organizationName', 'Python Software Foundation'),),
218 (('commonName', 'localhost'),))
219 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100220 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200221 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
222 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200223 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
224 self.assertEqual(p['subject'],
225 ((('countryName', 'XY'),),
226 (('localityName', 'Castle Anthrax'),),
227 (('organizationName', 'Python Software Foundation'),),
228 (('commonName', 'localhost'),))
229 )
230 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
231 # Issue #13034: the subjectAltName in some certificates
232 # (notably projects.developer.nokia.com:443) wasn't parsed
233 p = ssl._ssl._test_decode_cert(NOKIACERT)
234 if support.verbose:
235 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
236 self.assertEqual(p['subjectAltName'],
237 (('DNS', 'projects.developer.nokia.com'),
238 ('DNS', 'projects.forum.nokia.com'))
239 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100240 # extra OCSP and AIA fields
241 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
242 self.assertEqual(p['caIssuers'],
243 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
244 self.assertEqual(p['crlDistributionPoints'],
245 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000246
Christian Heimes824f7f32013-08-17 00:54:47 +0200247 def test_parse_cert_CVE_2013_4238(self):
248 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
249 if support.verbose:
250 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
251 subject = ((('countryName', 'US'),),
252 (('stateOrProvinceName', 'Oregon'),),
253 (('localityName', 'Beaverton'),),
254 (('organizationName', 'Python Software Foundation'),),
255 (('organizationalUnitName', 'Python Core Development'),),
256 (('commonName', 'null.python.org\x00example.org'),),
257 (('emailAddress', 'python-dev@python.org'),))
258 self.assertEqual(p['subject'], subject)
259 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200260 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
261 san = (('DNS', 'altnull.python.org\x00example.com'),
262 ('email', 'null@python.org\x00user@example.org'),
263 ('URI', 'http://null.python.org\x00http://example.org'),
264 ('IP Address', '192.0.2.1'),
265 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
266 else:
267 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
268 san = (('DNS', 'altnull.python.org\x00example.com'),
269 ('email', 'null@python.org\x00user@example.org'),
270 ('URI', 'http://null.python.org\x00http://example.org'),
271 ('IP Address', '192.0.2.1'),
272 ('IP Address', '<invalid>'))
273
274 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200275
Antoine Pitrou480a1242010-04-28 21:37:09 +0000276 def test_DER_to_PEM(self):
277 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
278 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000279 d1 = ssl.PEM_cert_to_DER_cert(pem)
280 p2 = ssl.DER_cert_to_PEM_cert(d1)
281 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000282 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000283 if not p2.startswith(ssl.PEM_HEADER + '\n'):
284 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
285 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
286 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000287
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000288 def test_openssl_version(self):
289 n = ssl.OPENSSL_VERSION_NUMBER
290 t = ssl.OPENSSL_VERSION_INFO
291 s = ssl.OPENSSL_VERSION
292 self.assertIsInstance(n, int)
293 self.assertIsInstance(t, tuple)
294 self.assertIsInstance(s, str)
295 # Some sanity checks follow
296 # >= 0.9
297 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400298 # < 3.0
299 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000300 major, minor, fix, patch, status = t
301 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400302 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000303 self.assertGreaterEqual(minor, 0)
304 self.assertLess(minor, 256)
305 self.assertGreaterEqual(fix, 0)
306 self.assertLess(fix, 256)
307 self.assertGreaterEqual(patch, 0)
308 self.assertLessEqual(patch, 26)
309 self.assertGreaterEqual(status, 0)
310 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400311 # Version string as returned by {Open,Libre}SSL, the format might change
312 if "LibreSSL" in s:
313 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
314 (s, t))
315 else:
316 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
317 (s, t))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000318
Antoine Pitrou9d543662010-04-23 23:10:32 +0000319 @support.cpython_only
320 def test_refcycle(self):
321 # Issue #7943: an SSL object doesn't create reference cycles with
322 # itself.
323 s = socket.socket(socket.AF_INET)
324 ss = ssl.wrap_socket(s)
325 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100326 with support.check_warnings(("", ResourceWarning)):
327 del ss
328 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000329
Antoine Pitroua468adc2010-09-14 14:43:44 +0000330 def test_wrapped_unconnected(self):
331 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200332 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000333 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100334 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100335 self.assertRaises(OSError, ss.recv, 1)
336 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
337 self.assertRaises(OSError, ss.recvfrom, 1)
338 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
339 self.assertRaises(OSError, ss.send, b'x')
340 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000341
Antoine Pitrou40f08742010-04-24 22:04:40 +0000342 def test_timeout(self):
343 # Issue #8524: when creating an SSL socket, the timeout of the
344 # original socket should be retained.
345 for timeout in (None, 0.0, 5.0):
346 s = socket.socket(socket.AF_INET)
347 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100348 with ssl.wrap_socket(s) as ss:
349 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000350
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000351 def test_errors(self):
352 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000353 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000354 "certfile must be specified",
355 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000356 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000357 "certfile must be specified for server-side operations",
358 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000359 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000360 "certfile must be specified for server-side operations",
361 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100362 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
363 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
364 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200365 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000366 with socket.socket() as sock:
367 ssl.wrap_socket(sock, certfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000368 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200369 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000370 with socket.socket() as sock:
371 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000372 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200373 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000374 with socket.socket() as sock:
375 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000376 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000377
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000378 def test_match_hostname(self):
379 def ok(cert, hostname):
380 ssl.match_hostname(cert, hostname)
381 def fail(cert, hostname):
382 self.assertRaises(ssl.CertificateError,
383 ssl.match_hostname, cert, hostname)
384
385 cert = {'subject': ((('commonName', 'example.com'),),)}
386 ok(cert, 'example.com')
387 ok(cert, 'ExAmple.cOm')
388 fail(cert, 'www.example.com')
389 fail(cert, '.example.com')
390 fail(cert, 'example.org')
391 fail(cert, 'exampleXcom')
392
393 cert = {'subject': ((('commonName', '*.a.com'),),)}
394 ok(cert, 'foo.a.com')
395 fail(cert, 'bar.foo.a.com')
396 fail(cert, 'a.com')
397 fail(cert, 'Xa.com')
398 fail(cert, '.a.com')
399
Georg Brandl72c98d32013-10-27 07:16:53 +0100400 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000401 cert = {'subject': ((('commonName', 'f*.com'),),)}
402 ok(cert, 'foo.com')
403 ok(cert, 'f.com')
404 fail(cert, 'bar.com')
405 fail(cert, 'foo.a.com')
406 fail(cert, 'bar.foo.com')
407
Christian Heimes824f7f32013-08-17 00:54:47 +0200408 # NULL bytes are bad, CVE-2013-4073
409 cert = {'subject': ((('commonName',
410 'null.python.org\x00example.org'),),)}
411 ok(cert, 'null.python.org\x00example.org') # or raise an error?
412 fail(cert, 'example.org')
413 fail(cert, 'null.python.org')
414
Georg Brandl72c98d32013-10-27 07:16:53 +0100415 # error cases with wildcards
416 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
417 fail(cert, 'bar.foo.a.com')
418 fail(cert, 'a.com')
419 fail(cert, 'Xa.com')
420 fail(cert, '.a.com')
421
422 cert = {'subject': ((('commonName', 'a.*.com'),),)}
423 fail(cert, 'a.foo.com')
424 fail(cert, 'a..com')
425 fail(cert, 'a.com')
426
427 # wildcard doesn't match IDNA prefix 'xn--'
428 idna = 'püthon.python.org'.encode("idna").decode("ascii")
429 cert = {'subject': ((('commonName', idna),),)}
430 ok(cert, idna)
431 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
432 fail(cert, idna)
433 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
434 fail(cert, idna)
435
436 # wildcard in first fragment and IDNA A-labels in sequent fragments
437 # are supported.
438 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
439 cert = {'subject': ((('commonName', idna),),)}
440 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
441 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
442 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
443 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
444
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000445 # Slightly fake real-world example
446 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
447 'subject': ((('commonName', 'linuxfrz.org'),),),
448 'subjectAltName': (('DNS', 'linuxfr.org'),
449 ('DNS', 'linuxfr.com'),
450 ('othername', '<unsupported>'))}
451 ok(cert, 'linuxfr.org')
452 ok(cert, 'linuxfr.com')
453 # Not a "DNS" entry
454 fail(cert, '<unsupported>')
455 # When there is a subjectAltName, commonName isn't used
456 fail(cert, 'linuxfrz.org')
457
458 # A pristine real-world example
459 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
460 'subject': ((('countryName', 'US'),),
461 (('stateOrProvinceName', 'California'),),
462 (('localityName', 'Mountain View'),),
463 (('organizationName', 'Google Inc'),),
464 (('commonName', 'mail.google.com'),))}
465 ok(cert, 'mail.google.com')
466 fail(cert, 'gmail.com')
467 # Only commonName is considered
468 fail(cert, 'California')
469
470 # Neither commonName nor subjectAltName
471 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
472 'subject': ((('countryName', 'US'),),
473 (('stateOrProvinceName', 'California'),),
474 (('localityName', 'Mountain View'),),
475 (('organizationName', 'Google Inc'),))}
476 fail(cert, 'mail.google.com')
477
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200478 # No DNS entry in subjectAltName but a commonName
479 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
480 'subject': ((('countryName', 'US'),),
481 (('stateOrProvinceName', 'California'),),
482 (('localityName', 'Mountain View'),),
483 (('commonName', 'mail.google.com'),)),
484 'subjectAltName': (('othername', 'blabla'), )}
485 ok(cert, 'mail.google.com')
486
487 # No DNS entry subjectAltName and no commonName
488 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
489 'subject': ((('countryName', 'US'),),
490 (('stateOrProvinceName', 'California'),),
491 (('localityName', 'Mountain View'),),
492 (('organizationName', 'Google Inc'),)),
493 'subjectAltName': (('othername', 'blabla'),)}
494 fail(cert, 'google.com')
495
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000496 # Empty cert / no cert
497 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
498 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
499
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200500 # Issue #17980: avoid denials of service by refusing more than one
501 # wildcard per fragment.
502 cert = {'subject': ((('commonName', 'a*b.com'),),)}
503 ok(cert, 'axxb.com')
504 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100505 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200506 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
507 with self.assertRaises(ssl.CertificateError) as cm:
508 ssl.match_hostname(cert, 'axxbxxc.com')
509 self.assertIn("too many wildcards", str(cm.exception))
510
Antoine Pitroud5323212010-10-22 18:19:07 +0000511 def test_server_side(self):
512 # server_hostname doesn't work for server sockets
513 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000514 with socket.socket() as sock:
515 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
516 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000517
Antoine Pitroud6494802011-07-21 01:11:30 +0200518 def test_unknown_channel_binding(self):
519 # should raise ValueError for unknown type
520 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200521 s.bind(('127.0.0.1', 0))
522 s.listen()
523 c = socket.socket(socket.AF_INET)
524 c.connect(s.getsockname())
525 with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100526 with self.assertRaises(ValueError):
527 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200528 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200529
530 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
531 "'tls-unique' channel binding not available")
532 def test_tls_unique_channel_binding(self):
533 # unconnected should return None for known type
534 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100535 with ssl.wrap_socket(s) as ss:
536 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200537 # the same for server-side
538 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100539 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
540 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200541
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600542 def test_dealloc_warn(self):
543 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
544 r = repr(ss)
545 with self.assertWarns(ResourceWarning) as cm:
546 ss = None
547 support.gc_collect()
548 self.assertIn(r, str(cm.warning.args[0]))
549
Christian Heimes6d7ad132013-06-09 18:02:55 +0200550 def test_get_default_verify_paths(self):
551 paths = ssl.get_default_verify_paths()
552 self.assertEqual(len(paths), 6)
553 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
554
555 with support.EnvironmentVarGuard() as env:
556 env["SSL_CERT_DIR"] = CAPATH
557 env["SSL_CERT_FILE"] = CERTFILE
558 paths = ssl.get_default_verify_paths()
559 self.assertEqual(paths.cafile, CERTFILE)
560 self.assertEqual(paths.capath, CAPATH)
561
Christian Heimes44109d72013-11-22 01:51:30 +0100562 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
563 def test_enum_certificates(self):
564 self.assertTrue(ssl.enum_certificates("CA"))
565 self.assertTrue(ssl.enum_certificates("ROOT"))
566
567 self.assertRaises(TypeError, ssl.enum_certificates)
568 self.assertRaises(WindowsError, ssl.enum_certificates, "")
569
Christian Heimesc2d65e12013-11-22 16:13:55 +0100570 trust_oids = set()
571 for storename in ("CA", "ROOT"):
572 store = ssl.enum_certificates(storename)
573 self.assertIsInstance(store, list)
574 for element in store:
575 self.assertIsInstance(element, tuple)
576 self.assertEqual(len(element), 3)
577 cert, enc, trust = element
578 self.assertIsInstance(cert, bytes)
579 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
580 self.assertIsInstance(trust, (set, bool))
581 if isinstance(trust, set):
582 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100583
584 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100585 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200586
Christian Heimes46bebee2013-06-09 19:03:31 +0200587 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100588 def test_enum_crls(self):
589 self.assertTrue(ssl.enum_crls("CA"))
590 self.assertRaises(TypeError, ssl.enum_crls)
591 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200592
Christian Heimes44109d72013-11-22 01:51:30 +0100593 crls = ssl.enum_crls("CA")
594 self.assertIsInstance(crls, list)
595 for element in crls:
596 self.assertIsInstance(element, tuple)
597 self.assertEqual(len(element), 2)
598 self.assertIsInstance(element[0], bytes)
599 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200600
Christian Heimes46bebee2013-06-09 19:03:31 +0200601
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100602 def test_asn1object(self):
603 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
604 '1.3.6.1.5.5.7.3.1')
605
606 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
607 self.assertEqual(val, expected)
608 self.assertEqual(val.nid, 129)
609 self.assertEqual(val.shortname, 'serverAuth')
610 self.assertEqual(val.longname, 'TLS Web Server Authentication')
611 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
612 self.assertIsInstance(val, ssl._ASN1Object)
613 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
614
615 val = ssl._ASN1Object.fromnid(129)
616 self.assertEqual(val, expected)
617 self.assertIsInstance(val, ssl._ASN1Object)
618 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100619 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
620 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100621 for i in range(1000):
622 try:
623 obj = ssl._ASN1Object.fromnid(i)
624 except ValueError:
625 pass
626 else:
627 self.assertIsInstance(obj.nid, int)
628 self.assertIsInstance(obj.shortname, str)
629 self.assertIsInstance(obj.longname, str)
630 self.assertIsInstance(obj.oid, (str, type(None)))
631
632 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
633 self.assertEqual(val, expected)
634 self.assertIsInstance(val, ssl._ASN1Object)
635 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
636 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
637 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100638 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
639 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100640
Christian Heimes72d28502013-11-23 13:56:58 +0100641 def test_purpose_enum(self):
642 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
643 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
644 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
645 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
646 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
647 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
648 '1.3.6.1.5.5.7.3.1')
649
650 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
651 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
652 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
653 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
654 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
655 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
656 '1.3.6.1.5.5.7.3.2')
657
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100658 def test_unsupported_dtls(self):
659 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
660 self.addCleanup(s.close)
661 with self.assertRaises(NotImplementedError) as cx:
662 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
663 self.assertEqual(str(cx.exception), "only stream sockets are supported")
664 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
665 with self.assertRaises(NotImplementedError) as cx:
666 ctx.wrap_socket(s)
667 self.assertEqual(str(cx.exception), "only stream sockets are supported")
668
Antoine Pitrouc695c952014-04-28 20:57:36 +0200669 def cert_time_ok(self, timestring, timestamp):
670 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
671
672 def cert_time_fail(self, timestring):
673 with self.assertRaises(ValueError):
674 ssl.cert_time_to_seconds(timestring)
675
676 @unittest.skipUnless(utc_offset(),
677 'local time needs to be different from UTC')
678 def test_cert_time_to_seconds_timezone(self):
679 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
680 # results if local timezone is not UTC
681 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
682 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
683
684 def test_cert_time_to_seconds(self):
685 timestring = "Jan 5 09:34:43 2018 GMT"
686 ts = 1515144883.0
687 self.cert_time_ok(timestring, ts)
688 # accept keyword parameter, assert its name
689 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
690 # accept both %e and %d (space or zero generated by strftime)
691 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
692 # case-insensitive
693 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
694 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
695 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
696 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
697 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
698 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
699 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
700 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
701
702 newyear_ts = 1230768000.0
703 # leap seconds
704 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
705 # same timestamp
706 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
707
708 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
709 # allow 60th second (even if it is not a leap second)
710 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
711 # allow 2nd leap second for compatibility with time.strptime()
712 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
713 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
714
715 # no special treatement for the special value:
716 # 99991231235959Z (rfc 5280)
717 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
718
719 @support.run_with_locale('LC_ALL', '')
720 def test_cert_time_to_seconds_locale(self):
721 # `cert_time_to_seconds()` should be locale independent
722
723 def local_february_name():
724 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
725
726 if local_february_name().lower() == 'feb':
727 self.skipTest("locale-specific month name needs to be "
728 "different from C locale")
729
730 # locale-independent
731 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
732 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
733
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100734
Antoine Pitrou152efa22010-05-16 18:19:27 +0000735class ContextTests(unittest.TestCase):
736
Antoine Pitrou23df4832010-08-04 17:14:06 +0000737 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000738 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100739 for protocol in PROTOCOLS:
740 ssl.SSLContext(protocol)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000741 self.assertRaises(TypeError, ssl.SSLContext)
742 self.assertRaises(ValueError, ssl.SSLContext, -1)
743 self.assertRaises(ValueError, ssl.SSLContext, 42)
744
Antoine Pitrou23df4832010-08-04 17:14:06 +0000745 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000746 def test_protocol(self):
747 for proto in PROTOCOLS:
748 ctx = ssl.SSLContext(proto)
749 self.assertEqual(ctx.protocol, proto)
750
751 def test_ciphers(self):
752 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
753 ctx.set_ciphers("ALL")
754 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000755 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000756 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000757
Antoine Pitrou23df4832010-08-04 17:14:06 +0000758 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000759 def test_options(self):
760 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +0100761 # OP_ALL | OP_NO_SSLv2 is the default value
Antoine Pitroub5218772010-05-21 09:56:06 +0000762 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
763 ctx.options)
764 ctx.options |= ssl.OP_NO_SSLv3
765 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
766 ctx.options)
767 if can_clear_options():
768 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
769 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
770 ctx.options)
771 ctx.options = 0
772 self.assertEqual(0, ctx.options)
773 else:
774 with self.assertRaises(ValueError):
775 ctx.options = 0
776
Christian Heimes22587792013-11-21 23:56:13 +0100777 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000778 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
779 # Default value
780 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
781 ctx.verify_mode = ssl.CERT_OPTIONAL
782 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
783 ctx.verify_mode = ssl.CERT_REQUIRED
784 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
785 ctx.verify_mode = ssl.CERT_NONE
786 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
787 with self.assertRaises(TypeError):
788 ctx.verify_mode = None
789 with self.assertRaises(ValueError):
790 ctx.verify_mode = 42
791
Christian Heimes2427b502013-11-23 11:24:32 +0100792 @unittest.skipUnless(have_verify_flags(),
793 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100794 def test_verify_flags(self):
795 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
796 # default value by OpenSSL
797 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
798 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
799 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
800 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
801 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
802 ctx.verify_flags = ssl.VERIFY_DEFAULT
803 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
804 # supports any value
805 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
806 self.assertEqual(ctx.verify_flags,
807 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
808 with self.assertRaises(TypeError):
809 ctx.verify_flags = None
810
Antoine Pitrou152efa22010-05-16 18:19:27 +0000811 def test_load_cert_chain(self):
812 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
813 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500814 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000815 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
816 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200817 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000818 ctx.load_cert_chain(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000819 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000820 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000821 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000822 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000823 ctx.load_cert_chain(EMPTYCERT)
824 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000825 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000826 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
827 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
828 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000829 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000830 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000831 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000832 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000833 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000834 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
835 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000836 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000837 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Antoine Pitrou81564092010-10-08 23:06:24 +0000838 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200839 # Password protected key and cert
840 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
841 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
842 ctx.load_cert_chain(CERTFILE_PROTECTED,
843 password=bytearray(KEY_PASSWORD.encode()))
844 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
845 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
846 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
847 bytearray(KEY_PASSWORD.encode()))
848 with self.assertRaisesRegex(TypeError, "should be a string"):
849 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
850 with self.assertRaises(ssl.SSLError):
851 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
852 with self.assertRaisesRegex(ValueError, "cannot be longer"):
853 # openssl has a fixed limit on the password buffer.
854 # PEM_BUFSIZE is generally set to 1kb.
855 # Return a string larger than this.
856 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
857 # Password callback
858 def getpass_unicode():
859 return KEY_PASSWORD
860 def getpass_bytes():
861 return KEY_PASSWORD.encode()
862 def getpass_bytearray():
863 return bytearray(KEY_PASSWORD.encode())
864 def getpass_badpass():
865 return "badpass"
866 def getpass_huge():
867 return b'a' * (1024 * 1024)
868 def getpass_bad_type():
869 return 9
870 def getpass_exception():
871 raise Exception('getpass error')
872 class GetPassCallable:
873 def __call__(self):
874 return KEY_PASSWORD
875 def getpass(self):
876 return KEY_PASSWORD
877 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
878 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
879 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
880 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
881 ctx.load_cert_chain(CERTFILE_PROTECTED,
882 password=GetPassCallable().getpass)
883 with self.assertRaises(ssl.SSLError):
884 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
885 with self.assertRaisesRegex(ValueError, "cannot be longer"):
886 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
887 with self.assertRaisesRegex(TypeError, "must return a string"):
888 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
889 with self.assertRaisesRegex(Exception, "getpass error"):
890 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
891 # Make sure the password function isn't called if it isn't needed
892 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000893
894 def test_load_verify_locations(self):
895 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
896 ctx.load_verify_locations(CERTFILE)
897 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
898 ctx.load_verify_locations(BYTES_CERTFILE)
899 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
900 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100901 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200902 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000903 ctx.load_verify_locations(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000904 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000905 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000906 ctx.load_verify_locations(BADCERT)
907 ctx.load_verify_locations(CERTFILE, CAPATH)
908 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
909
Victor Stinner80f75e62011-01-29 11:31:20 +0000910 # Issue #10989: crash if the second argument type is invalid
911 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
912
Christian Heimesefff7062013-11-21 03:35:02 +0100913 def test_load_verify_cadata(self):
914 # test cadata
915 with open(CAFILE_CACERT) as f:
916 cacert_pem = f.read()
917 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
918 with open(CAFILE_NEURONIO) as f:
919 neuronio_pem = f.read()
920 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
921
922 # test PEM
923 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
924 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
925 ctx.load_verify_locations(cadata=cacert_pem)
926 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
927 ctx.load_verify_locations(cadata=neuronio_pem)
928 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
929 # cert already in hash table
930 ctx.load_verify_locations(cadata=neuronio_pem)
931 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
932
933 # combined
934 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
935 combined = "\n".join((cacert_pem, neuronio_pem))
936 ctx.load_verify_locations(cadata=combined)
937 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
938
939 # with junk around the certs
940 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
941 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
942 neuronio_pem, "tail"]
943 ctx.load_verify_locations(cadata="\n".join(combined))
944 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
945
946 # test DER
947 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
948 ctx.load_verify_locations(cadata=cacert_der)
949 ctx.load_verify_locations(cadata=neuronio_der)
950 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
951 # cert already in hash table
952 ctx.load_verify_locations(cadata=cacert_der)
953 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
954
955 # combined
956 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
957 combined = b"".join((cacert_der, neuronio_der))
958 ctx.load_verify_locations(cadata=combined)
959 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
960
961 # error cases
962 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
963 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
964
965 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
966 ctx.load_verify_locations(cadata="broken")
967 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
968 ctx.load_verify_locations(cadata=b"broken")
969
970
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100971 def test_load_dh_params(self):
972 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
973 ctx.load_dh_params(DHFILE)
974 if os.name != 'nt':
975 ctx.load_dh_params(BYTES_DHFILE)
976 self.assertRaises(TypeError, ctx.load_dh_params)
977 self.assertRaises(TypeError, ctx.load_dh_params, None)
978 with self.assertRaises(FileNotFoundError) as cm:
979 ctx.load_dh_params(WRONGCERT)
980 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +0200981 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100982 ctx.load_dh_params(CERTFILE)
983
Antoine Pitroueb585ad2010-10-22 18:24:20 +0000984 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +0000985 def test_session_stats(self):
986 for proto in PROTOCOLS:
987 ctx = ssl.SSLContext(proto)
988 self.assertEqual(ctx.session_stats(), {
989 'number': 0,
990 'connect': 0,
991 'connect_good': 0,
992 'connect_renegotiate': 0,
993 'accept': 0,
994 'accept_good': 0,
995 'accept_renegotiate': 0,
996 'hits': 0,
997 'misses': 0,
998 'timeouts': 0,
999 'cache_full': 0,
1000 })
1001
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001002 def test_set_default_verify_paths(self):
1003 # There's not much we can do to test that it acts as expected,
1004 # so just check it doesn't crash or raise an exception.
1005 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1006 ctx.set_default_verify_paths()
1007
Antoine Pitrou501da612011-12-21 09:27:41 +01001008 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001009 def test_set_ecdh_curve(self):
1010 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1011 ctx.set_ecdh_curve("prime256v1")
1012 ctx.set_ecdh_curve(b"prime256v1")
1013 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1014 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1015 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1016 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1017
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001018 @needs_sni
1019 def test_sni_callback(self):
1020 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1021
1022 # set_servername_callback expects a callable, or None
1023 self.assertRaises(TypeError, ctx.set_servername_callback)
1024 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1025 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1026 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1027
1028 def dummycallback(sock, servername, ctx):
1029 pass
1030 ctx.set_servername_callback(None)
1031 ctx.set_servername_callback(dummycallback)
1032
1033 @needs_sni
1034 def test_sni_callback_refcycle(self):
1035 # Reference cycles through the servername callback are detected
1036 # and cleared.
1037 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1038 def dummycallback(sock, servername, ctx, cycle=ctx):
1039 pass
1040 ctx.set_servername_callback(dummycallback)
1041 wr = weakref.ref(ctx)
1042 del ctx, dummycallback
1043 gc.collect()
1044 self.assertIs(wr(), None)
1045
Christian Heimes9a5395a2013-06-17 15:44:12 +02001046 def test_cert_store_stats(self):
1047 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1048 self.assertEqual(ctx.cert_store_stats(),
1049 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1050 ctx.load_cert_chain(CERTFILE)
1051 self.assertEqual(ctx.cert_store_stats(),
1052 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1053 ctx.load_verify_locations(CERTFILE)
1054 self.assertEqual(ctx.cert_store_stats(),
1055 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1056 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1057 self.assertEqual(ctx.cert_store_stats(),
1058 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1059
1060 def test_get_ca_certs(self):
1061 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1062 self.assertEqual(ctx.get_ca_certs(), [])
1063 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1064 ctx.load_verify_locations(CERTFILE)
1065 self.assertEqual(ctx.get_ca_certs(), [])
1066 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1067 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1068 self.assertEqual(ctx.get_ca_certs(),
1069 [{'issuer': ((('organizationName', 'Root CA'),),
1070 (('organizationalUnitName', 'http://www.cacert.org'),),
1071 (('commonName', 'CA Cert Signing Authority'),),
1072 (('emailAddress', 'support@cacert.org'),)),
1073 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1074 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1075 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001076 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001077 'subject': ((('organizationName', 'Root CA'),),
1078 (('organizationalUnitName', 'http://www.cacert.org'),),
1079 (('commonName', 'CA Cert Signing Authority'),),
1080 (('emailAddress', 'support@cacert.org'),)),
1081 'version': 3}])
1082
1083 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1084 pem = f.read()
1085 der = ssl.PEM_cert_to_DER_cert(pem)
1086 self.assertEqual(ctx.get_ca_certs(True), [der])
1087
Christian Heimes72d28502013-11-23 13:56:58 +01001088 def test_load_default_certs(self):
1089 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1090 ctx.load_default_certs()
1091
1092 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1093 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1094 ctx.load_default_certs()
1095
1096 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1097 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1098
1099 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1100 self.assertRaises(TypeError, ctx.load_default_certs, None)
1101 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1102
Benjamin Peterson91244e02014-10-03 18:17:15 -04001103 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001104 def test_load_default_certs_env(self):
1105 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1106 with support.EnvironmentVarGuard() as env:
1107 env["SSL_CERT_DIR"] = CAPATH
1108 env["SSL_CERT_FILE"] = CERTFILE
1109 ctx.load_default_certs()
1110 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1111
Benjamin Peterson91244e02014-10-03 18:17:15 -04001112 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1113 def test_load_default_certs_env_windows(self):
1114 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1115 ctx.load_default_certs()
1116 stats = ctx.cert_store_stats()
1117
1118 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1119 with support.EnvironmentVarGuard() as env:
1120 env["SSL_CERT_DIR"] = CAPATH
1121 env["SSL_CERT_FILE"] = CERTFILE
1122 ctx.load_default_certs()
1123 stats["x509"] += 1
1124 self.assertEqual(ctx.cert_store_stats(), stats)
1125
Christian Heimes4c05b472013-11-23 15:58:30 +01001126 def test_create_default_context(self):
1127 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001128 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001129 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001130 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001131 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001132 self.assertEqual(
1133 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1134 getattr(ssl, "OP_NO_COMPRESSION", 0),
1135 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001136
1137 with open(SIGNING_CA) as f:
1138 cadata = f.read()
1139 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1140 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001141 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001142 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1143 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001144 self.assertEqual(
1145 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1146 getattr(ssl, "OP_NO_COMPRESSION", 0),
1147 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001148
1149 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001150 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001151 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1152 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001153 self.assertEqual(
1154 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1155 getattr(ssl, "OP_NO_COMPRESSION", 0),
1156 )
1157 self.assertEqual(
1158 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1159 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1160 )
1161 self.assertEqual(
1162 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1163 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1164 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001165
Christian Heimes67986f92013-11-23 22:43:47 +01001166 def test__create_stdlib_context(self):
1167 ctx = ssl._create_stdlib_context()
1168 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1169 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001170 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001171 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1172
1173 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1174 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1175 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1176 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1177
1178 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001179 cert_reqs=ssl.CERT_REQUIRED,
1180 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001181 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1182 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001183 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001184 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1185
1186 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1187 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1188 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1189 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001190
Christian Heimes1aa9a752013-12-02 02:41:19 +01001191 def test_check_hostname(self):
1192 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1193 self.assertFalse(ctx.check_hostname)
1194
1195 # Requires CERT_REQUIRED or CERT_OPTIONAL
1196 with self.assertRaises(ValueError):
1197 ctx.check_hostname = True
1198 ctx.verify_mode = ssl.CERT_REQUIRED
1199 self.assertFalse(ctx.check_hostname)
1200 ctx.check_hostname = True
1201 self.assertTrue(ctx.check_hostname)
1202
1203 ctx.verify_mode = ssl.CERT_OPTIONAL
1204 ctx.check_hostname = True
1205 self.assertTrue(ctx.check_hostname)
1206
1207 # Cannot set CERT_NONE with check_hostname enabled
1208 with self.assertRaises(ValueError):
1209 ctx.verify_mode = ssl.CERT_NONE
1210 ctx.check_hostname = False
1211 self.assertFalse(ctx.check_hostname)
1212
Antoine Pitrou152efa22010-05-16 18:19:27 +00001213
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001214class SSLErrorTests(unittest.TestCase):
1215
1216 def test_str(self):
1217 # The str() of a SSLError doesn't include the errno
1218 e = ssl.SSLError(1, "foo")
1219 self.assertEqual(str(e), "foo")
1220 self.assertEqual(e.errno, 1)
1221 # Same for a subclass
1222 e = ssl.SSLZeroReturnError(1, "foo")
1223 self.assertEqual(str(e), "foo")
1224 self.assertEqual(e.errno, 1)
1225
1226 def test_lib_reason(self):
1227 # Test the library and reason attributes
1228 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1229 with self.assertRaises(ssl.SSLError) as cm:
1230 ctx.load_dh_params(CERTFILE)
1231 self.assertEqual(cm.exception.library, 'PEM')
1232 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1233 s = str(cm.exception)
1234 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1235
1236 def test_subclass(self):
1237 # Check that the appropriate SSLError subclass is raised
1238 # (this only tests one of them)
1239 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1240 with socket.socket() as s:
1241 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001242 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001243 c = socket.socket()
1244 c.connect(s.getsockname())
1245 c.setblocking(False)
1246 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001247 with self.assertRaises(ssl.SSLWantReadError) as cm:
1248 c.do_handshake()
1249 s = str(cm.exception)
1250 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1251 # For compatibility
1252 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1253
1254
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001255class MemoryBIOTests(unittest.TestCase):
1256
1257 def test_read_write(self):
1258 bio = ssl.MemoryBIO()
1259 bio.write(b'foo')
1260 self.assertEqual(bio.read(), b'foo')
1261 self.assertEqual(bio.read(), b'')
1262 bio.write(b'foo')
1263 bio.write(b'bar')
1264 self.assertEqual(bio.read(), b'foobar')
1265 self.assertEqual(bio.read(), b'')
1266 bio.write(b'baz')
1267 self.assertEqual(bio.read(2), b'ba')
1268 self.assertEqual(bio.read(1), b'z')
1269 self.assertEqual(bio.read(1), b'')
1270
1271 def test_eof(self):
1272 bio = ssl.MemoryBIO()
1273 self.assertFalse(bio.eof)
1274 self.assertEqual(bio.read(), b'')
1275 self.assertFalse(bio.eof)
1276 bio.write(b'foo')
1277 self.assertFalse(bio.eof)
1278 bio.write_eof()
1279 self.assertFalse(bio.eof)
1280 self.assertEqual(bio.read(2), b'fo')
1281 self.assertFalse(bio.eof)
1282 self.assertEqual(bio.read(1), b'o')
1283 self.assertTrue(bio.eof)
1284 self.assertEqual(bio.read(), b'')
1285 self.assertTrue(bio.eof)
1286
1287 def test_pending(self):
1288 bio = ssl.MemoryBIO()
1289 self.assertEqual(bio.pending, 0)
1290 bio.write(b'foo')
1291 self.assertEqual(bio.pending, 3)
1292 for i in range(3):
1293 bio.read(1)
1294 self.assertEqual(bio.pending, 3-i-1)
1295 for i in range(3):
1296 bio.write(b'x')
1297 self.assertEqual(bio.pending, i+1)
1298 bio.read()
1299 self.assertEqual(bio.pending, 0)
1300
1301 def test_buffer_types(self):
1302 bio = ssl.MemoryBIO()
1303 bio.write(b'foo')
1304 self.assertEqual(bio.read(), b'foo')
1305 bio.write(bytearray(b'bar'))
1306 self.assertEqual(bio.read(), b'bar')
1307 bio.write(memoryview(b'baz'))
1308 self.assertEqual(bio.read(), b'baz')
1309
1310 def test_error_types(self):
1311 bio = ssl.MemoryBIO()
1312 self.assertRaises(TypeError, bio.write, 'foo')
1313 self.assertRaises(TypeError, bio.write, None)
1314 self.assertRaises(TypeError, bio.write, True)
1315 self.assertRaises(TypeError, bio.write, 1)
1316
1317
Bill Janssen6e027db2007-11-15 22:23:56 +00001318class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001319
Antoine Pitrou480a1242010-04-28 21:37:09 +00001320 def test_connect(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001321 with support.transient_internet("svn.python.org"):
1322 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1323 cert_reqs=ssl.CERT_NONE)
1324 try:
1325 s.connect(("svn.python.org", 443))
1326 self.assertEqual({}, s.getpeercert())
1327 finally:
1328 s.close()
1329
1330 # this should fail because we have no verification certs
1331 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1332 cert_reqs=ssl.CERT_REQUIRED)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001333 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1334 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001335 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001336
Antoine Pitrou350c7222010-09-09 13:31:46 +00001337 # this should succeed because we specify the root cert
1338 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1339 cert_reqs=ssl.CERT_REQUIRED,
1340 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1341 try:
1342 s.connect(("svn.python.org", 443))
1343 self.assertTrue(s.getpeercert())
1344 finally:
1345 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001346
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001347 def test_connect_ex(self):
1348 # Issue #11326: check connect_ex() implementation
1349 with support.transient_internet("svn.python.org"):
1350 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1351 cert_reqs=ssl.CERT_REQUIRED,
1352 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1353 try:
1354 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1355 self.assertTrue(s.getpeercert())
1356 finally:
1357 s.close()
1358
1359 def test_non_blocking_connect_ex(self):
1360 # Issue #11326: non-blocking connect_ex() should allow handshake
1361 # to proceed after the socket gets ready.
1362 with support.transient_internet("svn.python.org"):
1363 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1364 cert_reqs=ssl.CERT_REQUIRED,
1365 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1366 do_handshake_on_connect=False)
1367 try:
1368 s.setblocking(False)
1369 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8a14a0c2011-02-27 15:44:12 +00001370 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1371 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001372 # Wait for connect to finish
1373 select.select([], [s], [], 5.0)
1374 # Non-blocking handshake
1375 while True:
1376 try:
1377 s.do_handshake()
1378 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001379 except ssl.SSLWantReadError:
1380 select.select([s], [], [], 5.0)
1381 except ssl.SSLWantWriteError:
1382 select.select([], [s], [], 5.0)
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001383 # SSL established
1384 self.assertTrue(s.getpeercert())
1385 finally:
1386 s.close()
1387
Antoine Pitroub4410db2011-05-18 18:51:06 +02001388 def test_timeout_connect_ex(self):
1389 # Issue #12065: on a timeout, connect_ex() should return the original
1390 # errno (mimicking the behaviour of non-SSL sockets).
1391 with support.transient_internet("svn.python.org"):
1392 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1393 cert_reqs=ssl.CERT_REQUIRED,
1394 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1395 do_handshake_on_connect=False)
1396 try:
1397 s.settimeout(0.0000001)
1398 rc = s.connect_ex(('svn.python.org', 443))
1399 if rc == 0:
1400 self.skipTest("svn.python.org responded too quickly")
1401 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1402 finally:
1403 s.close()
1404
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001405 def test_connect_ex_error(self):
Antoine Pitrouddb87ab2012-12-28 19:07:43 +01001406 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001407 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1408 cert_reqs=ssl.CERT_REQUIRED,
1409 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1410 try:
Christian Heimesde570742013-12-16 21:15:44 +01001411 rc = s.connect_ex(("svn.python.org", 444))
1412 # Issue #19919: Windows machines or VMs hosted on Windows
1413 # machines sometimes return EWOULDBLOCK.
1414 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001415 finally:
1416 s.close()
1417
Antoine Pitrou152efa22010-05-16 18:19:27 +00001418 def test_connect_with_context(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001419 with support.transient_internet("svn.python.org"):
1420 # Same as test_connect, but with a separately created context
1421 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1422 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1423 s.connect(("svn.python.org", 443))
1424 try:
1425 self.assertEqual({}, s.getpeercert())
1426 finally:
1427 s.close()
Antoine Pitroud5323212010-10-22 18:19:07 +00001428 # Same with a server hostname
1429 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1430 server_hostname="svn.python.org")
1431 if ssl.HAS_SNI:
1432 s.connect(("svn.python.org", 443))
1433 s.close()
1434 else:
1435 self.assertRaises(ValueError, s.connect, ("svn.python.org", 443))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001436 # This should fail because we have no verification certs
1437 ctx.verify_mode = ssl.CERT_REQUIRED
1438 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Ezio Melottied3a7d22010-12-01 02:32:32 +00001439 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
Antoine Pitrou350c7222010-09-09 13:31:46 +00001440 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001441 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001442 # This should succeed because we specify the root cert
1443 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1444 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1445 s.connect(("svn.python.org", 443))
1446 try:
1447 cert = s.getpeercert()
1448 self.assertTrue(cert)
1449 finally:
1450 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001451
1452 def test_connect_capath(self):
1453 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001454 # NOTE: the subject hashing algorithm has been changed between
1455 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1456 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001457 # filename) for this test to be portable across OpenSSL releases.
Antoine Pitrou350c7222010-09-09 13:31:46 +00001458 with support.transient_internet("svn.python.org"):
1459 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1460 ctx.verify_mode = ssl.CERT_REQUIRED
1461 ctx.load_verify_locations(capath=CAPATH)
1462 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1463 s.connect(("svn.python.org", 443))
1464 try:
1465 cert = s.getpeercert()
1466 self.assertTrue(cert)
1467 finally:
1468 s.close()
1469 # Same with a bytes `capath` argument
1470 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1471 ctx.verify_mode = ssl.CERT_REQUIRED
1472 ctx.load_verify_locations(capath=BYTES_CAPATH)
1473 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1474 s.connect(("svn.python.org", 443))
1475 try:
1476 cert = s.getpeercert()
1477 self.assertTrue(cert)
1478 finally:
1479 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001480
Christian Heimesefff7062013-11-21 03:35:02 +01001481 def test_connect_cadata(self):
1482 with open(CAFILE_CACERT) as f:
1483 pem = f.read()
1484 der = ssl.PEM_cert_to_DER_cert(pem)
1485 with support.transient_internet("svn.python.org"):
1486 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1487 ctx.verify_mode = ssl.CERT_REQUIRED
1488 ctx.load_verify_locations(cadata=pem)
1489 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1490 s.connect(("svn.python.org", 443))
1491 cert = s.getpeercert()
1492 self.assertTrue(cert)
1493
1494 # same with DER
1495 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1496 ctx.verify_mode = ssl.CERT_REQUIRED
1497 ctx.load_verify_locations(cadata=der)
1498 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1499 s.connect(("svn.python.org", 443))
1500 cert = s.getpeercert()
1501 self.assertTrue(cert)
1502
Antoine Pitroue3220242010-04-24 11:13:53 +00001503 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1504 def test_makefile_close(self):
1505 # Issue #5238: creating a file-like object with makefile() shouldn't
1506 # delay closing the underlying "real socket" (here tested with its
1507 # file descriptor, hence skipping the test under Windows).
Antoine Pitrou350c7222010-09-09 13:31:46 +00001508 with support.transient_internet("svn.python.org"):
1509 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1510 ss.connect(("svn.python.org", 443))
1511 fd = ss.fileno()
1512 f = ss.makefile()
1513 f.close()
1514 # The fd is still open
Antoine Pitroue3220242010-04-24 11:13:53 +00001515 os.read(fd, 0)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001516 # Closing the SSL socket should close the fd too
1517 ss.close()
1518 gc.collect()
1519 with self.assertRaises(OSError) as e:
1520 os.read(fd, 0)
1521 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001522
Antoine Pitrou480a1242010-04-28 21:37:09 +00001523 def test_non_blocking_handshake(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001524 with support.transient_internet("svn.python.org"):
1525 s = socket.socket(socket.AF_INET)
1526 s.connect(("svn.python.org", 443))
1527 s.setblocking(False)
1528 s = ssl.wrap_socket(s,
1529 cert_reqs=ssl.CERT_NONE,
1530 do_handshake_on_connect=False)
1531 count = 0
1532 while True:
1533 try:
1534 count += 1
1535 s.do_handshake()
1536 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001537 except ssl.SSLWantReadError:
1538 select.select([s], [], [])
1539 except ssl.SSLWantWriteError:
1540 select.select([], [s], [])
Antoine Pitrou350c7222010-09-09 13:31:46 +00001541 s.close()
1542 if support.verbose:
1543 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001544
Antoine Pitrou480a1242010-04-28 21:37:09 +00001545 def test_get_server_certificate(self):
Antoine Pitrou15399c32011-04-28 19:23:55 +02001546 def _test_get_server_certificate(host, port, cert=None):
1547 with support.transient_internet(host):
Antoine Pitrou94a5b662014-04-16 18:56:28 +02001548 pem = ssl.get_server_certificate((host, port))
Antoine Pitrou15399c32011-04-28 19:23:55 +02001549 if not pem:
1550 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001551
Antoine Pitrou15399c32011-04-28 19:23:55 +02001552 try:
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001553 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001554 ca_certs=CERTFILE)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001555 except ssl.SSLError as x:
1556 #should fail
1557 if support.verbose:
1558 sys.stdout.write("%s\n" % x)
1559 else:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001560 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1561
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001562 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001563 ca_certs=cert)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001564 if not pem:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001565 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001566 if support.verbose:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001567 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001568
Antoine Pitrou15399c32011-04-28 19:23:55 +02001569 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1570 if support.IPV6_ENABLED:
1571 _test_get_server_certificate('ipv6.google.com', 443)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001572
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001573 def test_ciphers(self):
1574 remote = ("svn.python.org", 443)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001575 with support.transient_internet(remote[0]):
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001576 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1577 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1578 s.connect(remote)
1579 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1580 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1581 s.connect(remote)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001582 # Error checking can happen at instantiation or when connecting
Ezio Melottied3a7d22010-12-01 02:32:32 +00001583 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitroud2eca372010-10-29 23:41:37 +00001584 with socket.socket(socket.AF_INET) as sock:
1585 s = ssl.wrap_socket(sock,
1586 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1587 s.connect(remote)
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001588
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001589 def test_algorithms(self):
1590 # Issue #8484: all algorithms should be available when verifying a
1591 # certificate.
Antoine Pitrou29619b22010-04-22 18:43:31 +00001592 # SHA256 was added in OpenSSL 0.9.8
1593 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1594 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001595 # sha256.tbs-internet.com needs SNI to use the correct certificate
1596 if not ssl.HAS_SNI:
1597 self.skipTest("SNI needed for this test")
Victor Stinnerf332abb2011-01-08 03:16:05 +00001598 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1599 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001600 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Antoine Pitrou160fd932011-01-08 10:23:29 +00001601 with support.transient_internet("sha256.tbs-internet.com"):
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001602 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1603 ctx.verify_mode = ssl.CERT_REQUIRED
1604 ctx.load_verify_locations(sha256_cert)
1605 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1606 server_hostname="sha256.tbs-internet.com")
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001607 try:
1608 s.connect(remote)
1609 if support.verbose:
1610 sys.stdout.write("\nCipher with %r is %r\n" %
1611 (remote, s.cipher()))
1612 sys.stdout.write("Certificate is:\n%s\n" %
1613 pprint.pformat(s.getpeercert()))
1614 finally:
1615 s.close()
1616
Christian Heimes9a5395a2013-06-17 15:44:12 +02001617 def test_get_ca_certs_capath(self):
1618 # capath certs are loaded on request
1619 with support.transient_internet("svn.python.org"):
1620 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1621 ctx.verify_mode = ssl.CERT_REQUIRED
1622 ctx.load_verify_locations(capath=CAPATH)
1623 self.assertEqual(ctx.get_ca_certs(), [])
1624 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1625 s.connect(("svn.python.org", 443))
1626 try:
1627 cert = s.getpeercert()
1628 self.assertTrue(cert)
1629 finally:
1630 s.close()
1631 self.assertEqual(len(ctx.get_ca_certs()), 1)
1632
Christian Heimes575596e2013-12-15 21:49:17 +01001633 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001634 def test_context_setget(self):
1635 # Check that the context of a connected socket can be replaced.
1636 with support.transient_internet("svn.python.org"):
1637 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1638 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1639 s = socket.socket(socket.AF_INET)
1640 with ctx1.wrap_socket(s) as ss:
1641 ss.connect(("svn.python.org", 443))
1642 self.assertIs(ss.context, ctx1)
1643 self.assertIs(ss._sslobj.context, ctx1)
1644 ss.context = ctx2
1645 self.assertIs(ss.context, ctx2)
1646 self.assertIs(ss._sslobj.context, ctx2)
1647
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001648
1649class NetworkedBIOTests(unittest.TestCase):
1650
1651 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1652 # A simple IO loop. Call func(*args) depending on the error we get
1653 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1654 timeout = kwargs.get('timeout', 10)
1655 count = 0
1656 while True:
1657 errno = None
1658 count += 1
1659 try:
1660 ret = func(*args)
1661 except ssl.SSLError as e:
1662 # Note that we get a spurious -1/SSL_ERROR_SYSCALL for
1663 # non-blocking IO. The SSL_shutdown manpage hints at this.
1664 # It *should* be safe to just ignore SYS_ERROR_SYSCALL because
1665 # with a Memory BIO there's no syscalls (for IO at least).
1666 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
1667 ssl.SSL_ERROR_WANT_WRITE,
1668 ssl.SSL_ERROR_SYSCALL):
1669 raise
1670 errno = e.errno
1671 # Get any data from the outgoing BIO irrespective of any error, and
1672 # send it to the socket.
1673 buf = outgoing.read()
1674 sock.sendall(buf)
1675 # If there's no error, we're done. For WANT_READ, we need to get
1676 # data from the socket and put it in the incoming BIO.
1677 if errno is None:
1678 break
1679 elif errno == ssl.SSL_ERROR_WANT_READ:
1680 buf = sock.recv(32768)
1681 if buf:
1682 incoming.write(buf)
1683 else:
1684 incoming.write_eof()
1685 if support.verbose:
1686 sys.stdout.write("Needed %d calls to complete %s().\n"
1687 % (count, func.__name__))
1688 return ret
1689
1690 def test_handshake(self):
1691 with support.transient_internet("svn.python.org"):
1692 sock = socket.socket(socket.AF_INET)
1693 sock.connect(("svn.python.org", 443))
1694 incoming = ssl.MemoryBIO()
1695 outgoing = ssl.MemoryBIO()
1696 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1697 ctx.verify_mode = ssl.CERT_REQUIRED
1698 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1699 if ssl.HAS_SNI:
1700 ctx.check_hostname = True
1701 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'svn.python.org')
1702 else:
1703 ctx.check_hostname = False
1704 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1705 self.assertIs(sslobj._sslobj.owner, sslobj)
1706 self.assertIsNone(sslobj.cipher())
1707 self.assertRaises(ValueError, sslobj.getpeercert)
1708 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1709 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1710 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1711 self.assertTrue(sslobj.cipher())
1712 self.assertTrue(sslobj.getpeercert())
1713 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1714 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1715 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1716 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1717 sock.close()
1718
1719 def test_read_write_data(self):
1720 with support.transient_internet("svn.python.org"):
1721 sock = socket.socket(socket.AF_INET)
1722 sock.connect(("svn.python.org", 443))
1723 incoming = ssl.MemoryBIO()
1724 outgoing = ssl.MemoryBIO()
1725 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1726 ctx.verify_mode = ssl.CERT_NONE
1727 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1728 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1729 req = b'GET / HTTP/1.0\r\n\r\n'
1730 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1731 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1732 self.assertEqual(buf[:5], b'HTTP/')
1733 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1734 sock.close()
1735
1736
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001737try:
1738 import threading
1739except ImportError:
1740 _have_threads = False
1741else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001742 _have_threads = True
1743
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001744 from test.ssl_servers import make_https_server
1745
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001746 class ThreadedEchoServer(threading.Thread):
1747
1748 class ConnectionHandler(threading.Thread):
1749
1750 """A mildly complicated class, because we want it to work both
1751 with and without the SSL wrapper around the socket connection, so
1752 that we can test the STARTTLS functionality."""
1753
Bill Janssen6e027db2007-11-15 22:23:56 +00001754 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001755 self.server = server
1756 self.running = False
1757 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001758 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001759 self.sock.setblocking(1)
1760 self.sslconn = None
1761 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001762 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001763
Antoine Pitrou480a1242010-04-28 21:37:09 +00001764 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001765 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001766 self.sslconn = self.server.context.wrap_socket(
1767 self.sock, server_side=True)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001768 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001769 except (ssl.SSLError, ConnectionResetError) as e:
1770 # We treat ConnectionResetError as though it were an
1771 # SSLError - OpenSSL on Ubuntu abruptly closes the
1772 # connection when asked to use an unsupported protocol.
1773 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001774 # XXX Various errors can have happened here, for example
1775 # a mismatching protocol version, an invalid certificate,
1776 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001777 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001778 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001779 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001780 self.running = False
1781 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001782 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001783 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001784 else:
Antoine Pitroub5218772010-05-21 09:56:06 +00001785 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001786 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001787 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001788 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1789 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001790 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001791 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1792 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001793 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001794 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001795 sys.stdout.write(" server: selected protocol is now "
1796 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001797 return True
1798
1799 def read(self):
1800 if self.sslconn:
1801 return self.sslconn.read()
1802 else:
1803 return self.sock.recv(1024)
1804
1805 def write(self, bytes):
1806 if self.sslconn:
1807 return self.sslconn.write(bytes)
1808 else:
1809 return self.sock.send(bytes)
1810
1811 def close(self):
1812 if self.sslconn:
1813 self.sslconn.close()
1814 else:
1815 self.sock.close()
1816
Antoine Pitrou480a1242010-04-28 21:37:09 +00001817 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001818 self.running = True
1819 if not self.server.starttls_server:
1820 if not self.wrap_conn():
1821 return
1822 while self.running:
1823 try:
1824 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001825 stripped = msg.strip()
1826 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001827 # eof, so quit this handler
1828 self.running = False
1829 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001830 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001831 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001832 sys.stdout.write(" server: client closed connection\n")
1833 self.close()
1834 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001835 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001836 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001837 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001838 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001839 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001840 if not self.wrap_conn():
1841 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001842 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001843 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001844 if support.verbose and self.server.connectionchatty:
1845 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001846 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001847 self.sock = self.sslconn.unwrap()
1848 self.sslconn = None
1849 if support.verbose and self.server.connectionchatty:
1850 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001851 elif stripped == b'CB tls-unique':
1852 if support.verbose and self.server.connectionchatty:
1853 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1854 data = self.sslconn.get_channel_binding("tls-unique")
1855 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001856 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001857 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001858 self.server.connectionchatty):
1859 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001860 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1861 % (msg, ctype, msg.lower(), ctype))
1862 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001863 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001864 if self.server.chatty:
1865 handle_error("Test server failure:\n")
1866 self.close()
1867 self.running = False
1868 # normally, we'd just stop here, but for the test
1869 # harness, we want to stop the server
1870 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001871
Antoine Pitroub5218772010-05-21 09:56:06 +00001872 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001873 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001874 chatty=True, connectionchatty=False, starttls_server=False,
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001875 npn_protocols=None, ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001876 if context:
1877 self.context = context
1878 else:
1879 self.context = ssl.SSLContext(ssl_version
1880 if ssl_version is not None
1881 else ssl.PROTOCOL_TLSv1)
1882 self.context.verify_mode = (certreqs if certreqs is not None
1883 else ssl.CERT_NONE)
1884 if cacerts:
1885 self.context.load_verify_locations(cacerts)
1886 if certificate:
1887 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001888 if npn_protocols:
1889 self.context.set_npn_protocols(npn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001890 if ciphers:
1891 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001892 self.chatty = chatty
1893 self.connectionchatty = connectionchatty
1894 self.starttls_server = starttls_server
1895 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001896 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001897 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001898 self.active = False
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001899 self.selected_protocols = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001900 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001901 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001902 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001903
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001904 def __enter__(self):
1905 self.start(threading.Event())
1906 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001907 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001908
1909 def __exit__(self, *args):
1910 self.stop()
1911 self.join()
1912
Antoine Pitrou480a1242010-04-28 21:37:09 +00001913 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001914 self.flag = flag
1915 threading.Thread.start(self)
1916
Antoine Pitrou480a1242010-04-28 21:37:09 +00001917 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001918 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01001919 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001920 self.active = True
1921 if self.flag:
1922 # signal an event
1923 self.flag.set()
1924 while self.active:
1925 try:
1926 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001927 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001928 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001929 + repr(connaddr) + '\n')
1930 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001931 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001932 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001933 except socket.timeout:
1934 pass
1935 except KeyboardInterrupt:
1936 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001937 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001938
Antoine Pitrou480a1242010-04-28 21:37:09 +00001939 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001940 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001941
Bill Janssen54cc54c2007-12-14 22:08:56 +00001942 class AsyncoreEchoServer(threading.Thread):
1943
1944 # this one's based on asyncore.dispatcher
1945
1946 class EchoServer (asyncore.dispatcher):
1947
1948 class ConnectionHandler (asyncore.dispatcher_with_send):
1949
1950 def __init__(self, conn, certfile):
1951 self.socket = ssl.wrap_socket(conn, server_side=True,
1952 certfile=certfile,
1953 do_handshake_on_connect=False)
1954 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001955 self._ssl_accepting = True
1956 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001957
1958 def readable(self):
1959 if isinstance(self.socket, ssl.SSLSocket):
1960 while self.socket.pending() > 0:
1961 self.handle_read_event()
1962 return True
1963
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001964 def _do_ssl_handshake(self):
1965 try:
1966 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02001967 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1968 return
1969 except ssl.SSLEOFError:
1970 return self.handle_close()
1971 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001972 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02001973 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001974 if err.args[0] == errno.ECONNABORTED:
1975 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001976 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001977 self._ssl_accepting = False
1978
1979 def handle_read(self):
1980 if self._ssl_accepting:
1981 self._do_ssl_handshake()
1982 else:
1983 data = self.recv(1024)
1984 if support.verbose:
1985 sys.stdout.write(" server: read %s from client\n" % repr(data))
1986 if not data:
1987 self.close()
1988 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001989 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00001990
1991 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00001992 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001993 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00001994 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1995
1996 def handle_error(self):
1997 raise
1998
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001999 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002000 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002001 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2002 self.port = support.bind_port(sock, '')
2003 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002004 self.listen(5)
2005
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00002006 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002007 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002008 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2009 self.ConnectionHandler(sock_obj, self.certfile)
2010
2011 def handle_error(self):
2012 raise
2013
Trent Nelson78520002008-04-10 20:54:35 +00002014 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002015 self.flag = None
2016 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002017 self.server = self.EchoServer(certfile)
2018 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00002019 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002020 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002021
2022 def __str__(self):
2023 return "<%s %s>" % (self.__class__.__name__, self.server)
2024
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002025 def __enter__(self):
2026 self.start(threading.Event())
2027 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002028 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002029
2030 def __exit__(self, *args):
2031 if support.verbose:
2032 sys.stdout.write(" cleanup: stopping server.\n")
2033 self.stop()
2034 if support.verbose:
2035 sys.stdout.write(" cleanup: joining server thread.\n")
2036 self.join()
2037 if support.verbose:
2038 sys.stdout.write(" cleanup: successfully joined.\n")
2039
Bill Janssen54cc54c2007-12-14 22:08:56 +00002040 def start (self, flag=None):
2041 self.flag = flag
2042 threading.Thread.start(self)
2043
Antoine Pitrou480a1242010-04-28 21:37:09 +00002044 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002045 self.active = True
2046 if self.flag:
2047 self.flag.set()
2048 while self.active:
2049 try:
2050 asyncore.loop(1)
2051 except:
2052 pass
2053
Antoine Pitrou480a1242010-04-28 21:37:09 +00002054 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002055 self.active = False
2056 self.server.close()
2057
Antoine Pitrou480a1242010-04-28 21:37:09 +00002058 def bad_cert_test(certfile):
2059 """
2060 Launch a server with CERT_REQUIRED, and check that trying to
2061 connect to it with the given client certificate fails.
2062 """
Trent Nelson78520002008-04-10 20:54:35 +00002063 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002064 certreqs=ssl.CERT_REQUIRED,
Bill Janssen6e027db2007-11-15 22:23:56 +00002065 cacerts=CERTFILE, chatty=False,
2066 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002067 with server:
Thomas Woutersed03b412007-08-28 21:37:11 +00002068 try:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002069 with socket.socket() as sock:
2070 s = ssl.wrap_socket(sock,
2071 certfile=certfile,
2072 ssl_version=ssl.PROTOCOL_TLSv1)
2073 s.connect((HOST, server.port))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002074 except ssl.SSLError as x:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002075 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002076 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
Andrew Svetlov0832af62012-12-18 23:10:48 +02002077 except OSError as x:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002078 if support.verbose:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002079 sys.stdout.write("\nOSError is %s\n" % x.args[1])
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002080 except OSError as x:
Giampaolo Rodolàcd9dfb92010-08-29 20:56:56 +00002081 if x.errno != errno.ENOENT:
2082 raise
Giampaolo Rodolà745ab382010-08-29 19:25:49 +00002083 if support.verbose:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002084 sys.stdout.write("\OSError is %s\n" % str(x))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002085 else:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002086 raise AssertionError("Use of invalid cert should have failed!")
Thomas Woutersed03b412007-08-28 21:37:11 +00002087
Antoine Pitroub5218772010-05-21 09:56:06 +00002088 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002089 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00002090 """
2091 Launch a server, connect a client to it and try various reads
2092 and writes.
2093 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002094 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00002095 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002096 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00002097 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002098 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002099 with client_context.wrap_socket(socket.socket(),
2100 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002101 s.connect((HOST, server.port))
2102 for arg in [indata, bytearray(indata), memoryview(indata)]:
2103 if connectionchatty:
2104 if support.verbose:
2105 sys.stdout.write(
2106 " client: sending %r...\n" % indata)
2107 s.write(arg)
2108 outdata = s.read()
2109 if connectionchatty:
2110 if support.verbose:
2111 sys.stdout.write(" client: read %r\n" % outdata)
2112 if outdata != indata.lower():
2113 raise AssertionError(
2114 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2115 % (outdata[:20], len(outdata),
2116 indata[:20].lower(), len(indata)))
2117 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00002118 if connectionchatty:
2119 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002120 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002121 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01002122 'compression': s.compression(),
2123 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002124 'peercert': s.getpeercert(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02002125 'client_npn_protocol': s.selected_npn_protocol(),
2126 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002127 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01002128 s.close()
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002129 stats['server_npn_protocols'] = server.selected_protocols
2130 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00002131
Antoine Pitroub5218772010-05-21 09:56:06 +00002132 def try_protocol_combo(server_protocol, client_protocol, expect_success,
2133 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02002134 """
2135 Try to SSL-connect using *client_protocol* to *server_protocol*.
2136 If *expect_success* is true, assert that the connection succeeds,
2137 if it's false, assert that the connection fails.
2138 Also, if *expect_success* is a string, assert that it is the protocol
2139 version actually used by the connection.
2140 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00002141 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002142 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00002143 certtype = {
2144 ssl.CERT_NONE: "CERT_NONE",
2145 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2146 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2147 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002148 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002149 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002150 sys.stdout.write(formatstr %
2151 (ssl.get_protocol_name(client_protocol),
2152 ssl.get_protocol_name(server_protocol),
2153 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00002154 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002155 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00002156 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002157 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002158
2159 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2160 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2161 # starting from OpenSSL 1.0.0 (see issue #8322).
2162 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2163 client_context.set_ciphers("ALL")
2164
Antoine Pitroub5218772010-05-21 09:56:06 +00002165 for ctx in (client_context, server_context):
2166 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00002167 ctx.load_cert_chain(CERTFILE)
2168 ctx.load_verify_locations(CERTFILE)
2169 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02002170 stats = server_params_test(client_context, server_context,
2171 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002172 # Protocol mismatch can result in either an SSLError, or a
2173 # "Connection reset by peer" error.
2174 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002175 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002176 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002177 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002178 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002179 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002180 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002181 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002182 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002183 "Client protocol %s succeeded with server protocol %s!"
2184 % (ssl.get_protocol_name(client_protocol),
2185 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002186 elif (expect_success is not True
2187 and expect_success != stats['version']):
2188 raise AssertionError("version mismatch: expected %r, got %r"
2189 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002190
2191
Bill Janssen6e027db2007-11-15 22:23:56 +00002192 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002193
Antoine Pitrou23df4832010-08-04 17:14:06 +00002194 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002195 def test_echo(self):
2196 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002197 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002198 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002199 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002200 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2201 context = ssl.SSLContext(protocol)
2202 context.load_cert_chain(CERTFILE)
2203 server_params_test(context, context,
2204 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002205
Antoine Pitrou480a1242010-04-28 21:37:09 +00002206 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002207 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002208 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002209 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2210 context.verify_mode = ssl.CERT_REQUIRED
2211 context.load_verify_locations(CERTFILE)
2212 context.load_cert_chain(CERTFILE)
2213 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002214 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002215 s = context.wrap_socket(socket.socket(),
2216 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002217 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002218 # getpeercert() raise ValueError while the handshake isn't
2219 # done.
2220 with self.assertRaises(ValueError):
2221 s.getpeercert()
2222 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002223 cert = s.getpeercert()
2224 self.assertTrue(cert, "Can't get peer certificate.")
2225 cipher = s.cipher()
2226 if support.verbose:
2227 sys.stdout.write(pprint.pformat(cert) + '\n')
2228 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2229 if 'subject' not in cert:
2230 self.fail("No subject field in certificate: %s." %
2231 pprint.pformat(cert))
2232 if ((('organizationName', 'Python Software Foundation'),)
2233 not in cert['subject']):
2234 self.fail(
2235 "Missing or invalid 'organizationName' field in certificate subject; "
2236 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002237 self.assertIn('notBefore', cert)
2238 self.assertIn('notAfter', cert)
2239 before = ssl.cert_time_to_seconds(cert['notBefore'])
2240 after = ssl.cert_time_to_seconds(cert['notAfter'])
2241 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002242 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002243
Christian Heimes2427b502013-11-23 11:24:32 +01002244 @unittest.skipUnless(have_verify_flags(),
2245 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002246 def test_crl_check(self):
2247 if support.verbose:
2248 sys.stdout.write("\n")
2249
2250 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2251 server_context.load_cert_chain(SIGNED_CERTFILE)
2252
2253 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2254 context.verify_mode = ssl.CERT_REQUIRED
2255 context.load_verify_locations(SIGNING_CA)
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002256 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
Christian Heimes22587792013-11-21 23:56:13 +01002257
2258 # VERIFY_DEFAULT should pass
2259 server = ThreadedEchoServer(context=server_context, chatty=True)
2260 with server:
2261 with context.wrap_socket(socket.socket()) as s:
2262 s.connect((HOST, server.port))
2263 cert = s.getpeercert()
2264 self.assertTrue(cert, "Can't get peer certificate.")
2265
2266 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002267 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002268
2269 server = ThreadedEchoServer(context=server_context, chatty=True)
2270 with server:
2271 with context.wrap_socket(socket.socket()) as s:
2272 with self.assertRaisesRegex(ssl.SSLError,
2273 "certificate verify failed"):
2274 s.connect((HOST, server.port))
2275
2276 # now load a CRL file. The CRL file is signed by the CA.
2277 context.load_verify_locations(CRLFILE)
2278
2279 server = ThreadedEchoServer(context=server_context, chatty=True)
2280 with server:
2281 with context.wrap_socket(socket.socket()) as s:
2282 s.connect((HOST, server.port))
2283 cert = s.getpeercert()
2284 self.assertTrue(cert, "Can't get peer certificate.")
2285
Christian Heimes575596e2013-12-15 21:49:17 +01002286 @needs_sni
Christian Heimes1aa9a752013-12-02 02:41:19 +01002287 def test_check_hostname(self):
2288 if support.verbose:
2289 sys.stdout.write("\n")
2290
2291 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2292 server_context.load_cert_chain(SIGNED_CERTFILE)
2293
2294 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2295 context.verify_mode = ssl.CERT_REQUIRED
2296 context.check_hostname = True
2297 context.load_verify_locations(SIGNING_CA)
2298
2299 # correct hostname should verify
2300 server = ThreadedEchoServer(context=server_context, chatty=True)
2301 with server:
2302 with context.wrap_socket(socket.socket(),
2303 server_hostname="localhost") as s:
2304 s.connect((HOST, server.port))
2305 cert = s.getpeercert()
2306 self.assertTrue(cert, "Can't get peer certificate.")
2307
2308 # incorrect hostname should raise an exception
2309 server = ThreadedEchoServer(context=server_context, chatty=True)
2310 with server:
2311 with context.wrap_socket(socket.socket(),
2312 server_hostname="invalid") as s:
2313 with self.assertRaisesRegex(ssl.CertificateError,
2314 "hostname 'invalid' doesn't match 'localhost'"):
2315 s.connect((HOST, server.port))
2316
2317 # missing server_hostname arg should cause an exception, too
2318 server = ThreadedEchoServer(context=server_context, chatty=True)
2319 with server:
2320 with socket.socket() as s:
2321 with self.assertRaisesRegex(ValueError,
2322 "check_hostname requires server_hostname"):
2323 context.wrap_socket(s)
2324
Antoine Pitrou480a1242010-04-28 21:37:09 +00002325 def test_empty_cert(self):
2326 """Connecting with an empty cert file"""
2327 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2328 "nullcert.pem"))
2329 def test_malformed_cert(self):
2330 """Connecting with a badly formatted certificate (syntax error)"""
2331 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2332 "badcert.pem"))
2333 def test_nonexisting_cert(self):
2334 """Connecting with a non-existing cert file"""
2335 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2336 "wrongcert.pem"))
2337 def test_malformed_key(self):
2338 """Connecting with a badly formatted key (syntax error)"""
2339 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2340 "badkey.pem"))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002341
Antoine Pitrou480a1242010-04-28 21:37:09 +00002342 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002343 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002344 in the client when attempting handshake.
2345 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002346 listener_ready = threading.Event()
2347 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002348
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002349 s = socket.socket()
2350 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002351
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002352 # `listener` runs in a thread. It sits in an accept() until
2353 # the main thread connects. Then it rudely closes the socket,
2354 # and sets Event `listener_gone` to let the main thread know
2355 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002356 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002357 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002358 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002359 newsock, addr = s.accept()
2360 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002361 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002362 listener_gone.set()
2363
2364 def connector():
2365 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002366 with socket.socket() as c:
2367 c.connect((HOST, port))
2368 listener_gone.wait()
2369 try:
2370 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002371 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002372 pass
2373 else:
2374 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002375
2376 t = threading.Thread(target=listener)
2377 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002378 try:
2379 connector()
2380 finally:
2381 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002382
Antoine Pitrou23df4832010-08-04 17:14:06 +00002383 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002384 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2385 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002386 def test_protocol_sslv2(self):
2387 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002388 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002389 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002390 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2391 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2392 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002393 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002394 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2395 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002396 # SSLv23 client with specific SSL options
2397 if no_sslv2_implies_sslv3_hello():
2398 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2399 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2400 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002401 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002402 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002403 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002404 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002405
Antoine Pitrou23df4832010-08-04 17:14:06 +00002406 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002407 def test_protocol_sslv23(self):
2408 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002409 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002410 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002411 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2412 try:
2413 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002414 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002415 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2416 if support.verbose:
2417 sys.stdout.write(
2418 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2419 % str(x))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002420 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou480a1242010-04-28 21:37:09 +00002421 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002422 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002423
Antoine Pitrou47e40422014-09-04 21:00:10 +02002424 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002425 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002426 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002427
Antoine Pitrou47e40422014-09-04 21:00:10 +02002428 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002429 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002430 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002431
Antoine Pitroub5218772010-05-21 09:56:06 +00002432 # Server with specific SSL options
2433 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2434 server_options=ssl.OP_NO_SSLv3)
2435 # Will choose TLSv1
2436 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2437 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2438 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2439 server_options=ssl.OP_NO_TLSv1)
2440
2441
Antoine Pitrou23df4832010-08-04 17:14:06 +00002442 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002443 def test_protocol_sslv3(self):
2444 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002445 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002446 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002447 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2448 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2449 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002450 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2451 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002452 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2453 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002454 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002455 if no_sslv2_implies_sslv3_hello():
2456 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Antoine Pitrou47e40422014-09-04 21:00:10 +02002457 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Antoine Pitroub5218772010-05-21 09:56:06 +00002458 client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002459
Antoine Pitrou23df4832010-08-04 17:14:06 +00002460 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002461 def test_protocol_tlsv1(self):
2462 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002463 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002464 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002465 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2466 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2467 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002468 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2469 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002470 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002471 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2472 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002473
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002474 @skip_if_broken_ubuntu_ssl
2475 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2476 "TLS version 1.1 not supported.")
2477 def test_protocol_tlsv1_1(self):
2478 """Connecting to a TLSv1.1 server with various client options.
2479 Testing against older TLS versions."""
2480 if support.verbose:
2481 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002482 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002483 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2484 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2485 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2486 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2487 client_options=ssl.OP_NO_TLSv1_1)
2488
Antoine Pitrou47e40422014-09-04 21:00:10 +02002489 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002490 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2491 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2492
2493
2494 @skip_if_broken_ubuntu_ssl
2495 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2496 "TLS version 1.2 not supported.")
2497 def test_protocol_tlsv1_2(self):
2498 """Connecting to a TLSv1.2 server with various client options.
2499 Testing against older TLS versions."""
2500 if support.verbose:
2501 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002502 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002503 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2504 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2505 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2506 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2507 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2508 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2509 client_options=ssl.OP_NO_TLSv1_2)
2510
Antoine Pitrou47e40422014-09-04 21:00:10 +02002511 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002512 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2513 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2514 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2515 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2516
Antoine Pitrou480a1242010-04-28 21:37:09 +00002517 def test_starttls(self):
2518 """Switching from clear text to encrypted and back again."""
2519 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002520
Trent Nelson78520002008-04-10 20:54:35 +00002521 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002522 ssl_version=ssl.PROTOCOL_TLSv1,
2523 starttls_server=True,
2524 chatty=True,
2525 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002526 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002527 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002528 s = socket.socket()
2529 s.setblocking(1)
2530 s.connect((HOST, server.port))
2531 if support.verbose:
2532 sys.stdout.write("\n")
2533 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002534 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002535 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002536 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002537 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002538 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002539 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002540 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002541 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002542 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002543 msg = outdata.strip().lower()
2544 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2545 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002546 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002547 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002548 " client: read %r from server, starting TLS...\n"
2549 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002550 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2551 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002552 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2553 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002554 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002555 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002556 " client: read %r from server, ending TLS...\n"
2557 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002558 s = conn.unwrap()
2559 wrapped = False
2560 else:
2561 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002562 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002563 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002564 if support.verbose:
2565 sys.stdout.write(" client: closing connection.\n")
2566 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002567 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002568 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002569 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002570 if wrapped:
2571 conn.close()
2572 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002573 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002574
Antoine Pitrou480a1242010-04-28 21:37:09 +00002575 def test_socketserver(self):
2576 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002577 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002578 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002579 if support.verbose:
2580 sys.stdout.write('\n')
2581 with open(CERTFILE, 'rb') as f:
2582 d1 = f.read()
2583 d2 = ''
2584 # now fetch the same data from the HTTPS server
Benjamin Peterson4ffb0752014-11-03 14:29:33 -05002585 url = 'https://localhost:%d/%s' % (
2586 server.port, os.path.split(CERTFILE)[1])
2587 context = ssl.create_default_context(cafile=CERTFILE)
2588 f = urllib.request.urlopen(url, context=context)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002589 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002590 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002591 if dlen and (int(dlen) > 0):
2592 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002593 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002594 sys.stdout.write(
2595 " client: read %d bytes from remote server '%s'\n"
2596 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002597 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002598 f.close()
2599 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002600
Antoine Pitrou480a1242010-04-28 21:37:09 +00002601 def test_asyncore_server(self):
2602 """Check the example asyncore integration."""
2603 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002604
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002605 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002606 sys.stdout.write("\n")
2607
Antoine Pitrou480a1242010-04-28 21:37:09 +00002608 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002609 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002610 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002611 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002612 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002613 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002614 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002615 " client: sending %r...\n" % indata)
2616 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002617 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002618 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002619 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002620 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002621 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002622 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2623 % (outdata[:20], len(outdata),
2624 indata[:20].lower(), len(indata)))
2625 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002626 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002627 sys.stdout.write(" client: closing connection.\n")
2628 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002629 if support.verbose:
2630 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002631
Antoine Pitrou480a1242010-04-28 21:37:09 +00002632 def test_recv_send(self):
2633 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002634 if support.verbose:
2635 sys.stdout.write("\n")
2636
2637 server = ThreadedEchoServer(CERTFILE,
2638 certreqs=ssl.CERT_NONE,
2639 ssl_version=ssl.PROTOCOL_TLSv1,
2640 cacerts=CERTFILE,
2641 chatty=True,
2642 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002643 with server:
2644 s = ssl.wrap_socket(socket.socket(),
2645 server_side=False,
2646 certfile=CERTFILE,
2647 ca_certs=CERTFILE,
2648 cert_reqs=ssl.CERT_NONE,
2649 ssl_version=ssl.PROTOCOL_TLSv1)
2650 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002651 # helper methods for standardising recv* method signatures
2652 def _recv_into():
2653 b = bytearray(b"\0"*100)
2654 count = s.recv_into(b)
2655 return b[:count]
2656
2657 def _recvfrom_into():
2658 b = bytearray(b"\0"*100)
2659 count, addr = s.recvfrom_into(b)
2660 return b[:count]
2661
2662 # (name, method, whether to expect success, *args)
2663 send_methods = [
2664 ('send', s.send, True, []),
2665 ('sendto', s.sendto, False, ["some.address"]),
2666 ('sendall', s.sendall, True, []),
2667 ]
2668 recv_methods = [
2669 ('recv', s.recv, True, []),
2670 ('recvfrom', s.recvfrom, False, ["some.address"]),
2671 ('recv_into', _recv_into, True, []),
2672 ('recvfrom_into', _recvfrom_into, False, []),
2673 ]
2674 data_prefix = "PREFIX_"
2675
2676 for meth_name, send_meth, expect_success, args in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002677 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002678 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002679 send_meth(indata, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002680 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002681 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002682 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002683 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002684 "<<{outdata:r}>> ({nout:d}) received; "
2685 "expected <<{indata:r}>> ({nin:d})\n".format(
2686 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002687 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002688 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002689 )
2690 )
2691 except ValueError as e:
2692 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002693 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002694 "Failed to send with method <<{name:s}>>; "
2695 "expected to succeed.\n".format(name=meth_name)
2696 )
2697 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002698 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002699 "Method <<{name:s}>> failed with unexpected "
2700 "exception message: {exp:s}\n".format(
2701 name=meth_name, exp=e
2702 )
2703 )
2704
2705 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002706 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002707 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002708 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002709 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002710 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002711 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002712 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002713 "<<{outdata:r}>> ({nout:d}) received; "
2714 "expected <<{indata:r}>> ({nin:d})\n".format(
2715 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002716 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002717 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002718 )
2719 )
2720 except ValueError as e:
2721 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002722 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002723 "Failed to receive with method <<{name:s}>>; "
2724 "expected to succeed.\n".format(name=meth_name)
2725 )
2726 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002727 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002728 "Method <<{name:s}>> failed with unexpected "
2729 "exception message: {exp:s}\n".format(
2730 name=meth_name, exp=e
2731 )
2732 )
2733 # consume data
2734 s.read()
2735
Nick Coghlan513886a2011-08-28 00:00:27 +10002736 # Make sure sendmsg et al are disallowed to avoid
2737 # inadvertent disclosure of data and/or corruption
2738 # of the encrypted data stream
2739 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2740 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2741 self.assertRaises(NotImplementedError,
2742 s.recvmsg_into, bytearray(100))
2743
Antoine Pitrou480a1242010-04-28 21:37:09 +00002744 s.write(b"over\n")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002745 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002746
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002747 def test_nonblocking_send(self):
2748 server = ThreadedEchoServer(CERTFILE,
2749 certreqs=ssl.CERT_NONE,
2750 ssl_version=ssl.PROTOCOL_TLSv1,
2751 cacerts=CERTFILE,
2752 chatty=True,
2753 connectionchatty=False)
2754 with server:
2755 s = ssl.wrap_socket(socket.socket(),
2756 server_side=False,
2757 certfile=CERTFILE,
2758 ca_certs=CERTFILE,
2759 cert_reqs=ssl.CERT_NONE,
2760 ssl_version=ssl.PROTOCOL_TLSv1)
2761 s.connect((HOST, server.port))
2762 s.setblocking(False)
2763
2764 # If we keep sending data, at some point the buffers
2765 # will be full and the call will block
2766 buf = bytearray(8192)
2767 def fill_buffer():
2768 while True:
2769 s.send(buf)
2770 self.assertRaises((ssl.SSLWantWriteError,
2771 ssl.SSLWantReadError), fill_buffer)
2772
2773 # Now read all the output and discard it
2774 s.setblocking(True)
2775 s.close()
2776
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002777 def test_handshake_timeout(self):
2778 # Issue #5103: SSL handshake must respect the socket timeout
2779 server = socket.socket(socket.AF_INET)
2780 host = "127.0.0.1"
2781 port = support.bind_port(server)
2782 started = threading.Event()
2783 finish = False
2784
2785 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002786 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002787 started.set()
2788 conns = []
2789 while not finish:
2790 r, w, e = select.select([server], [], [], 0.1)
2791 if server in r:
2792 # Let the socket hang around rather than having
2793 # it closed by garbage collection.
2794 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002795 for sock in conns:
2796 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002797
2798 t = threading.Thread(target=serve)
2799 t.start()
2800 started.wait()
2801
2802 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002803 try:
2804 c = socket.socket(socket.AF_INET)
2805 c.settimeout(0.2)
2806 c.connect((host, port))
2807 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002808 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002809 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002810 finally:
2811 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002812 try:
2813 c = socket.socket(socket.AF_INET)
2814 c = ssl.wrap_socket(c)
2815 c.settimeout(0.2)
2816 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002817 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002818 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002819 finally:
2820 c.close()
2821 finally:
2822 finish = True
2823 t.join()
2824 server.close()
2825
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002826 def test_server_accept(self):
2827 # Issue #16357: accept() on a SSLSocket created through
2828 # SSLContext.wrap_socket().
2829 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2830 context.verify_mode = ssl.CERT_REQUIRED
2831 context.load_verify_locations(CERTFILE)
2832 context.load_cert_chain(CERTFILE)
2833 server = socket.socket(socket.AF_INET)
2834 host = "127.0.0.1"
2835 port = support.bind_port(server)
2836 server = context.wrap_socket(server, server_side=True)
2837
2838 evt = threading.Event()
2839 remote = None
2840 peer = None
2841 def serve():
2842 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002843 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002844 # Block on the accept and wait on the connection to close.
2845 evt.set()
2846 remote, peer = server.accept()
2847 remote.recv(1)
2848
2849 t = threading.Thread(target=serve)
2850 t.start()
2851 # Client wait until server setup and perform a connect.
2852 evt.wait()
2853 client = context.wrap_socket(socket.socket())
2854 client.connect((host, port))
2855 client_addr = client.getsockname()
2856 client.close()
2857 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002858 remote.close()
2859 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002860 # Sanity checks.
2861 self.assertIsInstance(remote, ssl.SSLSocket)
2862 self.assertEqual(peer, client_addr)
2863
Antoine Pitrou242db722013-05-01 20:52:07 +02002864 def test_getpeercert_enotconn(self):
2865 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2866 with context.wrap_socket(socket.socket()) as sock:
2867 with self.assertRaises(OSError) as cm:
2868 sock.getpeercert()
2869 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2870
2871 def test_do_handshake_enotconn(self):
2872 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2873 with context.wrap_socket(socket.socket()) as sock:
2874 with self.assertRaises(OSError) as cm:
2875 sock.do_handshake()
2876 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2877
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002878 def test_default_ciphers(self):
2879 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2880 try:
2881 # Force a set of weak ciphers on our client context
2882 context.set_ciphers("DES")
2883 except ssl.SSLError:
2884 self.skipTest("no DES cipher available")
2885 with ThreadedEchoServer(CERTFILE,
2886 ssl_version=ssl.PROTOCOL_SSLv23,
2887 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002888 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002889 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002890 s.connect((HOST, server.port))
2891 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2892
Antoine Pitrou47e40422014-09-04 21:00:10 +02002893 def test_version_basic(self):
2894 """
2895 Basic tests for SSLSocket.version().
2896 More tests are done in the test_protocol_*() methods.
2897 """
2898 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2899 with ThreadedEchoServer(CERTFILE,
2900 ssl_version=ssl.PROTOCOL_TLSv1,
2901 chatty=False) as server:
2902 with context.wrap_socket(socket.socket()) as s:
2903 self.assertIs(s.version(), None)
2904 s.connect((HOST, server.port))
2905 self.assertEqual(s.version(), "TLSv1")
2906 self.assertIs(s.version(), None)
2907
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002908 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2909 def test_default_ecdh_curve(self):
2910 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2911 # should be enabled by default on SSL contexts.
2912 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2913 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02002914 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2915 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2916 # our default cipher list should prefer ECDH-based ciphers
2917 # automatically.
2918 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2919 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002920 with ThreadedEchoServer(context=context) as server:
2921 with context.wrap_socket(socket.socket()) as s:
2922 s.connect((HOST, server.port))
2923 self.assertIn("ECDH", s.cipher()[0])
2924
Antoine Pitroud6494802011-07-21 01:11:30 +02002925 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2926 "'tls-unique' channel binding not available")
2927 def test_tls_unique_channel_binding(self):
2928 """Test tls-unique channel binding."""
2929 if support.verbose:
2930 sys.stdout.write("\n")
2931
2932 server = ThreadedEchoServer(CERTFILE,
2933 certreqs=ssl.CERT_NONE,
2934 ssl_version=ssl.PROTOCOL_TLSv1,
2935 cacerts=CERTFILE,
2936 chatty=True,
2937 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01002938 with server:
2939 s = ssl.wrap_socket(socket.socket(),
2940 server_side=False,
2941 certfile=CERTFILE,
2942 ca_certs=CERTFILE,
2943 cert_reqs=ssl.CERT_NONE,
2944 ssl_version=ssl.PROTOCOL_TLSv1)
2945 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002946 # get the data
2947 cb_data = s.get_channel_binding("tls-unique")
2948 if support.verbose:
2949 sys.stdout.write(" got channel binding data: {0!r}\n"
2950 .format(cb_data))
2951
2952 # check if it is sane
2953 self.assertIsNotNone(cb_data)
2954 self.assertEqual(len(cb_data), 12) # True for TLSv1
2955
2956 # and compare with the peers version
2957 s.write(b"CB tls-unique\n")
2958 peer_data_repr = s.read().strip()
2959 self.assertEqual(peer_data_repr,
2960 repr(cb_data).encode("us-ascii"))
2961 s.close()
2962
2963 # now, again
2964 s = ssl.wrap_socket(socket.socket(),
2965 server_side=False,
2966 certfile=CERTFILE,
2967 ca_certs=CERTFILE,
2968 cert_reqs=ssl.CERT_NONE,
2969 ssl_version=ssl.PROTOCOL_TLSv1)
2970 s.connect((HOST, server.port))
2971 new_cb_data = s.get_channel_binding("tls-unique")
2972 if support.verbose:
2973 sys.stdout.write(" got another channel binding data: {0!r}\n"
2974 .format(new_cb_data))
2975 # is it really unique
2976 self.assertNotEqual(cb_data, new_cb_data)
2977 self.assertIsNotNone(cb_data)
2978 self.assertEqual(len(cb_data), 12) # True for TLSv1
2979 s.write(b"CB tls-unique\n")
2980 peer_data_repr = s.read().strip()
2981 self.assertEqual(peer_data_repr,
2982 repr(new_cb_data).encode("us-ascii"))
2983 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002984
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002985 def test_compression(self):
2986 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2987 context.load_cert_chain(CERTFILE)
2988 stats = server_params_test(context, context,
2989 chatty=True, connectionchatty=True)
2990 if support.verbose:
2991 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2992 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2993
2994 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2995 "ssl.OP_NO_COMPRESSION needed for this test")
2996 def test_compression_disabled(self):
2997 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2998 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01002999 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003000 stats = server_params_test(context, context,
3001 chatty=True, connectionchatty=True)
3002 self.assertIs(stats['compression'], None)
3003
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003004 def test_dh_params(self):
3005 # Check we can get a connection with ephemeral Diffie-Hellman
3006 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3007 context.load_cert_chain(CERTFILE)
3008 context.load_dh_params(DHFILE)
3009 context.set_ciphers("kEDH")
3010 stats = server_params_test(context, context,
3011 chatty=True, connectionchatty=True)
3012 cipher = stats["cipher"][0]
3013 parts = cipher.split("-")
3014 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3015 self.fail("Non-DH cipher: " + cipher[0])
3016
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01003017 def test_selected_npn_protocol(self):
3018 # selected_npn_protocol() is None unless NPN is used
3019 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3020 context.load_cert_chain(CERTFILE)
3021 stats = server_params_test(context, context,
3022 chatty=True, connectionchatty=True)
3023 self.assertIs(stats['client_npn_protocol'], None)
3024
3025 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3026 def test_npn_protocols(self):
3027 server_protocols = ['http/1.1', 'spdy/2']
3028 protocol_tests = [
3029 (['http/1.1', 'spdy/2'], 'http/1.1'),
3030 (['spdy/2', 'http/1.1'], 'http/1.1'),
3031 (['spdy/2', 'test'], 'spdy/2'),
3032 (['abc', 'def'], 'abc')
3033 ]
3034 for client_protocols, expected in protocol_tests:
3035 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3036 server_context.load_cert_chain(CERTFILE)
3037 server_context.set_npn_protocols(server_protocols)
3038 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3039 client_context.load_cert_chain(CERTFILE)
3040 client_context.set_npn_protocols(client_protocols)
3041 stats = server_params_test(client_context, server_context,
3042 chatty=True, connectionchatty=True)
3043
3044 msg = "failed trying %s (s) and %s (c).\n" \
3045 "was expecting %s, but got %%s from the %%s" \
3046 % (str(server_protocols), str(client_protocols),
3047 str(expected))
3048 client_result = stats['client_npn_protocol']
3049 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3050 server_result = stats['server_npn_protocols'][-1] \
3051 if len(stats['server_npn_protocols']) else 'nothing'
3052 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3053
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003054 def sni_contexts(self):
3055 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3056 server_context.load_cert_chain(SIGNED_CERTFILE)
3057 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3058 other_context.load_cert_chain(SIGNED_CERTFILE2)
3059 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3060 client_context.verify_mode = ssl.CERT_REQUIRED
3061 client_context.load_verify_locations(SIGNING_CA)
3062 return server_context, other_context, client_context
3063
3064 def check_common_name(self, stats, name):
3065 cert = stats['peercert']
3066 self.assertIn((('commonName', name),), cert['subject'])
3067
3068 @needs_sni
3069 def test_sni_callback(self):
3070 calls = []
3071 server_context, other_context, client_context = self.sni_contexts()
3072
3073 def servername_cb(ssl_sock, server_name, initial_context):
3074 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003075 if server_name is not None:
3076 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003077 server_context.set_servername_callback(servername_cb)
3078
3079 stats = server_params_test(client_context, server_context,
3080 chatty=True,
3081 sni_name='supermessage')
3082 # The hostname was fetched properly, and the certificate was
3083 # changed for the connection.
3084 self.assertEqual(calls, [("supermessage", server_context)])
3085 # CERTFILE4 was selected
3086 self.check_common_name(stats, 'fakehostname')
3087
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003088 calls = []
3089 # The callback is called with server_name=None
3090 stats = server_params_test(client_context, server_context,
3091 chatty=True,
3092 sni_name=None)
3093 self.assertEqual(calls, [(None, server_context)])
3094 self.check_common_name(stats, 'localhost')
3095
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003096 # Check disabling the callback
3097 calls = []
3098 server_context.set_servername_callback(None)
3099
3100 stats = server_params_test(client_context, server_context,
3101 chatty=True,
3102 sni_name='notfunny')
3103 # Certificate didn't change
3104 self.check_common_name(stats, 'localhost')
3105 self.assertEqual(calls, [])
3106
3107 @needs_sni
3108 def test_sni_callback_alert(self):
3109 # Returning a TLS alert is reflected to the connecting client
3110 server_context, other_context, client_context = self.sni_contexts()
3111
3112 def cb_returning_alert(ssl_sock, server_name, initial_context):
3113 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3114 server_context.set_servername_callback(cb_returning_alert)
3115
3116 with self.assertRaises(ssl.SSLError) as cm:
3117 stats = server_params_test(client_context, server_context,
3118 chatty=False,
3119 sni_name='supermessage')
3120 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3121
3122 @needs_sni
3123 def test_sni_callback_raising(self):
3124 # Raising fails the connection with a TLS handshake failure alert.
3125 server_context, other_context, client_context = self.sni_contexts()
3126
3127 def cb_raising(ssl_sock, server_name, initial_context):
3128 1/0
3129 server_context.set_servername_callback(cb_raising)
3130
3131 with self.assertRaises(ssl.SSLError) as cm, \
3132 support.captured_stderr() as stderr:
3133 stats = server_params_test(client_context, server_context,
3134 chatty=False,
3135 sni_name='supermessage')
3136 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3137 self.assertIn("ZeroDivisionError", stderr.getvalue())
3138
3139 @needs_sni
3140 def test_sni_callback_wrong_return_type(self):
3141 # Returning the wrong return type terminates the TLS connection
3142 # with an internal error alert.
3143 server_context, other_context, client_context = self.sni_contexts()
3144
3145 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3146 return "foo"
3147 server_context.set_servername_callback(cb_wrong_return_type)
3148
3149 with self.assertRaises(ssl.SSLError) as cm, \
3150 support.captured_stderr() as stderr:
3151 stats = server_params_test(client_context, server_context,
3152 chatty=False,
3153 sni_name='supermessage')
3154 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3155 self.assertIn("TypeError", stderr.getvalue())
3156
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003157 def test_read_write_after_close_raises_valuerror(self):
3158 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3159 context.verify_mode = ssl.CERT_REQUIRED
3160 context.load_verify_locations(CERTFILE)
3161 context.load_cert_chain(CERTFILE)
3162 server = ThreadedEchoServer(context=context, chatty=False)
3163
3164 with server:
3165 s = context.wrap_socket(socket.socket())
3166 s.connect((HOST, server.port))
3167 s.close()
3168
3169 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02003170 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003171
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003172 def test_sendfile(self):
3173 TEST_DATA = b"x" * 512
3174 with open(support.TESTFN, 'wb') as f:
3175 f.write(TEST_DATA)
3176 self.addCleanup(support.unlink, support.TESTFN)
3177 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3178 context.verify_mode = ssl.CERT_REQUIRED
3179 context.load_verify_locations(CERTFILE)
3180 context.load_cert_chain(CERTFILE)
3181 server = ThreadedEchoServer(context=context, chatty=False)
3182 with server:
3183 with context.wrap_socket(socket.socket()) as s:
3184 s.connect((HOST, server.port))
3185 with open(support.TESTFN, 'rb') as file:
3186 s.sendfile(file)
3187 self.assertEqual(s.recv(1024), TEST_DATA)
3188
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003189
Thomas Woutersed03b412007-08-28 21:37:11 +00003190def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003191 if support.verbose:
3192 plats = {
3193 'Linux': platform.linux_distribution,
3194 'Mac': platform.mac_ver,
3195 'Windows': platform.win32_ver,
3196 }
3197 for name, func in plats.items():
3198 plat = func()
3199 if plat and plat[0]:
3200 plat = '%s %r' % (name, plat)
3201 break
3202 else:
3203 plat = repr(platform.platform())
3204 print("test_ssl: testing with %r %r" %
3205 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3206 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003207 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003208 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3209 try:
3210 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3211 except AttributeError:
3212 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003213
Antoine Pitrou152efa22010-05-16 18:19:27 +00003214 for filename in [
3215 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
3216 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003217 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003218 BADCERT, BADKEY, EMPTYCERT]:
3219 if not os.path.exists(filename):
3220 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003221
Antoine Pitroub1fdf472014-10-05 20:41:53 +02003222 tests = [ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests]
Thomas Woutersed03b412007-08-28 21:37:11 +00003223
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003224 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003225 tests.append(NetworkedTests)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02003226 tests.append(NetworkedBIOTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003227
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003228 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003229 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003230 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003231 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003232
Antoine Pitrou480a1242010-04-28 21:37:09 +00003233 try:
3234 support.run_unittest(*tests)
3235 finally:
3236 if _have_threads:
3237 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003238
3239if __name__ == "__main__":
3240 test_main()