blob: 879d6fd788d92b2a00840a303638f4ab8dab47bd [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Antoine Pitrou242db722013-05-01 20:52:07 +020021from unittest import mock
Thomas Woutersed03b412007-08-28 21:37:11 +000022
Antoine Pitrou05d936d2010-10-13 11:38:36 +000023ssl = support.import_module("ssl")
24
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010025PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000026HOST = support.HOST
Antoine Pitrou152efa22010-05-16 18:19:27 +000027
Christian Heimesefff7062013-11-21 03:35:02 +010028def data_file(*name):
29 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000030
Antoine Pitrou81564092010-10-08 23:06:24 +000031# The custom key and certificate files used in test_ssl are generated
32# using Lib/test/make_ssl_certs.py.
33# Other certificates are simply fetched from the Internet servers they
34# are meant to authenticate.
35
Antoine Pitrou152efa22010-05-16 18:19:27 +000036CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000037BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038ONLYCERT = data_file("ssl_cert.pem")
39ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000040BYTES_ONLYCERT = os.fsencode(ONLYCERT)
41BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020042CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
43ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
44KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000045CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010047CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
48CAFILE_CACERT = data_file("capath", "5ed36f99.0")
49
Antoine Pitrou152efa22010-05-16 18:19:27 +000050
Christian Heimes22587792013-11-21 23:56:13 +010051# empty CRL
52CRLFILE = data_file("revocation.crl")
53
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010054# Two keys and certs signed by the same CA (for SNI tests)
55SIGNED_CERTFILE = data_file("keycert3.pem")
56SIGNED_CERTFILE2 = data_file("keycert4.pem")
57SIGNING_CA = data_file("pycacert.pem")
58
Antoine Pitrou152efa22010-05-16 18:19:27 +000059SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
60
61EMPTYCERT = data_file("nullcert.pem")
62BADCERT = data_file("badcert.pem")
63WRONGCERT = data_file("XXXnonexisting.pem")
64BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020065NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020066NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000067
Antoine Pitrou0e576f12011-12-22 10:03:38 +010068DHFILE = data_file("dh512.pem")
69BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000070
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010071
Thomas Woutersed03b412007-08-28 21:37:11 +000072def handle_error(prefix):
73 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000074 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000075 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000076
Antoine Pitroub5218772010-05-21 09:56:06 +000077def can_clear_options():
78 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020079 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000080
81def no_sslv2_implies_sslv3_hello():
82 # 0.9.7h or higher
83 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
84
Christian Heimes2427b502013-11-23 11:24:32 +010085def have_verify_flags():
86 # 0.9.8 or higher
87 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
88
Antoine Pitrouc695c952014-04-28 20:57:36 +020089def utc_offset(): #NOTE: ignore issues like #1647654
90 # local time = utc time + utc offset
91 if time.daylight and time.localtime().tm_isdst > 0:
92 return -time.altzone # seconds
93 return -time.timezone
94
Christian Heimes9424bb42013-06-17 15:32:57 +020095def asn1time(cert_time):
96 # Some versions of OpenSSL ignore seconds, see #18207
97 # 0.9.8.i
98 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
99 fmt = "%b %d %H:%M:%S %Y GMT"
100 dt = datetime.datetime.strptime(cert_time, fmt)
101 dt = dt.replace(second=0)
102 cert_time = dt.strftime(fmt)
103 # %d adds leading zero but ASN1_TIME_print() uses leading space
104 if cert_time[4] == "0":
105 cert_time = cert_time[:4] + " " + cert_time[5:]
106
107 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000108
Antoine Pitrou23df4832010-08-04 17:14:06 +0000109# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
110def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200111 if hasattr(ssl, 'PROTOCOL_SSLv2'):
112 @functools.wraps(func)
113 def f(*args, **kwargs):
114 try:
115 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
116 except ssl.SSLError:
117 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
118 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
119 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
120 return func(*args, **kwargs)
121 return f
122 else:
123 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000124
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100125needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
126
Antoine Pitrou23df4832010-08-04 17:14:06 +0000127
Antoine Pitrou152efa22010-05-16 18:19:27 +0000128class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000129
Antoine Pitrou480a1242010-04-28 21:37:09 +0000130 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000131 ssl.CERT_NONE
132 ssl.CERT_OPTIONAL
133 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100134 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100135 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100136 if ssl.HAS_ECDH:
137 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100138 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
139 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000140 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100141 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000142
Antoine Pitrou172f0252014-04-18 20:33:08 +0200143 def test_str_for_enums(self):
144 # Make sure that the PROTOCOL_* constants have enum-like string
145 # reprs.
Victor Stinner648b8622014-12-12 12:23:59 +0100146 proto = ssl.PROTOCOL_SSLv23
147 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv23')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200148 ctx = ssl.SSLContext(proto)
149 self.assertIs(ctx.protocol, proto)
150
Antoine Pitrou480a1242010-04-28 21:37:09 +0000151 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000152 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000153 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000154 sys.stdout.write("\n RAND_status is %d (%s)\n"
155 % (v, (v and "sufficient randomness") or
156 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200157
158 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
159 self.assertEqual(len(data), 16)
160 self.assertEqual(is_cryptographic, v == 1)
161 if v:
162 data = ssl.RAND_bytes(16)
163 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200164 else:
165 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200166
Victor Stinner1e81a392013-12-19 16:47:04 +0100167 # negative num is invalid
168 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
169 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
170
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100171 if hasattr(ssl, 'RAND_egd'):
172 self.assertRaises(TypeError, ssl.RAND_egd, 1)
173 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000174 ssl.RAND_add("this is a random string", 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000175
Christian Heimesf77b4b22013-08-21 13:26:05 +0200176 @unittest.skipUnless(os.name == 'posix', 'requires posix')
177 def test_random_fork(self):
178 status = ssl.RAND_status()
179 if not status:
180 self.fail("OpenSSL's PRNG has insufficient randomness")
181
182 rfd, wfd = os.pipe()
183 pid = os.fork()
184 if pid == 0:
185 try:
186 os.close(rfd)
187 child_random = ssl.RAND_pseudo_bytes(16)[0]
188 self.assertEqual(len(child_random), 16)
189 os.write(wfd, child_random)
190 os.close(wfd)
191 except BaseException:
192 os._exit(1)
193 else:
194 os._exit(0)
195 else:
196 os.close(wfd)
197 self.addCleanup(os.close, rfd)
198 _, status = os.waitpid(pid, 0)
199 self.assertEqual(status, 0)
200
201 child_random = os.read(rfd, 16)
202 self.assertEqual(len(child_random), 16)
203 parent_random = ssl.RAND_pseudo_bytes(16)[0]
204 self.assertEqual(len(parent_random), 16)
205
206 self.assertNotEqual(child_random, parent_random)
207
Antoine Pitrou480a1242010-04-28 21:37:09 +0000208 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000209 # note that this uses an 'unofficial' function in _ssl.c,
210 # provided solely for this test, to exercise the certificate
211 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000212 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000213 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000214 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200215 self.assertEqual(p['issuer'],
216 ((('countryName', 'XY'),),
217 (('localityName', 'Castle Anthrax'),),
218 (('organizationName', 'Python Software Foundation'),),
219 (('commonName', 'localhost'),))
220 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100221 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200222 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
223 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200224 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
225 self.assertEqual(p['subject'],
226 ((('countryName', 'XY'),),
227 (('localityName', 'Castle Anthrax'),),
228 (('organizationName', 'Python Software Foundation'),),
229 (('commonName', 'localhost'),))
230 )
231 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
232 # Issue #13034: the subjectAltName in some certificates
233 # (notably projects.developer.nokia.com:443) wasn't parsed
234 p = ssl._ssl._test_decode_cert(NOKIACERT)
235 if support.verbose:
236 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
237 self.assertEqual(p['subjectAltName'],
238 (('DNS', 'projects.developer.nokia.com'),
239 ('DNS', 'projects.forum.nokia.com'))
240 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100241 # extra OCSP and AIA fields
242 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
243 self.assertEqual(p['caIssuers'],
244 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
245 self.assertEqual(p['crlDistributionPoints'],
246 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000247
Christian Heimes824f7f32013-08-17 00:54:47 +0200248 def test_parse_cert_CVE_2013_4238(self):
249 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
250 if support.verbose:
251 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
252 subject = ((('countryName', 'US'),),
253 (('stateOrProvinceName', 'Oregon'),),
254 (('localityName', 'Beaverton'),),
255 (('organizationName', 'Python Software Foundation'),),
256 (('organizationalUnitName', 'Python Core Development'),),
257 (('commonName', 'null.python.org\x00example.org'),),
258 (('emailAddress', 'python-dev@python.org'),))
259 self.assertEqual(p['subject'], subject)
260 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200261 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
262 san = (('DNS', 'altnull.python.org\x00example.com'),
263 ('email', 'null@python.org\x00user@example.org'),
264 ('URI', 'http://null.python.org\x00http://example.org'),
265 ('IP Address', '192.0.2.1'),
266 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
267 else:
268 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
269 san = (('DNS', 'altnull.python.org\x00example.com'),
270 ('email', 'null@python.org\x00user@example.org'),
271 ('URI', 'http://null.python.org\x00http://example.org'),
272 ('IP Address', '192.0.2.1'),
273 ('IP Address', '<invalid>'))
274
275 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200276
Antoine Pitrou480a1242010-04-28 21:37:09 +0000277 def test_DER_to_PEM(self):
278 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
279 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000280 d1 = ssl.PEM_cert_to_DER_cert(pem)
281 p2 = ssl.DER_cert_to_PEM_cert(d1)
282 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000283 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000284 if not p2.startswith(ssl.PEM_HEADER + '\n'):
285 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
286 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
287 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000288
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000289 def test_openssl_version(self):
290 n = ssl.OPENSSL_VERSION_NUMBER
291 t = ssl.OPENSSL_VERSION_INFO
292 s = ssl.OPENSSL_VERSION
293 self.assertIsInstance(n, int)
294 self.assertIsInstance(t, tuple)
295 self.assertIsInstance(s, str)
296 # Some sanity checks follow
297 # >= 0.9
298 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400299 # < 3.0
300 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000301 major, minor, fix, patch, status = t
302 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400303 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000304 self.assertGreaterEqual(minor, 0)
305 self.assertLess(minor, 256)
306 self.assertGreaterEqual(fix, 0)
307 self.assertLess(fix, 256)
308 self.assertGreaterEqual(patch, 0)
309 self.assertLessEqual(patch, 26)
310 self.assertGreaterEqual(status, 0)
311 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400312 # Version string as returned by {Open,Libre}SSL, the format might change
313 if "LibreSSL" in s:
314 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
Victor Stinner789b8052015-01-06 11:51:06 +0100315 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400316 else:
317 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100318 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000319
Antoine Pitrou9d543662010-04-23 23:10:32 +0000320 @support.cpython_only
321 def test_refcycle(self):
322 # Issue #7943: an SSL object doesn't create reference cycles with
323 # itself.
324 s = socket.socket(socket.AF_INET)
325 ss = ssl.wrap_socket(s)
326 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100327 with support.check_warnings(("", ResourceWarning)):
328 del ss
329 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000330
Antoine Pitroua468adc2010-09-14 14:43:44 +0000331 def test_wrapped_unconnected(self):
332 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200333 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000334 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100335 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100336 self.assertRaises(OSError, ss.recv, 1)
337 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
338 self.assertRaises(OSError, ss.recvfrom, 1)
339 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
340 self.assertRaises(OSError, ss.send, b'x')
341 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000342
Antoine Pitrou40f08742010-04-24 22:04:40 +0000343 def test_timeout(self):
344 # Issue #8524: when creating an SSL socket, the timeout of the
345 # original socket should be retained.
346 for timeout in (None, 0.0, 5.0):
347 s = socket.socket(socket.AF_INET)
348 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100349 with ssl.wrap_socket(s) as ss:
350 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000351
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000352 def test_errors(self):
353 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000354 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000355 "certfile must be specified",
356 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000357 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000358 "certfile must be specified for server-side operations",
359 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000360 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000361 "certfile must be specified for server-side operations",
362 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100363 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
364 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
365 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200366 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000367 with socket.socket() as sock:
368 ssl.wrap_socket(sock, certfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000369 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200370 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000371 with socket.socket() as sock:
372 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000373 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200374 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000375 with socket.socket() as sock:
376 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000377 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000378
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000379 def test_match_hostname(self):
380 def ok(cert, hostname):
381 ssl.match_hostname(cert, hostname)
382 def fail(cert, hostname):
383 self.assertRaises(ssl.CertificateError,
384 ssl.match_hostname, cert, hostname)
385
386 cert = {'subject': ((('commonName', 'example.com'),),)}
387 ok(cert, 'example.com')
388 ok(cert, 'ExAmple.cOm')
389 fail(cert, 'www.example.com')
390 fail(cert, '.example.com')
391 fail(cert, 'example.org')
392 fail(cert, 'exampleXcom')
393
394 cert = {'subject': ((('commonName', '*.a.com'),),)}
395 ok(cert, 'foo.a.com')
396 fail(cert, 'bar.foo.a.com')
397 fail(cert, 'a.com')
398 fail(cert, 'Xa.com')
399 fail(cert, '.a.com')
400
Georg Brandl72c98d32013-10-27 07:16:53 +0100401 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000402 cert = {'subject': ((('commonName', 'f*.com'),),)}
403 ok(cert, 'foo.com')
404 ok(cert, 'f.com')
405 fail(cert, 'bar.com')
406 fail(cert, 'foo.a.com')
407 fail(cert, 'bar.foo.com')
408
Christian Heimes824f7f32013-08-17 00:54:47 +0200409 # NULL bytes are bad, CVE-2013-4073
410 cert = {'subject': ((('commonName',
411 'null.python.org\x00example.org'),),)}
412 ok(cert, 'null.python.org\x00example.org') # or raise an error?
413 fail(cert, 'example.org')
414 fail(cert, 'null.python.org')
415
Georg Brandl72c98d32013-10-27 07:16:53 +0100416 # error cases with wildcards
417 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
418 fail(cert, 'bar.foo.a.com')
419 fail(cert, 'a.com')
420 fail(cert, 'Xa.com')
421 fail(cert, '.a.com')
422
423 cert = {'subject': ((('commonName', 'a.*.com'),),)}
424 fail(cert, 'a.foo.com')
425 fail(cert, 'a..com')
426 fail(cert, 'a.com')
427
428 # wildcard doesn't match IDNA prefix 'xn--'
429 idna = 'püthon.python.org'.encode("idna").decode("ascii")
430 cert = {'subject': ((('commonName', idna),),)}
431 ok(cert, idna)
432 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
433 fail(cert, idna)
434 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
435 fail(cert, idna)
436
437 # wildcard in first fragment and IDNA A-labels in sequent fragments
438 # are supported.
439 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
440 cert = {'subject': ((('commonName', idna),),)}
441 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
442 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
443 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
444 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
445
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000446 # Slightly fake real-world example
447 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
448 'subject': ((('commonName', 'linuxfrz.org'),),),
449 'subjectAltName': (('DNS', 'linuxfr.org'),
450 ('DNS', 'linuxfr.com'),
451 ('othername', '<unsupported>'))}
452 ok(cert, 'linuxfr.org')
453 ok(cert, 'linuxfr.com')
454 # Not a "DNS" entry
455 fail(cert, '<unsupported>')
456 # When there is a subjectAltName, commonName isn't used
457 fail(cert, 'linuxfrz.org')
458
459 # A pristine real-world example
460 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
461 'subject': ((('countryName', 'US'),),
462 (('stateOrProvinceName', 'California'),),
463 (('localityName', 'Mountain View'),),
464 (('organizationName', 'Google Inc'),),
465 (('commonName', 'mail.google.com'),))}
466 ok(cert, 'mail.google.com')
467 fail(cert, 'gmail.com')
468 # Only commonName is considered
469 fail(cert, 'California')
470
471 # Neither commonName nor subjectAltName
472 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
473 'subject': ((('countryName', 'US'),),
474 (('stateOrProvinceName', 'California'),),
475 (('localityName', 'Mountain View'),),
476 (('organizationName', 'Google Inc'),))}
477 fail(cert, 'mail.google.com')
478
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200479 # No DNS entry in subjectAltName but a commonName
480 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
481 'subject': ((('countryName', 'US'),),
482 (('stateOrProvinceName', 'California'),),
483 (('localityName', 'Mountain View'),),
484 (('commonName', 'mail.google.com'),)),
485 'subjectAltName': (('othername', 'blabla'), )}
486 ok(cert, 'mail.google.com')
487
488 # No DNS entry subjectAltName and no commonName
489 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
490 'subject': ((('countryName', 'US'),),
491 (('stateOrProvinceName', 'California'),),
492 (('localityName', 'Mountain View'),),
493 (('organizationName', 'Google Inc'),)),
494 'subjectAltName': (('othername', 'blabla'),)}
495 fail(cert, 'google.com')
496
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000497 # Empty cert / no cert
498 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
499 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
500
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200501 # Issue #17980: avoid denials of service by refusing more than one
502 # wildcard per fragment.
503 cert = {'subject': ((('commonName', 'a*b.com'),),)}
504 ok(cert, 'axxb.com')
505 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100506 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200507 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
508 with self.assertRaises(ssl.CertificateError) as cm:
509 ssl.match_hostname(cert, 'axxbxxc.com')
510 self.assertIn("too many wildcards", str(cm.exception))
511
Antoine Pitroud5323212010-10-22 18:19:07 +0000512 def test_server_side(self):
513 # server_hostname doesn't work for server sockets
514 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000515 with socket.socket() as sock:
516 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
517 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000518
Antoine Pitroud6494802011-07-21 01:11:30 +0200519 def test_unknown_channel_binding(self):
520 # should raise ValueError for unknown type
521 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200522 s.bind(('127.0.0.1', 0))
523 s.listen()
524 c = socket.socket(socket.AF_INET)
525 c.connect(s.getsockname())
526 with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100527 with self.assertRaises(ValueError):
528 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200529 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200530
531 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
532 "'tls-unique' channel binding not available")
533 def test_tls_unique_channel_binding(self):
534 # unconnected should return None for known type
535 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100536 with ssl.wrap_socket(s) as ss:
537 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200538 # the same for server-side
539 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100540 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
541 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200542
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600543 def test_dealloc_warn(self):
544 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
545 r = repr(ss)
546 with self.assertWarns(ResourceWarning) as cm:
547 ss = None
548 support.gc_collect()
549 self.assertIn(r, str(cm.warning.args[0]))
550
Christian Heimes6d7ad132013-06-09 18:02:55 +0200551 def test_get_default_verify_paths(self):
552 paths = ssl.get_default_verify_paths()
553 self.assertEqual(len(paths), 6)
554 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
555
556 with support.EnvironmentVarGuard() as env:
557 env["SSL_CERT_DIR"] = CAPATH
558 env["SSL_CERT_FILE"] = CERTFILE
559 paths = ssl.get_default_verify_paths()
560 self.assertEqual(paths.cafile, CERTFILE)
561 self.assertEqual(paths.capath, CAPATH)
562
Christian Heimes44109d72013-11-22 01:51:30 +0100563 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
564 def test_enum_certificates(self):
565 self.assertTrue(ssl.enum_certificates("CA"))
566 self.assertTrue(ssl.enum_certificates("ROOT"))
567
568 self.assertRaises(TypeError, ssl.enum_certificates)
569 self.assertRaises(WindowsError, ssl.enum_certificates, "")
570
Christian Heimesc2d65e12013-11-22 16:13:55 +0100571 trust_oids = set()
572 for storename in ("CA", "ROOT"):
573 store = ssl.enum_certificates(storename)
574 self.assertIsInstance(store, list)
575 for element in store:
576 self.assertIsInstance(element, tuple)
577 self.assertEqual(len(element), 3)
578 cert, enc, trust = element
579 self.assertIsInstance(cert, bytes)
580 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
581 self.assertIsInstance(trust, (set, bool))
582 if isinstance(trust, set):
583 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100584
585 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100586 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200587
Christian Heimes46bebee2013-06-09 19:03:31 +0200588 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100589 def test_enum_crls(self):
590 self.assertTrue(ssl.enum_crls("CA"))
591 self.assertRaises(TypeError, ssl.enum_crls)
592 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200593
Christian Heimes44109d72013-11-22 01:51:30 +0100594 crls = ssl.enum_crls("CA")
595 self.assertIsInstance(crls, list)
596 for element in crls:
597 self.assertIsInstance(element, tuple)
598 self.assertEqual(len(element), 2)
599 self.assertIsInstance(element[0], bytes)
600 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200601
Christian Heimes46bebee2013-06-09 19:03:31 +0200602
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100603 def test_asn1object(self):
604 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
605 '1.3.6.1.5.5.7.3.1')
606
607 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
608 self.assertEqual(val, expected)
609 self.assertEqual(val.nid, 129)
610 self.assertEqual(val.shortname, 'serverAuth')
611 self.assertEqual(val.longname, 'TLS Web Server Authentication')
612 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
613 self.assertIsInstance(val, ssl._ASN1Object)
614 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
615
616 val = ssl._ASN1Object.fromnid(129)
617 self.assertEqual(val, expected)
618 self.assertIsInstance(val, ssl._ASN1Object)
619 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100620 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
621 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100622 for i in range(1000):
623 try:
624 obj = ssl._ASN1Object.fromnid(i)
625 except ValueError:
626 pass
627 else:
628 self.assertIsInstance(obj.nid, int)
629 self.assertIsInstance(obj.shortname, str)
630 self.assertIsInstance(obj.longname, str)
631 self.assertIsInstance(obj.oid, (str, type(None)))
632
633 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
634 self.assertEqual(val, expected)
635 self.assertIsInstance(val, ssl._ASN1Object)
636 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
637 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
638 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100639 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
640 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100641
Christian Heimes72d28502013-11-23 13:56:58 +0100642 def test_purpose_enum(self):
643 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
644 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
645 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
646 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
647 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
648 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
649 '1.3.6.1.5.5.7.3.1')
650
651 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
652 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
653 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
654 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
655 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
656 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
657 '1.3.6.1.5.5.7.3.2')
658
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100659 def test_unsupported_dtls(self):
660 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
661 self.addCleanup(s.close)
662 with self.assertRaises(NotImplementedError) as cx:
663 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
664 self.assertEqual(str(cx.exception), "only stream sockets are supported")
665 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
666 with self.assertRaises(NotImplementedError) as cx:
667 ctx.wrap_socket(s)
668 self.assertEqual(str(cx.exception), "only stream sockets are supported")
669
Antoine Pitrouc695c952014-04-28 20:57:36 +0200670 def cert_time_ok(self, timestring, timestamp):
671 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
672
673 def cert_time_fail(self, timestring):
674 with self.assertRaises(ValueError):
675 ssl.cert_time_to_seconds(timestring)
676
677 @unittest.skipUnless(utc_offset(),
678 'local time needs to be different from UTC')
679 def test_cert_time_to_seconds_timezone(self):
680 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
681 # results if local timezone is not UTC
682 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
683 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
684
685 def test_cert_time_to_seconds(self):
686 timestring = "Jan 5 09:34:43 2018 GMT"
687 ts = 1515144883.0
688 self.cert_time_ok(timestring, ts)
689 # accept keyword parameter, assert its name
690 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
691 # accept both %e and %d (space or zero generated by strftime)
692 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
693 # case-insensitive
694 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
695 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
696 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
697 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
698 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
699 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
700 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
701 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
702
703 newyear_ts = 1230768000.0
704 # leap seconds
705 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
706 # same timestamp
707 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
708
709 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
710 # allow 60th second (even if it is not a leap second)
711 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
712 # allow 2nd leap second for compatibility with time.strptime()
713 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
714 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
715
716 # no special treatement for the special value:
717 # 99991231235959Z (rfc 5280)
718 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
719
720 @support.run_with_locale('LC_ALL', '')
721 def test_cert_time_to_seconds_locale(self):
722 # `cert_time_to_seconds()` should be locale independent
723
724 def local_february_name():
725 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
726
727 if local_february_name().lower() == 'feb':
728 self.skipTest("locale-specific month name needs to be "
729 "different from C locale")
730
731 # locale-independent
732 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
733 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
734
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100735
Antoine Pitrou152efa22010-05-16 18:19:27 +0000736class ContextTests(unittest.TestCase):
737
Antoine Pitrou23df4832010-08-04 17:14:06 +0000738 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000739 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100740 for protocol in PROTOCOLS:
741 ssl.SSLContext(protocol)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000742 self.assertRaises(TypeError, ssl.SSLContext)
743 self.assertRaises(ValueError, ssl.SSLContext, -1)
744 self.assertRaises(ValueError, ssl.SSLContext, 42)
745
Antoine Pitrou23df4832010-08-04 17:14:06 +0000746 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000747 def test_protocol(self):
748 for proto in PROTOCOLS:
749 ctx = ssl.SSLContext(proto)
750 self.assertEqual(ctx.protocol, proto)
751
752 def test_ciphers(self):
753 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
754 ctx.set_ciphers("ALL")
755 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000756 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000757 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000758
Antoine Pitrou23df4832010-08-04 17:14:06 +0000759 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000760 def test_options(self):
761 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +0100762 # OP_ALL | OP_NO_SSLv2 is the default value
Antoine Pitroub5218772010-05-21 09:56:06 +0000763 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
764 ctx.options)
765 ctx.options |= ssl.OP_NO_SSLv3
766 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
767 ctx.options)
768 if can_clear_options():
769 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
770 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
771 ctx.options)
772 ctx.options = 0
773 self.assertEqual(0, ctx.options)
774 else:
775 with self.assertRaises(ValueError):
776 ctx.options = 0
777
Christian Heimes22587792013-11-21 23:56:13 +0100778 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000779 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
780 # Default value
781 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
782 ctx.verify_mode = ssl.CERT_OPTIONAL
783 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
784 ctx.verify_mode = ssl.CERT_REQUIRED
785 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
786 ctx.verify_mode = ssl.CERT_NONE
787 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
788 with self.assertRaises(TypeError):
789 ctx.verify_mode = None
790 with self.assertRaises(ValueError):
791 ctx.verify_mode = 42
792
Christian Heimes2427b502013-11-23 11:24:32 +0100793 @unittest.skipUnless(have_verify_flags(),
794 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100795 def test_verify_flags(self):
796 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
797 # default value by OpenSSL
798 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
799 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
800 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
801 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
802 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
803 ctx.verify_flags = ssl.VERIFY_DEFAULT
804 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
805 # supports any value
806 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
807 self.assertEqual(ctx.verify_flags,
808 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
809 with self.assertRaises(TypeError):
810 ctx.verify_flags = None
811
Antoine Pitrou152efa22010-05-16 18:19:27 +0000812 def test_load_cert_chain(self):
813 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
814 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500815 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000816 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
817 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200818 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000819 ctx.load_cert_chain(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000820 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000821 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000822 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000823 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000824 ctx.load_cert_chain(EMPTYCERT)
825 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000826 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000827 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
828 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
829 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000830 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000831 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000832 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000833 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000834 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000835 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
836 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000837 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000838 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Antoine Pitrou81564092010-10-08 23:06:24 +0000839 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200840 # Password protected key and cert
841 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
842 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
843 ctx.load_cert_chain(CERTFILE_PROTECTED,
844 password=bytearray(KEY_PASSWORD.encode()))
845 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
846 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
847 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
848 bytearray(KEY_PASSWORD.encode()))
849 with self.assertRaisesRegex(TypeError, "should be a string"):
850 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
851 with self.assertRaises(ssl.SSLError):
852 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
853 with self.assertRaisesRegex(ValueError, "cannot be longer"):
854 # openssl has a fixed limit on the password buffer.
855 # PEM_BUFSIZE is generally set to 1kb.
856 # Return a string larger than this.
857 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
858 # Password callback
859 def getpass_unicode():
860 return KEY_PASSWORD
861 def getpass_bytes():
862 return KEY_PASSWORD.encode()
863 def getpass_bytearray():
864 return bytearray(KEY_PASSWORD.encode())
865 def getpass_badpass():
866 return "badpass"
867 def getpass_huge():
868 return b'a' * (1024 * 1024)
869 def getpass_bad_type():
870 return 9
871 def getpass_exception():
872 raise Exception('getpass error')
873 class GetPassCallable:
874 def __call__(self):
875 return KEY_PASSWORD
876 def getpass(self):
877 return KEY_PASSWORD
878 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
879 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
880 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
881 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
882 ctx.load_cert_chain(CERTFILE_PROTECTED,
883 password=GetPassCallable().getpass)
884 with self.assertRaises(ssl.SSLError):
885 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
886 with self.assertRaisesRegex(ValueError, "cannot be longer"):
887 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
888 with self.assertRaisesRegex(TypeError, "must return a string"):
889 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
890 with self.assertRaisesRegex(Exception, "getpass error"):
891 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
892 # Make sure the password function isn't called if it isn't needed
893 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000894
895 def test_load_verify_locations(self):
896 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
897 ctx.load_verify_locations(CERTFILE)
898 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
899 ctx.load_verify_locations(BYTES_CERTFILE)
900 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
901 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100902 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200903 with self.assertRaises(OSError) as cm:
Antoine Pitrou152efa22010-05-16 18:19:27 +0000904 ctx.load_verify_locations(WRONGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000905 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000906 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000907 ctx.load_verify_locations(BADCERT)
908 ctx.load_verify_locations(CERTFILE, CAPATH)
909 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
910
Victor Stinner80f75e62011-01-29 11:31:20 +0000911 # Issue #10989: crash if the second argument type is invalid
912 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
913
Christian Heimesefff7062013-11-21 03:35:02 +0100914 def test_load_verify_cadata(self):
915 # test cadata
916 with open(CAFILE_CACERT) as f:
917 cacert_pem = f.read()
918 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
919 with open(CAFILE_NEURONIO) as f:
920 neuronio_pem = f.read()
921 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
922
923 # test PEM
924 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
925 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
926 ctx.load_verify_locations(cadata=cacert_pem)
927 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
928 ctx.load_verify_locations(cadata=neuronio_pem)
929 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
930 # cert already in hash table
931 ctx.load_verify_locations(cadata=neuronio_pem)
932 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
933
934 # combined
935 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
936 combined = "\n".join((cacert_pem, neuronio_pem))
937 ctx.load_verify_locations(cadata=combined)
938 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
939
940 # with junk around the certs
941 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
942 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
943 neuronio_pem, "tail"]
944 ctx.load_verify_locations(cadata="\n".join(combined))
945 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
946
947 # test DER
948 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
949 ctx.load_verify_locations(cadata=cacert_der)
950 ctx.load_verify_locations(cadata=neuronio_der)
951 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
952 # cert already in hash table
953 ctx.load_verify_locations(cadata=cacert_der)
954 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
955
956 # combined
957 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
958 combined = b"".join((cacert_der, neuronio_der))
959 ctx.load_verify_locations(cadata=combined)
960 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
961
962 # error cases
963 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
964 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
965
966 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
967 ctx.load_verify_locations(cadata="broken")
968 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
969 ctx.load_verify_locations(cadata=b"broken")
970
971
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100972 def test_load_dh_params(self):
973 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
974 ctx.load_dh_params(DHFILE)
975 if os.name != 'nt':
976 ctx.load_dh_params(BYTES_DHFILE)
977 self.assertRaises(TypeError, ctx.load_dh_params)
978 self.assertRaises(TypeError, ctx.load_dh_params, None)
979 with self.assertRaises(FileNotFoundError) as cm:
980 ctx.load_dh_params(WRONGCERT)
981 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +0200982 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100983 ctx.load_dh_params(CERTFILE)
984
Antoine Pitroueb585ad2010-10-22 18:24:20 +0000985 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +0000986 def test_session_stats(self):
987 for proto in PROTOCOLS:
988 ctx = ssl.SSLContext(proto)
989 self.assertEqual(ctx.session_stats(), {
990 'number': 0,
991 'connect': 0,
992 'connect_good': 0,
993 'connect_renegotiate': 0,
994 'accept': 0,
995 'accept_good': 0,
996 'accept_renegotiate': 0,
997 'hits': 0,
998 'misses': 0,
999 'timeouts': 0,
1000 'cache_full': 0,
1001 })
1002
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001003 def test_set_default_verify_paths(self):
1004 # There's not much we can do to test that it acts as expected,
1005 # so just check it doesn't crash or raise an exception.
1006 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1007 ctx.set_default_verify_paths()
1008
Antoine Pitrou501da612011-12-21 09:27:41 +01001009 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001010 def test_set_ecdh_curve(self):
1011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1012 ctx.set_ecdh_curve("prime256v1")
1013 ctx.set_ecdh_curve(b"prime256v1")
1014 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1015 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1016 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1017 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1018
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001019 @needs_sni
1020 def test_sni_callback(self):
1021 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1022
1023 # set_servername_callback expects a callable, or None
1024 self.assertRaises(TypeError, ctx.set_servername_callback)
1025 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1026 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1027 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1028
1029 def dummycallback(sock, servername, ctx):
1030 pass
1031 ctx.set_servername_callback(None)
1032 ctx.set_servername_callback(dummycallback)
1033
1034 @needs_sni
1035 def test_sni_callback_refcycle(self):
1036 # Reference cycles through the servername callback are detected
1037 # and cleared.
1038 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1039 def dummycallback(sock, servername, ctx, cycle=ctx):
1040 pass
1041 ctx.set_servername_callback(dummycallback)
1042 wr = weakref.ref(ctx)
1043 del ctx, dummycallback
1044 gc.collect()
1045 self.assertIs(wr(), None)
1046
Christian Heimes9a5395a2013-06-17 15:44:12 +02001047 def test_cert_store_stats(self):
1048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1049 self.assertEqual(ctx.cert_store_stats(),
1050 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1051 ctx.load_cert_chain(CERTFILE)
1052 self.assertEqual(ctx.cert_store_stats(),
1053 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1054 ctx.load_verify_locations(CERTFILE)
1055 self.assertEqual(ctx.cert_store_stats(),
1056 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1057 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1058 self.assertEqual(ctx.cert_store_stats(),
1059 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1060
1061 def test_get_ca_certs(self):
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1063 self.assertEqual(ctx.get_ca_certs(), [])
1064 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1065 ctx.load_verify_locations(CERTFILE)
1066 self.assertEqual(ctx.get_ca_certs(), [])
1067 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1068 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1069 self.assertEqual(ctx.get_ca_certs(),
1070 [{'issuer': ((('organizationName', 'Root CA'),),
1071 (('organizationalUnitName', 'http://www.cacert.org'),),
1072 (('commonName', 'CA Cert Signing Authority'),),
1073 (('emailAddress', 'support@cacert.org'),)),
1074 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1075 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1076 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001077 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001078 'subject': ((('organizationName', 'Root CA'),),
1079 (('organizationalUnitName', 'http://www.cacert.org'),),
1080 (('commonName', 'CA Cert Signing Authority'),),
1081 (('emailAddress', 'support@cacert.org'),)),
1082 'version': 3}])
1083
1084 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1085 pem = f.read()
1086 der = ssl.PEM_cert_to_DER_cert(pem)
1087 self.assertEqual(ctx.get_ca_certs(True), [der])
1088
Christian Heimes72d28502013-11-23 13:56:58 +01001089 def test_load_default_certs(self):
1090 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1091 ctx.load_default_certs()
1092
1093 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1094 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1095 ctx.load_default_certs()
1096
1097 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1098 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1099
1100 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1101 self.assertRaises(TypeError, ctx.load_default_certs, None)
1102 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1103
Benjamin Peterson91244e02014-10-03 18:17:15 -04001104 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001105 def test_load_default_certs_env(self):
1106 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1107 with support.EnvironmentVarGuard() as env:
1108 env["SSL_CERT_DIR"] = CAPATH
1109 env["SSL_CERT_FILE"] = CERTFILE
1110 ctx.load_default_certs()
1111 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1112
Benjamin Peterson91244e02014-10-03 18:17:15 -04001113 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1114 def test_load_default_certs_env_windows(self):
1115 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1116 ctx.load_default_certs()
1117 stats = ctx.cert_store_stats()
1118
1119 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1120 with support.EnvironmentVarGuard() as env:
1121 env["SSL_CERT_DIR"] = CAPATH
1122 env["SSL_CERT_FILE"] = CERTFILE
1123 ctx.load_default_certs()
1124 stats["x509"] += 1
1125 self.assertEqual(ctx.cert_store_stats(), stats)
1126
Christian Heimes4c05b472013-11-23 15:58:30 +01001127 def test_create_default_context(self):
1128 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001129 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001130 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001131 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001132 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001133 self.assertEqual(
1134 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1135 getattr(ssl, "OP_NO_COMPRESSION", 0),
1136 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001137
1138 with open(SIGNING_CA) as f:
1139 cadata = f.read()
1140 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1141 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001142 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001143 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1144 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001145 self.assertEqual(
1146 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1147 getattr(ssl, "OP_NO_COMPRESSION", 0),
1148 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001149
1150 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001151 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001152 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1153 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001154 self.assertEqual(
1155 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1156 getattr(ssl, "OP_NO_COMPRESSION", 0),
1157 )
1158 self.assertEqual(
1159 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1160 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1161 )
1162 self.assertEqual(
1163 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1164 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1165 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001166
Christian Heimes67986f92013-11-23 22:43:47 +01001167 def test__create_stdlib_context(self):
1168 ctx = ssl._create_stdlib_context()
1169 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1170 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001171 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001172 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1173
1174 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1175 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1176 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1177 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1178
1179 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001180 cert_reqs=ssl.CERT_REQUIRED,
1181 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001182 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1183 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001184 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001185 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1186
1187 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1188 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1189 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1190 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001191
Christian Heimes1aa9a752013-12-02 02:41:19 +01001192 def test_check_hostname(self):
1193 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1194 self.assertFalse(ctx.check_hostname)
1195
1196 # Requires CERT_REQUIRED or CERT_OPTIONAL
1197 with self.assertRaises(ValueError):
1198 ctx.check_hostname = True
1199 ctx.verify_mode = ssl.CERT_REQUIRED
1200 self.assertFalse(ctx.check_hostname)
1201 ctx.check_hostname = True
1202 self.assertTrue(ctx.check_hostname)
1203
1204 ctx.verify_mode = ssl.CERT_OPTIONAL
1205 ctx.check_hostname = True
1206 self.assertTrue(ctx.check_hostname)
1207
1208 # Cannot set CERT_NONE with check_hostname enabled
1209 with self.assertRaises(ValueError):
1210 ctx.verify_mode = ssl.CERT_NONE
1211 ctx.check_hostname = False
1212 self.assertFalse(ctx.check_hostname)
1213
Antoine Pitrou152efa22010-05-16 18:19:27 +00001214
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001215class SSLErrorTests(unittest.TestCase):
1216
1217 def test_str(self):
1218 # The str() of a SSLError doesn't include the errno
1219 e = ssl.SSLError(1, "foo")
1220 self.assertEqual(str(e), "foo")
1221 self.assertEqual(e.errno, 1)
1222 # Same for a subclass
1223 e = ssl.SSLZeroReturnError(1, "foo")
1224 self.assertEqual(str(e), "foo")
1225 self.assertEqual(e.errno, 1)
1226
1227 def test_lib_reason(self):
1228 # Test the library and reason attributes
1229 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1230 with self.assertRaises(ssl.SSLError) as cm:
1231 ctx.load_dh_params(CERTFILE)
1232 self.assertEqual(cm.exception.library, 'PEM')
1233 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1234 s = str(cm.exception)
1235 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1236
1237 def test_subclass(self):
1238 # Check that the appropriate SSLError subclass is raised
1239 # (this only tests one of them)
1240 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1241 with socket.socket() as s:
1242 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001243 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001244 c = socket.socket()
1245 c.connect(s.getsockname())
1246 c.setblocking(False)
1247 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001248 with self.assertRaises(ssl.SSLWantReadError) as cm:
1249 c.do_handshake()
1250 s = str(cm.exception)
1251 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1252 # For compatibility
1253 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1254
1255
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001256class MemoryBIOTests(unittest.TestCase):
1257
1258 def test_read_write(self):
1259 bio = ssl.MemoryBIO()
1260 bio.write(b'foo')
1261 self.assertEqual(bio.read(), b'foo')
1262 self.assertEqual(bio.read(), b'')
1263 bio.write(b'foo')
1264 bio.write(b'bar')
1265 self.assertEqual(bio.read(), b'foobar')
1266 self.assertEqual(bio.read(), b'')
1267 bio.write(b'baz')
1268 self.assertEqual(bio.read(2), b'ba')
1269 self.assertEqual(bio.read(1), b'z')
1270 self.assertEqual(bio.read(1), b'')
1271
1272 def test_eof(self):
1273 bio = ssl.MemoryBIO()
1274 self.assertFalse(bio.eof)
1275 self.assertEqual(bio.read(), b'')
1276 self.assertFalse(bio.eof)
1277 bio.write(b'foo')
1278 self.assertFalse(bio.eof)
1279 bio.write_eof()
1280 self.assertFalse(bio.eof)
1281 self.assertEqual(bio.read(2), b'fo')
1282 self.assertFalse(bio.eof)
1283 self.assertEqual(bio.read(1), b'o')
1284 self.assertTrue(bio.eof)
1285 self.assertEqual(bio.read(), b'')
1286 self.assertTrue(bio.eof)
1287
1288 def test_pending(self):
1289 bio = ssl.MemoryBIO()
1290 self.assertEqual(bio.pending, 0)
1291 bio.write(b'foo')
1292 self.assertEqual(bio.pending, 3)
1293 for i in range(3):
1294 bio.read(1)
1295 self.assertEqual(bio.pending, 3-i-1)
1296 for i in range(3):
1297 bio.write(b'x')
1298 self.assertEqual(bio.pending, i+1)
1299 bio.read()
1300 self.assertEqual(bio.pending, 0)
1301
1302 def test_buffer_types(self):
1303 bio = ssl.MemoryBIO()
1304 bio.write(b'foo')
1305 self.assertEqual(bio.read(), b'foo')
1306 bio.write(bytearray(b'bar'))
1307 self.assertEqual(bio.read(), b'bar')
1308 bio.write(memoryview(b'baz'))
1309 self.assertEqual(bio.read(), b'baz')
1310
1311 def test_error_types(self):
1312 bio = ssl.MemoryBIO()
1313 self.assertRaises(TypeError, bio.write, 'foo')
1314 self.assertRaises(TypeError, bio.write, None)
1315 self.assertRaises(TypeError, bio.write, True)
1316 self.assertRaises(TypeError, bio.write, 1)
1317
1318
Bill Janssen6e027db2007-11-15 22:23:56 +00001319class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001320
Antoine Pitrou480a1242010-04-28 21:37:09 +00001321 def test_connect(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001322 with support.transient_internet("svn.python.org"):
1323 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1324 cert_reqs=ssl.CERT_NONE)
1325 try:
1326 s.connect(("svn.python.org", 443))
1327 self.assertEqual({}, s.getpeercert())
1328 finally:
1329 s.close()
1330
1331 # this should fail because we have no verification certs
1332 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1333 cert_reqs=ssl.CERT_REQUIRED)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001334 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1335 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001336 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001337
Antoine Pitrou350c7222010-09-09 13:31:46 +00001338 # this should succeed because we specify the root cert
1339 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1340 cert_reqs=ssl.CERT_REQUIRED,
1341 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1342 try:
1343 s.connect(("svn.python.org", 443))
1344 self.assertTrue(s.getpeercert())
1345 finally:
1346 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001347
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001348 def test_connect_ex(self):
1349 # Issue #11326: check connect_ex() implementation
1350 with support.transient_internet("svn.python.org"):
1351 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1352 cert_reqs=ssl.CERT_REQUIRED,
1353 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1354 try:
1355 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1356 self.assertTrue(s.getpeercert())
1357 finally:
1358 s.close()
1359
1360 def test_non_blocking_connect_ex(self):
1361 # Issue #11326: non-blocking connect_ex() should allow handshake
1362 # to proceed after the socket gets ready.
1363 with support.transient_internet("svn.python.org"):
1364 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1365 cert_reqs=ssl.CERT_REQUIRED,
1366 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1367 do_handshake_on_connect=False)
1368 try:
1369 s.setblocking(False)
1370 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8a14a0c2011-02-27 15:44:12 +00001371 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1372 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001373 # Wait for connect to finish
1374 select.select([], [s], [], 5.0)
1375 # Non-blocking handshake
1376 while True:
1377 try:
1378 s.do_handshake()
1379 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001380 except ssl.SSLWantReadError:
1381 select.select([s], [], [], 5.0)
1382 except ssl.SSLWantWriteError:
1383 select.select([], [s], [], 5.0)
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001384 # SSL established
1385 self.assertTrue(s.getpeercert())
1386 finally:
1387 s.close()
1388
Antoine Pitroub4410db2011-05-18 18:51:06 +02001389 def test_timeout_connect_ex(self):
1390 # Issue #12065: on a timeout, connect_ex() should return the original
1391 # errno (mimicking the behaviour of non-SSL sockets).
1392 with support.transient_internet("svn.python.org"):
1393 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1394 cert_reqs=ssl.CERT_REQUIRED,
1395 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1396 do_handshake_on_connect=False)
1397 try:
1398 s.settimeout(0.0000001)
1399 rc = s.connect_ex(('svn.python.org', 443))
1400 if rc == 0:
1401 self.skipTest("svn.python.org responded too quickly")
1402 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1403 finally:
1404 s.close()
1405
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001406 def test_connect_ex_error(self):
Antoine Pitrouddb87ab2012-12-28 19:07:43 +01001407 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001408 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1409 cert_reqs=ssl.CERT_REQUIRED,
1410 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1411 try:
Christian Heimesde570742013-12-16 21:15:44 +01001412 rc = s.connect_ex(("svn.python.org", 444))
1413 # Issue #19919: Windows machines or VMs hosted on Windows
1414 # machines sometimes return EWOULDBLOCK.
1415 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001416 finally:
1417 s.close()
1418
Antoine Pitrou152efa22010-05-16 18:19:27 +00001419 def test_connect_with_context(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001420 with support.transient_internet("svn.python.org"):
1421 # Same as test_connect, but with a separately created context
1422 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1423 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1424 s.connect(("svn.python.org", 443))
1425 try:
1426 self.assertEqual({}, s.getpeercert())
1427 finally:
1428 s.close()
Antoine Pitroud5323212010-10-22 18:19:07 +00001429 # Same with a server hostname
1430 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1431 server_hostname="svn.python.org")
Benjamin Peterson7243b572014-11-23 17:04:34 -06001432 s.connect(("svn.python.org", 443))
1433 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001434 # This should fail because we have no verification certs
1435 ctx.verify_mode = ssl.CERT_REQUIRED
1436 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Ezio Melottied3a7d22010-12-01 02:32:32 +00001437 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
Antoine Pitrou350c7222010-09-09 13:31:46 +00001438 s.connect, ("svn.python.org", 443))
Antoine Pitrou152efa22010-05-16 18:19:27 +00001439 s.close()
Antoine Pitrou350c7222010-09-09 13:31:46 +00001440 # This should succeed because we specify the root cert
1441 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1442 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1443 s.connect(("svn.python.org", 443))
1444 try:
1445 cert = s.getpeercert()
1446 self.assertTrue(cert)
1447 finally:
1448 s.close()
Antoine Pitrou152efa22010-05-16 18:19:27 +00001449
1450 def test_connect_capath(self):
1451 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001452 # NOTE: the subject hashing algorithm has been changed between
1453 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1454 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001455 # filename) for this test to be portable across OpenSSL releases.
Antoine Pitrou350c7222010-09-09 13:31:46 +00001456 with support.transient_internet("svn.python.org"):
1457 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1458 ctx.verify_mode = ssl.CERT_REQUIRED
1459 ctx.load_verify_locations(capath=CAPATH)
1460 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1461 s.connect(("svn.python.org", 443))
1462 try:
1463 cert = s.getpeercert()
1464 self.assertTrue(cert)
1465 finally:
1466 s.close()
1467 # Same with a bytes `capath` argument
1468 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1469 ctx.verify_mode = ssl.CERT_REQUIRED
1470 ctx.load_verify_locations(capath=BYTES_CAPATH)
1471 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1472 s.connect(("svn.python.org", 443))
1473 try:
1474 cert = s.getpeercert()
1475 self.assertTrue(cert)
1476 finally:
1477 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001478
Christian Heimesefff7062013-11-21 03:35:02 +01001479 def test_connect_cadata(self):
1480 with open(CAFILE_CACERT) as f:
1481 pem = f.read()
1482 der = ssl.PEM_cert_to_DER_cert(pem)
1483 with support.transient_internet("svn.python.org"):
1484 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1485 ctx.verify_mode = ssl.CERT_REQUIRED
1486 ctx.load_verify_locations(cadata=pem)
1487 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1488 s.connect(("svn.python.org", 443))
1489 cert = s.getpeercert()
1490 self.assertTrue(cert)
1491
1492 # same with DER
1493 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1494 ctx.verify_mode = ssl.CERT_REQUIRED
1495 ctx.load_verify_locations(cadata=der)
1496 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1497 s.connect(("svn.python.org", 443))
1498 cert = s.getpeercert()
1499 self.assertTrue(cert)
1500
Antoine Pitroue3220242010-04-24 11:13:53 +00001501 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1502 def test_makefile_close(self):
1503 # Issue #5238: creating a file-like object with makefile() shouldn't
1504 # delay closing the underlying "real socket" (here tested with its
1505 # file descriptor, hence skipping the test under Windows).
Antoine Pitrou350c7222010-09-09 13:31:46 +00001506 with support.transient_internet("svn.python.org"):
1507 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1508 ss.connect(("svn.python.org", 443))
1509 fd = ss.fileno()
1510 f = ss.makefile()
1511 f.close()
1512 # The fd is still open
Antoine Pitroue3220242010-04-24 11:13:53 +00001513 os.read(fd, 0)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001514 # Closing the SSL socket should close the fd too
1515 ss.close()
1516 gc.collect()
1517 with self.assertRaises(OSError) as e:
1518 os.read(fd, 0)
1519 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001520
Antoine Pitrou480a1242010-04-28 21:37:09 +00001521 def test_non_blocking_handshake(self):
Antoine Pitrou350c7222010-09-09 13:31:46 +00001522 with support.transient_internet("svn.python.org"):
1523 s = socket.socket(socket.AF_INET)
1524 s.connect(("svn.python.org", 443))
1525 s.setblocking(False)
1526 s = ssl.wrap_socket(s,
1527 cert_reqs=ssl.CERT_NONE,
1528 do_handshake_on_connect=False)
1529 count = 0
1530 while True:
1531 try:
1532 count += 1
1533 s.do_handshake()
1534 break
Antoine Pitrou41032a62011-10-27 23:56:55 +02001535 except ssl.SSLWantReadError:
1536 select.select([s], [], [])
1537 except ssl.SSLWantWriteError:
1538 select.select([], [s], [])
Antoine Pitrou350c7222010-09-09 13:31:46 +00001539 s.close()
1540 if support.verbose:
1541 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001542
Antoine Pitrou480a1242010-04-28 21:37:09 +00001543 def test_get_server_certificate(self):
Antoine Pitrou15399c32011-04-28 19:23:55 +02001544 def _test_get_server_certificate(host, port, cert=None):
1545 with support.transient_internet(host):
Antoine Pitrou94a5b662014-04-16 18:56:28 +02001546 pem = ssl.get_server_certificate((host, port))
Antoine Pitrou15399c32011-04-28 19:23:55 +02001547 if not pem:
1548 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001549
Antoine Pitrou15399c32011-04-28 19:23:55 +02001550 try:
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001551 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001552 ca_certs=CERTFILE)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001553 except ssl.SSLError as x:
1554 #should fail
1555 if support.verbose:
1556 sys.stdout.write("%s\n" % x)
1557 else:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001558 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1559
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001560 pem = ssl.get_server_certificate((host, port),
Benjamin Peterson10b93cc2014-03-12 18:10:57 -05001561 ca_certs=cert)
Antoine Pitrou15399c32011-04-28 19:23:55 +02001562 if not pem:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001563 self.fail("No server certificate on %s:%s!" % (host, port))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001564 if support.verbose:
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001565 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
Antoine Pitrou350c7222010-09-09 13:31:46 +00001566
Antoine Pitrou15399c32011-04-28 19:23:55 +02001567 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1568 if support.IPV6_ENABLED:
1569 _test_get_server_certificate('ipv6.google.com', 443)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001570
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001571 def test_ciphers(self):
1572 remote = ("svn.python.org", 443)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001573 with support.transient_internet(remote[0]):
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001574 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1575 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1576 s.connect(remote)
1577 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1578 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1579 s.connect(remote)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001580 # Error checking can happen at instantiation or when connecting
Ezio Melottied3a7d22010-12-01 02:32:32 +00001581 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitroud2eca372010-10-29 23:41:37 +00001582 with socket.socket(socket.AF_INET) as sock:
1583 s = ssl.wrap_socket(sock,
1584 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1585 s.connect(remote)
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001586
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001587 def test_algorithms(self):
1588 # Issue #8484: all algorithms should be available when verifying a
1589 # certificate.
Antoine Pitrou29619b22010-04-22 18:43:31 +00001590 # SHA256 was added in OpenSSL 0.9.8
1591 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1592 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001593 # sha256.tbs-internet.com needs SNI to use the correct certificate
1594 if not ssl.HAS_SNI:
1595 self.skipTest("SNI needed for this test")
Victor Stinnerf332abb2011-01-08 03:16:05 +00001596 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1597 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001598 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Antoine Pitrou160fd932011-01-08 10:23:29 +00001599 with support.transient_internet("sha256.tbs-internet.com"):
Antoine Pitrou16f6f832012-05-04 16:26:02 +02001600 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1601 ctx.verify_mode = ssl.CERT_REQUIRED
1602 ctx.load_verify_locations(sha256_cert)
1603 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1604 server_hostname="sha256.tbs-internet.com")
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001605 try:
1606 s.connect(remote)
1607 if support.verbose:
1608 sys.stdout.write("\nCipher with %r is %r\n" %
1609 (remote, s.cipher()))
1610 sys.stdout.write("Certificate is:\n%s\n" %
1611 pprint.pformat(s.getpeercert()))
1612 finally:
1613 s.close()
1614
Christian Heimes9a5395a2013-06-17 15:44:12 +02001615 def test_get_ca_certs_capath(self):
1616 # capath certs are loaded on request
1617 with support.transient_internet("svn.python.org"):
1618 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1619 ctx.verify_mode = ssl.CERT_REQUIRED
1620 ctx.load_verify_locations(capath=CAPATH)
1621 self.assertEqual(ctx.get_ca_certs(), [])
1622 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1623 s.connect(("svn.python.org", 443))
1624 try:
1625 cert = s.getpeercert()
1626 self.assertTrue(cert)
1627 finally:
1628 s.close()
1629 self.assertEqual(len(ctx.get_ca_certs()), 1)
1630
Christian Heimes575596e2013-12-15 21:49:17 +01001631 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001632 def test_context_setget(self):
1633 # Check that the context of a connected socket can be replaced.
1634 with support.transient_internet("svn.python.org"):
1635 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1636 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1637 s = socket.socket(socket.AF_INET)
1638 with ctx1.wrap_socket(s) as ss:
1639 ss.connect(("svn.python.org", 443))
1640 self.assertIs(ss.context, ctx1)
1641 self.assertIs(ss._sslobj.context, ctx1)
1642 ss.context = ctx2
1643 self.assertIs(ss.context, ctx2)
1644 self.assertIs(ss._sslobj.context, ctx2)
1645
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001646
1647class NetworkedBIOTests(unittest.TestCase):
1648
1649 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1650 # A simple IO loop. Call func(*args) depending on the error we get
1651 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1652 timeout = kwargs.get('timeout', 10)
1653 count = 0
1654 while True:
1655 errno = None
1656 count += 1
1657 try:
1658 ret = func(*args)
1659 except ssl.SSLError as e:
1660 # Note that we get a spurious -1/SSL_ERROR_SYSCALL for
1661 # non-blocking IO. The SSL_shutdown manpage hints at this.
1662 # It *should* be safe to just ignore SYS_ERROR_SYSCALL because
1663 # with a Memory BIO there's no syscalls (for IO at least).
1664 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
1665 ssl.SSL_ERROR_WANT_WRITE,
1666 ssl.SSL_ERROR_SYSCALL):
1667 raise
1668 errno = e.errno
1669 # Get any data from the outgoing BIO irrespective of any error, and
1670 # send it to the socket.
1671 buf = outgoing.read()
1672 sock.sendall(buf)
1673 # If there's no error, we're done. For WANT_READ, we need to get
1674 # data from the socket and put it in the incoming BIO.
1675 if errno is None:
1676 break
1677 elif errno == ssl.SSL_ERROR_WANT_READ:
1678 buf = sock.recv(32768)
1679 if buf:
1680 incoming.write(buf)
1681 else:
1682 incoming.write_eof()
1683 if support.verbose:
1684 sys.stdout.write("Needed %d calls to complete %s().\n"
1685 % (count, func.__name__))
1686 return ret
1687
1688 def test_handshake(self):
1689 with support.transient_internet("svn.python.org"):
1690 sock = socket.socket(socket.AF_INET)
1691 sock.connect(("svn.python.org", 443))
1692 incoming = ssl.MemoryBIO()
1693 outgoing = ssl.MemoryBIO()
1694 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1695 ctx.verify_mode = ssl.CERT_REQUIRED
1696 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
Benjamin Petersonf9284ae2014-11-23 17:06:39 -06001697 ctx.check_hostname = True
1698 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'svn.python.org')
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001699 self.assertIs(sslobj._sslobj.owner, sslobj)
1700 self.assertIsNone(sslobj.cipher())
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001701 self.assertIsNone(sslobj.shared_ciphers())
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001702 self.assertRaises(ValueError, sslobj.getpeercert)
1703 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1704 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1705 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1706 self.assertTrue(sslobj.cipher())
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001707 self.assertIsNone(sslobj.shared_ciphers())
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001708 self.assertTrue(sslobj.getpeercert())
1709 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1710 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1711 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1712 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1713 sock.close()
1714
1715 def test_read_write_data(self):
1716 with support.transient_internet("svn.python.org"):
1717 sock = socket.socket(socket.AF_INET)
1718 sock.connect(("svn.python.org", 443))
1719 incoming = ssl.MemoryBIO()
1720 outgoing = ssl.MemoryBIO()
1721 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1722 ctx.verify_mode = ssl.CERT_NONE
1723 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1724 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1725 req = b'GET / HTTP/1.0\r\n\r\n'
1726 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1727 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1728 self.assertEqual(buf[:5], b'HTTP/')
1729 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
1730 sock.close()
1731
1732
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001733try:
1734 import threading
1735except ImportError:
1736 _have_threads = False
1737else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001738 _have_threads = True
1739
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001740 from test.ssl_servers import make_https_server
1741
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001742 class ThreadedEchoServer(threading.Thread):
1743
1744 class ConnectionHandler(threading.Thread):
1745
1746 """A mildly complicated class, because we want it to work both
1747 with and without the SSL wrapper around the socket connection, so
1748 that we can test the STARTTLS functionality."""
1749
Bill Janssen6e027db2007-11-15 22:23:56 +00001750 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001751 self.server = server
1752 self.running = False
1753 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001754 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001755 self.sock.setblocking(1)
1756 self.sslconn = None
1757 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001758 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001759
Antoine Pitrou480a1242010-04-28 21:37:09 +00001760 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001761 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001762 self.sslconn = self.server.context.wrap_socket(
1763 self.sock, server_side=True)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001764 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1765 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001766 except (ssl.SSLError, ConnectionResetError) as e:
1767 # We treat ConnectionResetError as though it were an
1768 # SSLError - OpenSSL on Ubuntu abruptly closes the
1769 # connection when asked to use an unsupported protocol.
1770 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001771 # XXX Various errors can have happened here, for example
1772 # a mismatching protocol version, an invalid certificate,
1773 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001774 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001775 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001776 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001777 self.running = False
1778 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001779 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001780 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001781 else:
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001782 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
Antoine Pitroub5218772010-05-21 09:56:06 +00001783 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001784 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001785 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001786 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1787 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001788 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001789 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1790 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001791 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001792 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001793 sys.stdout.write(" server: selected protocol is now "
1794 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001795 return True
1796
1797 def read(self):
1798 if self.sslconn:
1799 return self.sslconn.read()
1800 else:
1801 return self.sock.recv(1024)
1802
1803 def write(self, bytes):
1804 if self.sslconn:
1805 return self.sslconn.write(bytes)
1806 else:
1807 return self.sock.send(bytes)
1808
1809 def close(self):
1810 if self.sslconn:
1811 self.sslconn.close()
1812 else:
1813 self.sock.close()
1814
Antoine Pitrou480a1242010-04-28 21:37:09 +00001815 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001816 self.running = True
1817 if not self.server.starttls_server:
1818 if not self.wrap_conn():
1819 return
1820 while self.running:
1821 try:
1822 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001823 stripped = msg.strip()
1824 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001825 # eof, so quit this handler
1826 self.running = False
1827 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001828 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001829 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001830 sys.stdout.write(" server: client closed connection\n")
1831 self.close()
1832 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001833 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001834 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001835 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001836 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001837 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001838 if not self.wrap_conn():
1839 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001840 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001841 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001842 if support.verbose and self.server.connectionchatty:
1843 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001844 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001845 self.sock = self.sslconn.unwrap()
1846 self.sslconn = None
1847 if support.verbose and self.server.connectionchatty:
1848 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001849 elif stripped == b'CB tls-unique':
1850 if support.verbose and self.server.connectionchatty:
1851 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1852 data = self.sslconn.get_channel_binding("tls-unique")
1853 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001854 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001855 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001856 self.server.connectionchatty):
1857 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001858 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1859 % (msg, ctype, msg.lower(), ctype))
1860 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001861 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001862 if self.server.chatty:
1863 handle_error("Test server failure:\n")
1864 self.close()
1865 self.running = False
1866 # normally, we'd just stop here, but for the test
1867 # harness, we want to stop the server
1868 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001869
Antoine Pitroub5218772010-05-21 09:56:06 +00001870 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001871 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001872 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersoncca27322015-01-23 16:35:37 -05001873 npn_protocols=None, alpn_protocols=None,
1874 ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001875 if context:
1876 self.context = context
1877 else:
1878 self.context = ssl.SSLContext(ssl_version
1879 if ssl_version is not None
1880 else ssl.PROTOCOL_TLSv1)
1881 self.context.verify_mode = (certreqs if certreqs is not None
1882 else ssl.CERT_NONE)
1883 if cacerts:
1884 self.context.load_verify_locations(cacerts)
1885 if certificate:
1886 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001887 if npn_protocols:
1888 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001889 if alpn_protocols:
1890 self.context.set_alpn_protocols(alpn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001891 if ciphers:
1892 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001893 self.chatty = chatty
1894 self.connectionchatty = connectionchatty
1895 self.starttls_server = starttls_server
1896 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001897 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001898 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001899 self.active = False
Benjamin Petersoncca27322015-01-23 16:35:37 -05001900 self.selected_npn_protocols = []
1901 self.selected_alpn_protocols = []
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001902 self.shared_ciphers = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001903 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001904 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001905 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001906
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001907 def __enter__(self):
1908 self.start(threading.Event())
1909 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001910 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001911
1912 def __exit__(self, *args):
1913 self.stop()
1914 self.join()
1915
Antoine Pitrou480a1242010-04-28 21:37:09 +00001916 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001917 self.flag = flag
1918 threading.Thread.start(self)
1919
Antoine Pitrou480a1242010-04-28 21:37:09 +00001920 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001921 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01001922 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001923 self.active = True
1924 if self.flag:
1925 # signal an event
1926 self.flag.set()
1927 while self.active:
1928 try:
1929 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001930 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001931 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001932 + repr(connaddr) + '\n')
1933 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001934 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001935 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001936 except socket.timeout:
1937 pass
1938 except KeyboardInterrupt:
1939 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001940 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001941
Antoine Pitrou480a1242010-04-28 21:37:09 +00001942 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001943 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001944
Bill Janssen54cc54c2007-12-14 22:08:56 +00001945 class AsyncoreEchoServer(threading.Thread):
1946
1947 # this one's based on asyncore.dispatcher
1948
1949 class EchoServer (asyncore.dispatcher):
1950
1951 class ConnectionHandler (asyncore.dispatcher_with_send):
1952
1953 def __init__(self, conn, certfile):
1954 self.socket = ssl.wrap_socket(conn, server_side=True,
1955 certfile=certfile,
1956 do_handshake_on_connect=False)
1957 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001958 self._ssl_accepting = True
1959 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001960
1961 def readable(self):
1962 if isinstance(self.socket, ssl.SSLSocket):
1963 while self.socket.pending() > 0:
1964 self.handle_read_event()
1965 return True
1966
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001967 def _do_ssl_handshake(self):
1968 try:
1969 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02001970 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1971 return
1972 except ssl.SSLEOFError:
1973 return self.handle_close()
1974 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001975 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02001976 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001977 if err.args[0] == errno.ECONNABORTED:
1978 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00001979 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00001980 self._ssl_accepting = False
1981
1982 def handle_read(self):
1983 if self._ssl_accepting:
1984 self._do_ssl_handshake()
1985 else:
1986 data = self.recv(1024)
1987 if support.verbose:
1988 sys.stdout.write(" server: read %s from client\n" % repr(data))
1989 if not data:
1990 self.close()
1991 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00001992 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00001993
1994 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00001995 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001996 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00001997 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1998
1999 def handle_error(self):
2000 raise
2001
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002002 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002003 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002004 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2005 self.port = support.bind_port(sock, '')
2006 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002007 self.listen(5)
2008
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00002009 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002010 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002011 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2012 self.ConnectionHandler(sock_obj, self.certfile)
2013
2014 def handle_error(self):
2015 raise
2016
Trent Nelson78520002008-04-10 20:54:35 +00002017 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002018 self.flag = None
2019 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002020 self.server = self.EchoServer(certfile)
2021 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00002022 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002023 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002024
2025 def __str__(self):
2026 return "<%s %s>" % (self.__class__.__name__, self.server)
2027
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002028 def __enter__(self):
2029 self.start(threading.Event())
2030 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002031 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002032
2033 def __exit__(self, *args):
2034 if support.verbose:
2035 sys.stdout.write(" cleanup: stopping server.\n")
2036 self.stop()
2037 if support.verbose:
2038 sys.stdout.write(" cleanup: joining server thread.\n")
2039 self.join()
2040 if support.verbose:
2041 sys.stdout.write(" cleanup: successfully joined.\n")
2042
Bill Janssen54cc54c2007-12-14 22:08:56 +00002043 def start (self, flag=None):
2044 self.flag = flag
2045 threading.Thread.start(self)
2046
Antoine Pitrou480a1242010-04-28 21:37:09 +00002047 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002048 self.active = True
2049 if self.flag:
2050 self.flag.set()
2051 while self.active:
2052 try:
2053 asyncore.loop(1)
2054 except:
2055 pass
2056
Antoine Pitrou480a1242010-04-28 21:37:09 +00002057 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002058 self.active = False
2059 self.server.close()
2060
Antoine Pitrou480a1242010-04-28 21:37:09 +00002061 def bad_cert_test(certfile):
2062 """
2063 Launch a server with CERT_REQUIRED, and check that trying to
2064 connect to it with the given client certificate fails.
2065 """
Trent Nelson78520002008-04-10 20:54:35 +00002066 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002067 certreqs=ssl.CERT_REQUIRED,
Bill Janssen6e027db2007-11-15 22:23:56 +00002068 cacerts=CERTFILE, chatty=False,
2069 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002070 with server:
Thomas Woutersed03b412007-08-28 21:37:11 +00002071 try:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002072 with socket.socket() as sock:
2073 s = ssl.wrap_socket(sock,
2074 certfile=certfile,
2075 ssl_version=ssl.PROTOCOL_TLSv1)
2076 s.connect((HOST, server.port))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002077 except ssl.SSLError as x:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002078 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002079 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
Andrew Svetlov0832af62012-12-18 23:10:48 +02002080 except OSError as x:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002081 if support.verbose:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002082 sys.stdout.write("\nOSError is %s\n" % x.args[1])
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002083 except OSError as x:
Giampaolo Rodolàcd9dfb92010-08-29 20:56:56 +00002084 if x.errno != errno.ENOENT:
2085 raise
Giampaolo Rodolà745ab382010-08-29 19:25:49 +00002086 if support.verbose:
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002087 sys.stdout.write("\OSError is %s\n" % str(x))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002088 else:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002089 raise AssertionError("Use of invalid cert should have failed!")
Thomas Woutersed03b412007-08-28 21:37:11 +00002090
Antoine Pitroub5218772010-05-21 09:56:06 +00002091 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002092 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00002093 """
2094 Launch a server, connect a client to it and try various reads
2095 and writes.
2096 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002097 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00002098 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002099 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00002100 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002101 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002102 with client_context.wrap_socket(socket.socket(),
2103 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002104 s.connect((HOST, server.port))
2105 for arg in [indata, bytearray(indata), memoryview(indata)]:
2106 if connectionchatty:
2107 if support.verbose:
2108 sys.stdout.write(
2109 " client: sending %r...\n" % indata)
2110 s.write(arg)
2111 outdata = s.read()
2112 if connectionchatty:
2113 if support.verbose:
2114 sys.stdout.write(" client: read %r\n" % outdata)
2115 if outdata != indata.lower():
2116 raise AssertionError(
2117 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2118 % (outdata[:20], len(outdata),
2119 indata[:20].lower(), len(indata)))
2120 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00002121 if connectionchatty:
2122 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002123 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002124 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01002125 'compression': s.compression(),
2126 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002127 'peercert': s.getpeercert(),
Benjamin Petersoncca27322015-01-23 16:35:37 -05002128 'client_alpn_protocol': s.selected_alpn_protocol(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02002129 'client_npn_protocol': s.selected_npn_protocol(),
2130 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002131 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01002132 s.close()
Benjamin Petersoncca27322015-01-23 16:35:37 -05002133 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2134 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Peterson4cb17812015-01-07 11:14:26 -06002135 stats['server_shared_ciphers'] = server.shared_ciphers
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002136 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00002137
Antoine Pitroub5218772010-05-21 09:56:06 +00002138 def try_protocol_combo(server_protocol, client_protocol, expect_success,
2139 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02002140 """
2141 Try to SSL-connect using *client_protocol* to *server_protocol*.
2142 If *expect_success* is true, assert that the connection succeeds,
2143 if it's false, assert that the connection fails.
2144 Also, if *expect_success* is a string, assert that it is the protocol
2145 version actually used by the connection.
2146 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00002147 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002148 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00002149 certtype = {
2150 ssl.CERT_NONE: "CERT_NONE",
2151 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2152 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2153 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002154 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002155 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002156 sys.stdout.write(formatstr %
2157 (ssl.get_protocol_name(client_protocol),
2158 ssl.get_protocol_name(server_protocol),
2159 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00002160 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002161 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00002162 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002163 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002164
2165 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2166 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2167 # starting from OpenSSL 1.0.0 (see issue #8322).
2168 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2169 client_context.set_ciphers("ALL")
2170
Antoine Pitroub5218772010-05-21 09:56:06 +00002171 for ctx in (client_context, server_context):
2172 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00002173 ctx.load_cert_chain(CERTFILE)
2174 ctx.load_verify_locations(CERTFILE)
2175 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02002176 stats = server_params_test(client_context, server_context,
2177 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002178 # Protocol mismatch can result in either an SSLError, or a
2179 # "Connection reset by peer" error.
2180 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002181 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002182 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002183 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002184 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002185 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002186 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002187 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002188 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002189 "Client protocol %s succeeded with server protocol %s!"
2190 % (ssl.get_protocol_name(client_protocol),
2191 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002192 elif (expect_success is not True
2193 and expect_success != stats['version']):
2194 raise AssertionError("version mismatch: expected %r, got %r"
2195 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002196
2197
Bill Janssen6e027db2007-11-15 22:23:56 +00002198 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002199
Antoine Pitrou23df4832010-08-04 17:14:06 +00002200 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002201 def test_echo(self):
2202 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002203 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002204 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002205 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002206 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2207 context = ssl.SSLContext(protocol)
2208 context.load_cert_chain(CERTFILE)
2209 server_params_test(context, context,
2210 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002211
Antoine Pitrou480a1242010-04-28 21:37:09 +00002212 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002213 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002214 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002215 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2216 context.verify_mode = ssl.CERT_REQUIRED
2217 context.load_verify_locations(CERTFILE)
2218 context.load_cert_chain(CERTFILE)
2219 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002220 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002221 s = context.wrap_socket(socket.socket(),
2222 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002223 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002224 # getpeercert() raise ValueError while the handshake isn't
2225 # done.
2226 with self.assertRaises(ValueError):
2227 s.getpeercert()
2228 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002229 cert = s.getpeercert()
2230 self.assertTrue(cert, "Can't get peer certificate.")
2231 cipher = s.cipher()
2232 if support.verbose:
2233 sys.stdout.write(pprint.pformat(cert) + '\n')
2234 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2235 if 'subject' not in cert:
2236 self.fail("No subject field in certificate: %s." %
2237 pprint.pformat(cert))
2238 if ((('organizationName', 'Python Software Foundation'),)
2239 not in cert['subject']):
2240 self.fail(
2241 "Missing or invalid 'organizationName' field in certificate subject; "
2242 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002243 self.assertIn('notBefore', cert)
2244 self.assertIn('notAfter', cert)
2245 before = ssl.cert_time_to_seconds(cert['notBefore'])
2246 after = ssl.cert_time_to_seconds(cert['notAfter'])
2247 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002248 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002249
Christian Heimes2427b502013-11-23 11:24:32 +01002250 @unittest.skipUnless(have_verify_flags(),
2251 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002252 def test_crl_check(self):
2253 if support.verbose:
2254 sys.stdout.write("\n")
2255
2256 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2257 server_context.load_cert_chain(SIGNED_CERTFILE)
2258
2259 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2260 context.verify_mode = ssl.CERT_REQUIRED
2261 context.load_verify_locations(SIGNING_CA)
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002262 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
Christian Heimes22587792013-11-21 23:56:13 +01002263
2264 # VERIFY_DEFAULT should pass
2265 server = ThreadedEchoServer(context=server_context, chatty=True)
2266 with server:
2267 with context.wrap_socket(socket.socket()) as s:
2268 s.connect((HOST, server.port))
2269 cert = s.getpeercert()
2270 self.assertTrue(cert, "Can't get peer certificate.")
2271
2272 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002273 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002274
2275 server = ThreadedEchoServer(context=server_context, chatty=True)
2276 with server:
2277 with context.wrap_socket(socket.socket()) as s:
2278 with self.assertRaisesRegex(ssl.SSLError,
2279 "certificate verify failed"):
2280 s.connect((HOST, server.port))
2281
2282 # now load a CRL file. The CRL file is signed by the CA.
2283 context.load_verify_locations(CRLFILE)
2284
2285 server = ThreadedEchoServer(context=server_context, chatty=True)
2286 with server:
2287 with context.wrap_socket(socket.socket()) as s:
2288 s.connect((HOST, server.port))
2289 cert = s.getpeercert()
2290 self.assertTrue(cert, "Can't get peer certificate.")
2291
Christian Heimes1aa9a752013-12-02 02:41:19 +01002292 def test_check_hostname(self):
2293 if support.verbose:
2294 sys.stdout.write("\n")
2295
2296 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2297 server_context.load_cert_chain(SIGNED_CERTFILE)
2298
2299 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2300 context.verify_mode = ssl.CERT_REQUIRED
2301 context.check_hostname = True
2302 context.load_verify_locations(SIGNING_CA)
2303
2304 # correct hostname should verify
2305 server = ThreadedEchoServer(context=server_context, chatty=True)
2306 with server:
2307 with context.wrap_socket(socket.socket(),
2308 server_hostname="localhost") as s:
2309 s.connect((HOST, server.port))
2310 cert = s.getpeercert()
2311 self.assertTrue(cert, "Can't get peer certificate.")
2312
2313 # incorrect hostname should raise an exception
2314 server = ThreadedEchoServer(context=server_context, chatty=True)
2315 with server:
2316 with context.wrap_socket(socket.socket(),
2317 server_hostname="invalid") as s:
2318 with self.assertRaisesRegex(ssl.CertificateError,
2319 "hostname 'invalid' doesn't match 'localhost'"):
2320 s.connect((HOST, server.port))
2321
2322 # missing server_hostname arg should cause an exception, too
2323 server = ThreadedEchoServer(context=server_context, chatty=True)
2324 with server:
2325 with socket.socket() as s:
2326 with self.assertRaisesRegex(ValueError,
2327 "check_hostname requires server_hostname"):
2328 context.wrap_socket(s)
2329
Antoine Pitrou480a1242010-04-28 21:37:09 +00002330 def test_empty_cert(self):
2331 """Connecting with an empty cert file"""
2332 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2333 "nullcert.pem"))
2334 def test_malformed_cert(self):
2335 """Connecting with a badly formatted certificate (syntax error)"""
2336 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2337 "badcert.pem"))
2338 def test_nonexisting_cert(self):
2339 """Connecting with a non-existing cert file"""
2340 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2341 "wrongcert.pem"))
2342 def test_malformed_key(self):
2343 """Connecting with a badly formatted key (syntax error)"""
2344 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2345 "badkey.pem"))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002346
Antoine Pitrou480a1242010-04-28 21:37:09 +00002347 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002348 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002349 in the client when attempting handshake.
2350 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002351 listener_ready = threading.Event()
2352 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002353
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002354 s = socket.socket()
2355 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002356
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002357 # `listener` runs in a thread. It sits in an accept() until
2358 # the main thread connects. Then it rudely closes the socket,
2359 # and sets Event `listener_gone` to let the main thread know
2360 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002361 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002362 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002363 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002364 newsock, addr = s.accept()
2365 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002366 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002367 listener_gone.set()
2368
2369 def connector():
2370 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002371 with socket.socket() as c:
2372 c.connect((HOST, port))
2373 listener_gone.wait()
2374 try:
2375 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002376 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002377 pass
2378 else:
2379 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002380
2381 t = threading.Thread(target=listener)
2382 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002383 try:
2384 connector()
2385 finally:
2386 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002387
Antoine Pitrou23df4832010-08-04 17:14:06 +00002388 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002389 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2390 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002391 def test_protocol_sslv2(self):
2392 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002393 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002394 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002395 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2396 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2397 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002398 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Victor Stinner648b8622014-12-12 12:23:59 +01002399 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2400 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002401 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002402 # SSLv23 client with specific SSL options
2403 if no_sslv2_implies_sslv3_hello():
2404 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2405 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2406 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002407 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002408 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002409 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002410 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002411
Antoine Pitrou23df4832010-08-04 17:14:06 +00002412 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002413 def test_protocol_sslv23(self):
2414 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002415 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002416 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002417 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2418 try:
2419 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002420 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002421 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2422 if support.verbose:
2423 sys.stdout.write(
2424 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2425 % str(x))
Benjamin Petersone32467c2014-12-05 21:59:35 -05002426 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson22293df2014-12-05 22:11:33 -05002427 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou480a1242010-04-28 21:37:09 +00002428 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002429 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002430
Benjamin Petersone32467c2014-12-05 21:59:35 -05002431 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson22293df2014-12-05 22:11:33 -05002432 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002433 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002434 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002435
Benjamin Petersone32467c2014-12-05 21:59:35 -05002436 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson22293df2014-12-05 22:11:33 -05002437 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002438 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002439 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002440
Antoine Pitroub5218772010-05-21 09:56:06 +00002441 # Server with specific SSL options
Benjamin Petersone32467c2014-12-05 21:59:35 -05002442 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2443 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002444 server_options=ssl.OP_NO_SSLv3)
2445 # Will choose TLSv1
2446 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2447 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2448 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2449 server_options=ssl.OP_NO_TLSv1)
2450
2451
Antoine Pitrou23df4832010-08-04 17:14:06 +00002452 @skip_if_broken_ubuntu_ssl
Benjamin Petersone32467c2014-12-05 21:59:35 -05002453 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2454 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002455 def test_protocol_sslv3(self):
2456 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002457 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002458 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002459 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2460 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2461 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002462 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2463 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002464 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2465 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002466 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002467 if no_sslv2_implies_sslv3_hello():
2468 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Antoine Pitrou47e40422014-09-04 21:00:10 +02002469 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Antoine Pitroub5218772010-05-21 09:56:06 +00002470 client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002471
Antoine Pitrou23df4832010-08-04 17:14:06 +00002472 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002473 def test_protocol_tlsv1(self):
2474 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002475 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002476 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002477 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2478 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2479 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002480 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2481 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002482 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2483 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002484 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2485 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002486
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002487 @skip_if_broken_ubuntu_ssl
2488 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2489 "TLS version 1.1 not supported.")
2490 def test_protocol_tlsv1_1(self):
2491 """Connecting to a TLSv1.1 server with various client options.
2492 Testing against older TLS versions."""
2493 if support.verbose:
2494 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002495 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002496 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2497 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002498 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2499 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002500 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2501 client_options=ssl.OP_NO_TLSv1_1)
2502
Antoine Pitrou47e40422014-09-04 21:00:10 +02002503 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002504 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2505 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2506
2507
2508 @skip_if_broken_ubuntu_ssl
2509 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2510 "TLS version 1.2 not supported.")
2511 def test_protocol_tlsv1_2(self):
2512 """Connecting to a TLSv1.2 server with various client options.
2513 Testing against older TLS versions."""
2514 if support.verbose:
2515 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002516 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002517 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2518 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2519 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2520 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002521 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2522 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002523 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2524 client_options=ssl.OP_NO_TLSv1_2)
2525
Antoine Pitrou47e40422014-09-04 21:00:10 +02002526 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002527 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2528 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2529 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2530 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2531
Antoine Pitrou480a1242010-04-28 21:37:09 +00002532 def test_starttls(self):
2533 """Switching from clear text to encrypted and back again."""
2534 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002535
Trent Nelson78520002008-04-10 20:54:35 +00002536 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002537 ssl_version=ssl.PROTOCOL_TLSv1,
2538 starttls_server=True,
2539 chatty=True,
2540 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002541 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002542 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002543 s = socket.socket()
2544 s.setblocking(1)
2545 s.connect((HOST, server.port))
2546 if support.verbose:
2547 sys.stdout.write("\n")
2548 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002549 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002550 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002551 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002552 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002553 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002554 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002555 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002556 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002557 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002558 msg = outdata.strip().lower()
2559 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2560 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002561 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002562 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002563 " client: read %r from server, starting TLS...\n"
2564 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002565 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2566 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002567 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2568 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002569 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002570 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002571 " client: read %r from server, ending TLS...\n"
2572 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002573 s = conn.unwrap()
2574 wrapped = False
2575 else:
2576 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002577 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002578 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002579 if support.verbose:
2580 sys.stdout.write(" client: closing connection.\n")
2581 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002582 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002583 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002584 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002585 if wrapped:
2586 conn.close()
2587 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002588 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002589
Antoine Pitrou480a1242010-04-28 21:37:09 +00002590 def test_socketserver(self):
2591 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002592 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002593 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002594 if support.verbose:
2595 sys.stdout.write('\n')
2596 with open(CERTFILE, 'rb') as f:
2597 d1 = f.read()
2598 d2 = ''
2599 # now fetch the same data from the HTTPS server
Benjamin Peterson4ffb0752014-11-03 14:29:33 -05002600 url = 'https://localhost:%d/%s' % (
2601 server.port, os.path.split(CERTFILE)[1])
2602 context = ssl.create_default_context(cafile=CERTFILE)
2603 f = urllib.request.urlopen(url, context=context)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002604 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002605 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002606 if dlen and (int(dlen) > 0):
2607 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002608 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002609 sys.stdout.write(
2610 " client: read %d bytes from remote server '%s'\n"
2611 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002612 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002613 f.close()
2614 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002615
Antoine Pitrou480a1242010-04-28 21:37:09 +00002616 def test_asyncore_server(self):
2617 """Check the example asyncore integration."""
2618 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002619
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002620 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002621 sys.stdout.write("\n")
2622
Antoine Pitrou480a1242010-04-28 21:37:09 +00002623 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002624 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002625 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002626 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002627 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002628 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002629 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002630 " client: sending %r...\n" % indata)
2631 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002632 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002633 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002634 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002635 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002636 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002637 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2638 % (outdata[:20], len(outdata),
2639 indata[:20].lower(), len(indata)))
2640 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002641 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002642 sys.stdout.write(" client: closing connection.\n")
2643 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002644 if support.verbose:
2645 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002646
Antoine Pitrou480a1242010-04-28 21:37:09 +00002647 def test_recv_send(self):
2648 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002649 if support.verbose:
2650 sys.stdout.write("\n")
2651
2652 server = ThreadedEchoServer(CERTFILE,
2653 certreqs=ssl.CERT_NONE,
2654 ssl_version=ssl.PROTOCOL_TLSv1,
2655 cacerts=CERTFILE,
2656 chatty=True,
2657 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002658 with server:
2659 s = ssl.wrap_socket(socket.socket(),
2660 server_side=False,
2661 certfile=CERTFILE,
2662 ca_certs=CERTFILE,
2663 cert_reqs=ssl.CERT_NONE,
2664 ssl_version=ssl.PROTOCOL_TLSv1)
2665 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002666 # helper methods for standardising recv* method signatures
2667 def _recv_into():
2668 b = bytearray(b"\0"*100)
2669 count = s.recv_into(b)
2670 return b[:count]
2671
2672 def _recvfrom_into():
2673 b = bytearray(b"\0"*100)
2674 count, addr = s.recvfrom_into(b)
2675 return b[:count]
2676
2677 # (name, method, whether to expect success, *args)
2678 send_methods = [
2679 ('send', s.send, True, []),
2680 ('sendto', s.sendto, False, ["some.address"]),
2681 ('sendall', s.sendall, True, []),
2682 ]
2683 recv_methods = [
2684 ('recv', s.recv, True, []),
2685 ('recvfrom', s.recvfrom, False, ["some.address"]),
2686 ('recv_into', _recv_into, True, []),
2687 ('recvfrom_into', _recvfrom_into, False, []),
2688 ]
2689 data_prefix = "PREFIX_"
2690
2691 for meth_name, send_meth, expect_success, args in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002692 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002693 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002694 send_meth(indata, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002695 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002696 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002697 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002698 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002699 "<<{outdata:r}>> ({nout:d}) received; "
2700 "expected <<{indata:r}>> ({nin:d})\n".format(
2701 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002702 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002703 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002704 )
2705 )
2706 except ValueError as e:
2707 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002708 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002709 "Failed to send with method <<{name:s}>>; "
2710 "expected to succeed.\n".format(name=meth_name)
2711 )
2712 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002713 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002714 "Method <<{name:s}>> failed with unexpected "
2715 "exception message: {exp:s}\n".format(
2716 name=meth_name, exp=e
2717 )
2718 )
2719
2720 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002721 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002722 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002723 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002724 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002725 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002726 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002727 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002728 "<<{outdata:r}>> ({nout:d}) received; "
2729 "expected <<{indata:r}>> ({nin:d})\n".format(
2730 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002731 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002732 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002733 )
2734 )
2735 except ValueError as e:
2736 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002737 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002738 "Failed to receive with method <<{name:s}>>; "
2739 "expected to succeed.\n".format(name=meth_name)
2740 )
2741 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002742 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002743 "Method <<{name:s}>> failed with unexpected "
2744 "exception message: {exp:s}\n".format(
2745 name=meth_name, exp=e
2746 )
2747 )
2748 # consume data
2749 s.read()
2750
Nick Coghlan513886a2011-08-28 00:00:27 +10002751 # Make sure sendmsg et al are disallowed to avoid
2752 # inadvertent disclosure of data and/or corruption
2753 # of the encrypted data stream
2754 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2755 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2756 self.assertRaises(NotImplementedError,
2757 s.recvmsg_into, bytearray(100))
2758
Antoine Pitrou480a1242010-04-28 21:37:09 +00002759 s.write(b"over\n")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002760 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002761
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002762 def test_nonblocking_send(self):
2763 server = ThreadedEchoServer(CERTFILE,
2764 certreqs=ssl.CERT_NONE,
2765 ssl_version=ssl.PROTOCOL_TLSv1,
2766 cacerts=CERTFILE,
2767 chatty=True,
2768 connectionchatty=False)
2769 with server:
2770 s = ssl.wrap_socket(socket.socket(),
2771 server_side=False,
2772 certfile=CERTFILE,
2773 ca_certs=CERTFILE,
2774 cert_reqs=ssl.CERT_NONE,
2775 ssl_version=ssl.PROTOCOL_TLSv1)
2776 s.connect((HOST, server.port))
2777 s.setblocking(False)
2778
2779 # If we keep sending data, at some point the buffers
2780 # will be full and the call will block
2781 buf = bytearray(8192)
2782 def fill_buffer():
2783 while True:
2784 s.send(buf)
2785 self.assertRaises((ssl.SSLWantWriteError,
2786 ssl.SSLWantReadError), fill_buffer)
2787
2788 # Now read all the output and discard it
2789 s.setblocking(True)
2790 s.close()
2791
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002792 def test_handshake_timeout(self):
2793 # Issue #5103: SSL handshake must respect the socket timeout
2794 server = socket.socket(socket.AF_INET)
2795 host = "127.0.0.1"
2796 port = support.bind_port(server)
2797 started = threading.Event()
2798 finish = False
2799
2800 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002801 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002802 started.set()
2803 conns = []
2804 while not finish:
2805 r, w, e = select.select([server], [], [], 0.1)
2806 if server in r:
2807 # Let the socket hang around rather than having
2808 # it closed by garbage collection.
2809 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002810 for sock in conns:
2811 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002812
2813 t = threading.Thread(target=serve)
2814 t.start()
2815 started.wait()
2816
2817 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002818 try:
2819 c = socket.socket(socket.AF_INET)
2820 c.settimeout(0.2)
2821 c.connect((host, port))
2822 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002823 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002824 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002825 finally:
2826 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002827 try:
2828 c = socket.socket(socket.AF_INET)
2829 c = ssl.wrap_socket(c)
2830 c.settimeout(0.2)
2831 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002832 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002833 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002834 finally:
2835 c.close()
2836 finally:
2837 finish = True
2838 t.join()
2839 server.close()
2840
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002841 def test_server_accept(self):
2842 # Issue #16357: accept() on a SSLSocket created through
2843 # SSLContext.wrap_socket().
2844 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2845 context.verify_mode = ssl.CERT_REQUIRED
2846 context.load_verify_locations(CERTFILE)
2847 context.load_cert_chain(CERTFILE)
2848 server = socket.socket(socket.AF_INET)
2849 host = "127.0.0.1"
2850 port = support.bind_port(server)
2851 server = context.wrap_socket(server, server_side=True)
2852
2853 evt = threading.Event()
2854 remote = None
2855 peer = None
2856 def serve():
2857 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002858 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002859 # Block on the accept and wait on the connection to close.
2860 evt.set()
2861 remote, peer = server.accept()
2862 remote.recv(1)
2863
2864 t = threading.Thread(target=serve)
2865 t.start()
2866 # Client wait until server setup and perform a connect.
2867 evt.wait()
2868 client = context.wrap_socket(socket.socket())
2869 client.connect((host, port))
2870 client_addr = client.getsockname()
2871 client.close()
2872 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002873 remote.close()
2874 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002875 # Sanity checks.
2876 self.assertIsInstance(remote, ssl.SSLSocket)
2877 self.assertEqual(peer, client_addr)
2878
Antoine Pitrou242db722013-05-01 20:52:07 +02002879 def test_getpeercert_enotconn(self):
2880 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2881 with context.wrap_socket(socket.socket()) as sock:
2882 with self.assertRaises(OSError) as cm:
2883 sock.getpeercert()
2884 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2885
2886 def test_do_handshake_enotconn(self):
2887 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2888 with context.wrap_socket(socket.socket()) as sock:
2889 with self.assertRaises(OSError) as cm:
2890 sock.do_handshake()
2891 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2892
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002893 def test_default_ciphers(self):
2894 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2895 try:
2896 # Force a set of weak ciphers on our client context
2897 context.set_ciphers("DES")
2898 except ssl.SSLError:
2899 self.skipTest("no DES cipher available")
2900 with ThreadedEchoServer(CERTFILE,
2901 ssl_version=ssl.PROTOCOL_SSLv23,
2902 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002903 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002904 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002905 s.connect((HOST, server.port))
2906 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2907
Antoine Pitrou47e40422014-09-04 21:00:10 +02002908 def test_version_basic(self):
2909 """
2910 Basic tests for SSLSocket.version().
2911 More tests are done in the test_protocol_*() methods.
2912 """
2913 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2914 with ThreadedEchoServer(CERTFILE,
2915 ssl_version=ssl.PROTOCOL_TLSv1,
2916 chatty=False) as server:
2917 with context.wrap_socket(socket.socket()) as s:
2918 self.assertIs(s.version(), None)
2919 s.connect((HOST, server.port))
2920 self.assertEqual(s.version(), "TLSv1")
2921 self.assertIs(s.version(), None)
2922
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002923 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2924 def test_default_ecdh_curve(self):
2925 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2926 # should be enabled by default on SSL contexts.
2927 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2928 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02002929 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2930 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2931 # our default cipher list should prefer ECDH-based ciphers
2932 # automatically.
2933 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2934 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002935 with ThreadedEchoServer(context=context) as server:
2936 with context.wrap_socket(socket.socket()) as s:
2937 s.connect((HOST, server.port))
2938 self.assertIn("ECDH", s.cipher()[0])
2939
Antoine Pitroud6494802011-07-21 01:11:30 +02002940 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2941 "'tls-unique' channel binding not available")
2942 def test_tls_unique_channel_binding(self):
2943 """Test tls-unique channel binding."""
2944 if support.verbose:
2945 sys.stdout.write("\n")
2946
2947 server = ThreadedEchoServer(CERTFILE,
2948 certreqs=ssl.CERT_NONE,
2949 ssl_version=ssl.PROTOCOL_TLSv1,
2950 cacerts=CERTFILE,
2951 chatty=True,
2952 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01002953 with server:
2954 s = ssl.wrap_socket(socket.socket(),
2955 server_side=False,
2956 certfile=CERTFILE,
2957 ca_certs=CERTFILE,
2958 cert_reqs=ssl.CERT_NONE,
2959 ssl_version=ssl.PROTOCOL_TLSv1)
2960 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002961 # get the data
2962 cb_data = s.get_channel_binding("tls-unique")
2963 if support.verbose:
2964 sys.stdout.write(" got channel binding data: {0!r}\n"
2965 .format(cb_data))
2966
2967 # check if it is sane
2968 self.assertIsNotNone(cb_data)
2969 self.assertEqual(len(cb_data), 12) # True for TLSv1
2970
2971 # and compare with the peers version
2972 s.write(b"CB tls-unique\n")
2973 peer_data_repr = s.read().strip()
2974 self.assertEqual(peer_data_repr,
2975 repr(cb_data).encode("us-ascii"))
2976 s.close()
2977
2978 # now, again
2979 s = ssl.wrap_socket(socket.socket(),
2980 server_side=False,
2981 certfile=CERTFILE,
2982 ca_certs=CERTFILE,
2983 cert_reqs=ssl.CERT_NONE,
2984 ssl_version=ssl.PROTOCOL_TLSv1)
2985 s.connect((HOST, server.port))
2986 new_cb_data = s.get_channel_binding("tls-unique")
2987 if support.verbose:
2988 sys.stdout.write(" got another channel binding data: {0!r}\n"
2989 .format(new_cb_data))
2990 # is it really unique
2991 self.assertNotEqual(cb_data, new_cb_data)
2992 self.assertIsNotNone(cb_data)
2993 self.assertEqual(len(cb_data), 12) # True for TLSv1
2994 s.write(b"CB tls-unique\n")
2995 peer_data_repr = s.read().strip()
2996 self.assertEqual(peer_data_repr,
2997 repr(new_cb_data).encode("us-ascii"))
2998 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002999
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003000 def test_compression(self):
3001 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3002 context.load_cert_chain(CERTFILE)
3003 stats = server_params_test(context, context,
3004 chatty=True, connectionchatty=True)
3005 if support.verbose:
3006 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3007 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3008
3009 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3010 "ssl.OP_NO_COMPRESSION needed for this test")
3011 def test_compression_disabled(self):
3012 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3013 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01003014 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003015 stats = server_params_test(context, context,
3016 chatty=True, connectionchatty=True)
3017 self.assertIs(stats['compression'], None)
3018
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003019 def test_dh_params(self):
3020 # Check we can get a connection with ephemeral Diffie-Hellman
3021 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3022 context.load_cert_chain(CERTFILE)
3023 context.load_dh_params(DHFILE)
3024 context.set_ciphers("kEDH")
3025 stats = server_params_test(context, context,
3026 chatty=True, connectionchatty=True)
3027 cipher = stats["cipher"][0]
3028 parts = cipher.split("-")
3029 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3030 self.fail("Non-DH cipher: " + cipher[0])
3031
Benjamin Petersoncca27322015-01-23 16:35:37 -05003032 def test_selected_alpn_protocol(self):
3033 # selected_alpn_protocol() is None unless ALPN is used.
3034 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3035 context.load_cert_chain(CERTFILE)
3036 stats = server_params_test(context, context,
3037 chatty=True, connectionchatty=True)
3038 self.assertIs(stats['client_alpn_protocol'], None)
3039
3040 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3041 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3042 # selected_alpn_protocol() is None unless ALPN is used by the client.
3043 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3044 client_context.load_verify_locations(CERTFILE)
3045 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3046 server_context.load_cert_chain(CERTFILE)
3047 server_context.set_alpn_protocols(['foo', 'bar'])
3048 stats = server_params_test(client_context, server_context,
3049 chatty=True, connectionchatty=True)
3050 self.assertIs(stats['client_alpn_protocol'], None)
3051
3052 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3053 def test_alpn_protocols(self):
3054 server_protocols = ['foo', 'bar', 'milkshake']
3055 protocol_tests = [
3056 (['foo', 'bar'], 'foo'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003057 (['bar', 'foo'], 'foo'),
Benjamin Petersoncca27322015-01-23 16:35:37 -05003058 (['milkshake'], 'milkshake'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003059 (['http/3.0', 'http/4.0'], None)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003060 ]
3061 for client_protocols, expected in protocol_tests:
3062 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3063 server_context.load_cert_chain(CERTFILE)
3064 server_context.set_alpn_protocols(server_protocols)
3065 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3066 client_context.load_cert_chain(CERTFILE)
3067 client_context.set_alpn_protocols(client_protocols)
3068 stats = server_params_test(client_context, server_context,
3069 chatty=True, connectionchatty=True)
3070
3071 msg = "failed trying %s (s) and %s (c).\n" \
3072 "was expecting %s, but got %%s from the %%s" \
3073 % (str(server_protocols), str(client_protocols),
3074 str(expected))
3075 client_result = stats['client_alpn_protocol']
3076 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3077 server_result = stats['server_alpn_protocols'][-1] \
3078 if len(stats['server_alpn_protocols']) else 'nothing'
3079 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3080
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01003081 def test_selected_npn_protocol(self):
3082 # selected_npn_protocol() is None unless NPN is used
3083 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3084 context.load_cert_chain(CERTFILE)
3085 stats = server_params_test(context, context,
3086 chatty=True, connectionchatty=True)
3087 self.assertIs(stats['client_npn_protocol'], None)
3088
3089 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3090 def test_npn_protocols(self):
3091 server_protocols = ['http/1.1', 'spdy/2']
3092 protocol_tests = [
3093 (['http/1.1', 'spdy/2'], 'http/1.1'),
3094 (['spdy/2', 'http/1.1'], 'http/1.1'),
3095 (['spdy/2', 'test'], 'spdy/2'),
3096 (['abc', 'def'], 'abc')
3097 ]
3098 for client_protocols, expected in protocol_tests:
3099 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3100 server_context.load_cert_chain(CERTFILE)
3101 server_context.set_npn_protocols(server_protocols)
3102 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3103 client_context.load_cert_chain(CERTFILE)
3104 client_context.set_npn_protocols(client_protocols)
3105 stats = server_params_test(client_context, server_context,
3106 chatty=True, connectionchatty=True)
3107
3108 msg = "failed trying %s (s) and %s (c).\n" \
3109 "was expecting %s, but got %%s from the %%s" \
3110 % (str(server_protocols), str(client_protocols),
3111 str(expected))
3112 client_result = stats['client_npn_protocol']
3113 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3114 server_result = stats['server_npn_protocols'][-1] \
3115 if len(stats['server_npn_protocols']) else 'nothing'
3116 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3117
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003118 def sni_contexts(self):
3119 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3120 server_context.load_cert_chain(SIGNED_CERTFILE)
3121 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3122 other_context.load_cert_chain(SIGNED_CERTFILE2)
3123 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3124 client_context.verify_mode = ssl.CERT_REQUIRED
3125 client_context.load_verify_locations(SIGNING_CA)
3126 return server_context, other_context, client_context
3127
3128 def check_common_name(self, stats, name):
3129 cert = stats['peercert']
3130 self.assertIn((('commonName', name),), cert['subject'])
3131
3132 @needs_sni
3133 def test_sni_callback(self):
3134 calls = []
3135 server_context, other_context, client_context = self.sni_contexts()
3136
3137 def servername_cb(ssl_sock, server_name, initial_context):
3138 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003139 if server_name is not None:
3140 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003141 server_context.set_servername_callback(servername_cb)
3142
3143 stats = server_params_test(client_context, server_context,
3144 chatty=True,
3145 sni_name='supermessage')
3146 # The hostname was fetched properly, and the certificate was
3147 # changed for the connection.
3148 self.assertEqual(calls, [("supermessage", server_context)])
3149 # CERTFILE4 was selected
3150 self.check_common_name(stats, 'fakehostname')
3151
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003152 calls = []
3153 # The callback is called with server_name=None
3154 stats = server_params_test(client_context, server_context,
3155 chatty=True,
3156 sni_name=None)
3157 self.assertEqual(calls, [(None, server_context)])
3158 self.check_common_name(stats, 'localhost')
3159
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003160 # Check disabling the callback
3161 calls = []
3162 server_context.set_servername_callback(None)
3163
3164 stats = server_params_test(client_context, server_context,
3165 chatty=True,
3166 sni_name='notfunny')
3167 # Certificate didn't change
3168 self.check_common_name(stats, 'localhost')
3169 self.assertEqual(calls, [])
3170
3171 @needs_sni
3172 def test_sni_callback_alert(self):
3173 # Returning a TLS alert is reflected to the connecting client
3174 server_context, other_context, client_context = self.sni_contexts()
3175
3176 def cb_returning_alert(ssl_sock, server_name, initial_context):
3177 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3178 server_context.set_servername_callback(cb_returning_alert)
3179
3180 with self.assertRaises(ssl.SSLError) as cm:
3181 stats = server_params_test(client_context, server_context,
3182 chatty=False,
3183 sni_name='supermessage')
3184 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3185
3186 @needs_sni
3187 def test_sni_callback_raising(self):
3188 # Raising fails the connection with a TLS handshake failure alert.
3189 server_context, other_context, client_context = self.sni_contexts()
3190
3191 def cb_raising(ssl_sock, server_name, initial_context):
3192 1/0
3193 server_context.set_servername_callback(cb_raising)
3194
3195 with self.assertRaises(ssl.SSLError) as cm, \
3196 support.captured_stderr() as stderr:
3197 stats = server_params_test(client_context, server_context,
3198 chatty=False,
3199 sni_name='supermessage')
3200 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3201 self.assertIn("ZeroDivisionError", stderr.getvalue())
3202
3203 @needs_sni
3204 def test_sni_callback_wrong_return_type(self):
3205 # Returning the wrong return type terminates the TLS connection
3206 # with an internal error alert.
3207 server_context, other_context, client_context = self.sni_contexts()
3208
3209 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3210 return "foo"
3211 server_context.set_servername_callback(cb_wrong_return_type)
3212
3213 with self.assertRaises(ssl.SSLError) as cm, \
3214 support.captured_stderr() as stderr:
3215 stats = server_params_test(client_context, server_context,
3216 chatty=False,
3217 sni_name='supermessage')
3218 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3219 self.assertIn("TypeError", stderr.getvalue())
3220
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003221 def test_shared_ciphers(self):
Benjamin Petersonaacd5242015-01-07 11:42:38 -06003222 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson15042922015-01-07 22:12:43 -06003223 server_context.load_cert_chain(SIGNED_CERTFILE)
3224 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3225 client_context.verify_mode = ssl.CERT_REQUIRED
3226 client_context.load_verify_locations(SIGNING_CA)
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003227 client_context.set_ciphers("RC4")
Benjamin Petersone6838e02015-01-07 20:52:40 -06003228 server_context.set_ciphers("AES:RC4")
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003229 stats = server_params_test(client_context, server_context)
3230 ciphers = stats['server_shared_ciphers'][0]
3231 self.assertGreater(len(ciphers), 0)
3232 for name, tls_version, bits in ciphers:
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003233 self.assertIn("RC4", name.split("-"))
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003234
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003235 def test_read_write_after_close_raises_valuerror(self):
3236 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3237 context.verify_mode = ssl.CERT_REQUIRED
3238 context.load_verify_locations(CERTFILE)
3239 context.load_cert_chain(CERTFILE)
3240 server = ThreadedEchoServer(context=context, chatty=False)
3241
3242 with server:
3243 s = context.wrap_socket(socket.socket())
3244 s.connect((HOST, server.port))
3245 s.close()
3246
3247 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02003248 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003249
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003250 def test_sendfile(self):
3251 TEST_DATA = b"x" * 512
3252 with open(support.TESTFN, 'wb') as f:
3253 f.write(TEST_DATA)
3254 self.addCleanup(support.unlink, support.TESTFN)
3255 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3256 context.verify_mode = ssl.CERT_REQUIRED
3257 context.load_verify_locations(CERTFILE)
3258 context.load_cert_chain(CERTFILE)
3259 server = ThreadedEchoServer(context=context, chatty=False)
3260 with server:
3261 with context.wrap_socket(socket.socket()) as s:
3262 s.connect((HOST, server.port))
3263 with open(support.TESTFN, 'rb') as file:
3264 s.sendfile(file)
3265 self.assertEqual(s.recv(1024), TEST_DATA)
3266
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003267
Thomas Woutersed03b412007-08-28 21:37:11 +00003268def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003269 if support.verbose:
3270 plats = {
3271 'Linux': platform.linux_distribution,
3272 'Mac': platform.mac_ver,
3273 'Windows': platform.win32_ver,
3274 }
3275 for name, func in plats.items():
3276 plat = func()
3277 if plat and plat[0]:
3278 plat = '%s %r' % (name, plat)
3279 break
3280 else:
3281 plat = repr(platform.platform())
3282 print("test_ssl: testing with %r %r" %
3283 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3284 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003285 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003286 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3287 try:
3288 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3289 except AttributeError:
3290 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003291
Antoine Pitrou152efa22010-05-16 18:19:27 +00003292 for filename in [
3293 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
3294 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003295 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003296 BADCERT, BADKEY, EMPTYCERT]:
3297 if not os.path.exists(filename):
3298 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003299
Antoine Pitroub1fdf472014-10-05 20:41:53 +02003300 tests = [ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests]
Thomas Woutersed03b412007-08-28 21:37:11 +00003301
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003302 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003303 tests.append(NetworkedTests)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02003304 tests.append(NetworkedBIOTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003305
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003306 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003307 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003308 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003309 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003310
Antoine Pitrou480a1242010-04-28 21:37:09 +00003311 try:
3312 support.run_unittest(*tests)
3313 finally:
3314 if _have_threads:
3315 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003316
3317if __name__ == "__main__":
3318 test_main()