blob: afec72afe4f00a178cbea7b19105798e42208cd6 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Antoine Pitrou242db722013-05-01 20:52:07 +020021from unittest import mock
Thomas Woutersed03b412007-08-28 21:37:11 +000022
Antoine Pitrou05d936d2010-10-13 11:38:36 +000023ssl = support.import_module("ssl")
24
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010025PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000026HOST = support.HOST
Antoine Pitrou152efa22010-05-16 18:19:27 +000027
Christian Heimesefff7062013-11-21 03:35:02 +010028def data_file(*name):
29 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000030
Antoine Pitrou81564092010-10-08 23:06:24 +000031# The custom key and certificate files used in test_ssl are generated
32# using Lib/test/make_ssl_certs.py.
33# Other certificates are simply fetched from the Internet servers they
34# are meant to authenticate.
35
Antoine Pitrou152efa22010-05-16 18:19:27 +000036CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000037BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038ONLYCERT = data_file("ssl_cert.pem")
39ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000040BYTES_ONLYCERT = os.fsencode(ONLYCERT)
41BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020042CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
43ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
44KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010047CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
48CAFILE_CACERT = data_file("capath", "5ed36f99.0")
49
Antoine Pitrou152efa22010-05-16 18:19:27 +000050
Christian Heimes22587792013-11-21 23:56:13 +010051# empty CRL
52CRLFILE = data_file("revocation.crl")
53
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010054# Two keys and certs signed by the same CA (for SNI tests)
55SIGNED_CERTFILE = data_file("keycert3.pem")
56SIGNED_CERTFILE2 = data_file("keycert4.pem")
57SIGNING_CA = data_file("pycacert.pem")
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
60
61EMPTYCERT = data_file("nullcert.pem")
62BADCERT = data_file("badcert.pem")
63WRONGCERT = data_file("XXXnonexisting.pem")
64BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020065NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020066NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000067
Antoine Pitrou0e576f12011-12-22 10:03:38 +010068DHFILE = data_file("dh512.pem")
69BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000070
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010071
Thomas Woutersed03b412007-08-28 21:37:11 +000072def handle_error(prefix):
73 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000074 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000075 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000076
Antoine Pitroub5218772010-05-21 09:56:06 +000077def can_clear_options():
78 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020079 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000080
81def no_sslv2_implies_sslv3_hello():
82 # 0.9.7h or higher
83 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
84
Christian Heimes2427b502013-11-23 11:24:32 +010085def have_verify_flags():
86 # 0.9.8 or higher
87 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
88
Christian Heimes9424bb42013-06-17 15:32:57 +020089def asn1time(cert_time):
90 # Some versions of OpenSSL ignore seconds, see #18207
91 # 0.9.8.i
92 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
93 fmt = "%b %d %H:%M:%S %Y GMT"
94 dt = datetime.datetime.strptime(cert_time, fmt)
95 dt = dt.replace(second=0)
96 cert_time = dt.strftime(fmt)
97 # %d adds leading zero but ASN1_TIME_print() uses leading space
98 if cert_time[4] == "0":
99 cert_time = cert_time[:4] + " " + cert_time[5:]
100
101 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000102
Antoine Pitrou23df4832010-08-04 17:14:06 +0000103# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
104def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200105 if hasattr(ssl, 'PROTOCOL_SSLv2'):
106 @functools.wraps(func)
107 def f(*args, **kwargs):
108 try:
109 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
110 except ssl.SSLError:
111 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
112 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
113 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
114 return func(*args, **kwargs)
115 return f
116 else:
117 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000118
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100119needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
120
Antoine Pitrou23df4832010-08-04 17:14:06 +0000121
Antoine Pitrou152efa22010-05-16 18:19:27 +0000122class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000123
Antoine Pitrou480a1242010-04-28 21:37:09 +0000124 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000125 ssl.CERT_NONE
126 ssl.CERT_OPTIONAL
127 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100128 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100129 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100130 if ssl.HAS_ECDH:
131 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100132 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
133 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000134 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100135 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitrou480a1242010-04-28 21:37:09 +0000137 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000138 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000139 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000140 sys.stdout.write("\n RAND_status is %d (%s)\n"
141 % (v, (v and "sufficient randomness") or
142 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200143
144 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
145 self.assertEqual(len(data), 16)
146 self.assertEqual(is_cryptographic, v == 1)
147 if v:
148 data = ssl.RAND_bytes(16)
149 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200150 else:
151 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200152
Jesus Ceac8754a12012-09-11 02:00:58 +0200153 self.assertRaises(TypeError, ssl.RAND_egd, 1)
154 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000155 ssl.RAND_add("this is a random string", 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000156
Christian Heimesf77b4b22013-08-21 13:26:05 +0200157 @unittest.skipUnless(os.name == 'posix', 'requires posix')
158 def test_random_fork(self):
159 status = ssl.RAND_status()
160 if not status:
161 self.fail("OpenSSL's PRNG has insufficient randomness")
162
163 rfd, wfd = os.pipe()
164 pid = os.fork()
165 if pid == 0:
166 try:
167 os.close(rfd)
168 child_random = ssl.RAND_pseudo_bytes(16)[0]
169 self.assertEqual(len(child_random), 16)
170 os.write(wfd, child_random)
171 os.close(wfd)
172 except BaseException:
173 os._exit(1)
174 else:
175 os._exit(0)
176 else:
177 os.close(wfd)
178 self.addCleanup(os.close, rfd)
179 _, status = os.waitpid(pid, 0)
180 self.assertEqual(status, 0)
181
182 child_random = os.read(rfd, 16)
183 self.assertEqual(len(child_random), 16)
184 parent_random = ssl.RAND_pseudo_bytes(16)[0]
185 self.assertEqual(len(parent_random), 16)
186
187 self.assertNotEqual(child_random, parent_random)
188
Antoine Pitrou480a1242010-04-28 21:37:09 +0000189 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000190 # note that this uses an 'unofficial' function in _ssl.c,
191 # provided solely for this test, to exercise the certificate
192 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000193 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000194 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000195 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200196 self.assertEqual(p['issuer'],
197 ((('countryName', 'XY'),),
198 (('localityName', 'Castle Anthrax'),),
199 (('organizationName', 'Python Software Foundation'),),
200 (('commonName', 'localhost'),))
201 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100202 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200203 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
204 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200205 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
206 self.assertEqual(p['subject'],
207 ((('countryName', 'XY'),),
208 (('localityName', 'Castle Anthrax'),),
209 (('organizationName', 'Python Software Foundation'),),
210 (('commonName', 'localhost'),))
211 )
212 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
213 # Issue #13034: the subjectAltName in some certificates
214 # (notably projects.developer.nokia.com:443) wasn't parsed
215 p = ssl._ssl._test_decode_cert(NOKIACERT)
216 if support.verbose:
217 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
218 self.assertEqual(p['subjectAltName'],
219 (('DNS', 'projects.developer.nokia.com'),
220 ('DNS', 'projects.forum.nokia.com'))
221 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100222 # extra OCSP and AIA fields
223 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
224 self.assertEqual(p['caIssuers'],
225 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
226 self.assertEqual(p['crlDistributionPoints'],
227 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000228
Christian Heimes824f7f32013-08-17 00:54:47 +0200229 def test_parse_cert_CVE_2013_4238(self):
230 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
231 if support.verbose:
232 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
233 subject = ((('countryName', 'US'),),
234 (('stateOrProvinceName', 'Oregon'),),
235 (('localityName', 'Beaverton'),),
236 (('organizationName', 'Python Software Foundation'),),
237 (('organizationalUnitName', 'Python Core Development'),),
238 (('commonName', 'null.python.org\x00example.org'),),
239 (('emailAddress', 'python-dev@python.org'),))
240 self.assertEqual(p['subject'], subject)
241 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200242 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
243 san = (('DNS', 'altnull.python.org\x00example.com'),
244 ('email', 'null@python.org\x00user@example.org'),
245 ('URI', 'http://null.python.org\x00http://example.org'),
246 ('IP Address', '192.0.2.1'),
247 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
248 else:
249 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
250 san = (('DNS', 'altnull.python.org\x00example.com'),
251 ('email', 'null@python.org\x00user@example.org'),
252 ('URI', 'http://null.python.org\x00http://example.org'),
253 ('IP Address', '192.0.2.1'),
254 ('IP Address', '<invalid>'))
255
256 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200257
Antoine Pitrou480a1242010-04-28 21:37:09 +0000258 def test_DER_to_PEM(self):
259 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
260 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000261 d1 = ssl.PEM_cert_to_DER_cert(pem)
262 p2 = ssl.DER_cert_to_PEM_cert(d1)
263 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000264 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000265 if not p2.startswith(ssl.PEM_HEADER + '\n'):
266 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
267 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
268 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000269
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000270 def test_openssl_version(self):
271 n = ssl.OPENSSL_VERSION_NUMBER
272 t = ssl.OPENSSL_VERSION_INFO
273 s = ssl.OPENSSL_VERSION
274 self.assertIsInstance(n, int)
275 self.assertIsInstance(t, tuple)
276 self.assertIsInstance(s, str)
277 # Some sanity checks follow
278 # >= 0.9
279 self.assertGreaterEqual(n, 0x900000)
280 # < 2.0
281 self.assertLess(n, 0x20000000)
282 major, minor, fix, patch, status = t
283 self.assertGreaterEqual(major, 0)
284 self.assertLess(major, 2)
285 self.assertGreaterEqual(minor, 0)
286 self.assertLess(minor, 256)
287 self.assertGreaterEqual(fix, 0)
288 self.assertLess(fix, 256)
289 self.assertGreaterEqual(patch, 0)
290 self.assertLessEqual(patch, 26)
291 self.assertGreaterEqual(status, 0)
292 self.assertLessEqual(status, 15)
293 # Version string as returned by OpenSSL, the format might change
294 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
295 (s, t))
296
Antoine Pitrou9d543662010-04-23 23:10:32 +0000297 @support.cpython_only
298 def test_refcycle(self):
299 # Issue #7943: an SSL object doesn't create reference cycles with
300 # itself.
301 s = socket.socket(socket.AF_INET)
302 ss = ssl.wrap_socket(s)
303 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100304 with support.check_warnings(("", ResourceWarning)):
305 del ss
306 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000307
Antoine Pitroua468adc2010-09-14 14:43:44 +0000308 def test_wrapped_unconnected(self):
309 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200310 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000311 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100312 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100313 self.assertRaises(OSError, ss.recv, 1)
314 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
315 self.assertRaises(OSError, ss.recvfrom, 1)
316 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
317 self.assertRaises(OSError, ss.send, b'x')
318 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000319
Antoine Pitrou40f08742010-04-24 22:04:40 +0000320 def test_timeout(self):
321 # Issue #8524: when creating an SSL socket, the timeout of the
322 # original socket should be retained.
323 for timeout in (None, 0.0, 5.0):
324 s = socket.socket(socket.AF_INET)
325 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100326 with ssl.wrap_socket(s) as ss:
327 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000328
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000329 def test_errors(self):
330 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000331 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000332 "certfile must be specified",
333 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000334 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000335 "certfile must be specified for server-side operations",
336 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000337 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000338 "certfile must be specified for server-side operations",
339 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100340 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
341 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
342 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200343 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000344 with socket.socket() as sock:
345 ssl.wrap_socket(sock, certfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000346 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200347 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000348 with socket.socket() as sock:
349 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000350 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200351 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000352 with socket.socket() as sock:
353 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000354 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000355
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000356 def test_match_hostname(self):
357 def ok(cert, hostname):
358 ssl.match_hostname(cert, hostname)
359 def fail(cert, hostname):
360 self.assertRaises(ssl.CertificateError,
361 ssl.match_hostname, cert, hostname)
362
363 cert = {'subject': ((('commonName', 'example.com'),),)}
364 ok(cert, 'example.com')
365 ok(cert, 'ExAmple.cOm')
366 fail(cert, 'www.example.com')
367 fail(cert, '.example.com')
368 fail(cert, 'example.org')
369 fail(cert, 'exampleXcom')
370
371 cert = {'subject': ((('commonName', '*.a.com'),),)}
372 ok(cert, 'foo.a.com')
373 fail(cert, 'bar.foo.a.com')
374 fail(cert, 'a.com')
375 fail(cert, 'Xa.com')
376 fail(cert, '.a.com')
377
Georg Brandl72c98d32013-10-27 07:16:53 +0100378 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000379 cert = {'subject': ((('commonName', 'f*.com'),),)}
380 ok(cert, 'foo.com')
381 ok(cert, 'f.com')
382 fail(cert, 'bar.com')
383 fail(cert, 'foo.a.com')
384 fail(cert, 'bar.foo.com')
385
Christian Heimes824f7f32013-08-17 00:54:47 +0200386 # NULL bytes are bad, CVE-2013-4073
387 cert = {'subject': ((('commonName',
388 'null.python.org\x00example.org'),),)}
389 ok(cert, 'null.python.org\x00example.org') # or raise an error?
390 fail(cert, 'example.org')
391 fail(cert, 'null.python.org')
392
Georg Brandl72c98d32013-10-27 07:16:53 +0100393 # error cases with wildcards
394 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
395 fail(cert, 'bar.foo.a.com')
396 fail(cert, 'a.com')
397 fail(cert, 'Xa.com')
398 fail(cert, '.a.com')
399
400 cert = {'subject': ((('commonName', 'a.*.com'),),)}
401 fail(cert, 'a.foo.com')
402 fail(cert, 'a..com')
403 fail(cert, 'a.com')
404
405 # wildcard doesn't match IDNA prefix 'xn--'
406 idna = 'püthon.python.org'.encode("idna").decode("ascii")
407 cert = {'subject': ((('commonName', idna),),)}
408 ok(cert, idna)
409 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
410 fail(cert, idna)
411 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
412 fail(cert, idna)
413
414 # wildcard in first fragment and IDNA A-labels in sequent fragments
415 # are supported.
416 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
417 cert = {'subject': ((('commonName', idna),),)}
418 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
419 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
420 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
421 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
422
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000423 # Slightly fake real-world example
424 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
425 'subject': ((('commonName', 'linuxfrz.org'),),),
426 'subjectAltName': (('DNS', 'linuxfr.org'),
427 ('DNS', 'linuxfr.com'),
428 ('othername', '<unsupported>'))}
429 ok(cert, 'linuxfr.org')
430 ok(cert, 'linuxfr.com')
431 # Not a "DNS" entry
432 fail(cert, '<unsupported>')
433 # When there is a subjectAltName, commonName isn't used
434 fail(cert, 'linuxfrz.org')
435
436 # A pristine real-world example
437 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
438 'subject': ((('countryName', 'US'),),
439 (('stateOrProvinceName', 'California'),),
440 (('localityName', 'Mountain View'),),
441 (('organizationName', 'Google Inc'),),
442 (('commonName', 'mail.google.com'),))}
443 ok(cert, 'mail.google.com')
444 fail(cert, 'gmail.com')
445 # Only commonName is considered
446 fail(cert, 'California')
447
448 # Neither commonName nor subjectAltName
449 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
450 'subject': ((('countryName', 'US'),),
451 (('stateOrProvinceName', 'California'),),
452 (('localityName', 'Mountain View'),),
453 (('organizationName', 'Google Inc'),))}
454 fail(cert, 'mail.google.com')
455
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200456 # No DNS entry in subjectAltName but a commonName
457 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
458 'subject': ((('countryName', 'US'),),
459 (('stateOrProvinceName', 'California'),),
460 (('localityName', 'Mountain View'),),
461 (('commonName', 'mail.google.com'),)),
462 'subjectAltName': (('othername', 'blabla'), )}
463 ok(cert, 'mail.google.com')
464
465 # No DNS entry subjectAltName and no commonName
466 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
467 'subject': ((('countryName', 'US'),),
468 (('stateOrProvinceName', 'California'),),
469 (('localityName', 'Mountain View'),),
470 (('organizationName', 'Google Inc'),)),
471 'subjectAltName': (('othername', 'blabla'),)}
472 fail(cert, 'google.com')
473
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000474 # Empty cert / no cert
475 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
476 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
477
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200478 # Issue #17980: avoid denials of service by refusing more than one
479 # wildcard per fragment.
480 cert = {'subject': ((('commonName', 'a*b.com'),),)}
481 ok(cert, 'axxb.com')
482 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100483 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200484 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
485 with self.assertRaises(ssl.CertificateError) as cm:
486 ssl.match_hostname(cert, 'axxbxxc.com')
487 self.assertIn("too many wildcards", str(cm.exception))
488
Antoine Pitroud5323212010-10-22 18:19:07 +0000489 def test_server_side(self):
490 # server_hostname doesn't work for server sockets
491 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000492 with socket.socket() as sock:
493 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
494 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000495
Antoine Pitroud6494802011-07-21 01:11:30 +0200496 def test_unknown_channel_binding(self):
497 # should raise ValueError for unknown type
498 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100499 with ssl.wrap_socket(s) as ss:
500 with self.assertRaises(ValueError):
501 ss.get_channel_binding("unknown-type")
Antoine Pitroud6494802011-07-21 01:11:30 +0200502
503 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
504 "'tls-unique' channel binding not available")
505 def test_tls_unique_channel_binding(self):
506 # unconnected should return None for known type
507 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100508 with ssl.wrap_socket(s) as ss:
509 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200510 # the same for server-side
511 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100512 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
513 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200514
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600515 def test_dealloc_warn(self):
516 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
517 r = repr(ss)
518 with self.assertWarns(ResourceWarning) as cm:
519 ss = None
520 support.gc_collect()
521 self.assertIn(r, str(cm.warning.args[0]))
522
Christian Heimes6d7ad132013-06-09 18:02:55 +0200523 def test_get_default_verify_paths(self):
524 paths = ssl.get_default_verify_paths()
525 self.assertEqual(len(paths), 6)
526 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
527
528 with support.EnvironmentVarGuard() as env:
529 env["SSL_CERT_DIR"] = CAPATH
530 env["SSL_CERT_FILE"] = CERTFILE
531 paths = ssl.get_default_verify_paths()
532 self.assertEqual(paths.cafile, CERTFILE)
533 self.assertEqual(paths.capath, CAPATH)
534
Christian Heimes44109d72013-11-22 01:51:30 +0100535 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
536 def test_enum_certificates(self):
537 self.assertTrue(ssl.enum_certificates("CA"))
538 self.assertTrue(ssl.enum_certificates("ROOT"))
539
540 self.assertRaises(TypeError, ssl.enum_certificates)
541 self.assertRaises(WindowsError, ssl.enum_certificates, "")
542
Christian Heimesc2d65e12013-11-22 16:13:55 +0100543 trust_oids = set()
544 for storename in ("CA", "ROOT"):
545 store = ssl.enum_certificates(storename)
546 self.assertIsInstance(store, list)
547 for element in store:
548 self.assertIsInstance(element, tuple)
549 self.assertEqual(len(element), 3)
550 cert, enc, trust = element
551 self.assertIsInstance(cert, bytes)
552 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
553 self.assertIsInstance(trust, (set, bool))
554 if isinstance(trust, set):
555 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100556
557 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100558 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200559
Christian Heimes46bebee2013-06-09 19:03:31 +0200560 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100561 def test_enum_crls(self):
562 self.assertTrue(ssl.enum_crls("CA"))
563 self.assertRaises(TypeError, ssl.enum_crls)
564 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200565
Christian Heimes44109d72013-11-22 01:51:30 +0100566 crls = ssl.enum_crls("CA")
567 self.assertIsInstance(crls, list)
568 for element in crls:
569 self.assertIsInstance(element, tuple)
570 self.assertEqual(len(element), 2)
571 self.assertIsInstance(element[0], bytes)
572 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200573
Christian Heimes46bebee2013-06-09 19:03:31 +0200574
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100575 def test_asn1object(self):
576 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
577 '1.3.6.1.5.5.7.3.1')
578
579 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
580 self.assertEqual(val, expected)
581 self.assertEqual(val.nid, 129)
582 self.assertEqual(val.shortname, 'serverAuth')
583 self.assertEqual(val.longname, 'TLS Web Server Authentication')
584 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
585 self.assertIsInstance(val, ssl._ASN1Object)
586 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
587
588 val = ssl._ASN1Object.fromnid(129)
589 self.assertEqual(val, expected)
590 self.assertIsInstance(val, ssl._ASN1Object)
591 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100592 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
593 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100594 for i in range(1000):
595 try:
596 obj = ssl._ASN1Object.fromnid(i)
597 except ValueError:
598 pass
599 else:
600 self.assertIsInstance(obj.nid, int)
601 self.assertIsInstance(obj.shortname, str)
602 self.assertIsInstance(obj.longname, str)
603 self.assertIsInstance(obj.oid, (str, type(None)))
604
605 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
606 self.assertEqual(val, expected)
607 self.assertIsInstance(val, ssl._ASN1Object)
608 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
609 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
610 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100611 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
612 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100613
Christian Heimes72d28502013-11-23 13:56:58 +0100614 def test_purpose_enum(self):
615 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
616 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
617 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
618 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
619 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
620 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
621 '1.3.6.1.5.5.7.3.1')
622
623 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
624 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
625 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
626 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
627 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
628 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
629 '1.3.6.1.5.5.7.3.2')
630
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100631
Antoine Pitrou152efa22010-05-16 18:19:27 +0000632class ContextTests(unittest.TestCase):
633
Antoine Pitrou23df4832010-08-04 17:14:06 +0000634 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000635 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100636 for protocol in PROTOCOLS:
637 ssl.SSLContext(protocol)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000638 self.assertRaises(TypeError, ssl.SSLContext)
639 self.assertRaises(ValueError, ssl.SSLContext, -1)
640 self.assertRaises(ValueError, ssl.SSLContext, 42)
641
Antoine Pitrou23df4832010-08-04 17:14:06 +0000642 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000643 def test_protocol(self):
644 for proto in PROTOCOLS:
645 ctx = ssl.SSLContext(proto)
646 self.assertEqual(ctx.protocol, proto)
647
648 def test_ciphers(self):
649 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
650 ctx.set_ciphers("ALL")
651 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000652 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000653 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000654
Antoine Pitrou23df4832010-08-04 17:14:06 +0000655 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000656 def test_options(self):
657 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
658 # OP_ALL is the default value
659 self.assertEqual(ssl.OP_ALL, ctx.options)
660 ctx.options |= ssl.OP_NO_SSLv2
661 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
662 ctx.options)
663 ctx.options |= ssl.OP_NO_SSLv3
664 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
665 ctx.options)
666 if can_clear_options():
667 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
668 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
669 ctx.options)
670 ctx.options = 0
671 self.assertEqual(0, ctx.options)
672 else:
673 with self.assertRaises(ValueError):
674 ctx.options = 0
675
Christian Heimes22587792013-11-21 23:56:13 +0100676 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000677 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
678 # Default value
679 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
680 ctx.verify_mode = ssl.CERT_OPTIONAL
681 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
682 ctx.verify_mode = ssl.CERT_REQUIRED
683 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
684 ctx.verify_mode = ssl.CERT_NONE
685 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
686 with self.assertRaises(TypeError):
687 ctx.verify_mode = None
688 with self.assertRaises(ValueError):
689 ctx.verify_mode = 42
690
Christian Heimes2427b502013-11-23 11:24:32 +0100691 @unittest.skipUnless(have_verify_flags(),
692 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100693 def test_verify_flags(self):
694 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
695 # default value by OpenSSL
696 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
697 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
698 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
699 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
700 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
701 ctx.verify_flags = ssl.VERIFY_DEFAULT
702 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
703 # supports any value
704 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
705 self.assertEqual(ctx.verify_flags,
706 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
707 with self.assertRaises(TypeError):
708 ctx.verify_flags = None
709
Antoine Pitrou152efa22010-05-16 18:19:27 +0000710 def test_load_cert_chain(self):
711 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
712 # Combined key and cert in a single file
713 ctx.load_cert_chain(CERTFILE)
714 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
715 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200716 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000717 ctx.load_cert_chain(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000718 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000719 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000720 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000721 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000722 ctx.load_cert_chain(EMPTYCERT)
723 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000724 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000725 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
726 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
727 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000728 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000729 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000730 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000731 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000732 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000733 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
734 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000735 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000736 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Antoine Pitrou81564092010-10-08 23:06:24 +0000737 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200738 # Password protected key and cert
739 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
740 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
741 ctx.load_cert_chain(CERTFILE_PROTECTED,
742 password=bytearray(KEY_PASSWORD.encode()))
743 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
744 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
745 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
746 bytearray(KEY_PASSWORD.encode()))
747 with self.assertRaisesRegex(TypeError, "should be a string"):
748 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
749 with self.assertRaises(ssl.SSLError):
750 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
751 with self.assertRaisesRegex(ValueError, "cannot be longer"):
752 # openssl has a fixed limit on the password buffer.
753 # PEM_BUFSIZE is generally set to 1kb.
754 # Return a string larger than this.
755 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
756 # Password callback
757 def getpass_unicode():
758 return KEY_PASSWORD
759 def getpass_bytes():
760 return KEY_PASSWORD.encode()
761 def getpass_bytearray():
762 return bytearray(KEY_PASSWORD.encode())
763 def getpass_badpass():
764 return "badpass"
765 def getpass_huge():
766 return b'a' * (1024 * 1024)
767 def getpass_bad_type():
768 return 9
769 def getpass_exception():
770 raise Exception('getpass error')
771 class GetPassCallable:
772 def __call__(self):
773 return KEY_PASSWORD
774 def getpass(self):
775 return KEY_PASSWORD
776 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
777 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
778 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
779 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
780 ctx.load_cert_chain(CERTFILE_PROTECTED,
781 password=GetPassCallable().getpass)
782 with self.assertRaises(ssl.SSLError):
783 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
784 with self.assertRaisesRegex(ValueError, "cannot be longer"):
785 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
786 with self.assertRaisesRegex(TypeError, "must return a string"):
787 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
788 with self.assertRaisesRegex(Exception, "getpass error"):
789 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
790 # Make sure the password function isn't called if it isn't needed
791 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000792
793 def test_load_verify_locations(self):
794 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
795 ctx.load_verify_locations(CERTFILE)
796 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
797 ctx.load_verify_locations(BYTES_CERTFILE)
798 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
799 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100800 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200801 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000802 ctx.load_verify_locations(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000803 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000804 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000805 ctx.load_verify_locations(BADCERT)
806 ctx.load_verify_locations(CERTFILE, CAPATH)
807 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
808
Victor Stinner80f75e62011-01-29 11:31:20 +0000809 # Issue #10989: crash if the second argument type is invalid
810 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
811
Christian Heimesefff7062013-11-21 03:35:02 +0100812 def test_load_verify_cadata(self):
813 # test cadata
814 with open(CAFILE_CACERT) as f:
815 cacert_pem = f.read()
816 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
817 with open(CAFILE_NEURONIO) as f:
818 neuronio_pem = f.read()
819 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
820
821 # test PEM
822 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
823 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
824 ctx.load_verify_locations(cadata=cacert_pem)
825 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
826 ctx.load_verify_locations(cadata=neuronio_pem)
827 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
828 # cert already in hash table
829 ctx.load_verify_locations(cadata=neuronio_pem)
830 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
831
832 # combined
833 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
834 combined = "\n".join((cacert_pem, neuronio_pem))
835 ctx.load_verify_locations(cadata=combined)
836 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
837
838 # with junk around the certs
839 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
840 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
841 neuronio_pem, "tail"]
842 ctx.load_verify_locations(cadata="\n".join(combined))
843 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
844
845 # test DER
846 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
847 ctx.load_verify_locations(cadata=cacert_der)
848 ctx.load_verify_locations(cadata=neuronio_der)
849 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
850 # cert already in hash table
851 ctx.load_verify_locations(cadata=cacert_der)
852 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
853
854 # combined
855 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
856 combined = b"".join((cacert_der, neuronio_der))
857 ctx.load_verify_locations(cadata=combined)
858 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
859
860 # error cases
861 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
862 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
863
864 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
865 ctx.load_verify_locations(cadata="broken")
866 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
867 ctx.load_verify_locations(cadata=b"broken")
868
869
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100870 def test_load_dh_params(self):
871 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
872 ctx.load_dh_params(DHFILE)
873 if os.name != 'nt':
874 ctx.load_dh_params(BYTES_DHFILE)
875 self.assertRaises(TypeError, ctx.load_dh_params)
876 self.assertRaises(TypeError, ctx.load_dh_params, None)
877 with self.assertRaises(FileNotFoundError) as cm:
878 ctx.load_dh_params(WRONGCERT)
879 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +0200880 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100881 ctx.load_dh_params(CERTFILE)
882
Antoine Pitroueb585ad2010-10-22 18:24:20 +0000883 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +0000884 def test_session_stats(self):
885 for proto in PROTOCOLS:
886 ctx = ssl.SSLContext(proto)
887 self.assertEqual(ctx.session_stats(), {
888 'number': 0,
889 'connect': 0,
890 'connect_good': 0,
891 'connect_renegotiate': 0,
892 'accept': 0,
893 'accept_good': 0,
894 'accept_renegotiate': 0,
895 'hits': 0,
896 'misses': 0,
897 'timeouts': 0,
898 'cache_full': 0,
899 })
900
Antoine Pitrou664c2d12010-11-17 20:29:42 +0000901 def test_set_default_verify_paths(self):
902 # There's not much we can do to test that it acts as expected,
903 # so just check it doesn't crash or raise an exception.
904 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
905 ctx.set_default_verify_paths()
906
Antoine Pitrou501da612011-12-21 09:27:41 +0100907 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +0100908 def test_set_ecdh_curve(self):
909 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
910 ctx.set_ecdh_curve("prime256v1")
911 ctx.set_ecdh_curve(b"prime256v1")
912 self.assertRaises(TypeError, ctx.set_ecdh_curve)
913 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
914 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
915 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
916
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100917 @needs_sni
918 def test_sni_callback(self):
919 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
920
921 # set_servername_callback expects a callable, or None
922 self.assertRaises(TypeError, ctx.set_servername_callback)
923 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
924 self.assertRaises(TypeError, ctx.set_servername_callback, "")
925 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
926
927 def dummycallback(sock, servername, ctx):
928 pass
929 ctx.set_servername_callback(None)
930 ctx.set_servername_callback(dummycallback)
931
932 @needs_sni
933 def test_sni_callback_refcycle(self):
934 # Reference cycles through the servername callback are detected
935 # and cleared.
936 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
937 def dummycallback(sock, servername, ctx, cycle=ctx):
938 pass
939 ctx.set_servername_callback(dummycallback)
940 wr = weakref.ref(ctx)
941 del ctx, dummycallback
942 gc.collect()
943 self.assertIs(wr(), None)
944
Christian Heimes9a5395a2013-06-17 15:44:12 +0200945 def test_cert_store_stats(self):
946 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
947 self.assertEqual(ctx.cert_store_stats(),
948 {'x509_ca': 0, 'crl': 0, 'x509': 0})
949 ctx.load_cert_chain(CERTFILE)
950 self.assertEqual(ctx.cert_store_stats(),
951 {'x509_ca': 0, 'crl': 0, 'x509': 0})
952 ctx.load_verify_locations(CERTFILE)
953 self.assertEqual(ctx.cert_store_stats(),
954 {'x509_ca': 0, 'crl': 0, 'x509': 1})
955 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
956 self.assertEqual(ctx.cert_store_stats(),
957 {'x509_ca': 1, 'crl': 0, 'x509': 2})
958
959 def test_get_ca_certs(self):
960 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
961 self.assertEqual(ctx.get_ca_certs(), [])
962 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
963 ctx.load_verify_locations(CERTFILE)
964 self.assertEqual(ctx.get_ca_certs(), [])
965 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
966 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
967 self.assertEqual(ctx.get_ca_certs(),
968 [{'issuer': ((('organizationName', 'Root CA'),),
969 (('organizationalUnitName', 'http://www.cacert.org'),),
970 (('commonName', 'CA Cert Signing Authority'),),
971 (('emailAddress', 'support@cacert.org'),)),
972 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
973 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
974 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100975 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +0200976 'subject': ((('organizationName', 'Root CA'),),
977 (('organizationalUnitName', 'http://www.cacert.org'),),
978 (('commonName', 'CA Cert Signing Authority'),),
979 (('emailAddress', 'support@cacert.org'),)),
980 'version': 3}])
981
982 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
983 pem = f.read()
984 der = ssl.PEM_cert_to_DER_cert(pem)
985 self.assertEqual(ctx.get_ca_certs(True), [der])
986
Christian Heimes72d28502013-11-23 13:56:58 +0100987 def test_load_default_certs(self):
988 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
989 ctx.load_default_certs()
990
991 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
992 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
993 ctx.load_default_certs()
994
995 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
996 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
997
998 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
999 self.assertRaises(TypeError, ctx.load_default_certs, None)
1000 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1001
Christian Heimes4c05b472013-11-23 15:58:30 +01001002 def test_create_default_context(self):
1003 ctx = ssl.create_default_context()
1004 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1005 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1006 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1007
1008 with open(SIGNING_CA) as f:
1009 cadata = f.read()
1010 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1011 cadata=cadata)
1012 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1013 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1014 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1015
1016 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1017 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1018 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1019 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1020
Christian Heimes67986f92013-11-23 22:43:47 +01001021 def test__create_stdlib_context(self):
1022 ctx = ssl._create_stdlib_context()
1023 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1024 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1025 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1026
1027 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1028 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1029 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1030 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1031
1032 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1033 cert_reqs=ssl.CERT_REQUIRED)
1034 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1035 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1036 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1037
1038 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1039 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1040 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1041 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001042
Antoine Pitrou152efa22010-05-16 18:19:27 +00001043
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001044class SSLErrorTests(unittest.TestCase):
1045
1046 def test_str(self):
1047 # The str() of a SSLError doesn't include the errno
1048 e = ssl.SSLError(1, "foo")
1049 self.assertEqual(str(e), "foo")
1050 self.assertEqual(e.errno, 1)
1051 # Same for a subclass
1052 e = ssl.SSLZeroReturnError(1, "foo")
1053 self.assertEqual(str(e), "foo")
1054 self.assertEqual(e.errno, 1)
1055
1056 def test_lib_reason(self):
1057 # Test the library and reason attributes
1058 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1059 with self.assertRaises(ssl.SSLError) as cm:
1060 ctx.load_dh_params(CERTFILE)
1061 self.assertEqual(cm.exception.library, 'PEM')
1062 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1063 s = str(cm.exception)
1064 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1065
1066 def test_subclass(self):
1067 # Check that the appropriate SSLError subclass is raised
1068 # (this only tests one of them)
1069 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1070 with socket.socket() as s:
1071 s.bind(("127.0.0.1", 0))
1072 s.listen(5)
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001073 c = socket.socket()
1074 c.connect(s.getsockname())
1075 c.setblocking(False)
1076 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001077 with self.assertRaises(ssl.SSLWantReadError) as cm:
1078 c.do_handshake()
1079 s = str(cm.exception)
1080 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1081 # For compatibility
1082 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1083
1084
Bill Janssen6e027db2007-11-15 22:23:56 +00001085class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001086
Antoine Pitrou480a1242010-04-28 21:37:09 +00001087 def test_connect(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001088 with support.transient_internet("svn.python.org"):
1089 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1090 cert_reqs=ssl.CERT_NONE)
1091 try:
1092 s.connect(("svn.python.org", 443))
1093 self.assertEqual({}, s.getpeercert())
1094 finally:
1095 s.close()
1096
1097 # this should fail because we have no verification certs
1098 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1099 cert_reqs=ssl.CERT_REQUIRED)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001100 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1101 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001102 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001103
Antoine Pitrou350c7222010-09-09 13:31:46 +00001104 # this should succeed because we specify the root cert
1105 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1106 cert_reqs=ssl.CERT_REQUIRED,
1107 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1108 try:
1109 s.connect(("svn.python.org", 443))
1110 self.assertTrue(s.getpeercert())
1111 finally:
1112 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001113
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001114 def test_connect_ex(self):
1115 # Issue #11326: check connect_ex() implementation
1116 with support.transient_internet("svn.python.org"):
1117 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1118 cert_reqs=ssl.CERT_REQUIRED,
1119 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1120 try:
1121 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1122 self.assertTrue(s.getpeercert())
1123 finally:
1124 s.close()
1125
1126 def test_non_blocking_connect_ex(self):
1127 # Issue #11326: non-blocking connect_ex() should allow handshake
1128 # to proceed after the socket gets ready.
1129 with support.transient_internet("svn.python.org"):
1130 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1131 cert_reqs=ssl.CERT_REQUIRED,
1132 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1133 do_handshake_on_connect=False)
1134 try:
1135 s.setblocking(False)
1136 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8a14a0c2011-02-27 15:44:12 +00001137 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1138 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001139 # Wait for connect to finish
1140 select.select([], [s], [], 5.0)
1141 # Non-blocking handshake
1142 while True:
1143 try:
1144 s.do_handshake()
1145 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001146 except ssl.SSLWantReadError:
1147 select.select([s], [], [], 5.0)
1148 except ssl.SSLWantWriteError:
1149 select.select([], [s], [], 5.0)
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001150 # SSL established
1151 self.assertTrue(s.getpeercert())
1152 finally:
1153 s.close()
1154
Antoine Pitroub4410db2011-05-18 18:51:06 +02001155 def test_timeout_connect_ex(self):
1156 # Issue #12065: on a timeout, connect_ex() should return the original
1157 # errno (mimicking the behaviour of non-SSL sockets).
1158 with support.transient_internet("svn.python.org"):
1159 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1160 cert_reqs=ssl.CERT_REQUIRED,
1161 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1162 do_handshake_on_connect=False)
1163 try:
1164 s.settimeout(0.0000001)
1165 rc = s.connect_ex(('svn.python.org', 443))
1166 if rc == 0:
1167 self.skipTest("svn.python.org responded too quickly")
1168 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1169 finally:
1170 s.close()
1171
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001172 def test_connect_ex_error(self):
Antoine Pitrouddb87ab2012-12-28 19:07:43 +01001173 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001174 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1175 cert_reqs=ssl.CERT_REQUIRED,
1176 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1177 try:
1178 self.assertEqual(errno.ECONNREFUSED,
1179 s.connect_ex(("svn.python.org", 444)))
1180 finally:
1181 s.close()
1182
Antoine Pitrou152efa22010-05-16 18:19:27 +00001183 def test_connect_with_context(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001184 with support.transient_internet("svn.python.org"):
1185 # Same as test_connect, but with a separately created context
1186 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1187 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1188 s.connect(("svn.python.org", 443))
1189 try:
1190 self.assertEqual({}, s.getpeercert())
1191 finally:
1192 s.close()
Antoine Pitroud5323212010-10-22 18:19:07 +00001193 # Same with a server hostname
1194 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1195 server_hostname="svn.python.org")
1196 if ssl.HAS_SNI:
1197 s.connect(("svn.python.org", 443))
1198 s.close()
1199 else:
1200 self.assertRaises(ValueError, s.connect, ("svn.python.org", 443))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001201 # This should fail because we have no verification certs
1202 ctx.verify_mode = ssl.CERT_REQUIRED
1203 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Ezio Melottied3a7d22010-12-01 02:32:32 +00001204 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
Antoine Pitrou350c7222010-09-09 13:31:46 +00001205 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001206 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001207 # This should succeed because we specify the root cert
1208 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1209 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1210 s.connect(("svn.python.org", 443))
1211 try:
1212 cert = s.getpeercert()
1213 self.assertTrue(cert)
1214 finally:
1215 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001216
1217 def test_connect_capath(self):
1218 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001219 # NOTE: the subject hashing algorithm has been changed between
1220 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1221 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001222 # filename) for this test to be portable across OpenSSL releases.
Antoine Pitrou350c7222010-09-09 13:31:46 +00001223 with support.transient_internet("svn.python.org"):
1224 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1225 ctx.verify_mode = ssl.CERT_REQUIRED
1226 ctx.load_verify_locations(capath=CAPATH)
1227 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1228 s.connect(("svn.python.org", 443))
1229 try:
1230 cert = s.getpeercert()
1231 self.assertTrue(cert)
1232 finally:
1233 s.close()
1234 # Same with a bytes `capath` argument
1235 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1236 ctx.verify_mode = ssl.CERT_REQUIRED
1237 ctx.load_verify_locations(capath=BYTES_CAPATH)
1238 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1239 s.connect(("svn.python.org", 443))
1240 try:
1241 cert = s.getpeercert()
1242 self.assertTrue(cert)
1243 finally:
1244 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001245
Christian Heimesefff7062013-11-21 03:35:02 +01001246 def test_connect_cadata(self):
1247 with open(CAFILE_CACERT) as f:
1248 pem = f.read()
1249 der = ssl.PEM_cert_to_DER_cert(pem)
1250 with support.transient_internet("svn.python.org"):
1251 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1252 ctx.verify_mode = ssl.CERT_REQUIRED
1253 ctx.load_verify_locations(cadata=pem)
1254 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1255 s.connect(("svn.python.org", 443))
1256 cert = s.getpeercert()
1257 self.assertTrue(cert)
1258
1259 # same with DER
1260 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1261 ctx.verify_mode = ssl.CERT_REQUIRED
1262 ctx.load_verify_locations(cadata=der)
1263 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1264 s.connect(("svn.python.org", 443))
1265 cert = s.getpeercert()
1266 self.assertTrue(cert)
1267
Antoine Pitroue3220242010-04-24 11:13:53 +00001268 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1269 def test_makefile_close(self):
1270 # Issue #5238: creating a file-like object with makefile() shouldn't
1271 # delay closing the underlying "real socket" (here tested with its
1272 # file descriptor, hence skipping the test under Windows).
Antoine Pitrou350c7222010-09-09 13:31:46 +00001273 with support.transient_internet("svn.python.org"):
1274 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1275 ss.connect(("svn.python.org", 443))
1276 fd = ss.fileno()
1277 f = ss.makefile()
1278 f.close()
1279 # The fd is still open
Antoine Pitroue3220242010-04-24 11:13:53 +00001280 os.read(fd, 0)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001281 # Closing the SSL socket should close the fd too
1282 ss.close()
1283 gc.collect()
1284 with self.assertRaises(OSError) as e:
1285 os.read(fd, 0)
1286 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001287
Antoine Pitrou480a1242010-04-28 21:37:09 +00001288 def test_non_blocking_handshake(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001289 with support.transient_internet("svn.python.org"):
1290 s = socket.socket(socket.AF_INET)
1291 s.connect(("svn.python.org", 443))
1292 s.setblocking(False)
1293 s = ssl.wrap_socket(s,
1294 cert_reqs=ssl.CERT_NONE,
1295 do_handshake_on_connect=False)
1296 count = 0
1297 while True:
1298 try:
1299 count += 1
1300 s.do_handshake()
1301 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001302 except ssl.SSLWantReadError:
1303 select.select([s], [], [])
1304 except ssl.SSLWantWriteError:
1305 select.select([], [s], [])
Antoine Pitrou350c7222010-09-09 13:31:46 +00001306 s.close()
1307 if support.verbose:
1308 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001309
Antoine Pitrou480a1242010-04-28 21:37:09 +00001310 def test_get_server_certificate(self):
Antoine Pitrou15399c32011-04-28 19:23:55 +02001311 def _test_get_server_certificate(host, port, cert=None):
1312 with support.transient_internet(host):
1313 pem = ssl.get_server_certificate((host, port))
1314 if not pem:
1315 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001316
Antoine Pitrou15399c32011-04-28 19:23:55 +02001317 try:
1318 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1319 except ssl.SSLError as x:
1320 #should fail
1321 if support.verbose:
1322 sys.stdout.write("%s\n" % x)
1323 else:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001324 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1325
Antoine Pitrou15399c32011-04-28 19:23:55 +02001326 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1327 if not pem:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001328 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001329 if support.verbose:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001330 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001331
Antoine Pitrou15399c32011-04-28 19:23:55 +02001332 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1333 if support.IPV6_ENABLED:
1334 _test_get_server_certificate('ipv6.google.com', 443)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001335
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001336 def test_ciphers(self):
1337 remote = ("svn.python.org", 443)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001338 with support.transient_internet(remote[0]):
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001339 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1340 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1341 s.connect(remote)
1342 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1343 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1344 s.connect(remote)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001345 # Error checking can happen at instantiation or when connecting
Ezio Melottied3a7d22010-12-01 02:32:32 +00001346 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitroud2eca372010-10-29 23:41:37 +00001347 with socket.socket(socket.AF_INET) as sock:
1348 s = ssl.wrap_socket(sock,
1349 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1350 s.connect(remote)
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001351
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001352 def test_algorithms(self):
1353 # Issue #8484: all algorithms should be available when verifying a
1354 # certificate.
Antoine Pitrou29619b22010-04-22 18:43:31 +00001355 # SHA256 was added in OpenSSL 0.9.8
1356 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1357 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001358 # sha256.tbs-internet.com needs SNI to use the correct certificate
1359 if not ssl.HAS_SNI:
1360 self.skipTest("SNI needed for this test")
Victor Stinnerf332abb2011-01-08 03:16:05 +00001361 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1362 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001363 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Antoine Pitrou160fd932011-01-08 10:23:29 +00001364 with support.transient_internet("sha256.tbs-internet.com"):
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001365 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1366 ctx.verify_mode = ssl.CERT_REQUIRED
1367 ctx.load_verify_locations(sha256_cert)
1368 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1369 server_hostname="sha256.tbs-internet.com")
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001370 try:
1371 s.connect(remote)
1372 if support.verbose:
1373 sys.stdout.write("\nCipher with %r is %r\n" %
1374 (remote, s.cipher()))
1375 sys.stdout.write("Certificate is:\n%s\n" %
1376 pprint.pformat(s.getpeercert()))
1377 finally:
1378 s.close()
1379
Christian Heimes9a5395a2013-06-17 15:44:12 +02001380 def test_get_ca_certs_capath(self):
1381 # capath certs are loaded on request
1382 with support.transient_internet("svn.python.org"):
1383 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1384 ctx.verify_mode = ssl.CERT_REQUIRED
1385 ctx.load_verify_locations(capath=CAPATH)
1386 self.assertEqual(ctx.get_ca_certs(), [])
1387 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1388 s.connect(("svn.python.org", 443))
1389 try:
1390 cert = s.getpeercert()
1391 self.assertTrue(cert)
1392 finally:
1393 s.close()
1394 self.assertEqual(len(ctx.get_ca_certs()), 1)
1395
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001396try:
1397 import threading
1398except ImportError:
1399 _have_threads = False
1400else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001401 _have_threads = True
1402
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001403 from test.ssl_servers import make_https_server
1404
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001405 class ThreadedEchoServer(threading.Thread):
1406
1407 class ConnectionHandler(threading.Thread):
1408
1409 """A mildly complicated class, because we want it to work both
1410 with and without the SSL wrapper around the socket connection, so
1411 that we can test the STARTTLS functionality."""
1412
Bill Janssen6e027db2007-11-15 22:23:56 +00001413 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001414 self.server = server
1415 self.running = False
1416 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001417 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001418 self.sock.setblocking(1)
1419 self.sslconn = None
1420 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001421 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001422
Antoine Pitrou480a1242010-04-28 21:37:09 +00001423 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001424 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001425 self.sslconn = self.server.context.wrap_socket(
1426 self.sock, server_side=True)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001427 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001428 except (ssl.SSLError, ConnectionResetError) as e:
1429 # We treat ConnectionResetError as though it were an
1430 # SSLError - OpenSSL on Ubuntu abruptly closes the
1431 # connection when asked to use an unsupported protocol.
1432 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001433 # XXX Various errors can have happened here, for example
1434 # a mismatching protocol version, an invalid certificate,
1435 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001436 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001437 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001438 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001439 self.running = False
1440 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001441 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001442 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001443 else:
Antoine Pitroub5218772010-05-21 09:56:06 +00001444 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001445 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001446 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001447 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1448 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001449 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001450 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1451 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001452 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001453 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001454 sys.stdout.write(" server: selected protocol is now "
1455 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001456 return True
1457
1458 def read(self):
1459 if self.sslconn:
1460 return self.sslconn.read()
1461 else:
1462 return self.sock.recv(1024)
1463
1464 def write(self, bytes):
1465 if self.sslconn:
1466 return self.sslconn.write(bytes)
1467 else:
1468 return self.sock.send(bytes)
1469
1470 def close(self):
1471 if self.sslconn:
1472 self.sslconn.close()
1473 else:
1474 self.sock.close()
1475
Antoine Pitrou480a1242010-04-28 21:37:09 +00001476 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001477 self.running = True
1478 if not self.server.starttls_server:
1479 if not self.wrap_conn():
1480 return
1481 while self.running:
1482 try:
1483 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001484 stripped = msg.strip()
1485 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001486 # eof, so quit this handler
1487 self.running = False
1488 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001489 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001490 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001491 sys.stdout.write(" server: client closed connection\n")
1492 self.close()
1493 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001494 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001495 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001496 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001497 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001498 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001499 if not self.wrap_conn():
1500 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001501 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001502 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001503 if support.verbose and self.server.connectionchatty:
1504 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001505 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001506 self.sock = self.sslconn.unwrap()
1507 self.sslconn = None
1508 if support.verbose and self.server.connectionchatty:
1509 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001510 elif stripped == b'CB tls-unique':
1511 if support.verbose and self.server.connectionchatty:
1512 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1513 data = self.sslconn.get_channel_binding("tls-unique")
1514 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001515 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001516 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001517 self.server.connectionchatty):
1518 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001519 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1520 % (msg, ctype, msg.lower(), ctype))
1521 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001522 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001523 if self.server.chatty:
1524 handle_error("Test server failure:\n")
1525 self.close()
1526 self.running = False
1527 # normally, we'd just stop here, but for the test
1528 # harness, we want to stop the server
1529 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001530
Antoine Pitroub5218772010-05-21 09:56:06 +00001531 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001532 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001533 chatty=True, connectionchatty=False, starttls_server=False,
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001534 npn_protocols=None, ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001535 if context:
1536 self.context = context
1537 else:
1538 self.context = ssl.SSLContext(ssl_version
1539 if ssl_version is not None
1540 else ssl.PROTOCOL_TLSv1)
1541 self.context.verify_mode = (certreqs if certreqs is not None
1542 else ssl.CERT_NONE)
1543 if cacerts:
1544 self.context.load_verify_locations(cacerts)
1545 if certificate:
1546 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001547 if npn_protocols:
1548 self.context.set_npn_protocols(npn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001549 if ciphers:
1550 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001551 self.chatty = chatty
1552 self.connectionchatty = connectionchatty
1553 self.starttls_server = starttls_server
1554 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001555 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001556 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001557 self.active = False
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001558 self.selected_protocols = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001559 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001560 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001561 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001562
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001563 def __enter__(self):
1564 self.start(threading.Event())
1565 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001566 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001567
1568 def __exit__(self, *args):
1569 self.stop()
1570 self.join()
1571
Antoine Pitrou480a1242010-04-28 21:37:09 +00001572 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001573 self.flag = flag
1574 threading.Thread.start(self)
1575
Antoine Pitrou480a1242010-04-28 21:37:09 +00001576 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001577 self.sock.settimeout(0.05)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001578 self.sock.listen(5)
1579 self.active = True
1580 if self.flag:
1581 # signal an event
1582 self.flag.set()
1583 while self.active:
1584 try:
1585 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001586 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001587 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001588 + repr(connaddr) + '\n')
1589 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001590 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001591 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001592 except socket.timeout:
1593 pass
1594 except KeyboardInterrupt:
1595 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001596 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001597
Antoine Pitrou480a1242010-04-28 21:37:09 +00001598 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001599 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001600
Bill Janssen54cc54c2007-12-14 22:08:56 +00001601 class AsyncoreEchoServer(threading.Thread):
1602
1603 # this one's based on asyncore.dispatcher
1604
1605 class EchoServer (asyncore.dispatcher):
1606
1607 class ConnectionHandler (asyncore.dispatcher_with_send):
1608
1609 def __init__(self, conn, certfile):
1610 self.socket = ssl.wrap_socket(conn, server_side=True,
1611 certfile=certfile,
1612 do_handshake_on_connect=False)
1613 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001614 self._ssl_accepting = True
1615 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001616
1617 def readable(self):
1618 if isinstance(self.socket, ssl.SSLSocket):
1619 while self.socket.pending() > 0:
1620 self.handle_read_event()
1621 return True
1622
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001623 def _do_ssl_handshake(self):
1624 try:
1625 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02001626 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1627 return
1628 except ssl.SSLEOFError:
1629 return self.handle_close()
1630 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001631 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02001632 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001633 if err.args[0] == errno.ECONNABORTED:
1634 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001635 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001636 self._ssl_accepting = False
1637
1638 def handle_read(self):
1639 if self._ssl_accepting:
1640 self._do_ssl_handshake()
1641 else:
1642 data = self.recv(1024)
1643 if support.verbose:
1644 sys.stdout.write(" server: read %s from client\n" % repr(data))
1645 if not data:
1646 self.close()
1647 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001648 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00001649
1650 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00001651 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001652 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00001653 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1654
1655 def handle_error(self):
1656 raise
1657
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001658 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001659 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001660 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1661 self.port = support.bind_port(sock, '')
1662 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001663 self.listen(5)
1664
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00001665 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001666 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00001667 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1668 self.ConnectionHandler(sock_obj, self.certfile)
1669
1670 def handle_error(self):
1671 raise
1672
Trent Nelson78520002008-04-10 20:54:35 +00001673 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001674 self.flag = None
1675 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001676 self.server = self.EchoServer(certfile)
1677 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00001678 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001679 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00001680
1681 def __str__(self):
1682 return "<%s %s>" % (self.__class__.__name__, self.server)
1683
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001684 def __enter__(self):
1685 self.start(threading.Event())
1686 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001687 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001688
1689 def __exit__(self, *args):
1690 if support.verbose:
1691 sys.stdout.write(" cleanup: stopping server.\n")
1692 self.stop()
1693 if support.verbose:
1694 sys.stdout.write(" cleanup: joining server thread.\n")
1695 self.join()
1696 if support.verbose:
1697 sys.stdout.write(" cleanup: successfully joined.\n")
1698
Bill Janssen54cc54c2007-12-14 22:08:56 +00001699 def start (self, flag=None):
1700 self.flag = flag
1701 threading.Thread.start(self)
1702
Antoine Pitrou480a1242010-04-28 21:37:09 +00001703 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001704 self.active = True
1705 if self.flag:
1706 self.flag.set()
1707 while self.active:
1708 try:
1709 asyncore.loop(1)
1710 except:
1711 pass
1712
Antoine Pitrou480a1242010-04-28 21:37:09 +00001713 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00001714 self.active = False
1715 self.server.close()
1716
Antoine Pitrou480a1242010-04-28 21:37:09 +00001717 def bad_cert_test(certfile):
1718 """
1719 Launch a server with CERT_REQUIRED, and check that trying to
1720 connect to it with the given client certificate fails.
1721 """
Trent Nelson78520002008-04-10 20:54:35 +00001722 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001723 certreqs=ssl.CERT_REQUIRED,
Bill Janssen6e027db2007-11-15 22:23:56 +00001724 cacerts=CERTFILE, chatty=False,
1725 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001726 with server:
Thomas Woutersed03b412007-08-28 21:37:11 +00001727 try:
Antoine Pitroud2eca372010-10-29 23:41:37 +00001728 with socket.socket() as sock:
1729 s = ssl.wrap_socket(sock,
1730 certfile=certfile,
1731 ssl_version=ssl.PROTOCOL_TLSv1)
1732 s.connect((HOST, server.port))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001733 except ssl.SSLError as x:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001734 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001735 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
Andrew Svetlov0832af62012-12-18 23:10:48 +02001736 except OSError as x:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001737 if support.verbose:
Andrew Svetlov0832af62012-12-18 23:10:48 +02001738 sys.stdout.write("\nOSError is %s\n" % x.args[1])
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001739 except OSError as x:
Giampaolo Rodolàcd9dfb92010-08-29 20:56:56 +00001740 if x.errno != errno.ENOENT:
1741 raise
Giampaolo Rodolà745ab382010-08-29 19:25:49 +00001742 if support.verbose:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001743 sys.stdout.write("\OSError is %s\n" % str(x))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001744 else:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00001745 raise AssertionError("Use of invalid cert should have failed!")
Thomas Woutersed03b412007-08-28 21:37:11 +00001746
Antoine Pitroub5218772010-05-21 09:56:06 +00001747 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001748 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00001749 """
1750 Launch a server, connect a client to it and try various reads
1751 and writes.
1752 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001753 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00001754 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001755 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00001756 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001757 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001758 with client_context.wrap_socket(socket.socket(),
1759 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01001760 s.connect((HOST, server.port))
1761 for arg in [indata, bytearray(indata), memoryview(indata)]:
1762 if connectionchatty:
1763 if support.verbose:
1764 sys.stdout.write(
1765 " client: sending %r...\n" % indata)
1766 s.write(arg)
1767 outdata = s.read()
1768 if connectionchatty:
1769 if support.verbose:
1770 sys.stdout.write(" client: read %r\n" % outdata)
1771 if outdata != indata.lower():
1772 raise AssertionError(
1773 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1774 % (outdata[:20], len(outdata),
1775 indata[:20].lower(), len(indata)))
1776 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00001777 if connectionchatty:
1778 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01001779 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001780 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01001781 'compression': s.compression(),
1782 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001783 'peercert': s.getpeercert(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001784 'client_npn_protocol': s.selected_npn_protocol()
1785 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01001786 s.close()
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001787 stats['server_npn_protocols'] = server.selected_protocols
1788 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00001789
Antoine Pitroub5218772010-05-21 09:56:06 +00001790 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1791 certsreqs=None, server_options=0, client_options=0):
Benjamin Peterson2a691a82008-03-31 01:51:45 +00001792 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001793 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00001794 certtype = {
1795 ssl.CERT_NONE: "CERT_NONE",
1796 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1797 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1798 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001799 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001800 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001801 sys.stdout.write(formatstr %
1802 (ssl.get_protocol_name(client_protocol),
1803 ssl.get_protocol_name(server_protocol),
1804 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00001805 client_context = ssl.SSLContext(client_protocol)
1806 client_context.options = ssl.OP_ALL | client_options
1807 server_context = ssl.SSLContext(server_protocol)
1808 server_context.options = ssl.OP_ALL | server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01001809
1810 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1811 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1812 # starting from OpenSSL 1.0.0 (see issue #8322).
1813 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1814 client_context.set_ciphers("ALL")
1815
Antoine Pitroub5218772010-05-21 09:56:06 +00001816 for ctx in (client_context, server_context):
1817 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00001818 ctx.load_cert_chain(CERTFILE)
1819 ctx.load_verify_locations(CERTFILE)
1820 try:
1821 server_params_test(client_context, server_context,
1822 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001823 # Protocol mismatch can result in either an SSLError, or a
1824 # "Connection reset by peer" error.
1825 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001826 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001827 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02001828 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001829 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001830 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001831 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001832 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00001833 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001834 "Client protocol %s succeeded with server protocol %s!"
1835 % (ssl.get_protocol_name(client_protocol),
1836 ssl.get_protocol_name(server_protocol)))
1837
1838
Bill Janssen6e027db2007-11-15 22:23:56 +00001839 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001840
Antoine Pitrou23df4832010-08-04 17:14:06 +00001841 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00001842 def test_echo(self):
1843 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001844 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001845 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00001846 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01001847 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
1848 context = ssl.SSLContext(protocol)
1849 context.load_cert_chain(CERTFILE)
1850 server_params_test(context, context,
1851 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001852
Antoine Pitrou480a1242010-04-28 21:37:09 +00001853 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001854 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001855 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00001856 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1857 context.verify_mode = ssl.CERT_REQUIRED
1858 context.load_verify_locations(CERTFILE)
1859 context.load_cert_chain(CERTFILE)
1860 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001861 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02001862 s = context.wrap_socket(socket.socket(),
1863 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001864 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02001865 # getpeercert() raise ValueError while the handshake isn't
1866 # done.
1867 with self.assertRaises(ValueError):
1868 s.getpeercert()
1869 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001870 cert = s.getpeercert()
1871 self.assertTrue(cert, "Can't get peer certificate.")
1872 cipher = s.cipher()
1873 if support.verbose:
1874 sys.stdout.write(pprint.pformat(cert) + '\n')
1875 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
1876 if 'subject' not in cert:
1877 self.fail("No subject field in certificate: %s." %
1878 pprint.pformat(cert))
1879 if ((('organizationName', 'Python Software Foundation'),)
1880 not in cert['subject']):
1881 self.fail(
1882 "Missing or invalid 'organizationName' field in certificate subject; "
1883 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00001884 self.assertIn('notBefore', cert)
1885 self.assertIn('notAfter', cert)
1886 before = ssl.cert_time_to_seconds(cert['notBefore'])
1887 after = ssl.cert_time_to_seconds(cert['notAfter'])
1888 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001889 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001890
Christian Heimes2427b502013-11-23 11:24:32 +01001891 @unittest.skipUnless(have_verify_flags(),
1892 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001893 def test_crl_check(self):
1894 if support.verbose:
1895 sys.stdout.write("\n")
1896
1897 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1898 server_context.load_cert_chain(SIGNED_CERTFILE)
1899
1900 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1901 context.verify_mode = ssl.CERT_REQUIRED
1902 context.load_verify_locations(SIGNING_CA)
Christian Heimes32f0c7a2013-11-22 03:43:48 +01001903 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
Christian Heimes22587792013-11-21 23:56:13 +01001904
1905 # VERIFY_DEFAULT should pass
1906 server = ThreadedEchoServer(context=server_context, chatty=True)
1907 with server:
1908 with context.wrap_socket(socket.socket()) as s:
1909 s.connect((HOST, server.port))
1910 cert = s.getpeercert()
1911 self.assertTrue(cert, "Can't get peer certificate.")
1912
1913 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01001914 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01001915
1916 server = ThreadedEchoServer(context=server_context, chatty=True)
1917 with server:
1918 with context.wrap_socket(socket.socket()) as s:
1919 with self.assertRaisesRegex(ssl.SSLError,
1920 "certificate verify failed"):
1921 s.connect((HOST, server.port))
1922
1923 # now load a CRL file. The CRL file is signed by the CA.
1924 context.load_verify_locations(CRLFILE)
1925
1926 server = ThreadedEchoServer(context=server_context, chatty=True)
1927 with server:
1928 with context.wrap_socket(socket.socket()) as s:
1929 s.connect((HOST, server.port))
1930 cert = s.getpeercert()
1931 self.assertTrue(cert, "Can't get peer certificate.")
1932
Antoine Pitrou480a1242010-04-28 21:37:09 +00001933 def test_empty_cert(self):
1934 """Connecting with an empty cert file"""
1935 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1936 "nullcert.pem"))
1937 def test_malformed_cert(self):
1938 """Connecting with a badly formatted certificate (syntax error)"""
1939 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1940 "badcert.pem"))
1941 def test_nonexisting_cert(self):
1942 """Connecting with a non-existing cert file"""
1943 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1944 "wrongcert.pem"))
1945 def test_malformed_key(self):
1946 """Connecting with a badly formatted key (syntax error)"""
1947 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1948 "badkey.pem"))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001949
Antoine Pitrou480a1242010-04-28 21:37:09 +00001950 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001951 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00001952 in the client when attempting handshake.
1953 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00001954 listener_ready = threading.Event()
1955 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001956
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001957 s = socket.socket()
1958 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00001959
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001960 # `listener` runs in a thread. It sits in an accept() until
1961 # the main thread connects. Then it rudely closes the socket,
1962 # and sets Event `listener_gone` to let the main thread know
1963 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00001964 def listener():
Trent Nelson6b240cd2008-04-10 20:12:06 +00001965 s.listen(5)
1966 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00001967 newsock, addr = s.accept()
1968 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001969 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00001970 listener_gone.set()
1971
1972 def connector():
1973 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00001974 with socket.socket() as c:
1975 c.connect((HOST, port))
1976 listener_gone.wait()
1977 try:
1978 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001979 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00001980 pass
1981 else:
1982 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00001983
1984 t = threading.Thread(target=listener)
1985 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00001986 try:
1987 connector()
1988 finally:
1989 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00001990
Antoine Pitrou23df4832010-08-04 17:14:06 +00001991 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02001992 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
1993 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001994 def test_protocol_sslv2(self):
1995 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001996 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001997 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001998 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
1999 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2000 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
2001 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
2002 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2003 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002004 # SSLv23 client with specific SSL options
2005 if no_sslv2_implies_sslv3_hello():
2006 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2007 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2008 client_options=ssl.OP_NO_SSLv2)
2009 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True,
2010 client_options=ssl.OP_NO_SSLv3)
2011 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True,
2012 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002013
Antoine Pitrou23df4832010-08-04 17:14:06 +00002014 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002015 def test_protocol_sslv23(self):
2016 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002017 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002018 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002019 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2020 try:
2021 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002022 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002023 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2024 if support.verbose:
2025 sys.stdout.write(
2026 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2027 % str(x))
Antoine Pitrou480a1242010-04-28 21:37:09 +00002028 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
2029 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
2030 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002031
Antoine Pitrou480a1242010-04-28 21:37:09 +00002032 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
2033 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
2034 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002035
Antoine Pitrou480a1242010-04-28 21:37:09 +00002036 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
2037 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
2038 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002039
Antoine Pitroub5218772010-05-21 09:56:06 +00002040 # Server with specific SSL options
2041 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2042 server_options=ssl.OP_NO_SSLv3)
2043 # Will choose TLSv1
2044 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2045 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2046 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2047 server_options=ssl.OP_NO_TLSv1)
2048
2049
Antoine Pitrou23df4832010-08-04 17:14:06 +00002050 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002051 def test_protocol_sslv3(self):
2052 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002053 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002054 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002055 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
2056 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
2057 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002058 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2059 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002060 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2061 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002062 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002063 if no_sslv2_implies_sslv3_hello():
2064 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2065 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, True,
2066 client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002067
Antoine Pitrou23df4832010-08-04 17:14:06 +00002068 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002069 def test_protocol_tlsv1(self):
2070 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002071 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002072 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002073 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
2074 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
2075 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002076 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2077 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002078 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002079 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2080 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002081
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002082 @skip_if_broken_ubuntu_ssl
2083 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2084 "TLS version 1.1 not supported.")
2085 def test_protocol_tlsv1_1(self):
2086 """Connecting to a TLSv1.1 server with various client options.
2087 Testing against older TLS versions."""
2088 if support.verbose:
2089 sys.stdout.write("\n")
2090 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, True)
2091 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2092 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2093 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2094 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2095 client_options=ssl.OP_NO_TLSv1_1)
2096
2097 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, True)
2098 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2099 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2100
2101
2102 @skip_if_broken_ubuntu_ssl
2103 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2104 "TLS version 1.2 not supported.")
2105 def test_protocol_tlsv1_2(self):
2106 """Connecting to a TLSv1.2 server with various client options.
2107 Testing against older TLS versions."""
2108 if support.verbose:
2109 sys.stdout.write("\n")
2110 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, True,
2111 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2112 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2113 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2114 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2115 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2116 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2117 client_options=ssl.OP_NO_TLSv1_2)
2118
2119 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, True)
2120 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2121 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2122 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2123 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2124
Antoine Pitrou480a1242010-04-28 21:37:09 +00002125 def test_starttls(self):
2126 """Switching from clear text to encrypted and back again."""
2127 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002128
Trent Nelson78520002008-04-10 20:54:35 +00002129 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002130 ssl_version=ssl.PROTOCOL_TLSv1,
2131 starttls_server=True,
2132 chatty=True,
2133 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002134 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002135 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002136 s = socket.socket()
2137 s.setblocking(1)
2138 s.connect((HOST, server.port))
2139 if support.verbose:
2140 sys.stdout.write("\n")
2141 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002142 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002143 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002144 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002145 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002146 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002147 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002148 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002149 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002150 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002151 msg = outdata.strip().lower()
2152 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2153 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002154 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002155 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002156 " client: read %r from server, starting TLS...\n"
2157 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002158 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2159 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002160 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2161 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002162 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002163 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002164 " client: read %r from server, ending TLS...\n"
2165 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002166 s = conn.unwrap()
2167 wrapped = False
2168 else:
2169 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002170 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002171 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002172 if support.verbose:
2173 sys.stdout.write(" client: closing connection.\n")
2174 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002175 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002176 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002177 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002178 if wrapped:
2179 conn.close()
2180 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002181 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002182
Antoine Pitrou480a1242010-04-28 21:37:09 +00002183 def test_socketserver(self):
2184 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002185 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002186 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002187 if support.verbose:
2188 sys.stdout.write('\n')
2189 with open(CERTFILE, 'rb') as f:
2190 d1 = f.read()
2191 d2 = ''
2192 # now fetch the same data from the HTTPS server
2193 url = 'https://%s:%d/%s' % (
2194 HOST, server.port, os.path.split(CERTFILE)[1])
2195 f = urllib.request.urlopen(url)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002196 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002197 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002198 if dlen and (int(dlen) > 0):
2199 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002200 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002201 sys.stdout.write(
2202 " client: read %d bytes from remote server '%s'\n"
2203 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002204 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002205 f.close()
2206 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002207
Antoine Pitrou480a1242010-04-28 21:37:09 +00002208 def test_asyncore_server(self):
2209 """Check the example asyncore integration."""
2210 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002211
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002212 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002213 sys.stdout.write("\n")
2214
Antoine Pitrou480a1242010-04-28 21:37:09 +00002215 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002216 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002217 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002218 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002219 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002220 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002221 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002222 " client: sending %r...\n" % indata)
2223 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002224 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002225 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002226 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002227 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002228 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002229 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2230 % (outdata[:20], len(outdata),
2231 indata[:20].lower(), len(indata)))
2232 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002233 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002234 sys.stdout.write(" client: closing connection.\n")
2235 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002236 if support.verbose:
2237 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002238
Antoine Pitrou480a1242010-04-28 21:37:09 +00002239 def test_recv_send(self):
2240 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002241 if support.verbose:
2242 sys.stdout.write("\n")
2243
2244 server = ThreadedEchoServer(CERTFILE,
2245 certreqs=ssl.CERT_NONE,
2246 ssl_version=ssl.PROTOCOL_TLSv1,
2247 cacerts=CERTFILE,
2248 chatty=True,
2249 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002250 with server:
2251 s = ssl.wrap_socket(socket.socket(),
2252 server_side=False,
2253 certfile=CERTFILE,
2254 ca_certs=CERTFILE,
2255 cert_reqs=ssl.CERT_NONE,
2256 ssl_version=ssl.PROTOCOL_TLSv1)
2257 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002258 # helper methods for standardising recv* method signatures
2259 def _recv_into():
2260 b = bytearray(b"\0"*100)
2261 count = s.recv_into(b)
2262 return b[:count]
2263
2264 def _recvfrom_into():
2265 b = bytearray(b"\0"*100)
2266 count, addr = s.recvfrom_into(b)
2267 return b[:count]
2268
2269 # (name, method, whether to expect success, *args)
2270 send_methods = [
2271 ('send', s.send, True, []),
2272 ('sendto', s.sendto, False, ["some.address"]),
2273 ('sendall', s.sendall, True, []),
2274 ]
2275 recv_methods = [
2276 ('recv', s.recv, True, []),
2277 ('recvfrom', s.recvfrom, False, ["some.address"]),
2278 ('recv_into', _recv_into, True, []),
2279 ('recvfrom_into', _recvfrom_into, False, []),
2280 ]
2281 data_prefix = "PREFIX_"
2282
2283 for meth_name, send_meth, expect_success, args in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002284 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002285 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002286 send_meth(indata, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002287 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002288 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002289 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002290 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002291 "<<{outdata:r}>> ({nout:d}) received; "
2292 "expected <<{indata:r}>> ({nin:d})\n".format(
2293 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002294 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002295 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002296 )
2297 )
2298 except ValueError as e:
2299 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002300 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002301 "Failed to send with method <<{name:s}>>; "
2302 "expected to succeed.\n".format(name=meth_name)
2303 )
2304 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002305 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002306 "Method <<{name:s}>> failed with unexpected "
2307 "exception message: {exp:s}\n".format(
2308 name=meth_name, exp=e
2309 )
2310 )
2311
2312 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002313 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002314 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002315 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002316 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002317 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002318 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002319 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002320 "<<{outdata:r}>> ({nout:d}) received; "
2321 "expected <<{indata:r}>> ({nin:d})\n".format(
2322 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002323 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002324 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002325 )
2326 )
2327 except ValueError as e:
2328 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002329 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002330 "Failed to receive with method <<{name:s}>>; "
2331 "expected to succeed.\n".format(name=meth_name)
2332 )
2333 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002334 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002335 "Method <<{name:s}>> failed with unexpected "
2336 "exception message: {exp:s}\n".format(
2337 name=meth_name, exp=e
2338 )
2339 )
2340 # consume data
2341 s.read()
2342
Nick Coghlan513886a2011-08-28 00:00:27 +10002343 # Make sure sendmsg et al are disallowed to avoid
2344 # inadvertent disclosure of data and/or corruption
2345 # of the encrypted data stream
2346 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2347 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2348 self.assertRaises(NotImplementedError,
2349 s.recvmsg_into, bytearray(100))
2350
Antoine Pitrou480a1242010-04-28 21:37:09 +00002351 s.write(b"over\n")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002352 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002353
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002354 def test_handshake_timeout(self):
2355 # Issue #5103: SSL handshake must respect the socket timeout
2356 server = socket.socket(socket.AF_INET)
2357 host = "127.0.0.1"
2358 port = support.bind_port(server)
2359 started = threading.Event()
2360 finish = False
2361
2362 def serve():
2363 server.listen(5)
2364 started.set()
2365 conns = []
2366 while not finish:
2367 r, w, e = select.select([server], [], [], 0.1)
2368 if server in r:
2369 # Let the socket hang around rather than having
2370 # it closed by garbage collection.
2371 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002372 for sock in conns:
2373 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002374
2375 t = threading.Thread(target=serve)
2376 t.start()
2377 started.wait()
2378
2379 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002380 try:
2381 c = socket.socket(socket.AF_INET)
2382 c.settimeout(0.2)
2383 c.connect((host, port))
2384 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002385 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002386 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002387 finally:
2388 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002389 try:
2390 c = socket.socket(socket.AF_INET)
2391 c = ssl.wrap_socket(c)
2392 c.settimeout(0.2)
2393 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002394 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002395 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002396 finally:
2397 c.close()
2398 finally:
2399 finish = True
2400 t.join()
2401 server.close()
2402
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002403 def test_server_accept(self):
2404 # Issue #16357: accept() on a SSLSocket created through
2405 # SSLContext.wrap_socket().
2406 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2407 context.verify_mode = ssl.CERT_REQUIRED
2408 context.load_verify_locations(CERTFILE)
2409 context.load_cert_chain(CERTFILE)
2410 server = socket.socket(socket.AF_INET)
2411 host = "127.0.0.1"
2412 port = support.bind_port(server)
2413 server = context.wrap_socket(server, server_side=True)
2414
2415 evt = threading.Event()
2416 remote = None
2417 peer = None
2418 def serve():
2419 nonlocal remote, peer
2420 server.listen(5)
2421 # Block on the accept and wait on the connection to close.
2422 evt.set()
2423 remote, peer = server.accept()
2424 remote.recv(1)
2425
2426 t = threading.Thread(target=serve)
2427 t.start()
2428 # Client wait until server setup and perform a connect.
2429 evt.wait()
2430 client = context.wrap_socket(socket.socket())
2431 client.connect((host, port))
2432 client_addr = client.getsockname()
2433 client.close()
2434 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002435 remote.close()
2436 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002437 # Sanity checks.
2438 self.assertIsInstance(remote, ssl.SSLSocket)
2439 self.assertEqual(peer, client_addr)
2440
Antoine Pitrou242db722013-05-01 20:52:07 +02002441 def test_getpeercert_enotconn(self):
2442 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2443 with context.wrap_socket(socket.socket()) as sock:
2444 with self.assertRaises(OSError) as cm:
2445 sock.getpeercert()
2446 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2447
2448 def test_do_handshake_enotconn(self):
2449 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2450 with context.wrap_socket(socket.socket()) as sock:
2451 with self.assertRaises(OSError) as cm:
2452 sock.do_handshake()
2453 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2454
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002455 def test_default_ciphers(self):
2456 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2457 try:
2458 # Force a set of weak ciphers on our client context
2459 context.set_ciphers("DES")
2460 except ssl.SSLError:
2461 self.skipTest("no DES cipher available")
2462 with ThreadedEchoServer(CERTFILE,
2463 ssl_version=ssl.PROTOCOL_SSLv23,
2464 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002465 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002466 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002467 s.connect((HOST, server.port))
2468 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2469
Antoine Pitroud6494802011-07-21 01:11:30 +02002470 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2471 "'tls-unique' channel binding not available")
2472 def test_tls_unique_channel_binding(self):
2473 """Test tls-unique channel binding."""
2474 if support.verbose:
2475 sys.stdout.write("\n")
2476
2477 server = ThreadedEchoServer(CERTFILE,
2478 certreqs=ssl.CERT_NONE,
2479 ssl_version=ssl.PROTOCOL_TLSv1,
2480 cacerts=CERTFILE,
2481 chatty=True,
2482 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01002483 with server:
2484 s = ssl.wrap_socket(socket.socket(),
2485 server_side=False,
2486 certfile=CERTFILE,
2487 ca_certs=CERTFILE,
2488 cert_reqs=ssl.CERT_NONE,
2489 ssl_version=ssl.PROTOCOL_TLSv1)
2490 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002491 # get the data
2492 cb_data = s.get_channel_binding("tls-unique")
2493 if support.verbose:
2494 sys.stdout.write(" got channel binding data: {0!r}\n"
2495 .format(cb_data))
2496
2497 # check if it is sane
2498 self.assertIsNotNone(cb_data)
2499 self.assertEqual(len(cb_data), 12) # True for TLSv1
2500
2501 # and compare with the peers version
2502 s.write(b"CB tls-unique\n")
2503 peer_data_repr = s.read().strip()
2504 self.assertEqual(peer_data_repr,
2505 repr(cb_data).encode("us-ascii"))
2506 s.close()
2507
2508 # now, again
2509 s = ssl.wrap_socket(socket.socket(),
2510 server_side=False,
2511 certfile=CERTFILE,
2512 ca_certs=CERTFILE,
2513 cert_reqs=ssl.CERT_NONE,
2514 ssl_version=ssl.PROTOCOL_TLSv1)
2515 s.connect((HOST, server.port))
2516 new_cb_data = s.get_channel_binding("tls-unique")
2517 if support.verbose:
2518 sys.stdout.write(" got another channel binding data: {0!r}\n"
2519 .format(new_cb_data))
2520 # is it really unique
2521 self.assertNotEqual(cb_data, new_cb_data)
2522 self.assertIsNotNone(cb_data)
2523 self.assertEqual(len(cb_data), 12) # True for TLSv1
2524 s.write(b"CB tls-unique\n")
2525 peer_data_repr = s.read().strip()
2526 self.assertEqual(peer_data_repr,
2527 repr(new_cb_data).encode("us-ascii"))
2528 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002529
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002530 def test_compression(self):
2531 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2532 context.load_cert_chain(CERTFILE)
2533 stats = server_params_test(context, context,
2534 chatty=True, connectionchatty=True)
2535 if support.verbose:
2536 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2537 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2538
2539 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2540 "ssl.OP_NO_COMPRESSION needed for this test")
2541 def test_compression_disabled(self):
2542 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2543 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01002544 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002545 stats = server_params_test(context, context,
2546 chatty=True, connectionchatty=True)
2547 self.assertIs(stats['compression'], None)
2548
Antoine Pitrou0e576f12011-12-22 10:03:38 +01002549 def test_dh_params(self):
2550 # Check we can get a connection with ephemeral Diffie-Hellman
2551 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2552 context.load_cert_chain(CERTFILE)
2553 context.load_dh_params(DHFILE)
2554 context.set_ciphers("kEDH")
2555 stats = server_params_test(context, context,
2556 chatty=True, connectionchatty=True)
2557 cipher = stats["cipher"][0]
2558 parts = cipher.split("-")
2559 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2560 self.fail("Non-DH cipher: " + cipher[0])
2561
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002562 def test_selected_npn_protocol(self):
2563 # selected_npn_protocol() is None unless NPN is used
2564 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2565 context.load_cert_chain(CERTFILE)
2566 stats = server_params_test(context, context,
2567 chatty=True, connectionchatty=True)
2568 self.assertIs(stats['client_npn_protocol'], None)
2569
2570 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2571 def test_npn_protocols(self):
2572 server_protocols = ['http/1.1', 'spdy/2']
2573 protocol_tests = [
2574 (['http/1.1', 'spdy/2'], 'http/1.1'),
2575 (['spdy/2', 'http/1.1'], 'http/1.1'),
2576 (['spdy/2', 'test'], 'spdy/2'),
2577 (['abc', 'def'], 'abc')
2578 ]
2579 for client_protocols, expected in protocol_tests:
2580 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2581 server_context.load_cert_chain(CERTFILE)
2582 server_context.set_npn_protocols(server_protocols)
2583 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2584 client_context.load_cert_chain(CERTFILE)
2585 client_context.set_npn_protocols(client_protocols)
2586 stats = server_params_test(client_context, server_context,
2587 chatty=True, connectionchatty=True)
2588
2589 msg = "failed trying %s (s) and %s (c).\n" \
2590 "was expecting %s, but got %%s from the %%s" \
2591 % (str(server_protocols), str(client_protocols),
2592 str(expected))
2593 client_result = stats['client_npn_protocol']
2594 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2595 server_result = stats['server_npn_protocols'][-1] \
2596 if len(stats['server_npn_protocols']) else 'nothing'
2597 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2598
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002599 def sni_contexts(self):
2600 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2601 server_context.load_cert_chain(SIGNED_CERTFILE)
2602 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2603 other_context.load_cert_chain(SIGNED_CERTFILE2)
2604 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2605 client_context.verify_mode = ssl.CERT_REQUIRED
2606 client_context.load_verify_locations(SIGNING_CA)
2607 return server_context, other_context, client_context
2608
2609 def check_common_name(self, stats, name):
2610 cert = stats['peercert']
2611 self.assertIn((('commonName', name),), cert['subject'])
2612
2613 @needs_sni
2614 def test_sni_callback(self):
2615 calls = []
2616 server_context, other_context, client_context = self.sni_contexts()
2617
2618 def servername_cb(ssl_sock, server_name, initial_context):
2619 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02002620 if server_name is not None:
2621 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002622 server_context.set_servername_callback(servername_cb)
2623
2624 stats = server_params_test(client_context, server_context,
2625 chatty=True,
2626 sni_name='supermessage')
2627 # The hostname was fetched properly, and the certificate was
2628 # changed for the connection.
2629 self.assertEqual(calls, [("supermessage", server_context)])
2630 # CERTFILE4 was selected
2631 self.check_common_name(stats, 'fakehostname')
2632
Antoine Pitrou50b24d02013-04-11 20:48:42 +02002633 calls = []
2634 # The callback is called with server_name=None
2635 stats = server_params_test(client_context, server_context,
2636 chatty=True,
2637 sni_name=None)
2638 self.assertEqual(calls, [(None, server_context)])
2639 self.check_common_name(stats, 'localhost')
2640
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002641 # Check disabling the callback
2642 calls = []
2643 server_context.set_servername_callback(None)
2644
2645 stats = server_params_test(client_context, server_context,
2646 chatty=True,
2647 sni_name='notfunny')
2648 # Certificate didn't change
2649 self.check_common_name(stats, 'localhost')
2650 self.assertEqual(calls, [])
2651
2652 @needs_sni
2653 def test_sni_callback_alert(self):
2654 # Returning a TLS alert is reflected to the connecting client
2655 server_context, other_context, client_context = self.sni_contexts()
2656
2657 def cb_returning_alert(ssl_sock, server_name, initial_context):
2658 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2659 server_context.set_servername_callback(cb_returning_alert)
2660
2661 with self.assertRaises(ssl.SSLError) as cm:
2662 stats = server_params_test(client_context, server_context,
2663 chatty=False,
2664 sni_name='supermessage')
2665 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2666
2667 @needs_sni
2668 def test_sni_callback_raising(self):
2669 # Raising fails the connection with a TLS handshake failure alert.
2670 server_context, other_context, client_context = self.sni_contexts()
2671
2672 def cb_raising(ssl_sock, server_name, initial_context):
2673 1/0
2674 server_context.set_servername_callback(cb_raising)
2675
2676 with self.assertRaises(ssl.SSLError) as cm, \
2677 support.captured_stderr() as stderr:
2678 stats = server_params_test(client_context, server_context,
2679 chatty=False,
2680 sni_name='supermessage')
2681 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2682 self.assertIn("ZeroDivisionError", stderr.getvalue())
2683
2684 @needs_sni
2685 def test_sni_callback_wrong_return_type(self):
2686 # Returning the wrong return type terminates the TLS connection
2687 # with an internal error alert.
2688 server_context, other_context, client_context = self.sni_contexts()
2689
2690 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2691 return "foo"
2692 server_context.set_servername_callback(cb_wrong_return_type)
2693
2694 with self.assertRaises(ssl.SSLError) as cm, \
2695 support.captured_stderr() as stderr:
2696 stats = server_params_test(client_context, server_context,
2697 chatty=False,
2698 sni_name='supermessage')
2699 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2700 self.assertIn("TypeError", stderr.getvalue())
2701
Antoine Pitrou60a26e02013-07-20 19:35:16 +02002702 def test_read_write_after_close_raises_valuerror(self):
2703 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2704 context.verify_mode = ssl.CERT_REQUIRED
2705 context.load_verify_locations(CERTFILE)
2706 context.load_cert_chain(CERTFILE)
2707 server = ThreadedEchoServer(context=context, chatty=False)
2708
2709 with server:
2710 s = context.wrap_socket(socket.socket())
2711 s.connect((HOST, server.port))
2712 s.close()
2713
2714 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02002715 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02002716
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002717
Thomas Woutersed03b412007-08-28 21:37:11 +00002718def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00002719 if support.verbose:
2720 plats = {
2721 'Linux': platform.linux_distribution,
2722 'Mac': platform.mac_ver,
2723 'Windows': platform.win32_ver,
2724 }
2725 for name, func in plats.items():
2726 plat = func()
2727 if plat and plat[0]:
2728 plat = '%s %r' % (name, plat)
2729 break
2730 else:
2731 plat = repr(platform.platform())
2732 print("test_ssl: testing with %r %r" %
2733 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
2734 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00002735 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01002736 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2737 try:
2738 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2739 except AttributeError:
2740 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00002741
Antoine Pitrou152efa22010-05-16 18:19:27 +00002742 for filename in [
2743 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2744 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002745 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00002746 BADCERT, BADKEY, EMPTYCERT]:
2747 if not os.path.exists(filename):
2748 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002749
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02002750 tests = [ContextTests, BasicSocketTests, SSLErrorTests]
Thomas Woutersed03b412007-08-28 21:37:11 +00002751
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002752 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00002753 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00002754
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002755 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002756 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01002757 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00002758 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00002759
Antoine Pitrou480a1242010-04-28 21:37:09 +00002760 try:
2761 support.run_unittest(*tests)
2762 finally:
2763 if _have_threads:
2764 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00002765
2766if __name__ == "__main__":
2767 test_main()