blob: 6cd5454ab4d06fdfff0b295b35b90938476aeec9 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Thomas Woutersed03b412007-08-28 21:37:11 +000021
Antoine Pitrou05d936d2010-10-13 11:38:36 +000022ssl = support.import_module("ssl")
23
Martin Panter3840b2a2016-03-27 01:53:46 +000024try:
25 import threading
26except ImportError:
27 _have_threads = False
28else:
29 _have_threads = True
30
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010031PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032HOST = support.HOST
Antoine Pitrou152efa22010-05-16 18:19:27 +000033
Christian Heimesefff7062013-11-21 03:35:02 +010034def data_file(*name):
35 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Antoine Pitrou81564092010-10-08 23:06:24 +000037# The custom key and certificate files used in test_ssl are generated
38# using Lib/test/make_ssl_certs.py.
39# Other certificates are simply fetched from the Internet servers they
40# are meant to authenticate.
41
Antoine Pitrou152efa22010-05-16 18:19:27 +000042CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000043BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000044ONLYCERT = data_file("ssl_cert.pem")
45ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_ONLYCERT = os.fsencode(ONLYCERT)
47BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020048CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
49ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
50KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000051CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000052BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010053CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
54CAFILE_CACERT = data_file("capath", "5ed36f99.0")
55
Antoine Pitrou152efa22010-05-16 18:19:27 +000056
Christian Heimes22587792013-11-21 23:56:13 +010057# empty CRL
58CRLFILE = data_file("revocation.crl")
59
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010060# Two keys and certs signed by the same CA (for SNI tests)
61SIGNED_CERTFILE = data_file("keycert3.pem")
62SIGNED_CERTFILE2 = data_file("keycert4.pem")
Martin Panter3840b2a2016-03-27 01:53:46 +000063# Same certificate as pycacert.pem, but without extra text in file
64SIGNING_CA = data_file("capath", "ceff1710.0")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010065
Martin Panter3d81d932016-01-14 09:36:00 +000066REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +000067
68EMPTYCERT = data_file("nullcert.pem")
69BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +000070NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000071BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020072NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020073NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Benjamin Petersona7eaf562015-04-02 00:04:06 -040075DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +010076BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000077
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078
Thomas Woutersed03b412007-08-28 21:37:11 +000079def handle_error(prefix):
80 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000081 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000082 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000083
Antoine Pitroub5218772010-05-21 09:56:06 +000084def can_clear_options():
85 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020086 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000087
88def no_sslv2_implies_sslv3_hello():
89 # 0.9.7h or higher
90 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
91
Christian Heimes2427b502013-11-23 11:24:32 +010092def have_verify_flags():
93 # 0.9.8 or higher
94 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
95
Antoine Pitrouc695c952014-04-28 20:57:36 +020096def utc_offset(): #NOTE: ignore issues like #1647654
97 # local time = utc time + utc offset
98 if time.daylight and time.localtime().tm_isdst > 0:
99 return -time.altzone # seconds
100 return -time.timezone
101
Christian Heimes9424bb42013-06-17 15:32:57 +0200102def asn1time(cert_time):
103 # Some versions of OpenSSL ignore seconds, see #18207
104 # 0.9.8.i
105 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
106 fmt = "%b %d %H:%M:%S %Y GMT"
107 dt = datetime.datetime.strptime(cert_time, fmt)
108 dt = dt.replace(second=0)
109 cert_time = dt.strftime(fmt)
110 # %d adds leading zero but ASN1_TIME_print() uses leading space
111 if cert_time[4] == "0":
112 cert_time = cert_time[:4] + " " + cert_time[5:]
113
114 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000115
Antoine Pitrou23df4832010-08-04 17:14:06 +0000116# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
117def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200118 if hasattr(ssl, 'PROTOCOL_SSLv2'):
119 @functools.wraps(func)
120 def f(*args, **kwargs):
121 try:
122 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
123 except ssl.SSLError:
124 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
125 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
126 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
127 return func(*args, **kwargs)
128 return f
129 else:
130 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000131
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100132needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
133
Antoine Pitrou23df4832010-08-04 17:14:06 +0000134
Antoine Pitrou152efa22010-05-16 18:19:27 +0000135class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitrou480a1242010-04-28 21:37:09 +0000137 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000138 ssl.CERT_NONE
139 ssl.CERT_OPTIONAL
140 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100141 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100142 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100143 if ssl.HAS_ECDH:
144 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100145 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
146 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000147 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100148 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000149
Antoine Pitrou172f0252014-04-18 20:33:08 +0200150 def test_str_for_enums(self):
151 # Make sure that the PROTOCOL_* constants have enum-like string
152 # reprs.
Victor Stinner648b8622014-12-12 12:23:59 +0100153 proto = ssl.PROTOCOL_SSLv23
154 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv23')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200155 ctx = ssl.SSLContext(proto)
156 self.assertIs(ctx.protocol, proto)
157
Antoine Pitrou480a1242010-04-28 21:37:09 +0000158 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000159 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000160 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000161 sys.stdout.write("\n RAND_status is %d (%s)\n"
162 % (v, (v and "sufficient randomness") or
163 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200164
165 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
166 self.assertEqual(len(data), 16)
167 self.assertEqual(is_cryptographic, v == 1)
168 if v:
169 data = ssl.RAND_bytes(16)
170 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200171 else:
172 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200173
Victor Stinner1e81a392013-12-19 16:47:04 +0100174 # negative num is invalid
175 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
176 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
177
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100178 if hasattr(ssl, 'RAND_egd'):
179 self.assertRaises(TypeError, ssl.RAND_egd, 1)
180 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000181 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200182 ssl.RAND_add(b"this is a random bytes object", 75.0)
183 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000184
Christian Heimesf77b4b22013-08-21 13:26:05 +0200185 @unittest.skipUnless(os.name == 'posix', 'requires posix')
186 def test_random_fork(self):
187 status = ssl.RAND_status()
188 if not status:
189 self.fail("OpenSSL's PRNG has insufficient randomness")
190
191 rfd, wfd = os.pipe()
192 pid = os.fork()
193 if pid == 0:
194 try:
195 os.close(rfd)
196 child_random = ssl.RAND_pseudo_bytes(16)[0]
197 self.assertEqual(len(child_random), 16)
198 os.write(wfd, child_random)
199 os.close(wfd)
200 except BaseException:
201 os._exit(1)
202 else:
203 os._exit(0)
204 else:
205 os.close(wfd)
206 self.addCleanup(os.close, rfd)
207 _, status = os.waitpid(pid, 0)
208 self.assertEqual(status, 0)
209
210 child_random = os.read(rfd, 16)
211 self.assertEqual(len(child_random), 16)
212 parent_random = ssl.RAND_pseudo_bytes(16)[0]
213 self.assertEqual(len(parent_random), 16)
214
215 self.assertNotEqual(child_random, parent_random)
216
Antoine Pitrou480a1242010-04-28 21:37:09 +0000217 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000218 # note that this uses an 'unofficial' function in _ssl.c,
219 # provided solely for this test, to exercise the certificate
220 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000221 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000222 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000223 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200224 self.assertEqual(p['issuer'],
225 ((('countryName', 'XY'),),
226 (('localityName', 'Castle Anthrax'),),
227 (('organizationName', 'Python Software Foundation'),),
228 (('commonName', 'localhost'),))
229 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100230 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200231 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
232 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200233 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
234 self.assertEqual(p['subject'],
235 ((('countryName', 'XY'),),
236 (('localityName', 'Castle Anthrax'),),
237 (('organizationName', 'Python Software Foundation'),),
238 (('commonName', 'localhost'),))
239 )
240 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
241 # Issue #13034: the subjectAltName in some certificates
242 # (notably projects.developer.nokia.com:443) wasn't parsed
243 p = ssl._ssl._test_decode_cert(NOKIACERT)
244 if support.verbose:
245 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
246 self.assertEqual(p['subjectAltName'],
247 (('DNS', 'projects.developer.nokia.com'),
248 ('DNS', 'projects.forum.nokia.com'))
249 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100250 # extra OCSP and AIA fields
251 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
252 self.assertEqual(p['caIssuers'],
253 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
254 self.assertEqual(p['crlDistributionPoints'],
255 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000256
Christian Heimes824f7f32013-08-17 00:54:47 +0200257 def test_parse_cert_CVE_2013_4238(self):
258 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
259 if support.verbose:
260 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
261 subject = ((('countryName', 'US'),),
262 (('stateOrProvinceName', 'Oregon'),),
263 (('localityName', 'Beaverton'),),
264 (('organizationName', 'Python Software Foundation'),),
265 (('organizationalUnitName', 'Python Core Development'),),
266 (('commonName', 'null.python.org\x00example.org'),),
267 (('emailAddress', 'python-dev@python.org'),))
268 self.assertEqual(p['subject'], subject)
269 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200270 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
271 san = (('DNS', 'altnull.python.org\x00example.com'),
272 ('email', 'null@python.org\x00user@example.org'),
273 ('URI', 'http://null.python.org\x00http://example.org'),
274 ('IP Address', '192.0.2.1'),
275 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
276 else:
277 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
278 san = (('DNS', 'altnull.python.org\x00example.com'),
279 ('email', 'null@python.org\x00user@example.org'),
280 ('URI', 'http://null.python.org\x00http://example.org'),
281 ('IP Address', '192.0.2.1'),
282 ('IP Address', '<invalid>'))
283
284 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200285
Antoine Pitrou480a1242010-04-28 21:37:09 +0000286 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000287 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000288 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000289 d1 = ssl.PEM_cert_to_DER_cert(pem)
290 p2 = ssl.DER_cert_to_PEM_cert(d1)
291 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000292 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000293 if not p2.startswith(ssl.PEM_HEADER + '\n'):
294 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
295 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
296 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000297
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000298 def test_openssl_version(self):
299 n = ssl.OPENSSL_VERSION_NUMBER
300 t = ssl.OPENSSL_VERSION_INFO
301 s = ssl.OPENSSL_VERSION
302 self.assertIsInstance(n, int)
303 self.assertIsInstance(t, tuple)
304 self.assertIsInstance(s, str)
305 # Some sanity checks follow
306 # >= 0.9
307 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400308 # < 3.0
309 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000310 major, minor, fix, patch, status = t
311 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400312 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000313 self.assertGreaterEqual(minor, 0)
314 self.assertLess(minor, 256)
315 self.assertGreaterEqual(fix, 0)
316 self.assertLess(fix, 256)
317 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100318 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000319 self.assertGreaterEqual(status, 0)
320 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400321 # Version string as returned by {Open,Libre}SSL, the format might change
322 if "LibreSSL" in s:
323 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
Victor Stinner789b8052015-01-06 11:51:06 +0100324 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400325 else:
326 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100327 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000328
Antoine Pitrou9d543662010-04-23 23:10:32 +0000329 @support.cpython_only
330 def test_refcycle(self):
331 # Issue #7943: an SSL object doesn't create reference cycles with
332 # itself.
333 s = socket.socket(socket.AF_INET)
334 ss = ssl.wrap_socket(s)
335 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100336 with support.check_warnings(("", ResourceWarning)):
337 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100338 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000339
Antoine Pitroua468adc2010-09-14 14:43:44 +0000340 def test_wrapped_unconnected(self):
341 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200342 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000343 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100344 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100345 self.assertRaises(OSError, ss.recv, 1)
346 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
347 self.assertRaises(OSError, ss.recvfrom, 1)
348 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
349 self.assertRaises(OSError, ss.send, b'x')
350 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000351
Antoine Pitrou40f08742010-04-24 22:04:40 +0000352 def test_timeout(self):
353 # Issue #8524: when creating an SSL socket, the timeout of the
354 # original socket should be retained.
355 for timeout in (None, 0.0, 5.0):
356 s = socket.socket(socket.AF_INET)
357 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100358 with ssl.wrap_socket(s) as ss:
359 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000360
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000361 def test_errors(self):
362 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000363 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000364 "certfile must be specified",
365 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000366 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000367 "certfile must be specified for server-side operations",
368 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000369 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000370 "certfile must be specified for server-side operations",
371 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100372 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
373 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
374 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200375 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000376 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000377 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000378 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200379 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000380 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000381 ssl.wrap_socket(sock,
382 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000383 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200384 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000385 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000386 ssl.wrap_socket(sock,
387 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000388 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000389
Martin Panter3464ea22016-02-01 21:58:11 +0000390 def bad_cert_test(self, certfile):
391 """Check that trying to use the given client certificate fails"""
392 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
393 certfile)
394 sock = socket.socket()
395 self.addCleanup(sock.close)
396 with self.assertRaises(ssl.SSLError):
397 ssl.wrap_socket(sock,
398 certfile=certfile,
399 ssl_version=ssl.PROTOCOL_TLSv1)
400
401 def test_empty_cert(self):
402 """Wrapping with an empty cert file"""
403 self.bad_cert_test("nullcert.pem")
404
405 def test_malformed_cert(self):
406 """Wrapping with a badly formatted certificate (syntax error)"""
407 self.bad_cert_test("badcert.pem")
408
409 def test_malformed_key(self):
410 """Wrapping with a badly formatted key (syntax error)"""
411 self.bad_cert_test("badkey.pem")
412
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000413 def test_match_hostname(self):
414 def ok(cert, hostname):
415 ssl.match_hostname(cert, hostname)
416 def fail(cert, hostname):
417 self.assertRaises(ssl.CertificateError,
418 ssl.match_hostname, cert, hostname)
419
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100420 # -- Hostname matching --
421
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000422 cert = {'subject': ((('commonName', 'example.com'),),)}
423 ok(cert, 'example.com')
424 ok(cert, 'ExAmple.cOm')
425 fail(cert, 'www.example.com')
426 fail(cert, '.example.com')
427 fail(cert, 'example.org')
428 fail(cert, 'exampleXcom')
429
430 cert = {'subject': ((('commonName', '*.a.com'),),)}
431 ok(cert, 'foo.a.com')
432 fail(cert, 'bar.foo.a.com')
433 fail(cert, 'a.com')
434 fail(cert, 'Xa.com')
435 fail(cert, '.a.com')
436
Georg Brandl72c98d32013-10-27 07:16:53 +0100437 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000438 cert = {'subject': ((('commonName', 'f*.com'),),)}
439 ok(cert, 'foo.com')
440 ok(cert, 'f.com')
441 fail(cert, 'bar.com')
442 fail(cert, 'foo.a.com')
443 fail(cert, 'bar.foo.com')
444
Christian Heimes824f7f32013-08-17 00:54:47 +0200445 # NULL bytes are bad, CVE-2013-4073
446 cert = {'subject': ((('commonName',
447 'null.python.org\x00example.org'),),)}
448 ok(cert, 'null.python.org\x00example.org') # or raise an error?
449 fail(cert, 'example.org')
450 fail(cert, 'null.python.org')
451
Georg Brandl72c98d32013-10-27 07:16:53 +0100452 # error cases with wildcards
453 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
454 fail(cert, 'bar.foo.a.com')
455 fail(cert, 'a.com')
456 fail(cert, 'Xa.com')
457 fail(cert, '.a.com')
458
459 cert = {'subject': ((('commonName', 'a.*.com'),),)}
460 fail(cert, 'a.foo.com')
461 fail(cert, 'a..com')
462 fail(cert, 'a.com')
463
464 # wildcard doesn't match IDNA prefix 'xn--'
465 idna = 'püthon.python.org'.encode("idna").decode("ascii")
466 cert = {'subject': ((('commonName', idna),),)}
467 ok(cert, idna)
468 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
469 fail(cert, idna)
470 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
471 fail(cert, idna)
472
473 # wildcard in first fragment and IDNA A-labels in sequent fragments
474 # are supported.
475 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
476 cert = {'subject': ((('commonName', idna),),)}
477 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
478 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
479 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
480 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
481
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000482 # Slightly fake real-world example
483 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
484 'subject': ((('commonName', 'linuxfrz.org'),),),
485 'subjectAltName': (('DNS', 'linuxfr.org'),
486 ('DNS', 'linuxfr.com'),
487 ('othername', '<unsupported>'))}
488 ok(cert, 'linuxfr.org')
489 ok(cert, 'linuxfr.com')
490 # Not a "DNS" entry
491 fail(cert, '<unsupported>')
492 # When there is a subjectAltName, commonName isn't used
493 fail(cert, 'linuxfrz.org')
494
495 # A pristine real-world example
496 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
497 'subject': ((('countryName', 'US'),),
498 (('stateOrProvinceName', 'California'),),
499 (('localityName', 'Mountain View'),),
500 (('organizationName', 'Google Inc'),),
501 (('commonName', 'mail.google.com'),))}
502 ok(cert, 'mail.google.com')
503 fail(cert, 'gmail.com')
504 # Only commonName is considered
505 fail(cert, 'California')
506
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100507 # -- IPv4 matching --
508 cert = {'subject': ((('commonName', 'example.com'),),),
509 'subjectAltName': (('DNS', 'example.com'),
510 ('IP Address', '10.11.12.13'),
511 ('IP Address', '14.15.16.17'))}
512 ok(cert, '10.11.12.13')
513 ok(cert, '14.15.16.17')
514 fail(cert, '14.15.16.18')
515 fail(cert, 'example.net')
516
517 # -- IPv6 matching --
518 cert = {'subject': ((('commonName', 'example.com'),),),
519 'subjectAltName': (('DNS', 'example.com'),
520 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
521 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
522 ok(cert, '2001::cafe')
523 ok(cert, '2003::baba')
524 fail(cert, '2003::bebe')
525 fail(cert, 'example.net')
526
527 # -- Miscellaneous --
528
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000529 # Neither commonName nor subjectAltName
530 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
531 'subject': ((('countryName', 'US'),),
532 (('stateOrProvinceName', 'California'),),
533 (('localityName', 'Mountain View'),),
534 (('organizationName', 'Google Inc'),))}
535 fail(cert, 'mail.google.com')
536
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200537 # No DNS entry in subjectAltName but a commonName
538 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
539 'subject': ((('countryName', 'US'),),
540 (('stateOrProvinceName', 'California'),),
541 (('localityName', 'Mountain View'),),
542 (('commonName', 'mail.google.com'),)),
543 'subjectAltName': (('othername', 'blabla'), )}
544 ok(cert, 'mail.google.com')
545
546 # No DNS entry subjectAltName and no commonName
547 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
548 'subject': ((('countryName', 'US'),),
549 (('stateOrProvinceName', 'California'),),
550 (('localityName', 'Mountain View'),),
551 (('organizationName', 'Google Inc'),)),
552 'subjectAltName': (('othername', 'blabla'),)}
553 fail(cert, 'google.com')
554
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 # Empty cert / no cert
556 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
557 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
558
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200559 # Issue #17980: avoid denials of service by refusing more than one
560 # wildcard per fragment.
561 cert = {'subject': ((('commonName', 'a*b.com'),),)}
562 ok(cert, 'axxb.com')
563 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100564 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200565 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
566 with self.assertRaises(ssl.CertificateError) as cm:
567 ssl.match_hostname(cert, 'axxbxxc.com')
568 self.assertIn("too many wildcards", str(cm.exception))
569
Antoine Pitroud5323212010-10-22 18:19:07 +0000570 def test_server_side(self):
571 # server_hostname doesn't work for server sockets
572 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000573 with socket.socket() as sock:
574 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
575 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000576
Antoine Pitroud6494802011-07-21 01:11:30 +0200577 def test_unknown_channel_binding(self):
578 # should raise ValueError for unknown type
579 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200580 s.bind(('127.0.0.1', 0))
581 s.listen()
582 c = socket.socket(socket.AF_INET)
583 c.connect(s.getsockname())
584 with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100585 with self.assertRaises(ValueError):
586 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200587 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200588
589 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
590 "'tls-unique' channel binding not available")
591 def test_tls_unique_channel_binding(self):
592 # unconnected should return None for known type
593 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100594 with ssl.wrap_socket(s) as ss:
595 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200596 # the same for server-side
597 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100598 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
599 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200600
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600601 def test_dealloc_warn(self):
602 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
603 r = repr(ss)
604 with self.assertWarns(ResourceWarning) as cm:
605 ss = None
606 support.gc_collect()
607 self.assertIn(r, str(cm.warning.args[0]))
608
Christian Heimes6d7ad132013-06-09 18:02:55 +0200609 def test_get_default_verify_paths(self):
610 paths = ssl.get_default_verify_paths()
611 self.assertEqual(len(paths), 6)
612 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
613
614 with support.EnvironmentVarGuard() as env:
615 env["SSL_CERT_DIR"] = CAPATH
616 env["SSL_CERT_FILE"] = CERTFILE
617 paths = ssl.get_default_verify_paths()
618 self.assertEqual(paths.cafile, CERTFILE)
619 self.assertEqual(paths.capath, CAPATH)
620
Christian Heimes44109d72013-11-22 01:51:30 +0100621 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
622 def test_enum_certificates(self):
623 self.assertTrue(ssl.enum_certificates("CA"))
624 self.assertTrue(ssl.enum_certificates("ROOT"))
625
626 self.assertRaises(TypeError, ssl.enum_certificates)
627 self.assertRaises(WindowsError, ssl.enum_certificates, "")
628
Christian Heimesc2d65e12013-11-22 16:13:55 +0100629 trust_oids = set()
630 for storename in ("CA", "ROOT"):
631 store = ssl.enum_certificates(storename)
632 self.assertIsInstance(store, list)
633 for element in store:
634 self.assertIsInstance(element, tuple)
635 self.assertEqual(len(element), 3)
636 cert, enc, trust = element
637 self.assertIsInstance(cert, bytes)
638 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
639 self.assertIsInstance(trust, (set, bool))
640 if isinstance(trust, set):
641 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100642
643 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100644 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200645
Christian Heimes46bebee2013-06-09 19:03:31 +0200646 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100647 def test_enum_crls(self):
648 self.assertTrue(ssl.enum_crls("CA"))
649 self.assertRaises(TypeError, ssl.enum_crls)
650 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200651
Christian Heimes44109d72013-11-22 01:51:30 +0100652 crls = ssl.enum_crls("CA")
653 self.assertIsInstance(crls, list)
654 for element in crls:
655 self.assertIsInstance(element, tuple)
656 self.assertEqual(len(element), 2)
657 self.assertIsInstance(element[0], bytes)
658 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200659
Christian Heimes46bebee2013-06-09 19:03:31 +0200660
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100661 def test_asn1object(self):
662 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
663 '1.3.6.1.5.5.7.3.1')
664
665 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
666 self.assertEqual(val, expected)
667 self.assertEqual(val.nid, 129)
668 self.assertEqual(val.shortname, 'serverAuth')
669 self.assertEqual(val.longname, 'TLS Web Server Authentication')
670 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
671 self.assertIsInstance(val, ssl._ASN1Object)
672 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
673
674 val = ssl._ASN1Object.fromnid(129)
675 self.assertEqual(val, expected)
676 self.assertIsInstance(val, ssl._ASN1Object)
677 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100678 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
679 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100680 for i in range(1000):
681 try:
682 obj = ssl._ASN1Object.fromnid(i)
683 except ValueError:
684 pass
685 else:
686 self.assertIsInstance(obj.nid, int)
687 self.assertIsInstance(obj.shortname, str)
688 self.assertIsInstance(obj.longname, str)
689 self.assertIsInstance(obj.oid, (str, type(None)))
690
691 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
692 self.assertEqual(val, expected)
693 self.assertIsInstance(val, ssl._ASN1Object)
694 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
695 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
696 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100697 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
698 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100699
Christian Heimes72d28502013-11-23 13:56:58 +0100700 def test_purpose_enum(self):
701 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
702 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
703 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
704 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
705 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
706 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
707 '1.3.6.1.5.5.7.3.1')
708
709 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
710 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
711 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
712 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
713 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
714 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
715 '1.3.6.1.5.5.7.3.2')
716
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100717 def test_unsupported_dtls(self):
718 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
719 self.addCleanup(s.close)
720 with self.assertRaises(NotImplementedError) as cx:
721 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
722 self.assertEqual(str(cx.exception), "only stream sockets are supported")
723 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
724 with self.assertRaises(NotImplementedError) as cx:
725 ctx.wrap_socket(s)
726 self.assertEqual(str(cx.exception), "only stream sockets are supported")
727
Antoine Pitrouc695c952014-04-28 20:57:36 +0200728 def cert_time_ok(self, timestring, timestamp):
729 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
730
731 def cert_time_fail(self, timestring):
732 with self.assertRaises(ValueError):
733 ssl.cert_time_to_seconds(timestring)
734
735 @unittest.skipUnless(utc_offset(),
736 'local time needs to be different from UTC')
737 def test_cert_time_to_seconds_timezone(self):
738 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
739 # results if local timezone is not UTC
740 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
741 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
742
743 def test_cert_time_to_seconds(self):
744 timestring = "Jan 5 09:34:43 2018 GMT"
745 ts = 1515144883.0
746 self.cert_time_ok(timestring, ts)
747 # accept keyword parameter, assert its name
748 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
749 # accept both %e and %d (space or zero generated by strftime)
750 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
751 # case-insensitive
752 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
753 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
754 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
755 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
756 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
757 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
758 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
759 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
760
761 newyear_ts = 1230768000.0
762 # leap seconds
763 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
764 # same timestamp
765 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
766
767 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
768 # allow 60th second (even if it is not a leap second)
769 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
770 # allow 2nd leap second for compatibility with time.strptime()
771 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
772 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
773
774 # no special treatement for the special value:
775 # 99991231235959Z (rfc 5280)
776 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
777
778 @support.run_with_locale('LC_ALL', '')
779 def test_cert_time_to_seconds_locale(self):
780 # `cert_time_to_seconds()` should be locale independent
781
782 def local_february_name():
783 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
784
785 if local_february_name().lower() == 'feb':
786 self.skipTest("locale-specific month name needs to be "
787 "different from C locale")
788
789 # locale-independent
790 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
791 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
792
Martin Panter3840b2a2016-03-27 01:53:46 +0000793 def test_connect_ex_error(self):
794 server = socket.socket(socket.AF_INET)
795 self.addCleanup(server.close)
796 port = support.bind_port(server) # Reserve port but don't listen
797 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
798 cert_reqs=ssl.CERT_REQUIRED)
799 self.addCleanup(s.close)
800 rc = s.connect_ex((HOST, port))
801 # Issue #19919: Windows machines or VMs hosted on Windows
802 # machines sometimes return EWOULDBLOCK.
803 errors = (
804 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
805 errno.EWOULDBLOCK,
806 )
807 self.assertIn(rc, errors)
808
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100809
Antoine Pitrou152efa22010-05-16 18:19:27 +0000810class ContextTests(unittest.TestCase):
811
Antoine Pitrou23df4832010-08-04 17:14:06 +0000812 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000813 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100814 for protocol in PROTOCOLS:
815 ssl.SSLContext(protocol)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000816 self.assertRaises(TypeError, ssl.SSLContext)
817 self.assertRaises(ValueError, ssl.SSLContext, -1)
818 self.assertRaises(ValueError, ssl.SSLContext, 42)
819
Antoine Pitrou23df4832010-08-04 17:14:06 +0000820 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000821 def test_protocol(self):
822 for proto in PROTOCOLS:
823 ctx = ssl.SSLContext(proto)
824 self.assertEqual(ctx.protocol, proto)
825
826 def test_ciphers(self):
827 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
828 ctx.set_ciphers("ALL")
829 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000830 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000831 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000832
Antoine Pitrou23df4832010-08-04 17:14:06 +0000833 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000834 def test_options(self):
835 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800836 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Antoine Pitroub5218772010-05-21 09:56:06 +0000837 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
838 ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800839 ctx.options |= ssl.OP_NO_TLSv1
840 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1,
841 ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000842 if can_clear_options():
843 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
844 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
845 ctx.options)
846 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -0700847 # Ubuntu has OP_NO_SSLv3 forced on by default
848 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +0000849 else:
850 with self.assertRaises(ValueError):
851 ctx.options = 0
852
Christian Heimes22587792013-11-21 23:56:13 +0100853 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000854 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
855 # Default value
856 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
857 ctx.verify_mode = ssl.CERT_OPTIONAL
858 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
859 ctx.verify_mode = ssl.CERT_REQUIRED
860 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
861 ctx.verify_mode = ssl.CERT_NONE
862 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
863 with self.assertRaises(TypeError):
864 ctx.verify_mode = None
865 with self.assertRaises(ValueError):
866 ctx.verify_mode = 42
867
Christian Heimes2427b502013-11-23 11:24:32 +0100868 @unittest.skipUnless(have_verify_flags(),
869 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100870 def test_verify_flags(self):
871 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -0500872 # default value
873 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
874 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +0100875 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
876 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
877 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
878 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
879 ctx.verify_flags = ssl.VERIFY_DEFAULT
880 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
881 # supports any value
882 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
883 self.assertEqual(ctx.verify_flags,
884 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
885 with self.assertRaises(TypeError):
886 ctx.verify_flags = None
887
Antoine Pitrou152efa22010-05-16 18:19:27 +0000888 def test_load_cert_chain(self):
889 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
890 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500891 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000892 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
893 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200894 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000895 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000896 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000897 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000898 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000899 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000900 ctx.load_cert_chain(EMPTYCERT)
901 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000902 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000903 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
904 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
905 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000906 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000907 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000908 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000909 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000910 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000911 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
912 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000913 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000914 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +0000915 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200916 # Password protected key and cert
917 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
918 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
919 ctx.load_cert_chain(CERTFILE_PROTECTED,
920 password=bytearray(KEY_PASSWORD.encode()))
921 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
922 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
923 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
924 bytearray(KEY_PASSWORD.encode()))
925 with self.assertRaisesRegex(TypeError, "should be a string"):
926 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
927 with self.assertRaises(ssl.SSLError):
928 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
929 with self.assertRaisesRegex(ValueError, "cannot be longer"):
930 # openssl has a fixed limit on the password buffer.
931 # PEM_BUFSIZE is generally set to 1kb.
932 # Return a string larger than this.
933 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
934 # Password callback
935 def getpass_unicode():
936 return KEY_PASSWORD
937 def getpass_bytes():
938 return KEY_PASSWORD.encode()
939 def getpass_bytearray():
940 return bytearray(KEY_PASSWORD.encode())
941 def getpass_badpass():
942 return "badpass"
943 def getpass_huge():
944 return b'a' * (1024 * 1024)
945 def getpass_bad_type():
946 return 9
947 def getpass_exception():
948 raise Exception('getpass error')
949 class GetPassCallable:
950 def __call__(self):
951 return KEY_PASSWORD
952 def getpass(self):
953 return KEY_PASSWORD
954 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
955 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
956 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
957 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
958 ctx.load_cert_chain(CERTFILE_PROTECTED,
959 password=GetPassCallable().getpass)
960 with self.assertRaises(ssl.SSLError):
961 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
962 with self.assertRaisesRegex(ValueError, "cannot be longer"):
963 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
964 with self.assertRaisesRegex(TypeError, "must return a string"):
965 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
966 with self.assertRaisesRegex(Exception, "getpass error"):
967 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
968 # Make sure the password function isn't called if it isn't needed
969 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000970
971 def test_load_verify_locations(self):
972 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
973 ctx.load_verify_locations(CERTFILE)
974 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
975 ctx.load_verify_locations(BYTES_CERTFILE)
976 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
977 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100978 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200979 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000980 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000981 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000982 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000983 ctx.load_verify_locations(BADCERT)
984 ctx.load_verify_locations(CERTFILE, CAPATH)
985 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
986
Victor Stinner80f75e62011-01-29 11:31:20 +0000987 # Issue #10989: crash if the second argument type is invalid
988 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
989
Christian Heimesefff7062013-11-21 03:35:02 +0100990 def test_load_verify_cadata(self):
991 # test cadata
992 with open(CAFILE_CACERT) as f:
993 cacert_pem = f.read()
994 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
995 with open(CAFILE_NEURONIO) as f:
996 neuronio_pem = f.read()
997 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
998
999 # test PEM
1000 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1001 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1002 ctx.load_verify_locations(cadata=cacert_pem)
1003 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1004 ctx.load_verify_locations(cadata=neuronio_pem)
1005 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1006 # cert already in hash table
1007 ctx.load_verify_locations(cadata=neuronio_pem)
1008 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1009
1010 # combined
1011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1012 combined = "\n".join((cacert_pem, neuronio_pem))
1013 ctx.load_verify_locations(cadata=combined)
1014 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1015
1016 # with junk around the certs
1017 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1018 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1019 neuronio_pem, "tail"]
1020 ctx.load_verify_locations(cadata="\n".join(combined))
1021 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1022
1023 # test DER
1024 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1025 ctx.load_verify_locations(cadata=cacert_der)
1026 ctx.load_verify_locations(cadata=neuronio_der)
1027 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1028 # cert already in hash table
1029 ctx.load_verify_locations(cadata=cacert_der)
1030 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1031
1032 # combined
1033 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1034 combined = b"".join((cacert_der, neuronio_der))
1035 ctx.load_verify_locations(cadata=combined)
1036 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1037
1038 # error cases
1039 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1040 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1041
1042 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1043 ctx.load_verify_locations(cadata="broken")
1044 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1045 ctx.load_verify_locations(cadata=b"broken")
1046
1047
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001048 def test_load_dh_params(self):
1049 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1050 ctx.load_dh_params(DHFILE)
1051 if os.name != 'nt':
1052 ctx.load_dh_params(BYTES_DHFILE)
1053 self.assertRaises(TypeError, ctx.load_dh_params)
1054 self.assertRaises(TypeError, ctx.load_dh_params, None)
1055 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001056 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001057 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001058 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001059 ctx.load_dh_params(CERTFILE)
1060
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001061 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001062 def test_session_stats(self):
1063 for proto in PROTOCOLS:
1064 ctx = ssl.SSLContext(proto)
1065 self.assertEqual(ctx.session_stats(), {
1066 'number': 0,
1067 'connect': 0,
1068 'connect_good': 0,
1069 'connect_renegotiate': 0,
1070 'accept': 0,
1071 'accept_good': 0,
1072 'accept_renegotiate': 0,
1073 'hits': 0,
1074 'misses': 0,
1075 'timeouts': 0,
1076 'cache_full': 0,
1077 })
1078
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001079 def test_set_default_verify_paths(self):
1080 # There's not much we can do to test that it acts as expected,
1081 # so just check it doesn't crash or raise an exception.
1082 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1083 ctx.set_default_verify_paths()
1084
Antoine Pitrou501da612011-12-21 09:27:41 +01001085 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001086 def test_set_ecdh_curve(self):
1087 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1088 ctx.set_ecdh_curve("prime256v1")
1089 ctx.set_ecdh_curve(b"prime256v1")
1090 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1091 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1092 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1093 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1094
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001095 @needs_sni
1096 def test_sni_callback(self):
1097 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1098
1099 # set_servername_callback expects a callable, or None
1100 self.assertRaises(TypeError, ctx.set_servername_callback)
1101 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1102 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1103 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1104
1105 def dummycallback(sock, servername, ctx):
1106 pass
1107 ctx.set_servername_callback(None)
1108 ctx.set_servername_callback(dummycallback)
1109
1110 @needs_sni
1111 def test_sni_callback_refcycle(self):
1112 # Reference cycles through the servername callback are detected
1113 # and cleared.
1114 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1115 def dummycallback(sock, servername, ctx, cycle=ctx):
1116 pass
1117 ctx.set_servername_callback(dummycallback)
1118 wr = weakref.ref(ctx)
1119 del ctx, dummycallback
1120 gc.collect()
1121 self.assertIs(wr(), None)
1122
Christian Heimes9a5395a2013-06-17 15:44:12 +02001123 def test_cert_store_stats(self):
1124 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1125 self.assertEqual(ctx.cert_store_stats(),
1126 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1127 ctx.load_cert_chain(CERTFILE)
1128 self.assertEqual(ctx.cert_store_stats(),
1129 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1130 ctx.load_verify_locations(CERTFILE)
1131 self.assertEqual(ctx.cert_store_stats(),
1132 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001133 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001134 self.assertEqual(ctx.cert_store_stats(),
1135 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1136
1137 def test_get_ca_certs(self):
1138 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1139 self.assertEqual(ctx.get_ca_certs(), [])
1140 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1141 ctx.load_verify_locations(CERTFILE)
1142 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001143 # but CAFILE_CACERT is a CA cert
1144 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001145 self.assertEqual(ctx.get_ca_certs(),
1146 [{'issuer': ((('organizationName', 'Root CA'),),
1147 (('organizationalUnitName', 'http://www.cacert.org'),),
1148 (('commonName', 'CA Cert Signing Authority'),),
1149 (('emailAddress', 'support@cacert.org'),)),
1150 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1151 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1152 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001153 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001154 'subject': ((('organizationName', 'Root CA'),),
1155 (('organizationalUnitName', 'http://www.cacert.org'),),
1156 (('commonName', 'CA Cert Signing Authority'),),
1157 (('emailAddress', 'support@cacert.org'),)),
1158 'version': 3}])
1159
Martin Panterb55f8b72016-01-14 12:53:56 +00001160 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001161 pem = f.read()
1162 der = ssl.PEM_cert_to_DER_cert(pem)
1163 self.assertEqual(ctx.get_ca_certs(True), [der])
1164
Christian Heimes72d28502013-11-23 13:56:58 +01001165 def test_load_default_certs(self):
1166 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1167 ctx.load_default_certs()
1168
1169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1170 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1171 ctx.load_default_certs()
1172
1173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1174 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1175
1176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1177 self.assertRaises(TypeError, ctx.load_default_certs, None)
1178 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1179
Benjamin Peterson91244e02014-10-03 18:17:15 -04001180 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001181 def test_load_default_certs_env(self):
1182 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1183 with support.EnvironmentVarGuard() as env:
1184 env["SSL_CERT_DIR"] = CAPATH
1185 env["SSL_CERT_FILE"] = CERTFILE
1186 ctx.load_default_certs()
1187 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1188
Benjamin Peterson91244e02014-10-03 18:17:15 -04001189 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1190 def test_load_default_certs_env_windows(self):
1191 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1192 ctx.load_default_certs()
1193 stats = ctx.cert_store_stats()
1194
1195 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1196 with support.EnvironmentVarGuard() as env:
1197 env["SSL_CERT_DIR"] = CAPATH
1198 env["SSL_CERT_FILE"] = CERTFILE
1199 ctx.load_default_certs()
1200 stats["x509"] += 1
1201 self.assertEqual(ctx.cert_store_stats(), stats)
1202
Christian Heimes4c05b472013-11-23 15:58:30 +01001203 def test_create_default_context(self):
1204 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001205 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001206 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001207 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001208 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001209 self.assertEqual(
1210 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1211 getattr(ssl, "OP_NO_COMPRESSION", 0),
1212 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001213
1214 with open(SIGNING_CA) as f:
1215 cadata = f.read()
1216 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1217 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001218 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001219 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1220 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001221 self.assertEqual(
1222 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1223 getattr(ssl, "OP_NO_COMPRESSION", 0),
1224 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001225
1226 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001227 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001228 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1229 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001230 self.assertEqual(
1231 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1232 getattr(ssl, "OP_NO_COMPRESSION", 0),
1233 )
1234 self.assertEqual(
1235 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1236 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1237 )
1238 self.assertEqual(
1239 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1240 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1241 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001242
Christian Heimes67986f92013-11-23 22:43:47 +01001243 def test__create_stdlib_context(self):
1244 ctx = ssl._create_stdlib_context()
1245 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1246 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001247 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001248 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1249
1250 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1251 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1252 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1253 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1254
1255 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001256 cert_reqs=ssl.CERT_REQUIRED,
1257 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001258 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1259 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001260 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001261 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1262
1263 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1264 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1265 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1266 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001267
Christian Heimes1aa9a752013-12-02 02:41:19 +01001268 def test_check_hostname(self):
1269 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1270 self.assertFalse(ctx.check_hostname)
1271
1272 # Requires CERT_REQUIRED or CERT_OPTIONAL
1273 with self.assertRaises(ValueError):
1274 ctx.check_hostname = True
1275 ctx.verify_mode = ssl.CERT_REQUIRED
1276 self.assertFalse(ctx.check_hostname)
1277 ctx.check_hostname = True
1278 self.assertTrue(ctx.check_hostname)
1279
1280 ctx.verify_mode = ssl.CERT_OPTIONAL
1281 ctx.check_hostname = True
1282 self.assertTrue(ctx.check_hostname)
1283
1284 # Cannot set CERT_NONE with check_hostname enabled
1285 with self.assertRaises(ValueError):
1286 ctx.verify_mode = ssl.CERT_NONE
1287 ctx.check_hostname = False
1288 self.assertFalse(ctx.check_hostname)
1289
Antoine Pitrou152efa22010-05-16 18:19:27 +00001290
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001291class SSLErrorTests(unittest.TestCase):
1292
1293 def test_str(self):
1294 # The str() of a SSLError doesn't include the errno
1295 e = ssl.SSLError(1, "foo")
1296 self.assertEqual(str(e), "foo")
1297 self.assertEqual(e.errno, 1)
1298 # Same for a subclass
1299 e = ssl.SSLZeroReturnError(1, "foo")
1300 self.assertEqual(str(e), "foo")
1301 self.assertEqual(e.errno, 1)
1302
1303 def test_lib_reason(self):
1304 # Test the library and reason attributes
1305 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1306 with self.assertRaises(ssl.SSLError) as cm:
1307 ctx.load_dh_params(CERTFILE)
1308 self.assertEqual(cm.exception.library, 'PEM')
1309 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1310 s = str(cm.exception)
1311 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1312
1313 def test_subclass(self):
1314 # Check that the appropriate SSLError subclass is raised
1315 # (this only tests one of them)
1316 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1317 with socket.socket() as s:
1318 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001319 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001320 c = socket.socket()
1321 c.connect(s.getsockname())
1322 c.setblocking(False)
1323 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001324 with self.assertRaises(ssl.SSLWantReadError) as cm:
1325 c.do_handshake()
1326 s = str(cm.exception)
1327 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1328 # For compatibility
1329 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1330
1331
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001332class MemoryBIOTests(unittest.TestCase):
1333
1334 def test_read_write(self):
1335 bio = ssl.MemoryBIO()
1336 bio.write(b'foo')
1337 self.assertEqual(bio.read(), b'foo')
1338 self.assertEqual(bio.read(), b'')
1339 bio.write(b'foo')
1340 bio.write(b'bar')
1341 self.assertEqual(bio.read(), b'foobar')
1342 self.assertEqual(bio.read(), b'')
1343 bio.write(b'baz')
1344 self.assertEqual(bio.read(2), b'ba')
1345 self.assertEqual(bio.read(1), b'z')
1346 self.assertEqual(bio.read(1), b'')
1347
1348 def test_eof(self):
1349 bio = ssl.MemoryBIO()
1350 self.assertFalse(bio.eof)
1351 self.assertEqual(bio.read(), b'')
1352 self.assertFalse(bio.eof)
1353 bio.write(b'foo')
1354 self.assertFalse(bio.eof)
1355 bio.write_eof()
1356 self.assertFalse(bio.eof)
1357 self.assertEqual(bio.read(2), b'fo')
1358 self.assertFalse(bio.eof)
1359 self.assertEqual(bio.read(1), b'o')
1360 self.assertTrue(bio.eof)
1361 self.assertEqual(bio.read(), b'')
1362 self.assertTrue(bio.eof)
1363
1364 def test_pending(self):
1365 bio = ssl.MemoryBIO()
1366 self.assertEqual(bio.pending, 0)
1367 bio.write(b'foo')
1368 self.assertEqual(bio.pending, 3)
1369 for i in range(3):
1370 bio.read(1)
1371 self.assertEqual(bio.pending, 3-i-1)
1372 for i in range(3):
1373 bio.write(b'x')
1374 self.assertEqual(bio.pending, i+1)
1375 bio.read()
1376 self.assertEqual(bio.pending, 0)
1377
1378 def test_buffer_types(self):
1379 bio = ssl.MemoryBIO()
1380 bio.write(b'foo')
1381 self.assertEqual(bio.read(), b'foo')
1382 bio.write(bytearray(b'bar'))
1383 self.assertEqual(bio.read(), b'bar')
1384 bio.write(memoryview(b'baz'))
1385 self.assertEqual(bio.read(), b'baz')
1386
1387 def test_error_types(self):
1388 bio = ssl.MemoryBIO()
1389 self.assertRaises(TypeError, bio.write, 'foo')
1390 self.assertRaises(TypeError, bio.write, None)
1391 self.assertRaises(TypeError, bio.write, True)
1392 self.assertRaises(TypeError, bio.write, 1)
1393
1394
Martin Panter3840b2a2016-03-27 01:53:46 +00001395@unittest.skipUnless(_have_threads, "Needs threading module")
1396class SimpleBackgroundTests(unittest.TestCase):
1397
1398 """Tests that connect to a simple server running in the background"""
1399
1400 def setUp(self):
1401 server = ThreadedEchoServer(SIGNED_CERTFILE)
1402 self.server_addr = (HOST, server.port)
1403 server.__enter__()
1404 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001405
Antoine Pitrou480a1242010-04-28 21:37:09 +00001406 def test_connect(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001407 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1408 cert_reqs=ssl.CERT_NONE) as s:
1409 s.connect(self.server_addr)
1410 self.assertEqual({}, s.getpeercert())
Antoine Pitrou350c7222010-09-09 13:31:46 +00001411
Martin Panter3840b2a2016-03-27 01:53:46 +00001412 # this should succeed because we specify the root cert
1413 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1414 cert_reqs=ssl.CERT_REQUIRED,
1415 ca_certs=SIGNING_CA) as s:
1416 s.connect(self.server_addr)
1417 self.assertTrue(s.getpeercert())
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001418
Martin Panter3840b2a2016-03-27 01:53:46 +00001419 def test_connect_fail(self):
1420 # This should fail because we have no verification certs. Connection
1421 # failure crashes ThreadedEchoServer, so run this in an independent
1422 # test method.
1423 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1424 cert_reqs=ssl.CERT_REQUIRED)
1425 self.addCleanup(s.close)
1426 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1427 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001428
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001429 def test_connect_ex(self):
1430 # Issue #11326: check connect_ex() implementation
Martin Panter3840b2a2016-03-27 01:53:46 +00001431 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1432 cert_reqs=ssl.CERT_REQUIRED,
1433 ca_certs=SIGNING_CA)
1434 self.addCleanup(s.close)
1435 self.assertEqual(0, s.connect_ex(self.server_addr))
1436 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001437
1438 def test_non_blocking_connect_ex(self):
1439 # Issue #11326: non-blocking connect_ex() should allow handshake
1440 # to proceed after the socket gets ready.
Martin Panter3840b2a2016-03-27 01:53:46 +00001441 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1442 cert_reqs=ssl.CERT_REQUIRED,
1443 ca_certs=SIGNING_CA,
1444 do_handshake_on_connect=False)
1445 self.addCleanup(s.close)
1446 s.setblocking(False)
1447 rc = s.connect_ex(self.server_addr)
1448 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1449 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1450 # Wait for connect to finish
1451 select.select([], [s], [], 5.0)
1452 # Non-blocking handshake
1453 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001454 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001455 s.do_handshake()
1456 break
1457 except ssl.SSLWantReadError:
1458 select.select([s], [], [], 5.0)
1459 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001460 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001461 # SSL established
1462 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001463
Antoine Pitrou152efa22010-05-16 18:19:27 +00001464 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001465 # Same as test_connect, but with a separately created context
1466 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1467 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1468 s.connect(self.server_addr)
1469 self.assertEqual({}, s.getpeercert())
1470 # Same with a server hostname
1471 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1472 server_hostname="dummy") as s:
1473 s.connect(self.server_addr)
1474 ctx.verify_mode = ssl.CERT_REQUIRED
1475 # This should succeed because we specify the root cert
1476 ctx.load_verify_locations(SIGNING_CA)
1477 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1478 s.connect(self.server_addr)
1479 cert = s.getpeercert()
1480 self.assertTrue(cert)
1481
1482 def test_connect_with_context_fail(self):
1483 # This should fail because we have no verification certs. Connection
1484 # failure crashes ThreadedEchoServer, so run this in an independent
1485 # test method.
1486 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1487 ctx.verify_mode = ssl.CERT_REQUIRED
1488 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1489 self.addCleanup(s.close)
1490 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1491 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001492
1493 def test_connect_capath(self):
1494 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001495 # NOTE: the subject hashing algorithm has been changed between
1496 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1497 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001498 # filename) for this test to be portable across OpenSSL releases.
Martin Panter3840b2a2016-03-27 01:53:46 +00001499 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1500 ctx.verify_mode = ssl.CERT_REQUIRED
1501 ctx.load_verify_locations(capath=CAPATH)
1502 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1503 s.connect(self.server_addr)
1504 cert = s.getpeercert()
1505 self.assertTrue(cert)
1506 # Same with a bytes `capath` argument
1507 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1508 ctx.verify_mode = ssl.CERT_REQUIRED
1509 ctx.load_verify_locations(capath=BYTES_CAPATH)
1510 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1511 s.connect(self.server_addr)
1512 cert = s.getpeercert()
1513 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001514
Christian Heimesefff7062013-11-21 03:35:02 +01001515 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001516 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001517 pem = f.read()
1518 der = ssl.PEM_cert_to_DER_cert(pem)
Martin Panter3840b2a2016-03-27 01:53:46 +00001519 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1520 ctx.verify_mode = ssl.CERT_REQUIRED
1521 ctx.load_verify_locations(cadata=pem)
1522 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1523 s.connect(self.server_addr)
1524 cert = s.getpeercert()
1525 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001526
Martin Panter3840b2a2016-03-27 01:53:46 +00001527 # same with DER
1528 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1529 ctx.verify_mode = ssl.CERT_REQUIRED
1530 ctx.load_verify_locations(cadata=der)
1531 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1532 s.connect(self.server_addr)
1533 cert = s.getpeercert()
1534 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001535
Antoine Pitroue3220242010-04-24 11:13:53 +00001536 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1537 def test_makefile_close(self):
1538 # Issue #5238: creating a file-like object with makefile() shouldn't
1539 # delay closing the underlying "real socket" (here tested with its
1540 # file descriptor, hence skipping the test under Windows).
Martin Panter3840b2a2016-03-27 01:53:46 +00001541 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1542 ss.connect(self.server_addr)
1543 fd = ss.fileno()
1544 f = ss.makefile()
1545 f.close()
1546 # The fd is still open
1547 os.read(fd, 0)
1548 # Closing the SSL socket should close the fd too
1549 ss.close()
1550 gc.collect()
1551 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001552 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001553 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001554
Antoine Pitrou480a1242010-04-28 21:37:09 +00001555 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001556 s = socket.socket(socket.AF_INET)
1557 s.connect(self.server_addr)
1558 s.setblocking(False)
1559 s = ssl.wrap_socket(s,
1560 cert_reqs=ssl.CERT_NONE,
1561 do_handshake_on_connect=False)
1562 self.addCleanup(s.close)
1563 count = 0
1564 while True:
1565 try:
1566 count += 1
1567 s.do_handshake()
1568 break
1569 except ssl.SSLWantReadError:
1570 select.select([s], [], [])
1571 except ssl.SSLWantWriteError:
1572 select.select([], [s], [])
1573 if support.verbose:
1574 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001575
Antoine Pitrou480a1242010-04-28 21:37:09 +00001576 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001577 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001578
Martin Panter3840b2a2016-03-27 01:53:46 +00001579 def test_get_server_certificate_fail(self):
1580 # Connection failure crashes ThreadedEchoServer, so run this in an
1581 # independent test method
1582 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001583
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001584 def test_ciphers(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001585 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1586 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1587 s.connect(self.server_addr)
1588 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1589 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1590 s.connect(self.server_addr)
1591 # Error checking can happen at instantiation or when connecting
1592 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1593 with socket.socket(socket.AF_INET) as sock:
1594 s = ssl.wrap_socket(sock,
1595 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1596 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001597
Christian Heimes9a5395a2013-06-17 15:44:12 +02001598 def test_get_ca_certs_capath(self):
1599 # capath certs are loaded on request
Martin Panter3840b2a2016-03-27 01:53:46 +00001600 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1601 ctx.verify_mode = ssl.CERT_REQUIRED
1602 ctx.load_verify_locations(capath=CAPATH)
1603 self.assertEqual(ctx.get_ca_certs(), [])
1604 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1605 s.connect(self.server_addr)
1606 cert = s.getpeercert()
1607 self.assertTrue(cert)
1608 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001609
Christian Heimes575596e2013-12-15 21:49:17 +01001610 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001611 def test_context_setget(self):
1612 # Check that the context of a connected socket can be replaced.
Martin Panter3840b2a2016-03-27 01:53:46 +00001613 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1614 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1615 s = socket.socket(socket.AF_INET)
1616 with ctx1.wrap_socket(s) as ss:
1617 ss.connect(self.server_addr)
1618 self.assertIs(ss.context, ctx1)
1619 self.assertIs(ss._sslobj.context, ctx1)
1620 ss.context = ctx2
1621 self.assertIs(ss.context, ctx2)
1622 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001623
1624 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1625 # A simple IO loop. Call func(*args) depending on the error we get
1626 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1627 timeout = kwargs.get('timeout', 10)
1628 count = 0
1629 while True:
1630 errno = None
1631 count += 1
1632 try:
1633 ret = func(*args)
1634 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001635 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001636 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001637 raise
1638 errno = e.errno
1639 # Get any data from the outgoing BIO irrespective of any error, and
1640 # send it to the socket.
1641 buf = outgoing.read()
1642 sock.sendall(buf)
1643 # If there's no error, we're done. For WANT_READ, we need to get
1644 # data from the socket and put it in the incoming BIO.
1645 if errno is None:
1646 break
1647 elif errno == ssl.SSL_ERROR_WANT_READ:
1648 buf = sock.recv(32768)
1649 if buf:
1650 incoming.write(buf)
1651 else:
1652 incoming.write_eof()
1653 if support.verbose:
1654 sys.stdout.write("Needed %d calls to complete %s().\n"
1655 % (count, func.__name__))
1656 return ret
1657
Martin Panter3840b2a2016-03-27 01:53:46 +00001658 def test_bio_handshake(self):
1659 sock = socket.socket(socket.AF_INET)
1660 self.addCleanup(sock.close)
1661 sock.connect(self.server_addr)
1662 incoming = ssl.MemoryBIO()
1663 outgoing = ssl.MemoryBIO()
1664 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1665 ctx.verify_mode = ssl.CERT_REQUIRED
1666 ctx.load_verify_locations(SIGNING_CA)
1667 ctx.check_hostname = True
1668 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
1669 self.assertIs(sslobj._sslobj.owner, sslobj)
1670 self.assertIsNone(sslobj.cipher())
1671 self.assertIsNone(sslobj.shared_ciphers())
1672 self.assertRaises(ValueError, sslobj.getpeercert)
1673 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1674 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1675 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1676 self.assertTrue(sslobj.cipher())
1677 self.assertIsNone(sslobj.shared_ciphers())
1678 self.assertTrue(sslobj.getpeercert())
1679 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1680 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1681 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001682 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001683 except ssl.SSLSyscallError:
1684 # If the server shuts down the TCP connection without sending a
1685 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1686 pass
1687 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1688
1689 def test_bio_read_write_data(self):
1690 sock = socket.socket(socket.AF_INET)
1691 self.addCleanup(sock.close)
1692 sock.connect(self.server_addr)
1693 incoming = ssl.MemoryBIO()
1694 outgoing = ssl.MemoryBIO()
1695 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1696 ctx.verify_mode = ssl.CERT_NONE
1697 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1698 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1699 req = b'FOO\n'
1700 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1701 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1702 self.assertEqual(buf, b'foo\n')
1703 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001704
1705
Martin Panter3840b2a2016-03-27 01:53:46 +00001706class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001707
Martin Panter3840b2a2016-03-27 01:53:46 +00001708 def test_timeout_connect_ex(self):
1709 # Issue #12065: on a timeout, connect_ex() should return the original
1710 # errno (mimicking the behaviour of non-SSL sockets).
1711 with support.transient_internet(REMOTE_HOST):
1712 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1713 cert_reqs=ssl.CERT_REQUIRED,
1714 do_handshake_on_connect=False)
1715 self.addCleanup(s.close)
1716 s.settimeout(0.0000001)
1717 rc = s.connect_ex((REMOTE_HOST, 443))
1718 if rc == 0:
1719 self.skipTest("REMOTE_HOST responded too quickly")
1720 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1721
1722 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1723 def test_get_server_certificate_ipv6(self):
1724 with support.transient_internet('ipv6.google.com'):
1725 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1726 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1727
1728 def test_algorithms(self):
1729 # Issue #8484: all algorithms should be available when verifying a
1730 # certificate.
1731 # SHA256 was added in OpenSSL 0.9.8
1732 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1733 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
1734 # sha256.tbs-internet.com needs SNI to use the correct certificate
1735 if not ssl.HAS_SNI:
1736 self.skipTest("SNI needed for this test")
1737 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1738 remote = ("sha256.tbs-internet.com", 443)
1739 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
1740 with support.transient_internet("sha256.tbs-internet.com"):
1741 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1742 ctx.verify_mode = ssl.CERT_REQUIRED
1743 ctx.load_verify_locations(sha256_cert)
1744 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1745 server_hostname="sha256.tbs-internet.com")
1746 try:
1747 s.connect(remote)
1748 if support.verbose:
1749 sys.stdout.write("\nCipher with %r is %r\n" %
1750 (remote, s.cipher()))
1751 sys.stdout.write("Certificate is:\n%s\n" %
1752 pprint.pformat(s.getpeercert()))
1753 finally:
1754 s.close()
1755
1756
1757def _test_get_server_certificate(test, host, port, cert=None):
1758 pem = ssl.get_server_certificate((host, port))
1759 if not pem:
1760 test.fail("No server certificate on %s:%s!" % (host, port))
1761
1762 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1763 if not pem:
1764 test.fail("No server certificate on %s:%s!" % (host, port))
1765 if support.verbose:
1766 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1767
1768def _test_get_server_certificate_fail(test, host, port):
1769 try:
1770 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1771 except ssl.SSLError as x:
1772 #should fail
1773 if support.verbose:
1774 sys.stdout.write("%s\n" % x)
1775 else:
1776 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1777
1778
1779if _have_threads:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001780 from test.ssl_servers import make_https_server
1781
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001782 class ThreadedEchoServer(threading.Thread):
1783
1784 class ConnectionHandler(threading.Thread):
1785
1786 """A mildly complicated class, because we want it to work both
1787 with and without the SSL wrapper around the socket connection, so
1788 that we can test the STARTTLS functionality."""
1789
Bill Janssen6e027db2007-11-15 22:23:56 +00001790 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001791 self.server = server
1792 self.running = False
1793 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001794 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001795 self.sock.setblocking(1)
1796 self.sslconn = None
1797 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001798 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001799
Antoine Pitrou480a1242010-04-28 21:37:09 +00001800 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001801 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001802 self.sslconn = self.server.context.wrap_socket(
1803 self.sock, server_side=True)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001804 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1805 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001806 except (ssl.SSLError, ConnectionResetError) as e:
1807 # We treat ConnectionResetError as though it were an
1808 # SSLError - OpenSSL on Ubuntu abruptly closes the
1809 # connection when asked to use an unsupported protocol.
1810 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001811 # XXX Various errors can have happened here, for example
1812 # a mismatching protocol version, an invalid certificate,
1813 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001814 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001815 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001816 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001817 self.running = False
1818 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001819 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001820 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001821 else:
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001822 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
Antoine Pitroub5218772010-05-21 09:56:06 +00001823 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001824 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001825 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001826 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1827 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001828 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001829 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1830 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001831 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001832 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001833 sys.stdout.write(" server: selected protocol is now "
1834 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001835 return True
1836
1837 def read(self):
1838 if self.sslconn:
1839 return self.sslconn.read()
1840 else:
1841 return self.sock.recv(1024)
1842
1843 def write(self, bytes):
1844 if self.sslconn:
1845 return self.sslconn.write(bytes)
1846 else:
1847 return self.sock.send(bytes)
1848
1849 def close(self):
1850 if self.sslconn:
1851 self.sslconn.close()
1852 else:
1853 self.sock.close()
1854
Antoine Pitrou480a1242010-04-28 21:37:09 +00001855 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001856 self.running = True
1857 if not self.server.starttls_server:
1858 if not self.wrap_conn():
1859 return
1860 while self.running:
1861 try:
1862 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001863 stripped = msg.strip()
1864 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001865 # eof, so quit this handler
1866 self.running = False
Martin Panter3840b2a2016-03-27 01:53:46 +00001867 try:
1868 self.sock = self.sslconn.unwrap()
1869 except OSError:
1870 # Many tests shut the TCP connection down
1871 # without an SSL shutdown. This causes
1872 # unwrap() to raise OSError with errno=0!
1873 pass
1874 else:
1875 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001876 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001877 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001878 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001879 sys.stdout.write(" server: client closed connection\n")
1880 self.close()
1881 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001882 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001883 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001884 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001885 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001886 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001887 if not self.wrap_conn():
1888 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001889 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001890 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001891 if support.verbose and self.server.connectionchatty:
1892 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001893 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001894 self.sock = self.sslconn.unwrap()
1895 self.sslconn = None
1896 if support.verbose and self.server.connectionchatty:
1897 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001898 elif stripped == b'CB tls-unique':
1899 if support.verbose and self.server.connectionchatty:
1900 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1901 data = self.sslconn.get_channel_binding("tls-unique")
1902 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001903 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001904 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001905 self.server.connectionchatty):
1906 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001907 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1908 % (msg, ctype, msg.lower(), ctype))
1909 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001910 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001911 if self.server.chatty:
1912 handle_error("Test server failure:\n")
1913 self.close()
1914 self.running = False
1915 # normally, we'd just stop here, but for the test
1916 # harness, we want to stop the server
1917 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001918
Antoine Pitroub5218772010-05-21 09:56:06 +00001919 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001920 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001921 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersoncca27322015-01-23 16:35:37 -05001922 npn_protocols=None, alpn_protocols=None,
1923 ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001924 if context:
1925 self.context = context
1926 else:
1927 self.context = ssl.SSLContext(ssl_version
1928 if ssl_version is not None
1929 else ssl.PROTOCOL_TLSv1)
1930 self.context.verify_mode = (certreqs if certreqs is not None
1931 else ssl.CERT_NONE)
1932 if cacerts:
1933 self.context.load_verify_locations(cacerts)
1934 if certificate:
1935 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001936 if npn_protocols:
1937 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001938 if alpn_protocols:
1939 self.context.set_alpn_protocols(alpn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001940 if ciphers:
1941 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001942 self.chatty = chatty
1943 self.connectionchatty = connectionchatty
1944 self.starttls_server = starttls_server
1945 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001946 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001947 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001948 self.active = False
Benjamin Petersoncca27322015-01-23 16:35:37 -05001949 self.selected_npn_protocols = []
1950 self.selected_alpn_protocols = []
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001951 self.shared_ciphers = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001952 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001953 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001954 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001955
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001956 def __enter__(self):
1957 self.start(threading.Event())
1958 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001959 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001960
1961 def __exit__(self, *args):
1962 self.stop()
1963 self.join()
1964
Antoine Pitrou480a1242010-04-28 21:37:09 +00001965 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001966 self.flag = flag
1967 threading.Thread.start(self)
1968
Antoine Pitrou480a1242010-04-28 21:37:09 +00001969 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001970 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01001971 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001972 self.active = True
1973 if self.flag:
1974 # signal an event
1975 self.flag.set()
1976 while self.active:
1977 try:
1978 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001979 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001980 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001981 + repr(connaddr) + '\n')
1982 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001983 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001984 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001985 except socket.timeout:
1986 pass
1987 except KeyboardInterrupt:
1988 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001989 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001990
Antoine Pitrou480a1242010-04-28 21:37:09 +00001991 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001992 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001993
Bill Janssen54cc54c2007-12-14 22:08:56 +00001994 class AsyncoreEchoServer(threading.Thread):
1995
1996 # this one's based on asyncore.dispatcher
1997
1998 class EchoServer (asyncore.dispatcher):
1999
2000 class ConnectionHandler (asyncore.dispatcher_with_send):
2001
2002 def __init__(self, conn, certfile):
2003 self.socket = ssl.wrap_socket(conn, server_side=True,
2004 certfile=certfile,
2005 do_handshake_on_connect=False)
2006 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002007 self._ssl_accepting = True
2008 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002009
2010 def readable(self):
2011 if isinstance(self.socket, ssl.SSLSocket):
2012 while self.socket.pending() > 0:
2013 self.handle_read_event()
2014 return True
2015
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002016 def _do_ssl_handshake(self):
2017 try:
2018 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02002019 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2020 return
2021 except ssl.SSLEOFError:
2022 return self.handle_close()
2023 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002024 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002025 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002026 if err.args[0] == errno.ECONNABORTED:
2027 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002028 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002029 self._ssl_accepting = False
2030
2031 def handle_read(self):
2032 if self._ssl_accepting:
2033 self._do_ssl_handshake()
2034 else:
2035 data = self.recv(1024)
2036 if support.verbose:
2037 sys.stdout.write(" server: read %s from client\n" % repr(data))
2038 if not data:
2039 self.close()
2040 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002041 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002042
2043 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00002044 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002045 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002046 sys.stdout.write(" server: closed connection %s\n" % self.socket)
2047
2048 def handle_error(self):
2049 raise
2050
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002051 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002052 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002053 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2054 self.port = support.bind_port(sock, '')
2055 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002056 self.listen(5)
2057
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00002058 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002059 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002060 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2061 self.ConnectionHandler(sock_obj, self.certfile)
2062
2063 def handle_error(self):
2064 raise
2065
Trent Nelson78520002008-04-10 20:54:35 +00002066 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002067 self.flag = None
2068 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002069 self.server = self.EchoServer(certfile)
2070 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00002071 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002072 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002073
2074 def __str__(self):
2075 return "<%s %s>" % (self.__class__.__name__, self.server)
2076
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002077 def __enter__(self):
2078 self.start(threading.Event())
2079 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002080 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002081
2082 def __exit__(self, *args):
2083 if support.verbose:
2084 sys.stdout.write(" cleanup: stopping server.\n")
2085 self.stop()
2086 if support.verbose:
2087 sys.stdout.write(" cleanup: joining server thread.\n")
2088 self.join()
2089 if support.verbose:
2090 sys.stdout.write(" cleanup: successfully joined.\n")
2091
Bill Janssen54cc54c2007-12-14 22:08:56 +00002092 def start (self, flag=None):
2093 self.flag = flag
2094 threading.Thread.start(self)
2095
Antoine Pitrou480a1242010-04-28 21:37:09 +00002096 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002097 self.active = True
2098 if self.flag:
2099 self.flag.set()
2100 while self.active:
2101 try:
2102 asyncore.loop(1)
2103 except:
2104 pass
2105
Antoine Pitrou480a1242010-04-28 21:37:09 +00002106 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002107 self.active = False
2108 self.server.close()
2109
Antoine Pitroub5218772010-05-21 09:56:06 +00002110 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002111 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00002112 """
2113 Launch a server, connect a client to it and try various reads
2114 and writes.
2115 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002116 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00002117 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002118 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00002119 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002120 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002121 with client_context.wrap_socket(socket.socket(),
2122 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002123 s.connect((HOST, server.port))
2124 for arg in [indata, bytearray(indata), memoryview(indata)]:
2125 if connectionchatty:
2126 if support.verbose:
2127 sys.stdout.write(
2128 " client: sending %r...\n" % indata)
2129 s.write(arg)
2130 outdata = s.read()
2131 if connectionchatty:
2132 if support.verbose:
2133 sys.stdout.write(" client: read %r\n" % outdata)
2134 if outdata != indata.lower():
2135 raise AssertionError(
2136 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2137 % (outdata[:20], len(outdata),
2138 indata[:20].lower(), len(indata)))
2139 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00002140 if connectionchatty:
2141 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002142 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002143 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01002144 'compression': s.compression(),
2145 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002146 'peercert': s.getpeercert(),
Benjamin Petersoncca27322015-01-23 16:35:37 -05002147 'client_alpn_protocol': s.selected_alpn_protocol(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02002148 'client_npn_protocol': s.selected_npn_protocol(),
2149 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002150 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01002151 s.close()
Benjamin Petersoncca27322015-01-23 16:35:37 -05002152 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2153 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Peterson4cb17812015-01-07 11:14:26 -06002154 stats['server_shared_ciphers'] = server.shared_ciphers
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002155 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00002156
Antoine Pitroub5218772010-05-21 09:56:06 +00002157 def try_protocol_combo(server_protocol, client_protocol, expect_success,
2158 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02002159 """
2160 Try to SSL-connect using *client_protocol* to *server_protocol*.
2161 If *expect_success* is true, assert that the connection succeeds,
2162 if it's false, assert that the connection fails.
2163 Also, if *expect_success* is a string, assert that it is the protocol
2164 version actually used by the connection.
2165 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00002166 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002167 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00002168 certtype = {
2169 ssl.CERT_NONE: "CERT_NONE",
2170 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2171 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2172 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002173 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002174 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002175 sys.stdout.write(formatstr %
2176 (ssl.get_protocol_name(client_protocol),
2177 ssl.get_protocol_name(server_protocol),
2178 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00002179 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002180 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00002181 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002182 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002183
2184 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2185 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2186 # starting from OpenSSL 1.0.0 (see issue #8322).
2187 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2188 client_context.set_ciphers("ALL")
2189
Antoine Pitroub5218772010-05-21 09:56:06 +00002190 for ctx in (client_context, server_context):
2191 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00002192 ctx.load_cert_chain(CERTFILE)
2193 ctx.load_verify_locations(CERTFILE)
2194 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02002195 stats = server_params_test(client_context, server_context,
2196 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002197 # Protocol mismatch can result in either an SSLError, or a
2198 # "Connection reset by peer" error.
2199 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002200 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002201 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002202 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002203 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002204 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002205 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002206 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002207 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002208 "Client protocol %s succeeded with server protocol %s!"
2209 % (ssl.get_protocol_name(client_protocol),
2210 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002211 elif (expect_success is not True
2212 and expect_success != stats['version']):
2213 raise AssertionError("version mismatch: expected %r, got %r"
2214 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002215
2216
Bill Janssen6e027db2007-11-15 22:23:56 +00002217 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002218
Antoine Pitrou23df4832010-08-04 17:14:06 +00002219 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002220 def test_echo(self):
2221 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002222 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002223 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002224 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002225 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2226 context = ssl.SSLContext(protocol)
2227 context.load_cert_chain(CERTFILE)
2228 server_params_test(context, context,
2229 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002230
Antoine Pitrou480a1242010-04-28 21:37:09 +00002231 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002232 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002233 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002234 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2235 context.verify_mode = ssl.CERT_REQUIRED
2236 context.load_verify_locations(CERTFILE)
2237 context.load_cert_chain(CERTFILE)
2238 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002239 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002240 s = context.wrap_socket(socket.socket(),
2241 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002242 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002243 # getpeercert() raise ValueError while the handshake isn't
2244 # done.
2245 with self.assertRaises(ValueError):
2246 s.getpeercert()
2247 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002248 cert = s.getpeercert()
2249 self.assertTrue(cert, "Can't get peer certificate.")
2250 cipher = s.cipher()
2251 if support.verbose:
2252 sys.stdout.write(pprint.pformat(cert) + '\n')
2253 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2254 if 'subject' not in cert:
2255 self.fail("No subject field in certificate: %s." %
2256 pprint.pformat(cert))
2257 if ((('organizationName', 'Python Software Foundation'),)
2258 not in cert['subject']):
2259 self.fail(
2260 "Missing or invalid 'organizationName' field in certificate subject; "
2261 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002262 self.assertIn('notBefore', cert)
2263 self.assertIn('notAfter', cert)
2264 before = ssl.cert_time_to_seconds(cert['notBefore'])
2265 after = ssl.cert_time_to_seconds(cert['notAfter'])
2266 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002267 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002268
Christian Heimes2427b502013-11-23 11:24:32 +01002269 @unittest.skipUnless(have_verify_flags(),
2270 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002271 def test_crl_check(self):
2272 if support.verbose:
2273 sys.stdout.write("\n")
2274
2275 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2276 server_context.load_cert_chain(SIGNED_CERTFILE)
2277
2278 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2279 context.verify_mode = ssl.CERT_REQUIRED
2280 context.load_verify_locations(SIGNING_CA)
Benjamin Petersonc3d9c5c2015-03-04 23:18:48 -05002281 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2282 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01002283
2284 # VERIFY_DEFAULT should pass
2285 server = ThreadedEchoServer(context=server_context, chatty=True)
2286 with server:
2287 with context.wrap_socket(socket.socket()) as s:
2288 s.connect((HOST, server.port))
2289 cert = s.getpeercert()
2290 self.assertTrue(cert, "Can't get peer certificate.")
2291
2292 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002293 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002294
2295 server = ThreadedEchoServer(context=server_context, chatty=True)
2296 with server:
2297 with context.wrap_socket(socket.socket()) as s:
2298 with self.assertRaisesRegex(ssl.SSLError,
2299 "certificate verify failed"):
2300 s.connect((HOST, server.port))
2301
2302 # now load a CRL file. The CRL file is signed by the CA.
2303 context.load_verify_locations(CRLFILE)
2304
2305 server = ThreadedEchoServer(context=server_context, chatty=True)
2306 with server:
2307 with context.wrap_socket(socket.socket()) as s:
2308 s.connect((HOST, server.port))
2309 cert = s.getpeercert()
2310 self.assertTrue(cert, "Can't get peer certificate.")
2311
Christian Heimes1aa9a752013-12-02 02:41:19 +01002312 def test_check_hostname(self):
2313 if support.verbose:
2314 sys.stdout.write("\n")
2315
2316 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2317 server_context.load_cert_chain(SIGNED_CERTFILE)
2318
2319 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2320 context.verify_mode = ssl.CERT_REQUIRED
2321 context.check_hostname = True
2322 context.load_verify_locations(SIGNING_CA)
2323
2324 # correct hostname should verify
2325 server = ThreadedEchoServer(context=server_context, chatty=True)
2326 with server:
2327 with context.wrap_socket(socket.socket(),
2328 server_hostname="localhost") as s:
2329 s.connect((HOST, server.port))
2330 cert = s.getpeercert()
2331 self.assertTrue(cert, "Can't get peer certificate.")
2332
2333 # incorrect hostname should raise an exception
2334 server = ThreadedEchoServer(context=server_context, chatty=True)
2335 with server:
2336 with context.wrap_socket(socket.socket(),
2337 server_hostname="invalid") as s:
2338 with self.assertRaisesRegex(ssl.CertificateError,
2339 "hostname 'invalid' doesn't match 'localhost'"):
2340 s.connect((HOST, server.port))
2341
2342 # missing server_hostname arg should cause an exception, too
2343 server = ThreadedEchoServer(context=server_context, chatty=True)
2344 with server:
2345 with socket.socket() as s:
2346 with self.assertRaisesRegex(ValueError,
2347 "check_hostname requires server_hostname"):
2348 context.wrap_socket(s)
2349
Martin Panter407b62f2016-01-30 03:41:43 +00002350 def test_wrong_cert(self):
Martin Panter3464ea22016-02-01 21:58:11 +00002351 """Connecting when the server rejects the client's certificate
2352
2353 Launch a server with CERT_REQUIRED, and check that trying to
2354 connect to it with a wrong client certificate fails.
2355 """
2356 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2357 "wrongcert.pem")
2358 server = ThreadedEchoServer(CERTFILE,
2359 certreqs=ssl.CERT_REQUIRED,
2360 cacerts=CERTFILE, chatty=False,
2361 connectionchatty=False)
2362 with server, \
2363 socket.socket() as sock, \
2364 ssl.wrap_socket(sock,
2365 certfile=certfile,
2366 ssl_version=ssl.PROTOCOL_TLSv1) as s:
2367 try:
2368 # Expect either an SSL error about the server rejecting
2369 # the connection, or a low-level connection reset (which
2370 # sometimes happens on Windows)
2371 s.connect((HOST, server.port))
2372 except ssl.SSLError as e:
2373 if support.verbose:
2374 sys.stdout.write("\nSSLError is %r\n" % e)
2375 except OSError as e:
2376 if e.errno != errno.ECONNRESET:
2377 raise
2378 if support.verbose:
2379 sys.stdout.write("\nsocket.error is %r\n" % e)
2380 else:
2381 self.fail("Use of invalid cert should have failed!")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002382
Antoine Pitrou480a1242010-04-28 21:37:09 +00002383 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002384 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002385 in the client when attempting handshake.
2386 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002387 listener_ready = threading.Event()
2388 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002389
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002390 s = socket.socket()
2391 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002392
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002393 # `listener` runs in a thread. It sits in an accept() until
2394 # the main thread connects. Then it rudely closes the socket,
2395 # and sets Event `listener_gone` to let the main thread know
2396 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002397 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002398 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002399 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002400 newsock, addr = s.accept()
2401 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002402 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002403 listener_gone.set()
2404
2405 def connector():
2406 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002407 with socket.socket() as c:
2408 c.connect((HOST, port))
2409 listener_gone.wait()
2410 try:
2411 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002412 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002413 pass
2414 else:
2415 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002416
2417 t = threading.Thread(target=listener)
2418 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002419 try:
2420 connector()
2421 finally:
2422 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002423
Antoine Pitrou23df4832010-08-04 17:14:06 +00002424 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002425 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2426 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002427 def test_protocol_sslv2(self):
2428 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002429 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002430 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002431 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2432 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2433 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002434 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Victor Stinner648b8622014-12-12 12:23:59 +01002435 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2436 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002437 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002438 # SSLv23 client with specific SSL options
2439 if no_sslv2_implies_sslv3_hello():
2440 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2441 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2442 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002443 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002444 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002445 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002446 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002447
Antoine Pitrou23df4832010-08-04 17:14:06 +00002448 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002449 def test_protocol_sslv23(self):
2450 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002451 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002452 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002453 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2454 try:
2455 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002456 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002457 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2458 if support.verbose:
2459 sys.stdout.write(
2460 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2461 % str(x))
Benjamin Petersone32467c2014-12-05 21:59:35 -05002462 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002463 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002464 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002465 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002466
Benjamin Petersone32467c2014-12-05 21:59:35 -05002467 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002468 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002469 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002470 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002471
Benjamin Petersone32467c2014-12-05 21:59:35 -05002472 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002473 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002474 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002475 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002476
Antoine Pitroub5218772010-05-21 09:56:06 +00002477 # Server with specific SSL options
Benjamin Petersone32467c2014-12-05 21:59:35 -05002478 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2479 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002480 server_options=ssl.OP_NO_SSLv3)
2481 # Will choose TLSv1
2482 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2483 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2484 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2485 server_options=ssl.OP_NO_TLSv1)
2486
2487
Antoine Pitrou23df4832010-08-04 17:14:06 +00002488 @skip_if_broken_ubuntu_ssl
Benjamin Petersone32467c2014-12-05 21:59:35 -05002489 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2490 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002491 def test_protocol_sslv3(self):
2492 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002493 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002494 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002495 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2496 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2497 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002498 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2499 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002500 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2501 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002502 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002503 if no_sslv2_implies_sslv3_hello():
2504 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002505 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2506 False, client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002507
Antoine Pitrou23df4832010-08-04 17:14:06 +00002508 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002509 def test_protocol_tlsv1(self):
2510 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002511 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002512 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002513 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2514 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2515 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002516 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2517 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002518 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2519 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002520 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2521 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002522
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002523 @skip_if_broken_ubuntu_ssl
2524 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2525 "TLS version 1.1 not supported.")
2526 def test_protocol_tlsv1_1(self):
2527 """Connecting to a TLSv1.1 server with various client options.
2528 Testing against older TLS versions."""
2529 if support.verbose:
2530 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002531 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002532 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2533 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002534 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2535 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002536 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2537 client_options=ssl.OP_NO_TLSv1_1)
2538
Antoine Pitrou47e40422014-09-04 21:00:10 +02002539 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002540 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2541 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2542
2543
2544 @skip_if_broken_ubuntu_ssl
2545 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2546 "TLS version 1.2 not supported.")
2547 def test_protocol_tlsv1_2(self):
2548 """Connecting to a TLSv1.2 server with various client options.
2549 Testing against older TLS versions."""
2550 if support.verbose:
2551 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002552 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002553 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2554 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2555 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2556 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002557 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2558 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002559 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2560 client_options=ssl.OP_NO_TLSv1_2)
2561
Antoine Pitrou47e40422014-09-04 21:00:10 +02002562 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002563 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2564 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2565 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2566 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2567
Antoine Pitrou480a1242010-04-28 21:37:09 +00002568 def test_starttls(self):
2569 """Switching from clear text to encrypted and back again."""
2570 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002571
Trent Nelson78520002008-04-10 20:54:35 +00002572 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002573 ssl_version=ssl.PROTOCOL_TLSv1,
2574 starttls_server=True,
2575 chatty=True,
2576 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002577 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002578 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002579 s = socket.socket()
2580 s.setblocking(1)
2581 s.connect((HOST, server.port))
2582 if support.verbose:
2583 sys.stdout.write("\n")
2584 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002585 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002586 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002587 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002588 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002589 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002590 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002591 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002592 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002593 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002594 msg = outdata.strip().lower()
2595 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2596 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002597 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002598 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002599 " client: read %r from server, starting TLS...\n"
2600 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002601 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2602 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002603 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2604 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002605 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002606 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002607 " client: read %r from server, ending TLS...\n"
2608 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002609 s = conn.unwrap()
2610 wrapped = False
2611 else:
2612 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002613 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002614 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002615 if support.verbose:
2616 sys.stdout.write(" client: closing connection.\n")
2617 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002618 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002619 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002620 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002621 if wrapped:
2622 conn.close()
2623 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002624 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002625
Antoine Pitrou480a1242010-04-28 21:37:09 +00002626 def test_socketserver(self):
2627 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002628 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002629 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002630 if support.verbose:
2631 sys.stdout.write('\n')
2632 with open(CERTFILE, 'rb') as f:
2633 d1 = f.read()
2634 d2 = ''
2635 # now fetch the same data from the HTTPS server
Benjamin Peterson4ffb0752014-11-03 14:29:33 -05002636 url = 'https://localhost:%d/%s' % (
2637 server.port, os.path.split(CERTFILE)[1])
2638 context = ssl.create_default_context(cafile=CERTFILE)
2639 f = urllib.request.urlopen(url, context=context)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002640 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002641 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002642 if dlen and (int(dlen) > 0):
2643 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002644 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002645 sys.stdout.write(
2646 " client: read %d bytes from remote server '%s'\n"
2647 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002648 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002649 f.close()
2650 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002651
Antoine Pitrou480a1242010-04-28 21:37:09 +00002652 def test_asyncore_server(self):
2653 """Check the example asyncore integration."""
2654 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002655
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002656 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002657 sys.stdout.write("\n")
2658
Antoine Pitrou480a1242010-04-28 21:37:09 +00002659 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002660 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002661 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002662 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002663 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002664 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002665 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002666 " client: sending %r...\n" % indata)
2667 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002668 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002669 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002670 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002671 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002672 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002673 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2674 % (outdata[:20], len(outdata),
2675 indata[:20].lower(), len(indata)))
2676 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002677 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002678 sys.stdout.write(" client: closing connection.\n")
2679 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002680 if support.verbose:
2681 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002682
Antoine Pitrou480a1242010-04-28 21:37:09 +00002683 def test_recv_send(self):
2684 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002685 if support.verbose:
2686 sys.stdout.write("\n")
2687
2688 server = ThreadedEchoServer(CERTFILE,
2689 certreqs=ssl.CERT_NONE,
2690 ssl_version=ssl.PROTOCOL_TLSv1,
2691 cacerts=CERTFILE,
2692 chatty=True,
2693 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002694 with server:
2695 s = ssl.wrap_socket(socket.socket(),
2696 server_side=False,
2697 certfile=CERTFILE,
2698 ca_certs=CERTFILE,
2699 cert_reqs=ssl.CERT_NONE,
2700 ssl_version=ssl.PROTOCOL_TLSv1)
2701 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002702 # helper methods for standardising recv* method signatures
2703 def _recv_into():
2704 b = bytearray(b"\0"*100)
2705 count = s.recv_into(b)
2706 return b[:count]
2707
2708 def _recvfrom_into():
2709 b = bytearray(b"\0"*100)
2710 count, addr = s.recvfrom_into(b)
2711 return b[:count]
2712
Martin Panter519f9122016-04-03 02:12:54 +00002713 # (name, method, expect success?, *args, return value func)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002714 send_methods = [
Martin Panter519f9122016-04-03 02:12:54 +00002715 ('send', s.send, True, [], len),
2716 ('sendto', s.sendto, False, ["some.address"], len),
2717 ('sendall', s.sendall, True, [], lambda x: None),
Bill Janssen58afe4c2008-09-08 16:45:19 +00002718 ]
Martin Panter519f9122016-04-03 02:12:54 +00002719 # (name, method, whether to expect success, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002720 recv_methods = [
2721 ('recv', s.recv, True, []),
2722 ('recvfrom', s.recvfrom, False, ["some.address"]),
2723 ('recv_into', _recv_into, True, []),
2724 ('recvfrom_into', _recvfrom_into, False, []),
2725 ]
2726 data_prefix = "PREFIX_"
2727
Martin Panter519f9122016-04-03 02:12:54 +00002728 for (meth_name, send_meth, expect_success, args,
2729 ret_val_meth) in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002730 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002731 try:
Martin Panter519f9122016-04-03 02:12:54 +00002732 ret = send_meth(indata, *args)
2733 msg = "sending with {}".format(meth_name)
2734 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002735 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002736 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002737 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002738 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002739 "<<{outdata:r}>> ({nout:d}) received; "
2740 "expected <<{indata:r}>> ({nin:d})\n".format(
2741 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002742 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002743 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002744 )
2745 )
2746 except ValueError as e:
2747 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002748 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002749 "Failed to send with method <<{name:s}>>; "
2750 "expected to succeed.\n".format(name=meth_name)
2751 )
2752 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002753 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002754 "Method <<{name:s}>> failed with unexpected "
2755 "exception message: {exp:s}\n".format(
2756 name=meth_name, exp=e
2757 )
2758 )
2759
2760 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002761 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002762 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002763 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002764 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002765 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002766 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002767 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002768 "<<{outdata:r}>> ({nout:d}) received; "
2769 "expected <<{indata:r}>> ({nin:d})\n".format(
2770 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002771 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002772 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002773 )
2774 )
2775 except ValueError as e:
2776 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002777 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002778 "Failed to receive with method <<{name:s}>>; "
2779 "expected to succeed.\n".format(name=meth_name)
2780 )
2781 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002782 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002783 "Method <<{name:s}>> failed with unexpected "
2784 "exception message: {exp:s}\n".format(
2785 name=meth_name, exp=e
2786 )
2787 )
2788 # consume data
2789 s.read()
2790
Martin Panterf6b1d662016-03-28 00:22:09 +00002791 # read(-1, buffer) is supported, even though read(-1) is not
Martin Panterbed7f1a2016-07-11 00:17:13 +00002792 data = b"data"
Martin Panter5503d472016-03-27 05:35:19 +00002793 s.send(data)
2794 buffer = bytearray(len(data))
2795 self.assertEqual(s.read(-1, buffer), len(data))
2796 self.assertEqual(buffer, data)
2797
Nick Coghlan513886a2011-08-28 00:00:27 +10002798 # Make sure sendmsg et al are disallowed to avoid
2799 # inadvertent disclosure of data and/or corruption
2800 # of the encrypted data stream
2801 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2802 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2803 self.assertRaises(NotImplementedError,
2804 s.recvmsg_into, bytearray(100))
2805
Antoine Pitrou480a1242010-04-28 21:37:09 +00002806 s.write(b"over\n")
Martin Panter5503d472016-03-27 05:35:19 +00002807
2808 self.assertRaises(ValueError, s.recv, -1)
2809 self.assertRaises(ValueError, s.read, -1)
2810
Bill Janssen58afe4c2008-09-08 16:45:19 +00002811 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002812
Martin Panterbed7f1a2016-07-11 00:17:13 +00002813 def test_recv_zero(self):
2814 server = ThreadedEchoServer(CERTFILE)
2815 server.__enter__()
2816 self.addCleanup(server.__exit__, None, None)
2817 s = socket.create_connection((HOST, server.port))
2818 self.addCleanup(s.close)
2819 s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
2820 self.addCleanup(s.close)
2821
2822 # recv/read(0) should return no data
2823 s.send(b"data")
2824 self.assertEqual(s.recv(0), b"")
2825 self.assertEqual(s.read(0), b"")
2826 self.assertEqual(s.read(), b"data")
2827
2828 # Should not block if the other end sends no data
2829 s.setblocking(False)
2830 self.assertEqual(s.recv(0), b"")
2831 self.assertEqual(s.recv_into(bytearray()), 0)
2832
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002833 def test_nonblocking_send(self):
2834 server = ThreadedEchoServer(CERTFILE,
2835 certreqs=ssl.CERT_NONE,
2836 ssl_version=ssl.PROTOCOL_TLSv1,
2837 cacerts=CERTFILE,
2838 chatty=True,
2839 connectionchatty=False)
2840 with server:
2841 s = ssl.wrap_socket(socket.socket(),
2842 server_side=False,
2843 certfile=CERTFILE,
2844 ca_certs=CERTFILE,
2845 cert_reqs=ssl.CERT_NONE,
2846 ssl_version=ssl.PROTOCOL_TLSv1)
2847 s.connect((HOST, server.port))
2848 s.setblocking(False)
2849
2850 # If we keep sending data, at some point the buffers
2851 # will be full and the call will block
2852 buf = bytearray(8192)
2853 def fill_buffer():
2854 while True:
2855 s.send(buf)
2856 self.assertRaises((ssl.SSLWantWriteError,
2857 ssl.SSLWantReadError), fill_buffer)
2858
2859 # Now read all the output and discard it
2860 s.setblocking(True)
2861 s.close()
2862
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002863 def test_handshake_timeout(self):
2864 # Issue #5103: SSL handshake must respect the socket timeout
2865 server = socket.socket(socket.AF_INET)
2866 host = "127.0.0.1"
2867 port = support.bind_port(server)
2868 started = threading.Event()
2869 finish = False
2870
2871 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002872 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002873 started.set()
2874 conns = []
2875 while not finish:
2876 r, w, e = select.select([server], [], [], 0.1)
2877 if server in r:
2878 # Let the socket hang around rather than having
2879 # it closed by garbage collection.
2880 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002881 for sock in conns:
2882 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002883
2884 t = threading.Thread(target=serve)
2885 t.start()
2886 started.wait()
2887
2888 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002889 try:
2890 c = socket.socket(socket.AF_INET)
2891 c.settimeout(0.2)
2892 c.connect((host, port))
2893 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002894 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002895 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002896 finally:
2897 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002898 try:
2899 c = socket.socket(socket.AF_INET)
2900 c = ssl.wrap_socket(c)
2901 c.settimeout(0.2)
2902 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002903 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002904 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002905 finally:
2906 c.close()
2907 finally:
2908 finish = True
2909 t.join()
2910 server.close()
2911
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002912 def test_server_accept(self):
2913 # Issue #16357: accept() on a SSLSocket created through
2914 # SSLContext.wrap_socket().
2915 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2916 context.verify_mode = ssl.CERT_REQUIRED
2917 context.load_verify_locations(CERTFILE)
2918 context.load_cert_chain(CERTFILE)
2919 server = socket.socket(socket.AF_INET)
2920 host = "127.0.0.1"
2921 port = support.bind_port(server)
2922 server = context.wrap_socket(server, server_side=True)
2923
2924 evt = threading.Event()
2925 remote = None
2926 peer = None
2927 def serve():
2928 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002929 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002930 # Block on the accept and wait on the connection to close.
2931 evt.set()
2932 remote, peer = server.accept()
2933 remote.recv(1)
2934
2935 t = threading.Thread(target=serve)
2936 t.start()
2937 # Client wait until server setup and perform a connect.
2938 evt.wait()
2939 client = context.wrap_socket(socket.socket())
2940 client.connect((host, port))
2941 client_addr = client.getsockname()
2942 client.close()
2943 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002944 remote.close()
2945 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002946 # Sanity checks.
2947 self.assertIsInstance(remote, ssl.SSLSocket)
2948 self.assertEqual(peer, client_addr)
2949
Antoine Pitrou242db722013-05-01 20:52:07 +02002950 def test_getpeercert_enotconn(self):
2951 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2952 with context.wrap_socket(socket.socket()) as sock:
2953 with self.assertRaises(OSError) as cm:
2954 sock.getpeercert()
2955 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2956
2957 def test_do_handshake_enotconn(self):
2958 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2959 with context.wrap_socket(socket.socket()) as sock:
2960 with self.assertRaises(OSError) as cm:
2961 sock.do_handshake()
2962 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2963
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002964 def test_default_ciphers(self):
2965 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2966 try:
2967 # Force a set of weak ciphers on our client context
2968 context.set_ciphers("DES")
2969 except ssl.SSLError:
2970 self.skipTest("no DES cipher available")
2971 with ThreadedEchoServer(CERTFILE,
2972 ssl_version=ssl.PROTOCOL_SSLv23,
2973 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002974 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002975 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002976 s.connect((HOST, server.port))
2977 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2978
Antoine Pitrou47e40422014-09-04 21:00:10 +02002979 def test_version_basic(self):
2980 """
2981 Basic tests for SSLSocket.version().
2982 More tests are done in the test_protocol_*() methods.
2983 """
2984 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2985 with ThreadedEchoServer(CERTFILE,
2986 ssl_version=ssl.PROTOCOL_TLSv1,
2987 chatty=False) as server:
2988 with context.wrap_socket(socket.socket()) as s:
2989 self.assertIs(s.version(), None)
2990 s.connect((HOST, server.port))
2991 self.assertEqual(s.version(), "TLSv1")
2992 self.assertIs(s.version(), None)
2993
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002994 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2995 def test_default_ecdh_curve(self):
2996 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2997 # should be enabled by default on SSL contexts.
2998 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2999 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02003000 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3001 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3002 # our default cipher list should prefer ECDH-based ciphers
3003 # automatically.
3004 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3005 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01003006 with ThreadedEchoServer(context=context) as server:
3007 with context.wrap_socket(socket.socket()) as s:
3008 s.connect((HOST, server.port))
3009 self.assertIn("ECDH", s.cipher()[0])
3010
Antoine Pitroud6494802011-07-21 01:11:30 +02003011 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3012 "'tls-unique' channel binding not available")
3013 def test_tls_unique_channel_binding(self):
3014 """Test tls-unique channel binding."""
3015 if support.verbose:
3016 sys.stdout.write("\n")
3017
3018 server = ThreadedEchoServer(CERTFILE,
3019 certreqs=ssl.CERT_NONE,
3020 ssl_version=ssl.PROTOCOL_TLSv1,
3021 cacerts=CERTFILE,
3022 chatty=True,
3023 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01003024 with server:
3025 s = ssl.wrap_socket(socket.socket(),
3026 server_side=False,
3027 certfile=CERTFILE,
3028 ca_certs=CERTFILE,
3029 cert_reqs=ssl.CERT_NONE,
3030 ssl_version=ssl.PROTOCOL_TLSv1)
3031 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003032 # get the data
3033 cb_data = s.get_channel_binding("tls-unique")
3034 if support.verbose:
3035 sys.stdout.write(" got channel binding data: {0!r}\n"
3036 .format(cb_data))
3037
3038 # check if it is sane
3039 self.assertIsNotNone(cb_data)
3040 self.assertEqual(len(cb_data), 12) # True for TLSv1
3041
3042 # and compare with the peers version
3043 s.write(b"CB tls-unique\n")
3044 peer_data_repr = s.read().strip()
3045 self.assertEqual(peer_data_repr,
3046 repr(cb_data).encode("us-ascii"))
3047 s.close()
3048
3049 # now, again
3050 s = ssl.wrap_socket(socket.socket(),
3051 server_side=False,
3052 certfile=CERTFILE,
3053 ca_certs=CERTFILE,
3054 cert_reqs=ssl.CERT_NONE,
3055 ssl_version=ssl.PROTOCOL_TLSv1)
3056 s.connect((HOST, server.port))
3057 new_cb_data = s.get_channel_binding("tls-unique")
3058 if support.verbose:
3059 sys.stdout.write(" got another channel binding data: {0!r}\n"
3060 .format(new_cb_data))
3061 # is it really unique
3062 self.assertNotEqual(cb_data, new_cb_data)
3063 self.assertIsNotNone(cb_data)
3064 self.assertEqual(len(cb_data), 12) # True for TLSv1
3065 s.write(b"CB tls-unique\n")
3066 peer_data_repr = s.read().strip()
3067 self.assertEqual(peer_data_repr,
3068 repr(new_cb_data).encode("us-ascii"))
3069 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00003070
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003071 def test_compression(self):
3072 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3073 context.load_cert_chain(CERTFILE)
3074 stats = server_params_test(context, context,
3075 chatty=True, connectionchatty=True)
3076 if support.verbose:
3077 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3078 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3079
3080 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3081 "ssl.OP_NO_COMPRESSION needed for this test")
3082 def test_compression_disabled(self):
3083 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3084 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01003085 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003086 stats = server_params_test(context, context,
3087 chatty=True, connectionchatty=True)
3088 self.assertIs(stats['compression'], None)
3089
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003090 def test_dh_params(self):
3091 # Check we can get a connection with ephemeral Diffie-Hellman
3092 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3093 context.load_cert_chain(CERTFILE)
3094 context.load_dh_params(DHFILE)
3095 context.set_ciphers("kEDH")
3096 stats = server_params_test(context, context,
3097 chatty=True, connectionchatty=True)
3098 cipher = stats["cipher"][0]
3099 parts = cipher.split("-")
3100 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3101 self.fail("Non-DH cipher: " + cipher[0])
3102
Benjamin Petersoncca27322015-01-23 16:35:37 -05003103 def test_selected_alpn_protocol(self):
3104 # selected_alpn_protocol() is None unless ALPN is used.
3105 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3106 context.load_cert_chain(CERTFILE)
3107 stats = server_params_test(context, context,
3108 chatty=True, connectionchatty=True)
3109 self.assertIs(stats['client_alpn_protocol'], None)
3110
3111 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3112 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3113 # selected_alpn_protocol() is None unless ALPN is used by the client.
3114 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3115 client_context.load_verify_locations(CERTFILE)
3116 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3117 server_context.load_cert_chain(CERTFILE)
3118 server_context.set_alpn_protocols(['foo', 'bar'])
3119 stats = server_params_test(client_context, server_context,
3120 chatty=True, connectionchatty=True)
3121 self.assertIs(stats['client_alpn_protocol'], None)
3122
3123 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3124 def test_alpn_protocols(self):
3125 server_protocols = ['foo', 'bar', 'milkshake']
3126 protocol_tests = [
3127 (['foo', 'bar'], 'foo'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003128 (['bar', 'foo'], 'foo'),
Benjamin Petersoncca27322015-01-23 16:35:37 -05003129 (['milkshake'], 'milkshake'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003130 (['http/3.0', 'http/4.0'], None)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003131 ]
3132 for client_protocols, expected in protocol_tests:
3133 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3134 server_context.load_cert_chain(CERTFILE)
3135 server_context.set_alpn_protocols(server_protocols)
3136 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3137 client_context.load_cert_chain(CERTFILE)
3138 client_context.set_alpn_protocols(client_protocols)
3139 stats = server_params_test(client_context, server_context,
3140 chatty=True, connectionchatty=True)
3141
3142 msg = "failed trying %s (s) and %s (c).\n" \
3143 "was expecting %s, but got %%s from the %%s" \
3144 % (str(server_protocols), str(client_protocols),
3145 str(expected))
3146 client_result = stats['client_alpn_protocol']
3147 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3148 server_result = stats['server_alpn_protocols'][-1] \
3149 if len(stats['server_alpn_protocols']) else 'nothing'
3150 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3151
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01003152 def test_selected_npn_protocol(self):
3153 # selected_npn_protocol() is None unless NPN is used
3154 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3155 context.load_cert_chain(CERTFILE)
3156 stats = server_params_test(context, context,
3157 chatty=True, connectionchatty=True)
3158 self.assertIs(stats['client_npn_protocol'], None)
3159
3160 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3161 def test_npn_protocols(self):
3162 server_protocols = ['http/1.1', 'spdy/2']
3163 protocol_tests = [
3164 (['http/1.1', 'spdy/2'], 'http/1.1'),
3165 (['spdy/2', 'http/1.1'], 'http/1.1'),
3166 (['spdy/2', 'test'], 'spdy/2'),
3167 (['abc', 'def'], 'abc')
3168 ]
3169 for client_protocols, expected in protocol_tests:
3170 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3171 server_context.load_cert_chain(CERTFILE)
3172 server_context.set_npn_protocols(server_protocols)
3173 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3174 client_context.load_cert_chain(CERTFILE)
3175 client_context.set_npn_protocols(client_protocols)
3176 stats = server_params_test(client_context, server_context,
3177 chatty=True, connectionchatty=True)
3178
3179 msg = "failed trying %s (s) and %s (c).\n" \
3180 "was expecting %s, but got %%s from the %%s" \
3181 % (str(server_protocols), str(client_protocols),
3182 str(expected))
3183 client_result = stats['client_npn_protocol']
3184 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3185 server_result = stats['server_npn_protocols'][-1] \
3186 if len(stats['server_npn_protocols']) else 'nothing'
3187 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3188
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003189 def sni_contexts(self):
3190 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3191 server_context.load_cert_chain(SIGNED_CERTFILE)
3192 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3193 other_context.load_cert_chain(SIGNED_CERTFILE2)
3194 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3195 client_context.verify_mode = ssl.CERT_REQUIRED
3196 client_context.load_verify_locations(SIGNING_CA)
3197 return server_context, other_context, client_context
3198
3199 def check_common_name(self, stats, name):
3200 cert = stats['peercert']
3201 self.assertIn((('commonName', name),), cert['subject'])
3202
3203 @needs_sni
3204 def test_sni_callback(self):
3205 calls = []
3206 server_context, other_context, client_context = self.sni_contexts()
3207
3208 def servername_cb(ssl_sock, server_name, initial_context):
3209 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003210 if server_name is not None:
3211 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003212 server_context.set_servername_callback(servername_cb)
3213
3214 stats = server_params_test(client_context, server_context,
3215 chatty=True,
3216 sni_name='supermessage')
3217 # The hostname was fetched properly, and the certificate was
3218 # changed for the connection.
3219 self.assertEqual(calls, [("supermessage", server_context)])
3220 # CERTFILE4 was selected
3221 self.check_common_name(stats, 'fakehostname')
3222
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003223 calls = []
3224 # The callback is called with server_name=None
3225 stats = server_params_test(client_context, server_context,
3226 chatty=True,
3227 sni_name=None)
3228 self.assertEqual(calls, [(None, server_context)])
3229 self.check_common_name(stats, 'localhost')
3230
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003231 # Check disabling the callback
3232 calls = []
3233 server_context.set_servername_callback(None)
3234
3235 stats = server_params_test(client_context, server_context,
3236 chatty=True,
3237 sni_name='notfunny')
3238 # Certificate didn't change
3239 self.check_common_name(stats, 'localhost')
3240 self.assertEqual(calls, [])
3241
3242 @needs_sni
3243 def test_sni_callback_alert(self):
3244 # Returning a TLS alert is reflected to the connecting client
3245 server_context, other_context, client_context = self.sni_contexts()
3246
3247 def cb_returning_alert(ssl_sock, server_name, initial_context):
3248 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3249 server_context.set_servername_callback(cb_returning_alert)
3250
3251 with self.assertRaises(ssl.SSLError) as cm:
3252 stats = server_params_test(client_context, server_context,
3253 chatty=False,
3254 sni_name='supermessage')
3255 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3256
3257 @needs_sni
3258 def test_sni_callback_raising(self):
3259 # Raising fails the connection with a TLS handshake failure alert.
3260 server_context, other_context, client_context = self.sni_contexts()
3261
3262 def cb_raising(ssl_sock, server_name, initial_context):
3263 1/0
3264 server_context.set_servername_callback(cb_raising)
3265
3266 with self.assertRaises(ssl.SSLError) as cm, \
3267 support.captured_stderr() as stderr:
3268 stats = server_params_test(client_context, server_context,
3269 chatty=False,
3270 sni_name='supermessage')
3271 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3272 self.assertIn("ZeroDivisionError", stderr.getvalue())
3273
3274 @needs_sni
3275 def test_sni_callback_wrong_return_type(self):
3276 # Returning the wrong return type terminates the TLS connection
3277 # with an internal error alert.
3278 server_context, other_context, client_context = self.sni_contexts()
3279
3280 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3281 return "foo"
3282 server_context.set_servername_callback(cb_wrong_return_type)
3283
3284 with self.assertRaises(ssl.SSLError) as cm, \
3285 support.captured_stderr() as stderr:
3286 stats = server_params_test(client_context, server_context,
3287 chatty=False,
3288 sni_name='supermessage')
3289 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3290 self.assertIn("TypeError", stderr.getvalue())
3291
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003292 def test_shared_ciphers(self):
Benjamin Petersonaacd5242015-01-07 11:42:38 -06003293 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson15042922015-01-07 22:12:43 -06003294 server_context.load_cert_chain(SIGNED_CERTFILE)
3295 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3296 client_context.verify_mode = ssl.CERT_REQUIRED
3297 client_context.load_verify_locations(SIGNING_CA)
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003298 client_context.set_ciphers("RC4")
Benjamin Petersone6838e02015-01-07 20:52:40 -06003299 server_context.set_ciphers("AES:RC4")
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003300 stats = server_params_test(client_context, server_context)
3301 ciphers = stats['server_shared_ciphers'][0]
3302 self.assertGreater(len(ciphers), 0)
3303 for name, tls_version, bits in ciphers:
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003304 self.assertIn("RC4", name.split("-"))
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003305
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003306 def test_read_write_after_close_raises_valuerror(self):
3307 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3308 context.verify_mode = ssl.CERT_REQUIRED
3309 context.load_verify_locations(CERTFILE)
3310 context.load_cert_chain(CERTFILE)
3311 server = ThreadedEchoServer(context=context, chatty=False)
3312
3313 with server:
3314 s = context.wrap_socket(socket.socket())
3315 s.connect((HOST, server.port))
3316 s.close()
3317
3318 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02003319 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003320
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003321 def test_sendfile(self):
3322 TEST_DATA = b"x" * 512
3323 with open(support.TESTFN, 'wb') as f:
3324 f.write(TEST_DATA)
3325 self.addCleanup(support.unlink, support.TESTFN)
3326 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3327 context.verify_mode = ssl.CERT_REQUIRED
3328 context.load_verify_locations(CERTFILE)
3329 context.load_cert_chain(CERTFILE)
3330 server = ThreadedEchoServer(context=context, chatty=False)
3331 with server:
3332 with context.wrap_socket(socket.socket()) as s:
3333 s.connect((HOST, server.port))
3334 with open(support.TESTFN, 'rb') as file:
3335 s.sendfile(file)
3336 self.assertEqual(s.recv(1024), TEST_DATA)
3337
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003338
Thomas Woutersed03b412007-08-28 21:37:11 +00003339def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003340 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003341 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003342 plats = {
3343 'Linux': platform.linux_distribution,
3344 'Mac': platform.mac_ver,
3345 'Windows': platform.win32_ver,
3346 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003347 with warnings.catch_warnings():
3348 warnings.filterwarnings(
3349 'ignore',
3350 'dist\(\) and linux_distribution\(\) '
3351 'functions are deprecated .*',
3352 PendingDeprecationWarning,
3353 )
3354 for name, func in plats.items():
3355 plat = func()
3356 if plat and plat[0]:
3357 plat = '%s %r' % (name, plat)
3358 break
3359 else:
3360 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003361 print("test_ssl: testing with %r %r" %
3362 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3363 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003364 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003365 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3366 try:
3367 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3368 except AttributeError:
3369 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003370
Antoine Pitrou152efa22010-05-16 18:19:27 +00003371 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003372 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003373 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003374 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003375 BADCERT, BADKEY, EMPTYCERT]:
3376 if not os.path.exists(filename):
3377 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003378
Martin Panter3840b2a2016-03-27 01:53:46 +00003379 tests = [
3380 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
3381 SimpleBackgroundTests,
3382 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003383
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003384 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003385 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003386
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003387 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003388 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003389 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003390 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003391
Antoine Pitrou480a1242010-04-28 21:37:09 +00003392 try:
3393 support.run_unittest(*tests)
3394 finally:
3395 if _have_threads:
3396 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003397
3398if __name__ == "__main__":
3399 test_main()