blob: 7824e9c847dc06e5ed1fda1e4afe318292e387ed [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou152efa22010-05-16 18:19:27 +000014import tempfile
Antoine Pitrou803e6d62010-10-13 10:36:15 +000015import urllib.request
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Thomas Woutersed03b412007-08-28 21:37:11 +000021
Antoine Pitrou05d936d2010-10-13 11:38:36 +000022ssl = support.import_module("ssl")
23
Martin Panter3840b2a2016-03-27 01:53:46 +000024try:
25 import threading
26except ImportError:
27 _have_threads = False
28else:
29 _have_threads = True
30
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010031PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000032HOST = support.HOST
Antoine Pitrou152efa22010-05-16 18:19:27 +000033
Christian Heimesefff7062013-11-21 03:35:02 +010034def data_file(*name):
35 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000036
Antoine Pitrou81564092010-10-08 23:06:24 +000037# The custom key and certificate files used in test_ssl are generated
38# using Lib/test/make_ssl_certs.py.
39# Other certificates are simply fetched from the Internet servers they
40# are meant to authenticate.
41
Antoine Pitrou152efa22010-05-16 18:19:27 +000042CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000043BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000044ONLYCERT = data_file("ssl_cert.pem")
45ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000046BYTES_ONLYCERT = os.fsencode(ONLYCERT)
47BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020048CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
49ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
50KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000051CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000052BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010053CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
54CAFILE_CACERT = data_file("capath", "5ed36f99.0")
55
Antoine Pitrou152efa22010-05-16 18:19:27 +000056
Christian Heimes22587792013-11-21 23:56:13 +010057# empty CRL
58CRLFILE = data_file("revocation.crl")
59
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010060# Two keys and certs signed by the same CA (for SNI tests)
61SIGNED_CERTFILE = data_file("keycert3.pem")
62SIGNED_CERTFILE2 = data_file("keycert4.pem")
Martin Panter3840b2a2016-03-27 01:53:46 +000063# Same certificate as pycacert.pem, but without extra text in file
64SIGNING_CA = data_file("capath", "ceff1710.0")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010065
Martin Panter3d81d932016-01-14 09:36:00 +000066REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +000067
68EMPTYCERT = data_file("nullcert.pem")
69BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +000070NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000071BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +020072NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +020073NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +000074
Benjamin Petersona7eaf562015-04-02 00:04:06 -040075DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +010076BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +000077
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010078
Thomas Woutersed03b412007-08-28 21:37:11 +000079def handle_error(prefix):
80 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +000081 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +000082 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +000083
Antoine Pitroub5218772010-05-21 09:56:06 +000084def can_clear_options():
85 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +020086 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +000087
88def no_sslv2_implies_sslv3_hello():
89 # 0.9.7h or higher
90 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
91
Christian Heimes2427b502013-11-23 11:24:32 +010092def have_verify_flags():
93 # 0.9.8 or higher
94 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
95
Antoine Pitrouc695c952014-04-28 20:57:36 +020096def utc_offset(): #NOTE: ignore issues like #1647654
97 # local time = utc time + utc offset
98 if time.daylight and time.localtime().tm_isdst > 0:
99 return -time.altzone # seconds
100 return -time.timezone
101
Christian Heimes9424bb42013-06-17 15:32:57 +0200102def asn1time(cert_time):
103 # Some versions of OpenSSL ignore seconds, see #18207
104 # 0.9.8.i
105 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
106 fmt = "%b %d %H:%M:%S %Y GMT"
107 dt = datetime.datetime.strptime(cert_time, fmt)
108 dt = dt.replace(second=0)
109 cert_time = dt.strftime(fmt)
110 # %d adds leading zero but ASN1_TIME_print() uses leading space
111 if cert_time[4] == "0":
112 cert_time = cert_time[:4] + " " + cert_time[5:]
113
114 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000115
Antoine Pitrou23df4832010-08-04 17:14:06 +0000116# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
117def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200118 if hasattr(ssl, 'PROTOCOL_SSLv2'):
119 @functools.wraps(func)
120 def f(*args, **kwargs):
121 try:
122 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
123 except ssl.SSLError:
124 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
125 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
126 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
127 return func(*args, **kwargs)
128 return f
129 else:
130 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000131
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100132needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
133
Antoine Pitrou23df4832010-08-04 17:14:06 +0000134
Antoine Pitrou152efa22010-05-16 18:19:27 +0000135class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000136
Antoine Pitrou480a1242010-04-28 21:37:09 +0000137 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000138 ssl.CERT_NONE
139 ssl.CERT_OPTIONAL
140 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100141 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100142 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100143 if ssl.HAS_ECDH:
144 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100145 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
146 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000147 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100148 self.assertIn(ssl.HAS_ECDH, {True, False})
Thomas Woutersed03b412007-08-28 21:37:11 +0000149
Antoine Pitrou172f0252014-04-18 20:33:08 +0200150 def test_str_for_enums(self):
151 # Make sure that the PROTOCOL_* constants have enum-like string
152 # reprs.
Victor Stinner648b8622014-12-12 12:23:59 +0100153 proto = ssl.PROTOCOL_SSLv23
154 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_SSLv23')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200155 ctx = ssl.SSLContext(proto)
156 self.assertIs(ctx.protocol, proto)
157
Antoine Pitrou480a1242010-04-28 21:37:09 +0000158 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000159 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000160 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000161 sys.stdout.write("\n RAND_status is %d (%s)\n"
162 % (v, (v and "sufficient randomness") or
163 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200164
165 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
166 self.assertEqual(len(data), 16)
167 self.assertEqual(is_cryptographic, v == 1)
168 if v:
169 data = ssl.RAND_bytes(16)
170 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200171 else:
172 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200173
Victor Stinner1e81a392013-12-19 16:47:04 +0100174 # negative num is invalid
175 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
176 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
177
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100178 if hasattr(ssl, 'RAND_egd'):
179 self.assertRaises(TypeError, ssl.RAND_egd, 1)
180 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000181 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200182 ssl.RAND_add(b"this is a random bytes object", 75.0)
183 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000184
Christian Heimesf77b4b22013-08-21 13:26:05 +0200185 @unittest.skipUnless(os.name == 'posix', 'requires posix')
186 def test_random_fork(self):
187 status = ssl.RAND_status()
188 if not status:
189 self.fail("OpenSSL's PRNG has insufficient randomness")
190
191 rfd, wfd = os.pipe()
192 pid = os.fork()
193 if pid == 0:
194 try:
195 os.close(rfd)
196 child_random = ssl.RAND_pseudo_bytes(16)[0]
197 self.assertEqual(len(child_random), 16)
198 os.write(wfd, child_random)
199 os.close(wfd)
200 except BaseException:
201 os._exit(1)
202 else:
203 os._exit(0)
204 else:
205 os.close(wfd)
206 self.addCleanup(os.close, rfd)
207 _, status = os.waitpid(pid, 0)
208 self.assertEqual(status, 0)
209
210 child_random = os.read(rfd, 16)
211 self.assertEqual(len(child_random), 16)
212 parent_random = ssl.RAND_pseudo_bytes(16)[0]
213 self.assertEqual(len(parent_random), 16)
214
215 self.assertNotEqual(child_random, parent_random)
216
Antoine Pitrou480a1242010-04-28 21:37:09 +0000217 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000218 # note that this uses an 'unofficial' function in _ssl.c,
219 # provided solely for this test, to exercise the certificate
220 # parsing code
Antoine Pitroufb046912010-11-09 20:21:19 +0000221 p = ssl._ssl._test_decode_cert(CERTFILE)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000222 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000223 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200224 self.assertEqual(p['issuer'],
225 ((('countryName', 'XY'),),
226 (('localityName', 'Castle Anthrax'),),
227 (('organizationName', 'Python Software Foundation'),),
228 (('commonName', 'localhost'),))
229 )
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100230 # Note the next three asserts will fail if the keys are regenerated
Christian Heimes9424bb42013-06-17 15:32:57 +0200231 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
232 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200233 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
234 self.assertEqual(p['subject'],
235 ((('countryName', 'XY'),),
236 (('localityName', 'Castle Anthrax'),),
237 (('organizationName', 'Python Software Foundation'),),
238 (('commonName', 'localhost'),))
239 )
240 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
241 # Issue #13034: the subjectAltName in some certificates
242 # (notably projects.developer.nokia.com:443) wasn't parsed
243 p = ssl._ssl._test_decode_cert(NOKIACERT)
244 if support.verbose:
245 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
246 self.assertEqual(p['subjectAltName'],
247 (('DNS', 'projects.developer.nokia.com'),
248 ('DNS', 'projects.forum.nokia.com'))
249 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100250 # extra OCSP and AIA fields
251 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
252 self.assertEqual(p['caIssuers'],
253 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
254 self.assertEqual(p['crlDistributionPoints'],
255 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000256
Christian Heimes824f7f32013-08-17 00:54:47 +0200257 def test_parse_cert_CVE_2013_4238(self):
258 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
259 if support.verbose:
260 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
261 subject = ((('countryName', 'US'),),
262 (('stateOrProvinceName', 'Oregon'),),
263 (('localityName', 'Beaverton'),),
264 (('organizationName', 'Python Software Foundation'),),
265 (('organizationalUnitName', 'Python Core Development'),),
266 (('commonName', 'null.python.org\x00example.org'),),
267 (('emailAddress', 'python-dev@python.org'),))
268 self.assertEqual(p['subject'], subject)
269 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200270 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
271 san = (('DNS', 'altnull.python.org\x00example.com'),
272 ('email', 'null@python.org\x00user@example.org'),
273 ('URI', 'http://null.python.org\x00http://example.org'),
274 ('IP Address', '192.0.2.1'),
275 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
276 else:
277 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
278 san = (('DNS', 'altnull.python.org\x00example.com'),
279 ('email', 'null@python.org\x00user@example.org'),
280 ('URI', 'http://null.python.org\x00http://example.org'),
281 ('IP Address', '192.0.2.1'),
282 ('IP Address', '<invalid>'))
283
284 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200285
Antoine Pitrou480a1242010-04-28 21:37:09 +0000286 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000287 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000288 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000289 d1 = ssl.PEM_cert_to_DER_cert(pem)
290 p2 = ssl.DER_cert_to_PEM_cert(d1)
291 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000292 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000293 if not p2.startswith(ssl.PEM_HEADER + '\n'):
294 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
295 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
296 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000297
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000298 def test_openssl_version(self):
299 n = ssl.OPENSSL_VERSION_NUMBER
300 t = ssl.OPENSSL_VERSION_INFO
301 s = ssl.OPENSSL_VERSION
302 self.assertIsInstance(n, int)
303 self.assertIsInstance(t, tuple)
304 self.assertIsInstance(s, str)
305 # Some sanity checks follow
306 # >= 0.9
307 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400308 # < 3.0
309 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000310 major, minor, fix, patch, status = t
311 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400312 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000313 self.assertGreaterEqual(minor, 0)
314 self.assertLess(minor, 256)
315 self.assertGreaterEqual(fix, 0)
316 self.assertLess(fix, 256)
317 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100318 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000319 self.assertGreaterEqual(status, 0)
320 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400321 # Version string as returned by {Open,Libre}SSL, the format might change
322 if "LibreSSL" in s:
323 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
Victor Stinner789b8052015-01-06 11:51:06 +0100324 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400325 else:
326 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100327 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000328
Antoine Pitrou9d543662010-04-23 23:10:32 +0000329 @support.cpython_only
330 def test_refcycle(self):
331 # Issue #7943: an SSL object doesn't create reference cycles with
332 # itself.
333 s = socket.socket(socket.AF_INET)
334 ss = ssl.wrap_socket(s)
335 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100336 with support.check_warnings(("", ResourceWarning)):
337 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100338 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000339
Antoine Pitroua468adc2010-09-14 14:43:44 +0000340 def test_wrapped_unconnected(self):
341 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200342 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000343 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100344 with ssl.wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100345 self.assertRaises(OSError, ss.recv, 1)
346 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
347 self.assertRaises(OSError, ss.recvfrom, 1)
348 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
349 self.assertRaises(OSError, ss.send, b'x')
350 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000351
Antoine Pitrou40f08742010-04-24 22:04:40 +0000352 def test_timeout(self):
353 # Issue #8524: when creating an SSL socket, the timeout of the
354 # original socket should be retained.
355 for timeout in (None, 0.0, 5.0):
356 s = socket.socket(socket.AF_INET)
357 s.settimeout(timeout)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100358 with ssl.wrap_socket(s) as ss:
359 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000360
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000361 def test_errors(self):
362 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000363 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000364 "certfile must be specified",
365 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000366 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000367 "certfile must be specified for server-side operations",
368 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000369 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000370 "certfile must be specified for server-side operations",
371 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100372 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
373 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
374 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200375 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000376 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000377 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000378 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200379 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000380 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000381 ssl.wrap_socket(sock,
382 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000383 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200384 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000385 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000386 ssl.wrap_socket(sock,
387 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000388 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000389
Martin Panter3464ea22016-02-01 21:58:11 +0000390 def bad_cert_test(self, certfile):
391 """Check that trying to use the given client certificate fails"""
392 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
393 certfile)
394 sock = socket.socket()
395 self.addCleanup(sock.close)
396 with self.assertRaises(ssl.SSLError):
397 ssl.wrap_socket(sock,
398 certfile=certfile,
399 ssl_version=ssl.PROTOCOL_TLSv1)
400
401 def test_empty_cert(self):
402 """Wrapping with an empty cert file"""
403 self.bad_cert_test("nullcert.pem")
404
405 def test_malformed_cert(self):
406 """Wrapping with a badly formatted certificate (syntax error)"""
407 self.bad_cert_test("badcert.pem")
408
409 def test_malformed_key(self):
410 """Wrapping with a badly formatted key (syntax error)"""
411 self.bad_cert_test("badkey.pem")
412
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000413 def test_match_hostname(self):
414 def ok(cert, hostname):
415 ssl.match_hostname(cert, hostname)
416 def fail(cert, hostname):
417 self.assertRaises(ssl.CertificateError,
418 ssl.match_hostname, cert, hostname)
419
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100420 # -- Hostname matching --
421
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000422 cert = {'subject': ((('commonName', 'example.com'),),)}
423 ok(cert, 'example.com')
424 ok(cert, 'ExAmple.cOm')
425 fail(cert, 'www.example.com')
426 fail(cert, '.example.com')
427 fail(cert, 'example.org')
428 fail(cert, 'exampleXcom')
429
430 cert = {'subject': ((('commonName', '*.a.com'),),)}
431 ok(cert, 'foo.a.com')
432 fail(cert, 'bar.foo.a.com')
433 fail(cert, 'a.com')
434 fail(cert, 'Xa.com')
435 fail(cert, '.a.com')
436
Georg Brandl72c98d32013-10-27 07:16:53 +0100437 # only match one left-most wildcard
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000438 cert = {'subject': ((('commonName', 'f*.com'),),)}
439 ok(cert, 'foo.com')
440 ok(cert, 'f.com')
441 fail(cert, 'bar.com')
442 fail(cert, 'foo.a.com')
443 fail(cert, 'bar.foo.com')
444
Christian Heimes824f7f32013-08-17 00:54:47 +0200445 # NULL bytes are bad, CVE-2013-4073
446 cert = {'subject': ((('commonName',
447 'null.python.org\x00example.org'),),)}
448 ok(cert, 'null.python.org\x00example.org') # or raise an error?
449 fail(cert, 'example.org')
450 fail(cert, 'null.python.org')
451
Georg Brandl72c98d32013-10-27 07:16:53 +0100452 # error cases with wildcards
453 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
454 fail(cert, 'bar.foo.a.com')
455 fail(cert, 'a.com')
456 fail(cert, 'Xa.com')
457 fail(cert, '.a.com')
458
459 cert = {'subject': ((('commonName', 'a.*.com'),),)}
460 fail(cert, 'a.foo.com')
461 fail(cert, 'a..com')
462 fail(cert, 'a.com')
463
464 # wildcard doesn't match IDNA prefix 'xn--'
465 idna = 'püthon.python.org'.encode("idna").decode("ascii")
466 cert = {'subject': ((('commonName', idna),),)}
467 ok(cert, idna)
468 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
469 fail(cert, idna)
470 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
471 fail(cert, idna)
472
473 # wildcard in first fragment and IDNA A-labels in sequent fragments
474 # are supported.
475 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
476 cert = {'subject': ((('commonName', idna),),)}
477 ok(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
478 ok(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
479 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
480 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
481
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000482 # Slightly fake real-world example
483 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
484 'subject': ((('commonName', 'linuxfrz.org'),),),
485 'subjectAltName': (('DNS', 'linuxfr.org'),
486 ('DNS', 'linuxfr.com'),
487 ('othername', '<unsupported>'))}
488 ok(cert, 'linuxfr.org')
489 ok(cert, 'linuxfr.com')
490 # Not a "DNS" entry
491 fail(cert, '<unsupported>')
492 # When there is a subjectAltName, commonName isn't used
493 fail(cert, 'linuxfrz.org')
494
495 # A pristine real-world example
496 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
497 'subject': ((('countryName', 'US'),),
498 (('stateOrProvinceName', 'California'),),
499 (('localityName', 'Mountain View'),),
500 (('organizationName', 'Google Inc'),),
501 (('commonName', 'mail.google.com'),))}
502 ok(cert, 'mail.google.com')
503 fail(cert, 'gmail.com')
504 # Only commonName is considered
505 fail(cert, 'California')
506
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100507 # -- IPv4 matching --
508 cert = {'subject': ((('commonName', 'example.com'),),),
509 'subjectAltName': (('DNS', 'example.com'),
510 ('IP Address', '10.11.12.13'),
511 ('IP Address', '14.15.16.17'))}
512 ok(cert, '10.11.12.13')
513 ok(cert, '14.15.16.17')
514 fail(cert, '14.15.16.18')
515 fail(cert, 'example.net')
516
517 # -- IPv6 matching --
518 cert = {'subject': ((('commonName', 'example.com'),),),
519 'subjectAltName': (('DNS', 'example.com'),
520 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
521 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
522 ok(cert, '2001::cafe')
523 ok(cert, '2003::baba')
524 fail(cert, '2003::bebe')
525 fail(cert, 'example.net')
526
527 # -- Miscellaneous --
528
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000529 # Neither commonName nor subjectAltName
530 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
531 'subject': ((('countryName', 'US'),),
532 (('stateOrProvinceName', 'California'),),
533 (('localityName', 'Mountain View'),),
534 (('organizationName', 'Google Inc'),))}
535 fail(cert, 'mail.google.com')
536
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200537 # No DNS entry in subjectAltName but a commonName
538 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
539 'subject': ((('countryName', 'US'),),
540 (('stateOrProvinceName', 'California'),),
541 (('localityName', 'Mountain View'),),
542 (('commonName', 'mail.google.com'),)),
543 'subjectAltName': (('othername', 'blabla'), )}
544 ok(cert, 'mail.google.com')
545
546 # No DNS entry subjectAltName and no commonName
547 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
548 'subject': ((('countryName', 'US'),),
549 (('stateOrProvinceName', 'California'),),
550 (('localityName', 'Mountain View'),),
551 (('organizationName', 'Google Inc'),)),
552 'subjectAltName': (('othername', 'blabla'),)}
553 fail(cert, 'google.com')
554
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000555 # Empty cert / no cert
556 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
557 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
558
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200559 # Issue #17980: avoid denials of service by refusing more than one
560 # wildcard per fragment.
561 cert = {'subject': ((('commonName', 'a*b.com'),),)}
562 ok(cert, 'axxb.com')
563 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100564 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200565 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
566 with self.assertRaises(ssl.CertificateError) as cm:
567 ssl.match_hostname(cert, 'axxbxxc.com')
568 self.assertIn("too many wildcards", str(cm.exception))
569
Antoine Pitroud5323212010-10-22 18:19:07 +0000570 def test_server_side(self):
571 # server_hostname doesn't work for server sockets
572 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000573 with socket.socket() as sock:
574 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
575 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000576
Antoine Pitroud6494802011-07-21 01:11:30 +0200577 def test_unknown_channel_binding(self):
578 # should raise ValueError for unknown type
579 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200580 s.bind(('127.0.0.1', 0))
581 s.listen()
582 c = socket.socket(socket.AF_INET)
583 c.connect(s.getsockname())
584 with ssl.wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100585 with self.assertRaises(ValueError):
586 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200587 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200588
589 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
590 "'tls-unique' channel binding not available")
591 def test_tls_unique_channel_binding(self):
592 # unconnected should return None for known type
593 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100594 with ssl.wrap_socket(s) as ss:
595 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200596 # the same for server-side
597 s = socket.socket(socket.AF_INET)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100598 with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
599 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200600
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600601 def test_dealloc_warn(self):
602 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
603 r = repr(ss)
604 with self.assertWarns(ResourceWarning) as cm:
605 ss = None
606 support.gc_collect()
607 self.assertIn(r, str(cm.warning.args[0]))
608
Christian Heimes6d7ad132013-06-09 18:02:55 +0200609 def test_get_default_verify_paths(self):
610 paths = ssl.get_default_verify_paths()
611 self.assertEqual(len(paths), 6)
612 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
613
614 with support.EnvironmentVarGuard() as env:
615 env["SSL_CERT_DIR"] = CAPATH
616 env["SSL_CERT_FILE"] = CERTFILE
617 paths = ssl.get_default_verify_paths()
618 self.assertEqual(paths.cafile, CERTFILE)
619 self.assertEqual(paths.capath, CAPATH)
620
Christian Heimes44109d72013-11-22 01:51:30 +0100621 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
622 def test_enum_certificates(self):
623 self.assertTrue(ssl.enum_certificates("CA"))
624 self.assertTrue(ssl.enum_certificates("ROOT"))
625
626 self.assertRaises(TypeError, ssl.enum_certificates)
627 self.assertRaises(WindowsError, ssl.enum_certificates, "")
628
Christian Heimesc2d65e12013-11-22 16:13:55 +0100629 trust_oids = set()
630 for storename in ("CA", "ROOT"):
631 store = ssl.enum_certificates(storename)
632 self.assertIsInstance(store, list)
633 for element in store:
634 self.assertIsInstance(element, tuple)
635 self.assertEqual(len(element), 3)
636 cert, enc, trust = element
637 self.assertIsInstance(cert, bytes)
638 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
639 self.assertIsInstance(trust, (set, bool))
640 if isinstance(trust, set):
641 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100642
643 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100644 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200645
Christian Heimes46bebee2013-06-09 19:03:31 +0200646 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100647 def test_enum_crls(self):
648 self.assertTrue(ssl.enum_crls("CA"))
649 self.assertRaises(TypeError, ssl.enum_crls)
650 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200651
Christian Heimes44109d72013-11-22 01:51:30 +0100652 crls = ssl.enum_crls("CA")
653 self.assertIsInstance(crls, list)
654 for element in crls:
655 self.assertIsInstance(element, tuple)
656 self.assertEqual(len(element), 2)
657 self.assertIsInstance(element[0], bytes)
658 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200659
Christian Heimes46bebee2013-06-09 19:03:31 +0200660
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100661 def test_asn1object(self):
662 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
663 '1.3.6.1.5.5.7.3.1')
664
665 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
666 self.assertEqual(val, expected)
667 self.assertEqual(val.nid, 129)
668 self.assertEqual(val.shortname, 'serverAuth')
669 self.assertEqual(val.longname, 'TLS Web Server Authentication')
670 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
671 self.assertIsInstance(val, ssl._ASN1Object)
672 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
673
674 val = ssl._ASN1Object.fromnid(129)
675 self.assertEqual(val, expected)
676 self.assertIsInstance(val, ssl._ASN1Object)
677 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100678 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
679 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100680 for i in range(1000):
681 try:
682 obj = ssl._ASN1Object.fromnid(i)
683 except ValueError:
684 pass
685 else:
686 self.assertIsInstance(obj.nid, int)
687 self.assertIsInstance(obj.shortname, str)
688 self.assertIsInstance(obj.longname, str)
689 self.assertIsInstance(obj.oid, (str, type(None)))
690
691 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
692 self.assertEqual(val, expected)
693 self.assertIsInstance(val, ssl._ASN1Object)
694 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
695 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
696 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100697 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
698 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100699
Christian Heimes72d28502013-11-23 13:56:58 +0100700 def test_purpose_enum(self):
701 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
702 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
703 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
704 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
705 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
706 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
707 '1.3.6.1.5.5.7.3.1')
708
709 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
710 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
711 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
712 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
713 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
714 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
715 '1.3.6.1.5.5.7.3.2')
716
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100717 def test_unsupported_dtls(self):
718 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
719 self.addCleanup(s.close)
720 with self.assertRaises(NotImplementedError) as cx:
721 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
722 self.assertEqual(str(cx.exception), "only stream sockets are supported")
723 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
724 with self.assertRaises(NotImplementedError) as cx:
725 ctx.wrap_socket(s)
726 self.assertEqual(str(cx.exception), "only stream sockets are supported")
727
Antoine Pitrouc695c952014-04-28 20:57:36 +0200728 def cert_time_ok(self, timestring, timestamp):
729 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
730
731 def cert_time_fail(self, timestring):
732 with self.assertRaises(ValueError):
733 ssl.cert_time_to_seconds(timestring)
734
735 @unittest.skipUnless(utc_offset(),
736 'local time needs to be different from UTC')
737 def test_cert_time_to_seconds_timezone(self):
738 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
739 # results if local timezone is not UTC
740 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
741 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
742
743 def test_cert_time_to_seconds(self):
744 timestring = "Jan 5 09:34:43 2018 GMT"
745 ts = 1515144883.0
746 self.cert_time_ok(timestring, ts)
747 # accept keyword parameter, assert its name
748 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
749 # accept both %e and %d (space or zero generated by strftime)
750 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
751 # case-insensitive
752 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
753 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
754 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
755 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
756 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
757 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
758 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
759 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
760
761 newyear_ts = 1230768000.0
762 # leap seconds
763 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
764 # same timestamp
765 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
766
767 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
768 # allow 60th second (even if it is not a leap second)
769 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
770 # allow 2nd leap second for compatibility with time.strptime()
771 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
772 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
773
774 # no special treatement for the special value:
775 # 99991231235959Z (rfc 5280)
776 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
777
778 @support.run_with_locale('LC_ALL', '')
779 def test_cert_time_to_seconds_locale(self):
780 # `cert_time_to_seconds()` should be locale independent
781
782 def local_february_name():
783 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
784
785 if local_february_name().lower() == 'feb':
786 self.skipTest("locale-specific month name needs to be "
787 "different from C locale")
788
789 # locale-independent
790 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
791 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
792
Martin Panter3840b2a2016-03-27 01:53:46 +0000793 def test_connect_ex_error(self):
794 server = socket.socket(socket.AF_INET)
795 self.addCleanup(server.close)
796 port = support.bind_port(server) # Reserve port but don't listen
797 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
798 cert_reqs=ssl.CERT_REQUIRED)
799 self.addCleanup(s.close)
800 rc = s.connect_ex((HOST, port))
801 # Issue #19919: Windows machines or VMs hosted on Windows
802 # machines sometimes return EWOULDBLOCK.
803 errors = (
804 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
805 errno.EWOULDBLOCK,
806 )
807 self.assertIn(rc, errors)
808
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100809
Antoine Pitrou152efa22010-05-16 18:19:27 +0000810class ContextTests(unittest.TestCase):
811
Antoine Pitrou23df4832010-08-04 17:14:06 +0000812 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000813 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100814 for protocol in PROTOCOLS:
815 ssl.SSLContext(protocol)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000816 self.assertRaises(TypeError, ssl.SSLContext)
817 self.assertRaises(ValueError, ssl.SSLContext, -1)
818 self.assertRaises(ValueError, ssl.SSLContext, 42)
819
Antoine Pitrou23df4832010-08-04 17:14:06 +0000820 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000821 def test_protocol(self):
822 for proto in PROTOCOLS:
823 ctx = ssl.SSLContext(proto)
824 self.assertEqual(ctx.protocol, proto)
825
826 def test_ciphers(self):
827 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
828 ctx.set_ciphers("ALL")
829 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000830 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000831 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000832
Antoine Pitrou23df4832010-08-04 17:14:06 +0000833 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000834 def test_options(self):
835 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800836 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Antoine Pitroub5218772010-05-21 09:56:06 +0000837 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
838 ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800839 ctx.options |= ssl.OP_NO_TLSv1
840 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1,
841 ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000842 if can_clear_options():
843 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
844 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
845 ctx.options)
846 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -0700847 # Ubuntu has OP_NO_SSLv3 forced on by default
848 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +0000849 else:
850 with self.assertRaises(ValueError):
851 ctx.options = 0
852
Christian Heimes22587792013-11-21 23:56:13 +0100853 def test_verify_mode(self):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000854 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
855 # Default value
856 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
857 ctx.verify_mode = ssl.CERT_OPTIONAL
858 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
859 ctx.verify_mode = ssl.CERT_REQUIRED
860 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
861 ctx.verify_mode = ssl.CERT_NONE
862 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
863 with self.assertRaises(TypeError):
864 ctx.verify_mode = None
865 with self.assertRaises(ValueError):
866 ctx.verify_mode = 42
867
Christian Heimes2427b502013-11-23 11:24:32 +0100868 @unittest.skipUnless(have_verify_flags(),
869 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +0100870 def test_verify_flags(self):
871 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -0500872 # default value
873 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
874 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +0100875 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
876 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
877 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
878 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
879 ctx.verify_flags = ssl.VERIFY_DEFAULT
880 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
881 # supports any value
882 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
883 self.assertEqual(ctx.verify_flags,
884 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
885 with self.assertRaises(TypeError):
886 ctx.verify_flags = None
887
Antoine Pitrou152efa22010-05-16 18:19:27 +0000888 def test_load_cert_chain(self):
889 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
890 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -0500891 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000892 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
893 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200894 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000895 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000896 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000897 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000898 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000899 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000900 ctx.load_cert_chain(EMPTYCERT)
901 # Separate key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000902 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000903 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
904 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
905 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000906 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000907 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000908 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000909 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000910 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000911 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
912 # Mismatching key and cert
Antoine Pitroud0919502010-05-17 10:30:00 +0000913 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000914 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +0000915 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +0200916 # Password protected key and cert
917 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
918 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
919 ctx.load_cert_chain(CERTFILE_PROTECTED,
920 password=bytearray(KEY_PASSWORD.encode()))
921 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
922 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
923 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
924 bytearray(KEY_PASSWORD.encode()))
925 with self.assertRaisesRegex(TypeError, "should be a string"):
926 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
927 with self.assertRaises(ssl.SSLError):
928 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
929 with self.assertRaisesRegex(ValueError, "cannot be longer"):
930 # openssl has a fixed limit on the password buffer.
931 # PEM_BUFSIZE is generally set to 1kb.
932 # Return a string larger than this.
933 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
934 # Password callback
935 def getpass_unicode():
936 return KEY_PASSWORD
937 def getpass_bytes():
938 return KEY_PASSWORD.encode()
939 def getpass_bytearray():
940 return bytearray(KEY_PASSWORD.encode())
941 def getpass_badpass():
942 return "badpass"
943 def getpass_huge():
944 return b'a' * (1024 * 1024)
945 def getpass_bad_type():
946 return 9
947 def getpass_exception():
948 raise Exception('getpass error')
949 class GetPassCallable:
950 def __call__(self):
951 return KEY_PASSWORD
952 def getpass(self):
953 return KEY_PASSWORD
954 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
955 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
956 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
957 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
958 ctx.load_cert_chain(CERTFILE_PROTECTED,
959 password=GetPassCallable().getpass)
960 with self.assertRaises(ssl.SSLError):
961 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
962 with self.assertRaisesRegex(ValueError, "cannot be longer"):
963 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
964 with self.assertRaisesRegex(TypeError, "must return a string"):
965 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
966 with self.assertRaisesRegex(Exception, "getpass error"):
967 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
968 # Make sure the password function isn't called if it isn't needed
969 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000970
971 def test_load_verify_locations(self):
972 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
973 ctx.load_verify_locations(CERTFILE)
974 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
975 ctx.load_verify_locations(BYTES_CERTFILE)
976 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
977 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +0100978 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200979 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +0000980 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000981 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000982 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +0000983 ctx.load_verify_locations(BADCERT)
984 ctx.load_verify_locations(CERTFILE, CAPATH)
985 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
986
Victor Stinner80f75e62011-01-29 11:31:20 +0000987 # Issue #10989: crash if the second argument type is invalid
988 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
989
Christian Heimesefff7062013-11-21 03:35:02 +0100990 def test_load_verify_cadata(self):
991 # test cadata
992 with open(CAFILE_CACERT) as f:
993 cacert_pem = f.read()
994 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
995 with open(CAFILE_NEURONIO) as f:
996 neuronio_pem = f.read()
997 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
998
999 # test PEM
1000 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1001 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1002 ctx.load_verify_locations(cadata=cacert_pem)
1003 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1004 ctx.load_verify_locations(cadata=neuronio_pem)
1005 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1006 # cert already in hash table
1007 ctx.load_verify_locations(cadata=neuronio_pem)
1008 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1009
1010 # combined
1011 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1012 combined = "\n".join((cacert_pem, neuronio_pem))
1013 ctx.load_verify_locations(cadata=combined)
1014 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1015
1016 # with junk around the certs
1017 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1018 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1019 neuronio_pem, "tail"]
1020 ctx.load_verify_locations(cadata="\n".join(combined))
1021 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1022
1023 # test DER
1024 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1025 ctx.load_verify_locations(cadata=cacert_der)
1026 ctx.load_verify_locations(cadata=neuronio_der)
1027 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1028 # cert already in hash table
1029 ctx.load_verify_locations(cadata=cacert_der)
1030 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1031
1032 # combined
1033 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1034 combined = b"".join((cacert_der, neuronio_der))
1035 ctx.load_verify_locations(cadata=combined)
1036 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1037
1038 # error cases
1039 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1040 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1041
1042 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1043 ctx.load_verify_locations(cadata="broken")
1044 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1045 ctx.load_verify_locations(cadata=b"broken")
1046
1047
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001048 def test_load_dh_params(self):
1049 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1050 ctx.load_dh_params(DHFILE)
1051 if os.name != 'nt':
1052 ctx.load_dh_params(BYTES_DHFILE)
1053 self.assertRaises(TypeError, ctx.load_dh_params)
1054 self.assertRaises(TypeError, ctx.load_dh_params, None)
1055 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001056 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001057 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001058 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001059 ctx.load_dh_params(CERTFILE)
1060
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001061 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001062 def test_session_stats(self):
1063 for proto in PROTOCOLS:
1064 ctx = ssl.SSLContext(proto)
1065 self.assertEqual(ctx.session_stats(), {
1066 'number': 0,
1067 'connect': 0,
1068 'connect_good': 0,
1069 'connect_renegotiate': 0,
1070 'accept': 0,
1071 'accept_good': 0,
1072 'accept_renegotiate': 0,
1073 'hits': 0,
1074 'misses': 0,
1075 'timeouts': 0,
1076 'cache_full': 0,
1077 })
1078
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001079 def test_set_default_verify_paths(self):
1080 # There's not much we can do to test that it acts as expected,
1081 # so just check it doesn't crash or raise an exception.
1082 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1083 ctx.set_default_verify_paths()
1084
Antoine Pitrou501da612011-12-21 09:27:41 +01001085 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001086 def test_set_ecdh_curve(self):
1087 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1088 ctx.set_ecdh_curve("prime256v1")
1089 ctx.set_ecdh_curve(b"prime256v1")
1090 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1091 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1092 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1093 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1094
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001095 @needs_sni
1096 def test_sni_callback(self):
1097 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1098
1099 # set_servername_callback expects a callable, or None
1100 self.assertRaises(TypeError, ctx.set_servername_callback)
1101 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1102 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1103 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1104
1105 def dummycallback(sock, servername, ctx):
1106 pass
1107 ctx.set_servername_callback(None)
1108 ctx.set_servername_callback(dummycallback)
1109
1110 @needs_sni
1111 def test_sni_callback_refcycle(self):
1112 # Reference cycles through the servername callback are detected
1113 # and cleared.
1114 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1115 def dummycallback(sock, servername, ctx, cycle=ctx):
1116 pass
1117 ctx.set_servername_callback(dummycallback)
1118 wr = weakref.ref(ctx)
1119 del ctx, dummycallback
1120 gc.collect()
1121 self.assertIs(wr(), None)
1122
Christian Heimes9a5395a2013-06-17 15:44:12 +02001123 def test_cert_store_stats(self):
1124 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1125 self.assertEqual(ctx.cert_store_stats(),
1126 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1127 ctx.load_cert_chain(CERTFILE)
1128 self.assertEqual(ctx.cert_store_stats(),
1129 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1130 ctx.load_verify_locations(CERTFILE)
1131 self.assertEqual(ctx.cert_store_stats(),
1132 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001133 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001134 self.assertEqual(ctx.cert_store_stats(),
1135 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1136
1137 def test_get_ca_certs(self):
1138 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1139 self.assertEqual(ctx.get_ca_certs(), [])
1140 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1141 ctx.load_verify_locations(CERTFILE)
1142 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001143 # but CAFILE_CACERT is a CA cert
1144 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001145 self.assertEqual(ctx.get_ca_certs(),
1146 [{'issuer': ((('organizationName', 'Root CA'),),
1147 (('organizationalUnitName', 'http://www.cacert.org'),),
1148 (('commonName', 'CA Cert Signing Authority'),),
1149 (('emailAddress', 'support@cacert.org'),)),
1150 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1151 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1152 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001153 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001154 'subject': ((('organizationName', 'Root CA'),),
1155 (('organizationalUnitName', 'http://www.cacert.org'),),
1156 (('commonName', 'CA Cert Signing Authority'),),
1157 (('emailAddress', 'support@cacert.org'),)),
1158 'version': 3}])
1159
Martin Panterb55f8b72016-01-14 12:53:56 +00001160 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001161 pem = f.read()
1162 der = ssl.PEM_cert_to_DER_cert(pem)
1163 self.assertEqual(ctx.get_ca_certs(True), [der])
1164
Christian Heimes72d28502013-11-23 13:56:58 +01001165 def test_load_default_certs(self):
1166 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1167 ctx.load_default_certs()
1168
1169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1170 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1171 ctx.load_default_certs()
1172
1173 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1174 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1175
1176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1177 self.assertRaises(TypeError, ctx.load_default_certs, None)
1178 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1179
Benjamin Peterson91244e02014-10-03 18:17:15 -04001180 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001181 def test_load_default_certs_env(self):
1182 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1183 with support.EnvironmentVarGuard() as env:
1184 env["SSL_CERT_DIR"] = CAPATH
1185 env["SSL_CERT_FILE"] = CERTFILE
1186 ctx.load_default_certs()
1187 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1188
Benjamin Peterson91244e02014-10-03 18:17:15 -04001189 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1190 def test_load_default_certs_env_windows(self):
1191 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1192 ctx.load_default_certs()
1193 stats = ctx.cert_store_stats()
1194
1195 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1196 with support.EnvironmentVarGuard() as env:
1197 env["SSL_CERT_DIR"] = CAPATH
1198 env["SSL_CERT_FILE"] = CERTFILE
1199 ctx.load_default_certs()
1200 stats["x509"] += 1
1201 self.assertEqual(ctx.cert_store_stats(), stats)
1202
Christian Heimes4c05b472013-11-23 15:58:30 +01001203 def test_create_default_context(self):
1204 ctx = ssl.create_default_context()
Donald Stufft6a2ba942014-03-23 19:05:28 -04001205 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001206 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001207 self.assertTrue(ctx.check_hostname)
Christian Heimes4c05b472013-11-23 15:58:30 +01001208 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001209 self.assertEqual(
1210 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1211 getattr(ssl, "OP_NO_COMPRESSION", 0),
1212 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001213
1214 with open(SIGNING_CA) as f:
1215 cadata = f.read()
1216 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1217 cadata=cadata)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001218 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001219 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1220 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001221 self.assertEqual(
1222 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1223 getattr(ssl, "OP_NO_COMPRESSION", 0),
1224 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001225
1226 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001227 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
Christian Heimes4c05b472013-11-23 15:58:30 +01001228 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1229 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Donald Stufft6a2ba942014-03-23 19:05:28 -04001230 self.assertEqual(
1231 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1232 getattr(ssl, "OP_NO_COMPRESSION", 0),
1233 )
1234 self.assertEqual(
1235 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1236 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1237 )
1238 self.assertEqual(
1239 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1240 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1241 )
Christian Heimes4c05b472013-11-23 15:58:30 +01001242
Christian Heimes67986f92013-11-23 22:43:47 +01001243 def test__create_stdlib_context(self):
1244 ctx = ssl._create_stdlib_context()
1245 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1246 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001247 self.assertFalse(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001248 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1249
1250 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1251 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1252 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1253 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1254
1255 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001256 cert_reqs=ssl.CERT_REQUIRED,
1257 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001258 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1259 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001260 self.assertTrue(ctx.check_hostname)
Christian Heimes67986f92013-11-23 22:43:47 +01001261 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1262
1263 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1264 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1265 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1266 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
Christian Heimes4c05b472013-11-23 15:58:30 +01001267
Christian Heimes1aa9a752013-12-02 02:41:19 +01001268 def test_check_hostname(self):
1269 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1270 self.assertFalse(ctx.check_hostname)
1271
1272 # Requires CERT_REQUIRED or CERT_OPTIONAL
1273 with self.assertRaises(ValueError):
1274 ctx.check_hostname = True
1275 ctx.verify_mode = ssl.CERT_REQUIRED
1276 self.assertFalse(ctx.check_hostname)
1277 ctx.check_hostname = True
1278 self.assertTrue(ctx.check_hostname)
1279
1280 ctx.verify_mode = ssl.CERT_OPTIONAL
1281 ctx.check_hostname = True
1282 self.assertTrue(ctx.check_hostname)
1283
1284 # Cannot set CERT_NONE with check_hostname enabled
1285 with self.assertRaises(ValueError):
1286 ctx.verify_mode = ssl.CERT_NONE
1287 ctx.check_hostname = False
1288 self.assertFalse(ctx.check_hostname)
1289
Antoine Pitrou152efa22010-05-16 18:19:27 +00001290
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001291class SSLErrorTests(unittest.TestCase):
1292
1293 def test_str(self):
1294 # The str() of a SSLError doesn't include the errno
1295 e = ssl.SSLError(1, "foo")
1296 self.assertEqual(str(e), "foo")
1297 self.assertEqual(e.errno, 1)
1298 # Same for a subclass
1299 e = ssl.SSLZeroReturnError(1, "foo")
1300 self.assertEqual(str(e), "foo")
1301 self.assertEqual(e.errno, 1)
1302
1303 def test_lib_reason(self):
1304 # Test the library and reason attributes
1305 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1306 with self.assertRaises(ssl.SSLError) as cm:
1307 ctx.load_dh_params(CERTFILE)
1308 self.assertEqual(cm.exception.library, 'PEM')
1309 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1310 s = str(cm.exception)
1311 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1312
1313 def test_subclass(self):
1314 # Check that the appropriate SSLError subclass is raised
1315 # (this only tests one of them)
1316 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1317 with socket.socket() as s:
1318 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001319 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001320 c = socket.socket()
1321 c.connect(s.getsockname())
1322 c.setblocking(False)
1323 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001324 with self.assertRaises(ssl.SSLWantReadError) as cm:
1325 c.do_handshake()
1326 s = str(cm.exception)
1327 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1328 # For compatibility
1329 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1330
1331
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001332class MemoryBIOTests(unittest.TestCase):
1333
1334 def test_read_write(self):
1335 bio = ssl.MemoryBIO()
1336 bio.write(b'foo')
1337 self.assertEqual(bio.read(), b'foo')
1338 self.assertEqual(bio.read(), b'')
1339 bio.write(b'foo')
1340 bio.write(b'bar')
1341 self.assertEqual(bio.read(), b'foobar')
1342 self.assertEqual(bio.read(), b'')
1343 bio.write(b'baz')
1344 self.assertEqual(bio.read(2), b'ba')
1345 self.assertEqual(bio.read(1), b'z')
1346 self.assertEqual(bio.read(1), b'')
1347
1348 def test_eof(self):
1349 bio = ssl.MemoryBIO()
1350 self.assertFalse(bio.eof)
1351 self.assertEqual(bio.read(), b'')
1352 self.assertFalse(bio.eof)
1353 bio.write(b'foo')
1354 self.assertFalse(bio.eof)
1355 bio.write_eof()
1356 self.assertFalse(bio.eof)
1357 self.assertEqual(bio.read(2), b'fo')
1358 self.assertFalse(bio.eof)
1359 self.assertEqual(bio.read(1), b'o')
1360 self.assertTrue(bio.eof)
1361 self.assertEqual(bio.read(), b'')
1362 self.assertTrue(bio.eof)
1363
1364 def test_pending(self):
1365 bio = ssl.MemoryBIO()
1366 self.assertEqual(bio.pending, 0)
1367 bio.write(b'foo')
1368 self.assertEqual(bio.pending, 3)
1369 for i in range(3):
1370 bio.read(1)
1371 self.assertEqual(bio.pending, 3-i-1)
1372 for i in range(3):
1373 bio.write(b'x')
1374 self.assertEqual(bio.pending, i+1)
1375 bio.read()
1376 self.assertEqual(bio.pending, 0)
1377
1378 def test_buffer_types(self):
1379 bio = ssl.MemoryBIO()
1380 bio.write(b'foo')
1381 self.assertEqual(bio.read(), b'foo')
1382 bio.write(bytearray(b'bar'))
1383 self.assertEqual(bio.read(), b'bar')
1384 bio.write(memoryview(b'baz'))
1385 self.assertEqual(bio.read(), b'baz')
1386
1387 def test_error_types(self):
1388 bio = ssl.MemoryBIO()
1389 self.assertRaises(TypeError, bio.write, 'foo')
1390 self.assertRaises(TypeError, bio.write, None)
1391 self.assertRaises(TypeError, bio.write, True)
1392 self.assertRaises(TypeError, bio.write, 1)
1393
1394
Martin Panter3840b2a2016-03-27 01:53:46 +00001395@unittest.skipUnless(_have_threads, "Needs threading module")
1396class SimpleBackgroundTests(unittest.TestCase):
1397
1398 """Tests that connect to a simple server running in the background"""
1399
1400 def setUp(self):
1401 server = ThreadedEchoServer(SIGNED_CERTFILE)
1402 self.server_addr = (HOST, server.port)
1403 server.__enter__()
1404 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001405
Antoine Pitrou480a1242010-04-28 21:37:09 +00001406 def test_connect(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001407 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1408 cert_reqs=ssl.CERT_NONE) as s:
1409 s.connect(self.server_addr)
1410 self.assertEqual({}, s.getpeercert())
Antoine Pitrou350c7222010-09-09 13:31:46 +00001411
Martin Panter3840b2a2016-03-27 01:53:46 +00001412 # this should succeed because we specify the root cert
1413 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1414 cert_reqs=ssl.CERT_REQUIRED,
1415 ca_certs=SIGNING_CA) as s:
1416 s.connect(self.server_addr)
1417 self.assertTrue(s.getpeercert())
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001418
Martin Panter3840b2a2016-03-27 01:53:46 +00001419 def test_connect_fail(self):
1420 # This should fail because we have no verification certs. Connection
1421 # failure crashes ThreadedEchoServer, so run this in an independent
1422 # test method.
1423 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1424 cert_reqs=ssl.CERT_REQUIRED)
1425 self.addCleanup(s.close)
1426 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1427 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001428
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001429 def test_connect_ex(self):
1430 # Issue #11326: check connect_ex() implementation
Martin Panter3840b2a2016-03-27 01:53:46 +00001431 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1432 cert_reqs=ssl.CERT_REQUIRED,
1433 ca_certs=SIGNING_CA)
1434 self.addCleanup(s.close)
1435 self.assertEqual(0, s.connect_ex(self.server_addr))
1436 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001437
1438 def test_non_blocking_connect_ex(self):
1439 # Issue #11326: non-blocking connect_ex() should allow handshake
1440 # to proceed after the socket gets ready.
Martin Panter3840b2a2016-03-27 01:53:46 +00001441 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1442 cert_reqs=ssl.CERT_REQUIRED,
1443 ca_certs=SIGNING_CA,
1444 do_handshake_on_connect=False)
1445 self.addCleanup(s.close)
1446 s.setblocking(False)
1447 rc = s.connect_ex(self.server_addr)
1448 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1449 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1450 # Wait for connect to finish
1451 select.select([], [s], [], 5.0)
1452 # Non-blocking handshake
1453 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001454 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001455 s.do_handshake()
1456 break
1457 except ssl.SSLWantReadError:
1458 select.select([s], [], [], 5.0)
1459 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001460 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001461 # SSL established
1462 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001463
Antoine Pitrou152efa22010-05-16 18:19:27 +00001464 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001465 # Same as test_connect, but with a separately created context
1466 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1467 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1468 s.connect(self.server_addr)
1469 self.assertEqual({}, s.getpeercert())
1470 # Same with a server hostname
1471 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1472 server_hostname="dummy") as s:
1473 s.connect(self.server_addr)
1474 ctx.verify_mode = ssl.CERT_REQUIRED
1475 # This should succeed because we specify the root cert
1476 ctx.load_verify_locations(SIGNING_CA)
1477 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1478 s.connect(self.server_addr)
1479 cert = s.getpeercert()
1480 self.assertTrue(cert)
1481
1482 def test_connect_with_context_fail(self):
1483 # This should fail because we have no verification certs. Connection
1484 # failure crashes ThreadedEchoServer, so run this in an independent
1485 # test method.
1486 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1487 ctx.verify_mode = ssl.CERT_REQUIRED
1488 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1489 self.addCleanup(s.close)
1490 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1491 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001492
1493 def test_connect_capath(self):
1494 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001495 # NOTE: the subject hashing algorithm has been changed between
1496 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1497 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001498 # filename) for this test to be portable across OpenSSL releases.
Martin Panter3840b2a2016-03-27 01:53:46 +00001499 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1500 ctx.verify_mode = ssl.CERT_REQUIRED
1501 ctx.load_verify_locations(capath=CAPATH)
1502 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1503 s.connect(self.server_addr)
1504 cert = s.getpeercert()
1505 self.assertTrue(cert)
1506 # Same with a bytes `capath` argument
1507 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1508 ctx.verify_mode = ssl.CERT_REQUIRED
1509 ctx.load_verify_locations(capath=BYTES_CAPATH)
1510 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1511 s.connect(self.server_addr)
1512 cert = s.getpeercert()
1513 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001514
Christian Heimesefff7062013-11-21 03:35:02 +01001515 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001516 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001517 pem = f.read()
1518 der = ssl.PEM_cert_to_DER_cert(pem)
Martin Panter3840b2a2016-03-27 01:53:46 +00001519 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1520 ctx.verify_mode = ssl.CERT_REQUIRED
1521 ctx.load_verify_locations(cadata=pem)
1522 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1523 s.connect(self.server_addr)
1524 cert = s.getpeercert()
1525 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001526
Martin Panter3840b2a2016-03-27 01:53:46 +00001527 # same with DER
1528 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1529 ctx.verify_mode = ssl.CERT_REQUIRED
1530 ctx.load_verify_locations(cadata=der)
1531 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1532 s.connect(self.server_addr)
1533 cert = s.getpeercert()
1534 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001535
Antoine Pitroue3220242010-04-24 11:13:53 +00001536 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1537 def test_makefile_close(self):
1538 # Issue #5238: creating a file-like object with makefile() shouldn't
1539 # delay closing the underlying "real socket" (here tested with its
1540 # file descriptor, hence skipping the test under Windows).
Martin Panter3840b2a2016-03-27 01:53:46 +00001541 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1542 ss.connect(self.server_addr)
1543 fd = ss.fileno()
1544 f = ss.makefile()
1545 f.close()
1546 # The fd is still open
1547 os.read(fd, 0)
1548 # Closing the SSL socket should close the fd too
1549 ss.close()
1550 gc.collect()
1551 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001552 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001553 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001554
Antoine Pitrou480a1242010-04-28 21:37:09 +00001555 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001556 s = socket.socket(socket.AF_INET)
1557 s.connect(self.server_addr)
1558 s.setblocking(False)
1559 s = ssl.wrap_socket(s,
1560 cert_reqs=ssl.CERT_NONE,
1561 do_handshake_on_connect=False)
1562 self.addCleanup(s.close)
1563 count = 0
1564 while True:
1565 try:
1566 count += 1
1567 s.do_handshake()
1568 break
1569 except ssl.SSLWantReadError:
1570 select.select([s], [], [])
1571 except ssl.SSLWantWriteError:
1572 select.select([], [s], [])
1573 if support.verbose:
1574 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001575
Antoine Pitrou480a1242010-04-28 21:37:09 +00001576 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001577 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001578
Martin Panter3840b2a2016-03-27 01:53:46 +00001579 def test_get_server_certificate_fail(self):
1580 # Connection failure crashes ThreadedEchoServer, so run this in an
1581 # independent test method
1582 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001583
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001584 def test_ciphers(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001585 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1586 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1587 s.connect(self.server_addr)
1588 with ssl.wrap_socket(socket.socket(socket.AF_INET),
1589 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1590 s.connect(self.server_addr)
1591 # Error checking can happen at instantiation or when connecting
1592 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1593 with socket.socket(socket.AF_INET) as sock:
1594 s = ssl.wrap_socket(sock,
1595 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1596 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001597
Christian Heimes9a5395a2013-06-17 15:44:12 +02001598 def test_get_ca_certs_capath(self):
1599 # capath certs are loaded on request
Martin Panter3840b2a2016-03-27 01:53:46 +00001600 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1601 ctx.verify_mode = ssl.CERT_REQUIRED
1602 ctx.load_verify_locations(capath=CAPATH)
1603 self.assertEqual(ctx.get_ca_certs(), [])
1604 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1605 s.connect(self.server_addr)
1606 cert = s.getpeercert()
1607 self.assertTrue(cert)
1608 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001609
Christian Heimes575596e2013-12-15 21:49:17 +01001610 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001611 def test_context_setget(self):
1612 # Check that the context of a connected socket can be replaced.
Martin Panter3840b2a2016-03-27 01:53:46 +00001613 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1614 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1615 s = socket.socket(socket.AF_INET)
1616 with ctx1.wrap_socket(s) as ss:
1617 ss.connect(self.server_addr)
1618 self.assertIs(ss.context, ctx1)
1619 self.assertIs(ss._sslobj.context, ctx1)
1620 ss.context = ctx2
1621 self.assertIs(ss.context, ctx2)
1622 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001623
1624 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1625 # A simple IO loop. Call func(*args) depending on the error we get
1626 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1627 timeout = kwargs.get('timeout', 10)
1628 count = 0
1629 while True:
1630 errno = None
1631 count += 1
1632 try:
1633 ret = func(*args)
1634 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001635 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001636 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001637 raise
1638 errno = e.errno
1639 # Get any data from the outgoing BIO irrespective of any error, and
1640 # send it to the socket.
1641 buf = outgoing.read()
1642 sock.sendall(buf)
1643 # If there's no error, we're done. For WANT_READ, we need to get
1644 # data from the socket and put it in the incoming BIO.
1645 if errno is None:
1646 break
1647 elif errno == ssl.SSL_ERROR_WANT_READ:
1648 buf = sock.recv(32768)
1649 if buf:
1650 incoming.write(buf)
1651 else:
1652 incoming.write_eof()
1653 if support.verbose:
1654 sys.stdout.write("Needed %d calls to complete %s().\n"
1655 % (count, func.__name__))
1656 return ret
1657
Martin Panter3840b2a2016-03-27 01:53:46 +00001658 def test_bio_handshake(self):
1659 sock = socket.socket(socket.AF_INET)
1660 self.addCleanup(sock.close)
1661 sock.connect(self.server_addr)
1662 incoming = ssl.MemoryBIO()
1663 outgoing = ssl.MemoryBIO()
1664 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1665 ctx.verify_mode = ssl.CERT_REQUIRED
1666 ctx.load_verify_locations(SIGNING_CA)
1667 ctx.check_hostname = True
1668 sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
1669 self.assertIs(sslobj._sslobj.owner, sslobj)
1670 self.assertIsNone(sslobj.cipher())
1671 self.assertIsNone(sslobj.shared_ciphers())
1672 self.assertRaises(ValueError, sslobj.getpeercert)
1673 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1674 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1675 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1676 self.assertTrue(sslobj.cipher())
1677 self.assertIsNone(sslobj.shared_ciphers())
1678 self.assertTrue(sslobj.getpeercert())
1679 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1680 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1681 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001682 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001683 except ssl.SSLSyscallError:
1684 # If the server shuts down the TCP connection without sending a
1685 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1686 pass
1687 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1688
1689 def test_bio_read_write_data(self):
1690 sock = socket.socket(socket.AF_INET)
1691 self.addCleanup(sock.close)
1692 sock.connect(self.server_addr)
1693 incoming = ssl.MemoryBIO()
1694 outgoing = ssl.MemoryBIO()
1695 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1696 ctx.verify_mode = ssl.CERT_NONE
1697 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1698 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1699 req = b'FOO\n'
1700 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1701 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1702 self.assertEqual(buf, b'foo\n')
1703 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001704
1705
Martin Panter3840b2a2016-03-27 01:53:46 +00001706class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001707
Martin Panter3840b2a2016-03-27 01:53:46 +00001708 def test_timeout_connect_ex(self):
1709 # Issue #12065: on a timeout, connect_ex() should return the original
1710 # errno (mimicking the behaviour of non-SSL sockets).
1711 with support.transient_internet(REMOTE_HOST):
1712 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1713 cert_reqs=ssl.CERT_REQUIRED,
1714 do_handshake_on_connect=False)
1715 self.addCleanup(s.close)
1716 s.settimeout(0.0000001)
1717 rc = s.connect_ex((REMOTE_HOST, 443))
1718 if rc == 0:
1719 self.skipTest("REMOTE_HOST responded too quickly")
1720 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1721
1722 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1723 def test_get_server_certificate_ipv6(self):
1724 with support.transient_internet('ipv6.google.com'):
1725 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1726 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1727
1728 def test_algorithms(self):
1729 # Issue #8484: all algorithms should be available when verifying a
1730 # certificate.
1731 # SHA256 was added in OpenSSL 0.9.8
1732 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1733 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
1734 # sha256.tbs-internet.com needs SNI to use the correct certificate
1735 if not ssl.HAS_SNI:
1736 self.skipTest("SNI needed for this test")
1737 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1738 remote = ("sha256.tbs-internet.com", 443)
1739 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
1740 with support.transient_internet("sha256.tbs-internet.com"):
1741 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1742 ctx.verify_mode = ssl.CERT_REQUIRED
1743 ctx.load_verify_locations(sha256_cert)
1744 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1745 server_hostname="sha256.tbs-internet.com")
1746 try:
1747 s.connect(remote)
1748 if support.verbose:
1749 sys.stdout.write("\nCipher with %r is %r\n" %
1750 (remote, s.cipher()))
1751 sys.stdout.write("Certificate is:\n%s\n" %
1752 pprint.pformat(s.getpeercert()))
1753 finally:
1754 s.close()
1755
1756
1757def _test_get_server_certificate(test, host, port, cert=None):
1758 pem = ssl.get_server_certificate((host, port))
1759 if not pem:
1760 test.fail("No server certificate on %s:%s!" % (host, port))
1761
1762 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1763 if not pem:
1764 test.fail("No server certificate on %s:%s!" % (host, port))
1765 if support.verbose:
1766 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1767
1768def _test_get_server_certificate_fail(test, host, port):
1769 try:
1770 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1771 except ssl.SSLError as x:
1772 #should fail
1773 if support.verbose:
1774 sys.stdout.write("%s\n" % x)
1775 else:
1776 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1777
1778
1779if _have_threads:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001780 from test.ssl_servers import make_https_server
1781
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001782 class ThreadedEchoServer(threading.Thread):
1783
1784 class ConnectionHandler(threading.Thread):
1785
1786 """A mildly complicated class, because we want it to work both
1787 with and without the SSL wrapper around the socket connection, so
1788 that we can test the STARTTLS functionality."""
1789
Bill Janssen6e027db2007-11-15 22:23:56 +00001790 def __init__(self, server, connsock, addr):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001791 self.server = server
1792 self.running = False
1793 self.sock = connsock
Bill Janssen6e027db2007-11-15 22:23:56 +00001794 self.addr = addr
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001795 self.sock.setblocking(1)
1796 self.sslconn = None
1797 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001798 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001799
Antoine Pitrou480a1242010-04-28 21:37:09 +00001800 def wrap_conn(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001801 try:
Antoine Pitroub5218772010-05-21 09:56:06 +00001802 self.sslconn = self.server.context.wrap_socket(
1803 self.sock, server_side=True)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001804 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1805 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Nadeem Vawdaad246bf2013-03-03 22:44:22 +01001806 except (ssl.SSLError, ConnectionResetError) as e:
1807 # We treat ConnectionResetError as though it were an
1808 # SSLError - OpenSSL on Ubuntu abruptly closes the
1809 # connection when asked to use an unsupported protocol.
1810 #
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001811 # XXX Various errors can have happened here, for example
1812 # a mismatching protocol version, an invalid certificate,
1813 # or a low-level bug. This should be made more discriminating.
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001814 self.server.conn_errors.append(e)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001815 if self.server.chatty:
Bill Janssen6e027db2007-11-15 22:23:56 +00001816 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001817 self.running = False
1818 self.server.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001819 self.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001820 return False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001821 else:
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001822 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
Antoine Pitroub5218772010-05-21 09:56:06 +00001823 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001824 cert = self.sslconn.getpeercert()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001825 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001826 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1827 cert_binary = self.sslconn.getpeercert(True)
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001828 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001829 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1830 cipher = self.sslconn.cipher()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001831 if support.verbose and self.server.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001832 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001833 sys.stdout.write(" server: selected protocol is now "
1834 + str(self.sslconn.selected_npn_protocol()) + "\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001835 return True
1836
1837 def read(self):
1838 if self.sslconn:
1839 return self.sslconn.read()
1840 else:
1841 return self.sock.recv(1024)
1842
1843 def write(self, bytes):
1844 if self.sslconn:
1845 return self.sslconn.write(bytes)
1846 else:
1847 return self.sock.send(bytes)
1848
1849 def close(self):
1850 if self.sslconn:
1851 self.sslconn.close()
1852 else:
1853 self.sock.close()
1854
Antoine Pitrou480a1242010-04-28 21:37:09 +00001855 def run(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001856 self.running = True
1857 if not self.server.starttls_server:
1858 if not self.wrap_conn():
1859 return
1860 while self.running:
1861 try:
1862 msg = self.read()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001863 stripped = msg.strip()
1864 if not stripped:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001865 # eof, so quit this handler
1866 self.running = False
Martin Panter3840b2a2016-03-27 01:53:46 +00001867 try:
1868 self.sock = self.sslconn.unwrap()
1869 except OSError:
1870 # Many tests shut the TCP connection down
1871 # without an SSL shutdown. This causes
1872 # unwrap() to raise OSError with errno=0!
1873 pass
1874 else:
1875 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001876 self.close()
Antoine Pitrou480a1242010-04-28 21:37:09 +00001877 elif stripped == b'over':
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001878 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001879 sys.stdout.write(" server: client closed connection\n")
1880 self.close()
1881 return
Bill Janssen6e027db2007-11-15 22:23:56 +00001882 elif (self.server.starttls_server and
Antoine Pitrou764b8782010-04-28 22:57:15 +00001883 stripped == b'STARTTLS'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001884 if support.verbose and self.server.connectionchatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001885 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001886 self.write(b"OK\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001887 if not self.wrap_conn():
1888 return
Bill Janssen40a0f662008-08-12 16:56:25 +00001889 elif (self.server.starttls_server and self.sslconn
Antoine Pitrou764b8782010-04-28 22:57:15 +00001890 and stripped == b'ENDTLS'):
Bill Janssen40a0f662008-08-12 16:56:25 +00001891 if support.verbose and self.server.connectionchatty:
1892 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00001893 self.write(b"OK\n")
Bill Janssen40a0f662008-08-12 16:56:25 +00001894 self.sock = self.sslconn.unwrap()
1895 self.sslconn = None
1896 if support.verbose and self.server.connectionchatty:
1897 sys.stdout.write(" server: connection is now unencrypted...\n")
Antoine Pitroud6494802011-07-21 01:11:30 +02001898 elif stripped == b'CB tls-unique':
1899 if support.verbose and self.server.connectionchatty:
1900 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1901 data = self.sslconn.get_channel_binding("tls-unique")
1902 self.write(repr(data).encode("us-ascii") + b"\n")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001903 else:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001904 if (support.verbose and
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001905 self.server.connectionchatty):
1906 ctype = (self.sslconn and "encrypted") or "unencrypted"
Antoine Pitrou480a1242010-04-28 21:37:09 +00001907 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1908 % (msg, ctype, msg.lower(), ctype))
1909 self.write(msg.lower())
Andrew Svetlov0832af62012-12-18 23:10:48 +02001910 except OSError:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001911 if self.server.chatty:
1912 handle_error("Test server failure:\n")
1913 self.close()
1914 self.running = False
1915 # normally, we'd just stop here, but for the test
1916 # harness, we want to stop the server
1917 self.server.stop()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001918
Antoine Pitroub5218772010-05-21 09:56:06 +00001919 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitrou18c913e2010-04-27 10:59:39 +00001920 certreqs=None, cacerts=None,
Antoine Pitrou2d9cb9c2010-04-17 17:40:45 +00001921 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersoncca27322015-01-23 16:35:37 -05001922 npn_protocols=None, alpn_protocols=None,
1923 ciphers=None, context=None):
Antoine Pitroub5218772010-05-21 09:56:06 +00001924 if context:
1925 self.context = context
1926 else:
1927 self.context = ssl.SSLContext(ssl_version
1928 if ssl_version is not None
1929 else ssl.PROTOCOL_TLSv1)
1930 self.context.verify_mode = (certreqs if certreqs is not None
1931 else ssl.CERT_NONE)
1932 if cacerts:
1933 self.context.load_verify_locations(cacerts)
1934 if certificate:
1935 self.context.load_cert_chain(certificate)
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01001936 if npn_protocols:
1937 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersoncca27322015-01-23 16:35:37 -05001938 if alpn_protocols:
1939 self.context.set_alpn_protocols(alpn_protocols)
Antoine Pitroub5218772010-05-21 09:56:06 +00001940 if ciphers:
1941 self.context.set_ciphers(ciphers)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001942 self.chatty = chatty
1943 self.connectionchatty = connectionchatty
1944 self.starttls_server = starttls_server
1945 self.sock = socket.socket()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001946 self.port = support.bind_port(self.sock)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001947 self.flag = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001948 self.active = False
Benjamin Petersoncca27322015-01-23 16:35:37 -05001949 self.selected_npn_protocols = []
1950 self.selected_alpn_protocols = []
Benjamin Peterson4cb17812015-01-07 11:14:26 -06001951 self.shared_ciphers = []
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001952 self.conn_errors = []
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001953 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001954 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001955
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001956 def __enter__(self):
1957 self.start(threading.Event())
1958 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01001959 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01001960
1961 def __exit__(self, *args):
1962 self.stop()
1963 self.join()
1964
Antoine Pitrou480a1242010-04-28 21:37:09 +00001965 def start(self, flag=None):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001966 self.flag = flag
1967 threading.Thread.start(self)
1968
Antoine Pitrou480a1242010-04-28 21:37:09 +00001969 def run(self):
Antoine Pitrouaf7c6022010-04-27 09:56:02 +00001970 self.sock.settimeout(0.05)
Charles-François Natali6e204602014-07-23 19:28:13 +01001971 self.sock.listen()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001972 self.active = True
1973 if self.flag:
1974 # signal an event
1975 self.flag.set()
1976 while self.active:
1977 try:
1978 newconn, connaddr = self.sock.accept()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001979 if support.verbose and self.chatty:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001980 sys.stdout.write(' server: new connection from '
Bill Janssen6e027db2007-11-15 22:23:56 +00001981 + repr(connaddr) + '\n')
1982 handler = self.ConnectionHandler(self, newconn, connaddr)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001983 handler.start()
Antoine Pitroueced82e2012-01-27 17:33:01 +01001984 handler.join()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001985 except socket.timeout:
1986 pass
1987 except KeyboardInterrupt:
1988 self.stop()
Bill Janssen6e027db2007-11-15 22:23:56 +00001989 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001990
Antoine Pitrou480a1242010-04-28 21:37:09 +00001991 def stop(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001992 self.active = False
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001993
Bill Janssen54cc54c2007-12-14 22:08:56 +00001994 class AsyncoreEchoServer(threading.Thread):
1995
1996 # this one's based on asyncore.dispatcher
1997
1998 class EchoServer (asyncore.dispatcher):
1999
2000 class ConnectionHandler (asyncore.dispatcher_with_send):
2001
2002 def __init__(self, conn, certfile):
2003 self.socket = ssl.wrap_socket(conn, server_side=True,
2004 certfile=certfile,
2005 do_handshake_on_connect=False)
2006 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002007 self._ssl_accepting = True
2008 self._do_ssl_handshake()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002009
2010 def readable(self):
2011 if isinstance(self.socket, ssl.SSLSocket):
2012 while self.socket.pending() > 0:
2013 self.handle_read_event()
2014 return True
2015
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002016 def _do_ssl_handshake(self):
2017 try:
2018 self.socket.do_handshake()
Antoine Pitrou41032a62011-10-27 23:56:55 +02002019 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2020 return
2021 except ssl.SSLEOFError:
2022 return self.handle_close()
2023 except ssl.SSLError:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002024 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002025 except OSError as err:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002026 if err.args[0] == errno.ECONNABORTED:
2027 return self.handle_close()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002028 else:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002029 self._ssl_accepting = False
2030
2031 def handle_read(self):
2032 if self._ssl_accepting:
2033 self._do_ssl_handshake()
2034 else:
2035 data = self.recv(1024)
2036 if support.verbose:
2037 sys.stdout.write(" server: read %s from client\n" % repr(data))
2038 if not data:
2039 self.close()
2040 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002041 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002042
2043 def handle_close(self):
Bill Janssen2f5799b2008-06-29 00:08:12 +00002044 self.close()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002045 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002046 sys.stdout.write(" server: closed connection %s\n" % self.socket)
2047
2048 def handle_error(self):
2049 raise
2050
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002051 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002052 self.certfile = certfile
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002053 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2054 self.port = support.bind_port(sock, '')
2055 asyncore.dispatcher.__init__(self, sock)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002056 self.listen(5)
2057
Giampaolo Rodolà977c7072010-10-04 21:08:36 +00002058 def handle_accepted(self, sock_obj, addr):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002059 if support.verbose:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002060 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2061 self.ConnectionHandler(sock_obj, self.certfile)
2062
2063 def handle_error(self):
2064 raise
2065
Trent Nelson78520002008-04-10 20:54:35 +00002066 def __init__(self, certfile):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002067 self.flag = None
2068 self.active = False
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002069 self.server = self.EchoServer(certfile)
2070 self.port = self.server.port
Bill Janssen54cc54c2007-12-14 22:08:56 +00002071 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00002072 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002073
2074 def __str__(self):
2075 return "<%s %s>" % (self.__class__.__name__, self.server)
2076
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002077 def __enter__(self):
2078 self.start(threading.Event())
2079 self.flag.wait()
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002080 return self
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002081
2082 def __exit__(self, *args):
2083 if support.verbose:
2084 sys.stdout.write(" cleanup: stopping server.\n")
2085 self.stop()
2086 if support.verbose:
2087 sys.stdout.write(" cleanup: joining server thread.\n")
2088 self.join()
2089 if support.verbose:
2090 sys.stdout.write(" cleanup: successfully joined.\n")
2091
Bill Janssen54cc54c2007-12-14 22:08:56 +00002092 def start (self, flag=None):
2093 self.flag = flag
2094 threading.Thread.start(self)
2095
Antoine Pitrou480a1242010-04-28 21:37:09 +00002096 def run(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002097 self.active = True
2098 if self.flag:
2099 self.flag.set()
2100 while self.active:
2101 try:
2102 asyncore.loop(1)
2103 except:
2104 pass
2105
Antoine Pitrou480a1242010-04-28 21:37:09 +00002106 def stop(self):
Bill Janssen54cc54c2007-12-14 22:08:56 +00002107 self.active = False
2108 self.server.close()
2109
Antoine Pitroub5218772010-05-21 09:56:06 +00002110 def server_params_test(client_context, server_context, indata=b"FOO\n",
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002111 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou480a1242010-04-28 21:37:09 +00002112 """
2113 Launch a server, connect a client to it and try various reads
2114 and writes.
2115 """
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002116 stats = {}
Antoine Pitroub5218772010-05-21 09:56:06 +00002117 server = ThreadedEchoServer(context=server_context,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002118 chatty=chatty,
Bill Janssen6e027db2007-11-15 22:23:56 +00002119 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002120 with server:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002121 with client_context.wrap_socket(socket.socket(),
2122 server_hostname=sni_name) as s:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002123 s.connect((HOST, server.port))
2124 for arg in [indata, bytearray(indata), memoryview(indata)]:
2125 if connectionchatty:
2126 if support.verbose:
2127 sys.stdout.write(
2128 " client: sending %r...\n" % indata)
2129 s.write(arg)
2130 outdata = s.read()
2131 if connectionchatty:
2132 if support.verbose:
2133 sys.stdout.write(" client: read %r\n" % outdata)
2134 if outdata != indata.lower():
2135 raise AssertionError(
2136 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2137 % (outdata[:20], len(outdata),
2138 indata[:20].lower(), len(indata)))
2139 s.write(b"over\n")
Antoine Pitrou7d7aede2009-11-25 18:55:32 +00002140 if connectionchatty:
2141 if support.verbose:
Antoine Pitroueba63c42012-01-28 17:38:34 +01002142 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002143 stats.update({
Antoine Pitrouce816a52012-01-28 17:40:23 +01002144 'compression': s.compression(),
2145 'cipher': s.cipher(),
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01002146 'peercert': s.getpeercert(),
Benjamin Petersoncca27322015-01-23 16:35:37 -05002147 'client_alpn_protocol': s.selected_alpn_protocol(),
Antoine Pitrou47e40422014-09-04 21:00:10 +02002148 'client_npn_protocol': s.selected_npn_protocol(),
2149 'version': s.version(),
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002150 })
Antoine Pitroueba63c42012-01-28 17:38:34 +01002151 s.close()
Benjamin Petersoncca27322015-01-23 16:35:37 -05002152 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2153 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Peterson4cb17812015-01-07 11:14:26 -06002154 stats['server_shared_ciphers'] = server.shared_ciphers
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01002155 return stats
Thomas Woutersed03b412007-08-28 21:37:11 +00002156
Antoine Pitroub5218772010-05-21 09:56:06 +00002157 def try_protocol_combo(server_protocol, client_protocol, expect_success,
2158 certsreqs=None, server_options=0, client_options=0):
Antoine Pitrou47e40422014-09-04 21:00:10 +02002159 """
2160 Try to SSL-connect using *client_protocol* to *server_protocol*.
2161 If *expect_success* is true, assert that the connection succeeds,
2162 if it's false, assert that the connection fails.
2163 Also, if *expect_success* is a string, assert that it is the protocol
2164 version actually used by the connection.
2165 """
Benjamin Peterson2a691a82008-03-31 01:51:45 +00002166 if certsreqs is None:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002167 certsreqs = ssl.CERT_NONE
Antoine Pitrou480a1242010-04-28 21:37:09 +00002168 certtype = {
2169 ssl.CERT_NONE: "CERT_NONE",
2170 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2171 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2172 }[certsreqs]
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002173 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002174 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002175 sys.stdout.write(formatstr %
2176 (ssl.get_protocol_name(client_protocol),
2177 ssl.get_protocol_name(server_protocol),
2178 certtype))
Antoine Pitroub5218772010-05-21 09:56:06 +00002179 client_context = ssl.SSLContext(client_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002180 client_context.options |= client_options
Antoine Pitroub5218772010-05-21 09:56:06 +00002181 server_context = ssl.SSLContext(server_protocol)
Antoine Pitrou32c49152014-01-09 21:28:48 +01002182 server_context.options |= server_options
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002183
2184 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2185 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2186 # starting from OpenSSL 1.0.0 (see issue #8322).
2187 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2188 client_context.set_ciphers("ALL")
2189
Antoine Pitroub5218772010-05-21 09:56:06 +00002190 for ctx in (client_context, server_context):
2191 ctx.verify_mode = certsreqs
Antoine Pitroub5218772010-05-21 09:56:06 +00002192 ctx.load_cert_chain(CERTFILE)
2193 ctx.load_verify_locations(CERTFILE)
2194 try:
Antoine Pitrou47e40422014-09-04 21:00:10 +02002195 stats = server_params_test(client_context, server_context,
2196 chatty=False, connectionchatty=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002197 # Protocol mismatch can result in either an SSLError, or a
2198 # "Connection reset by peer" error.
2199 except ssl.SSLError:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002200 if expect_success:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002201 raise
Andrew Svetlov0832af62012-12-18 23:10:48 +02002202 except OSError as e:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002203 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002204 raise
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002205 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002206 if not expect_success:
Antoine Pitroud75b2a92010-05-06 14:15:10 +00002207 raise AssertionError(
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002208 "Client protocol %s succeeded with server protocol %s!"
2209 % (ssl.get_protocol_name(client_protocol),
2210 ssl.get_protocol_name(server_protocol)))
Antoine Pitrou47e40422014-09-04 21:00:10 +02002211 elif (expect_success is not True
2212 and expect_success != stats['version']):
2213 raise AssertionError("version mismatch: expected %r, got %r"
2214 % (expect_success, stats['version']))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002215
2216
Bill Janssen6e027db2007-11-15 22:23:56 +00002217 class ThreadedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002218
Antoine Pitrou23df4832010-08-04 17:14:06 +00002219 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002220 def test_echo(self):
2221 """Basic test of an SSL client connecting to a server"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002222 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002223 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002224 for protocol in PROTOCOLS:
Antoine Pitrou972d5bb2013-03-29 17:56:03 +01002225 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2226 context = ssl.SSLContext(protocol)
2227 context.load_cert_chain(CERTFILE)
2228 server_params_test(context, context,
2229 chatty=True, connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002230
Antoine Pitrou480a1242010-04-28 21:37:09 +00002231 def test_getpeercert(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002232 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002233 sys.stdout.write("\n")
Antoine Pitroub5218772010-05-21 09:56:06 +00002234 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2235 context.verify_mode = ssl.CERT_REQUIRED
2236 context.load_verify_locations(CERTFILE)
2237 context.load_cert_chain(CERTFILE)
2238 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002239 with server:
Antoine Pitrou20b85552013-09-29 19:50:53 +02002240 s = context.wrap_socket(socket.socket(),
2241 do_handshake_on_connect=False)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002242 s.connect((HOST, server.port))
Antoine Pitrou20b85552013-09-29 19:50:53 +02002243 # getpeercert() raise ValueError while the handshake isn't
2244 # done.
2245 with self.assertRaises(ValueError):
2246 s.getpeercert()
2247 s.do_handshake()
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002248 cert = s.getpeercert()
2249 self.assertTrue(cert, "Can't get peer certificate.")
2250 cipher = s.cipher()
2251 if support.verbose:
2252 sys.stdout.write(pprint.pformat(cert) + '\n')
2253 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2254 if 'subject' not in cert:
2255 self.fail("No subject field in certificate: %s." %
2256 pprint.pformat(cert))
2257 if ((('organizationName', 'Python Software Foundation'),)
2258 not in cert['subject']):
2259 self.fail(
2260 "Missing or invalid 'organizationName' field in certificate subject; "
2261 "should be 'Python Software Foundation'.")
Antoine Pitroufb046912010-11-09 20:21:19 +00002262 self.assertIn('notBefore', cert)
2263 self.assertIn('notAfter', cert)
2264 before = ssl.cert_time_to_seconds(cert['notBefore'])
2265 after = ssl.cert_time_to_seconds(cert['notAfter'])
2266 self.assertLess(before, after)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002267 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002268
Christian Heimes2427b502013-11-23 11:24:32 +01002269 @unittest.skipUnless(have_verify_flags(),
2270 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01002271 def test_crl_check(self):
2272 if support.verbose:
2273 sys.stdout.write("\n")
2274
2275 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2276 server_context.load_cert_chain(SIGNED_CERTFILE)
2277
2278 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2279 context.verify_mode = ssl.CERT_REQUIRED
2280 context.load_verify_locations(SIGNING_CA)
Benjamin Petersonc3d9c5c2015-03-04 23:18:48 -05002281 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2282 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01002283
2284 # VERIFY_DEFAULT should pass
2285 server = ThreadedEchoServer(context=server_context, chatty=True)
2286 with server:
2287 with context.wrap_socket(socket.socket()) as s:
2288 s.connect((HOST, server.port))
2289 cert = s.getpeercert()
2290 self.assertTrue(cert, "Can't get peer certificate.")
2291
2292 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimes32f0c7a2013-11-22 03:43:48 +01002293 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Christian Heimes22587792013-11-21 23:56:13 +01002294
2295 server = ThreadedEchoServer(context=server_context, chatty=True)
2296 with server:
2297 with context.wrap_socket(socket.socket()) as s:
2298 with self.assertRaisesRegex(ssl.SSLError,
2299 "certificate verify failed"):
2300 s.connect((HOST, server.port))
2301
2302 # now load a CRL file. The CRL file is signed by the CA.
2303 context.load_verify_locations(CRLFILE)
2304
2305 server = ThreadedEchoServer(context=server_context, chatty=True)
2306 with server:
2307 with context.wrap_socket(socket.socket()) as s:
2308 s.connect((HOST, server.port))
2309 cert = s.getpeercert()
2310 self.assertTrue(cert, "Can't get peer certificate.")
2311
Christian Heimes1aa9a752013-12-02 02:41:19 +01002312 def test_check_hostname(self):
2313 if support.verbose:
2314 sys.stdout.write("\n")
2315
2316 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2317 server_context.load_cert_chain(SIGNED_CERTFILE)
2318
2319 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2320 context.verify_mode = ssl.CERT_REQUIRED
2321 context.check_hostname = True
2322 context.load_verify_locations(SIGNING_CA)
2323
2324 # correct hostname should verify
2325 server = ThreadedEchoServer(context=server_context, chatty=True)
2326 with server:
2327 with context.wrap_socket(socket.socket(),
2328 server_hostname="localhost") as s:
2329 s.connect((HOST, server.port))
2330 cert = s.getpeercert()
2331 self.assertTrue(cert, "Can't get peer certificate.")
2332
2333 # incorrect hostname should raise an exception
2334 server = ThreadedEchoServer(context=server_context, chatty=True)
2335 with server:
2336 with context.wrap_socket(socket.socket(),
2337 server_hostname="invalid") as s:
2338 with self.assertRaisesRegex(ssl.CertificateError,
2339 "hostname 'invalid' doesn't match 'localhost'"):
2340 s.connect((HOST, server.port))
2341
2342 # missing server_hostname arg should cause an exception, too
2343 server = ThreadedEchoServer(context=server_context, chatty=True)
2344 with server:
2345 with socket.socket() as s:
2346 with self.assertRaisesRegex(ValueError,
2347 "check_hostname requires server_hostname"):
2348 context.wrap_socket(s)
2349
Martin Panter407b62f2016-01-30 03:41:43 +00002350 def test_wrong_cert(self):
Martin Panter3464ea22016-02-01 21:58:11 +00002351 """Connecting when the server rejects the client's certificate
2352
2353 Launch a server with CERT_REQUIRED, and check that trying to
2354 connect to it with a wrong client certificate fails.
2355 """
2356 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2357 "wrongcert.pem")
2358 server = ThreadedEchoServer(CERTFILE,
2359 certreqs=ssl.CERT_REQUIRED,
2360 cacerts=CERTFILE, chatty=False,
2361 connectionchatty=False)
2362 with server, \
2363 socket.socket() as sock, \
2364 ssl.wrap_socket(sock,
2365 certfile=certfile,
2366 ssl_version=ssl.PROTOCOL_TLSv1) as s:
2367 try:
2368 # Expect either an SSL error about the server rejecting
2369 # the connection, or a low-level connection reset (which
2370 # sometimes happens on Windows)
2371 s.connect((HOST, server.port))
2372 except ssl.SSLError as e:
2373 if support.verbose:
2374 sys.stdout.write("\nSSLError is %r\n" % e)
2375 except OSError as e:
2376 if e.errno != errno.ECONNRESET:
2377 raise
2378 if support.verbose:
2379 sys.stdout.write("\nsocket.error is %r\n" % e)
2380 else:
2381 self.fail("Use of invalid cert should have failed!")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002382
Antoine Pitrou480a1242010-04-28 21:37:09 +00002383 def test_rude_shutdown(self):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002384 """A brutal shutdown of an SSL server should raise an OSError
Antoine Pitrou480a1242010-04-28 21:37:09 +00002385 in the client when attempting handshake.
2386 """
Trent Nelson6b240cd2008-04-10 20:12:06 +00002387 listener_ready = threading.Event()
2388 listener_gone = threading.Event()
Antoine Pitrou480a1242010-04-28 21:37:09 +00002389
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002390 s = socket.socket()
2391 port = support.bind_port(s, HOST)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002392
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002393 # `listener` runs in a thread. It sits in an accept() until
2394 # the main thread connects. Then it rudely closes the socket,
2395 # and sets Event `listener_gone` to let the main thread know
2396 # the socket is gone.
Trent Nelson6b240cd2008-04-10 20:12:06 +00002397 def listener():
Charles-François Natali6e204602014-07-23 19:28:13 +01002398 s.listen()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002399 listener_ready.set()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002400 newsock, addr = s.accept()
2401 newsock.close()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002402 s.close()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002403 listener_gone.set()
2404
2405 def connector():
2406 listener_ready.wait()
Antoine Pitroud2eca372010-10-29 23:41:37 +00002407 with socket.socket() as c:
2408 c.connect((HOST, port))
2409 listener_gone.wait()
2410 try:
2411 ssl_sock = ssl.wrap_socket(c)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002412 except OSError:
Antoine Pitroud2eca372010-10-29 23:41:37 +00002413 pass
2414 else:
2415 self.fail('connecting to closed SSL socket should have failed')
Trent Nelson6b240cd2008-04-10 20:12:06 +00002416
2417 t = threading.Thread(target=listener)
2418 t.start()
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002419 try:
2420 connector()
2421 finally:
2422 t.join()
Trent Nelson6b240cd2008-04-10 20:12:06 +00002423
Antoine Pitrou23df4832010-08-04 17:14:06 +00002424 @skip_if_broken_ubuntu_ssl
Victor Stinner3de49192011-05-09 00:42:58 +02002425 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2426 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002427 def test_protocol_sslv2(self):
2428 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002429 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002430 sys.stdout.write("\n")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002431 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2432 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2433 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002434 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Victor Stinner648b8622014-12-12 12:23:59 +01002435 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2436 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002437 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002438 # SSLv23 client with specific SSL options
2439 if no_sslv2_implies_sslv3_hello():
2440 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2441 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2442 client_options=ssl.OP_NO_SSLv2)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002443 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002444 client_options=ssl.OP_NO_SSLv3)
Antoine Pitroucd3d7ca2014-01-09 20:02:20 +01002445 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002446 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002447
Antoine Pitrou23df4832010-08-04 17:14:06 +00002448 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002449 def test_protocol_sslv23(self):
2450 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002451 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002452 sys.stdout.write("\n")
Victor Stinner3de49192011-05-09 00:42:58 +02002453 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2454 try:
2455 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
Andrew Svetlov0832af62012-12-18 23:10:48 +02002456 except OSError as x:
Victor Stinner3de49192011-05-09 00:42:58 +02002457 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2458 if support.verbose:
2459 sys.stdout.write(
2460 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2461 % str(x))
Benjamin Petersone32467c2014-12-05 21:59:35 -05002462 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002463 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002464 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002465 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002466
Benjamin Petersone32467c2014-12-05 21:59:35 -05002467 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002468 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002469 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002470 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002471
Benjamin Petersone32467c2014-12-05 21:59:35 -05002472 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002473 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002474 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Antoine Pitrou47e40422014-09-04 21:00:10 +02002475 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002476
Antoine Pitroub5218772010-05-21 09:56:06 +00002477 # Server with specific SSL options
Benjamin Petersone32467c2014-12-05 21:59:35 -05002478 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2479 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroub5218772010-05-21 09:56:06 +00002480 server_options=ssl.OP_NO_SSLv3)
2481 # Will choose TLSv1
2482 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2483 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2484 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2485 server_options=ssl.OP_NO_TLSv1)
2486
2487
Antoine Pitrou23df4832010-08-04 17:14:06 +00002488 @skip_if_broken_ubuntu_ssl
Benjamin Petersone32467c2014-12-05 21:59:35 -05002489 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2490 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou480a1242010-04-28 21:37:09 +00002491 def test_protocol_sslv3(self):
2492 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002493 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002494 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002495 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2496 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2497 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002498 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2499 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002500 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2501 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002502 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Antoine Pitroub5218772010-05-21 09:56:06 +00002503 if no_sslv2_implies_sslv3_hello():
2504 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Benjamin Petersona9dcdab2015-11-11 22:38:41 -08002505 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2506 False, client_options=ssl.OP_NO_SSLv2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002507
Antoine Pitrou23df4832010-08-04 17:14:06 +00002508 @skip_if_broken_ubuntu_ssl
Antoine Pitrou480a1242010-04-28 21:37:09 +00002509 def test_protocol_tlsv1(self):
2510 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002511 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002512 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002513 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2514 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2515 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinner3de49192011-05-09 00:42:58 +02002516 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2517 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002518 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2519 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Barry Warsaw46ae0ef2011-10-28 16:52:17 -04002520 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2521 client_options=ssl.OP_NO_TLSv1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002522
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002523 @skip_if_broken_ubuntu_ssl
2524 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2525 "TLS version 1.1 not supported.")
2526 def test_protocol_tlsv1_1(self):
2527 """Connecting to a TLSv1.1 server with various client options.
2528 Testing against older TLS versions."""
2529 if support.verbose:
2530 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002531 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002532 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2533 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002534 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2535 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002536 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2537 client_options=ssl.OP_NO_TLSv1_1)
2538
Antoine Pitrou47e40422014-09-04 21:00:10 +02002539 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002540 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2541 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2542
2543
2544 @skip_if_broken_ubuntu_ssl
2545 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2546 "TLS version 1.2 not supported.")
2547 def test_protocol_tlsv1_2(self):
2548 """Connecting to a TLSv1.2 server with various client options.
2549 Testing against older TLS versions."""
2550 if support.verbose:
2551 sys.stdout.write("\n")
Antoine Pitrou47e40422014-09-04 21:00:10 +02002552 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002553 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2554 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2555 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2556 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersone32467c2014-12-05 21:59:35 -05002557 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2558 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002559 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2560 client_options=ssl.OP_NO_TLSv1_2)
2561
Antoine Pitrou47e40422014-09-04 21:00:10 +02002562 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002563 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2564 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2565 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2566 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2567
Antoine Pitrou480a1242010-04-28 21:37:09 +00002568 def test_starttls(self):
2569 """Switching from clear text to encrypted and back again."""
2570 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002571
Trent Nelson78520002008-04-10 20:54:35 +00002572 server = ThreadedEchoServer(CERTFILE,
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002573 ssl_version=ssl.PROTOCOL_TLSv1,
2574 starttls_server=True,
2575 chatty=True,
2576 connectionchatty=True)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002577 wrapped = False
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002578 with server:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002579 s = socket.socket()
2580 s.setblocking(1)
2581 s.connect((HOST, server.port))
2582 if support.verbose:
2583 sys.stdout.write("\n")
2584 for indata in msgs:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002585 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002586 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002587 " client: sending %r...\n" % indata)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002588 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002589 conn.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002590 outdata = conn.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002591 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002592 s.send(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002593 outdata = s.recv(1024)
Antoine Pitrou480a1242010-04-28 21:37:09 +00002594 msg = outdata.strip().lower()
2595 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2596 # STARTTLS ok, switch to secure mode
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002597 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002598 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002599 " client: read %r from server, starting TLS...\n"
2600 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002601 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2602 wrapped = True
Antoine Pitrou480a1242010-04-28 21:37:09 +00002603 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2604 # ENDTLS ok, switch back to clear text
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002605 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002606 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002607 " client: read %r from server, ending TLS...\n"
2608 % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002609 s = conn.unwrap()
2610 wrapped = False
2611 else:
2612 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002613 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002614 " client: read %r from server\n" % msg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002615 if support.verbose:
2616 sys.stdout.write(" client: closing connection.\n")
2617 if wrapped:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002618 conn.write(b"over\n")
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002619 else:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002620 s.send(b"over\n")
Bill Janssen6e027db2007-11-15 22:23:56 +00002621 if wrapped:
2622 conn.close()
2623 else:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002624 s.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002625
Antoine Pitrou480a1242010-04-28 21:37:09 +00002626 def test_socketserver(self):
2627 """Using a SocketServer to create and manage SSL connections."""
Antoine Pitrouda232592013-02-05 21:20:51 +01002628 server = make_https_server(self, certfile=CERTFILE)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002629 # try to connect
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002630 if support.verbose:
2631 sys.stdout.write('\n')
2632 with open(CERTFILE, 'rb') as f:
2633 d1 = f.read()
2634 d2 = ''
2635 # now fetch the same data from the HTTPS server
Benjamin Peterson4ffb0752014-11-03 14:29:33 -05002636 url = 'https://localhost:%d/%s' % (
2637 server.port, os.path.split(CERTFILE)[1])
2638 context = ssl.create_default_context(cafile=CERTFILE)
2639 f = urllib.request.urlopen(url, context=context)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002640 try:
Barry Warsaw820c1202008-06-12 04:06:45 +00002641 dlen = f.info().get("content-length")
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002642 if dlen and (int(dlen) > 0):
2643 d2 = f.read(int(dlen))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002644 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002645 sys.stdout.write(
2646 " client: read %d bytes from remote server '%s'\n"
2647 % (len(d2), server))
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002648 finally:
Antoine Pitrou803e6d62010-10-13 10:36:15 +00002649 f.close()
2650 self.assertEqual(d1, d2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002651
Antoine Pitrou480a1242010-04-28 21:37:09 +00002652 def test_asyncore_server(self):
2653 """Check the example asyncore integration."""
2654 indata = "TEST MESSAGE of mixed case\n"
Trent Nelson6b240cd2008-04-10 20:12:06 +00002655
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002656 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002657 sys.stdout.write("\n")
2658
Antoine Pitrou480a1242010-04-28 21:37:09 +00002659 indata = b"FOO\n"
Trent Nelson78520002008-04-10 20:54:35 +00002660 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002661 with server:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002662 s = ssl.wrap_socket(socket.socket())
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002663 s.connect(('127.0.0.1', server.port))
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002664 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002665 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002666 " client: sending %r...\n" % indata)
2667 s.write(indata)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002668 outdata = s.read()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002669 if support.verbose:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002670 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002671 if outdata != indata.lower():
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002672 self.fail(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002673 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2674 % (outdata[:20], len(outdata),
2675 indata[:20].lower(), len(indata)))
2676 s.write(b"over\n")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002677 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002678 sys.stdout.write(" client: closing connection.\n")
2679 s.close()
Antoine Pitroued986362010-08-15 23:28:10 +00002680 if support.verbose:
2681 sys.stdout.write(" client: connection closed.\n")
Trent Nelson6b240cd2008-04-10 20:12:06 +00002682
Antoine Pitrou480a1242010-04-28 21:37:09 +00002683 def test_recv_send(self):
2684 """Test recv(), send() and friends."""
Bill Janssen58afe4c2008-09-08 16:45:19 +00002685 if support.verbose:
2686 sys.stdout.write("\n")
2687
2688 server = ThreadedEchoServer(CERTFILE,
2689 certreqs=ssl.CERT_NONE,
2690 ssl_version=ssl.PROTOCOL_TLSv1,
2691 cacerts=CERTFILE,
2692 chatty=True,
2693 connectionchatty=False)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002694 with server:
2695 s = ssl.wrap_socket(socket.socket(),
2696 server_side=False,
2697 certfile=CERTFILE,
2698 ca_certs=CERTFILE,
2699 cert_reqs=ssl.CERT_NONE,
2700 ssl_version=ssl.PROTOCOL_TLSv1)
2701 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002702 # helper methods for standardising recv* method signatures
2703 def _recv_into():
2704 b = bytearray(b"\0"*100)
2705 count = s.recv_into(b)
2706 return b[:count]
2707
2708 def _recvfrom_into():
2709 b = bytearray(b"\0"*100)
2710 count, addr = s.recvfrom_into(b)
2711 return b[:count]
2712
Martin Panter519f9122016-04-03 02:12:54 +00002713 # (name, method, expect success?, *args, return value func)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002714 send_methods = [
Martin Panter519f9122016-04-03 02:12:54 +00002715 ('send', s.send, True, [], len),
2716 ('sendto', s.sendto, False, ["some.address"], len),
2717 ('sendall', s.sendall, True, [], lambda x: None),
Bill Janssen58afe4c2008-09-08 16:45:19 +00002718 ]
Martin Panter519f9122016-04-03 02:12:54 +00002719 # (name, method, whether to expect success, *args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002720 recv_methods = [
2721 ('recv', s.recv, True, []),
2722 ('recvfrom', s.recvfrom, False, ["some.address"]),
2723 ('recv_into', _recv_into, True, []),
2724 ('recvfrom_into', _recvfrom_into, False, []),
2725 ]
2726 data_prefix = "PREFIX_"
2727
Martin Panter519f9122016-04-03 02:12:54 +00002728 for (meth_name, send_meth, expect_success, args,
2729 ret_val_meth) in send_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002730 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002731 try:
Martin Panter519f9122016-04-03 02:12:54 +00002732 ret = send_meth(indata, *args)
2733 msg = "sending with {}".format(meth_name)
2734 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002735 outdata = s.read()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002736 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002737 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002738 "While sending with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002739 "<<{outdata:r}>> ({nout:d}) received; "
2740 "expected <<{indata:r}>> ({nin:d})\n".format(
2741 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002742 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002743 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002744 )
2745 )
2746 except ValueError as e:
2747 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002748 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002749 "Failed to send with method <<{name:s}>>; "
2750 "expected to succeed.\n".format(name=meth_name)
2751 )
2752 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002753 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002754 "Method <<{name:s}>> failed with unexpected "
2755 "exception message: {exp:s}\n".format(
2756 name=meth_name, exp=e
2757 )
2758 )
2759
2760 for meth_name, recv_meth, expect_success, args in recv_methods:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002761 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen58afe4c2008-09-08 16:45:19 +00002762 try:
Antoine Pitrou480a1242010-04-28 21:37:09 +00002763 s.send(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002764 outdata = recv_meth(*args)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002765 if outdata != indata.lower():
Georg Brandl89fad142010-03-14 10:23:39 +00002766 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002767 "While receiving with <<{name:s}>> bad data "
Antoine Pitrou480a1242010-04-28 21:37:09 +00002768 "<<{outdata:r}>> ({nout:d}) received; "
2769 "expected <<{indata:r}>> ({nin:d})\n".format(
2770 name=meth_name, outdata=outdata[:20],
Bill Janssen58afe4c2008-09-08 16:45:19 +00002771 nout=len(outdata),
Antoine Pitrou480a1242010-04-28 21:37:09 +00002772 indata=indata[:20], nin=len(indata)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002773 )
2774 )
2775 except ValueError as e:
2776 if expect_success:
Georg Brandl89fad142010-03-14 10:23:39 +00002777 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002778 "Failed to receive with method <<{name:s}>>; "
2779 "expected to succeed.\n".format(name=meth_name)
2780 )
2781 if not str(e).startswith(meth_name):
Georg Brandl89fad142010-03-14 10:23:39 +00002782 self.fail(
Bill Janssen58afe4c2008-09-08 16:45:19 +00002783 "Method <<{name:s}>> failed with unexpected "
2784 "exception message: {exp:s}\n".format(
2785 name=meth_name, exp=e
2786 )
2787 )
2788 # consume data
2789 s.read()
2790
Martin Panter5503d472016-03-27 05:35:19 +00002791 data = b"data"
Martin Panterf6b1d662016-03-28 00:22:09 +00002792
2793 # read(-1, buffer) is supported, even though read(-1) is not
Martin Panter5503d472016-03-27 05:35:19 +00002794 s.send(data)
2795 buffer = bytearray(len(data))
2796 self.assertEqual(s.read(-1, buffer), len(data))
2797 self.assertEqual(buffer, data)
2798
Martin Panterf6b1d662016-03-28 00:22:09 +00002799 # recv/read(0) should return no data
2800 s.send(data)
2801 self.assertEqual(s.recv(0), b"")
2802 self.assertEqual(s.read(0), b"")
2803 self.assertEqual(s.read(), data)
2804
Nick Coghlan513886a2011-08-28 00:00:27 +10002805 # Make sure sendmsg et al are disallowed to avoid
2806 # inadvertent disclosure of data and/or corruption
2807 # of the encrypted data stream
2808 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
2809 self.assertRaises(NotImplementedError, s.recvmsg, 100)
2810 self.assertRaises(NotImplementedError,
2811 s.recvmsg_into, bytearray(100))
2812
Antoine Pitrou480a1242010-04-28 21:37:09 +00002813 s.write(b"over\n")
Martin Panter5503d472016-03-27 05:35:19 +00002814
2815 self.assertRaises(ValueError, s.recv, -1)
2816 self.assertRaises(ValueError, s.read, -1)
2817
Bill Janssen58afe4c2008-09-08 16:45:19 +00002818 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00002819
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002820 def test_nonblocking_send(self):
2821 server = ThreadedEchoServer(CERTFILE,
2822 certreqs=ssl.CERT_NONE,
2823 ssl_version=ssl.PROTOCOL_TLSv1,
2824 cacerts=CERTFILE,
2825 chatty=True,
2826 connectionchatty=False)
2827 with server:
2828 s = ssl.wrap_socket(socket.socket(),
2829 server_side=False,
2830 certfile=CERTFILE,
2831 ca_certs=CERTFILE,
2832 cert_reqs=ssl.CERT_NONE,
2833 ssl_version=ssl.PROTOCOL_TLSv1)
2834 s.connect((HOST, server.port))
2835 s.setblocking(False)
2836
2837 # If we keep sending data, at some point the buffers
2838 # will be full and the call will block
2839 buf = bytearray(8192)
2840 def fill_buffer():
2841 while True:
2842 s.send(buf)
2843 self.assertRaises((ssl.SSLWantWriteError,
2844 ssl.SSLWantReadError), fill_buffer)
2845
2846 # Now read all the output and discard it
2847 s.setblocking(True)
2848 s.close()
2849
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002850 def test_handshake_timeout(self):
2851 # Issue #5103: SSL handshake must respect the socket timeout
2852 server = socket.socket(socket.AF_INET)
2853 host = "127.0.0.1"
2854 port = support.bind_port(server)
2855 started = threading.Event()
2856 finish = False
2857
2858 def serve():
Charles-François Natali6e204602014-07-23 19:28:13 +01002859 server.listen()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002860 started.set()
2861 conns = []
2862 while not finish:
2863 r, w, e = select.select([server], [], [], 0.1)
2864 if server in r:
2865 # Let the socket hang around rather than having
2866 # it closed by garbage collection.
2867 conns.append(server.accept()[0])
Antoine Pitroud2eca372010-10-29 23:41:37 +00002868 for sock in conns:
2869 sock.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002870
2871 t = threading.Thread(target=serve)
2872 t.start()
2873 started.wait()
2874
2875 try:
Antoine Pitrou40f08742010-04-24 22:04:40 +00002876 try:
2877 c = socket.socket(socket.AF_INET)
2878 c.settimeout(0.2)
2879 c.connect((host, port))
2880 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002881 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002882 ssl.wrap_socket, c)
Antoine Pitrou40f08742010-04-24 22:04:40 +00002883 finally:
2884 c.close()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002885 try:
2886 c = socket.socket(socket.AF_INET)
2887 c = ssl.wrap_socket(c)
2888 c.settimeout(0.2)
2889 # Will attempt handshake and time out
Antoine Pitrouc4df7842010-12-03 19:59:41 +00002890 self.assertRaisesRegex(socket.timeout, "timed out",
Ezio Melottied3a7d22010-12-01 02:32:32 +00002891 c.connect, (host, port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002892 finally:
2893 c.close()
2894 finally:
2895 finish = True
2896 t.join()
2897 server.close()
2898
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002899 def test_server_accept(self):
2900 # Issue #16357: accept() on a SSLSocket created through
2901 # SSLContext.wrap_socket().
2902 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2903 context.verify_mode = ssl.CERT_REQUIRED
2904 context.load_verify_locations(CERTFILE)
2905 context.load_cert_chain(CERTFILE)
2906 server = socket.socket(socket.AF_INET)
2907 host = "127.0.0.1"
2908 port = support.bind_port(server)
2909 server = context.wrap_socket(server, server_side=True)
2910
2911 evt = threading.Event()
2912 remote = None
2913 peer = None
2914 def serve():
2915 nonlocal remote, peer
Charles-François Natali6e204602014-07-23 19:28:13 +01002916 server.listen()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002917 # Block on the accept and wait on the connection to close.
2918 evt.set()
2919 remote, peer = server.accept()
2920 remote.recv(1)
2921
2922 t = threading.Thread(target=serve)
2923 t.start()
2924 # Client wait until server setup and perform a connect.
2925 evt.wait()
2926 client = context.wrap_socket(socket.socket())
2927 client.connect((host, port))
2928 client_addr = client.getsockname()
2929 client.close()
2930 t.join()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002931 remote.close()
2932 server.close()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002933 # Sanity checks.
2934 self.assertIsInstance(remote, ssl.SSLSocket)
2935 self.assertEqual(peer, client_addr)
2936
Antoine Pitrou242db722013-05-01 20:52:07 +02002937 def test_getpeercert_enotconn(self):
2938 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2939 with context.wrap_socket(socket.socket()) as sock:
2940 with self.assertRaises(OSError) as cm:
2941 sock.getpeercert()
2942 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2943
2944 def test_do_handshake_enotconn(self):
2945 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2946 with context.wrap_socket(socket.socket()) as sock:
2947 with self.assertRaises(OSError) as cm:
2948 sock.do_handshake()
2949 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2950
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002951 def test_default_ciphers(self):
2952 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2953 try:
2954 # Force a set of weak ciphers on our client context
2955 context.set_ciphers("DES")
2956 except ssl.SSLError:
2957 self.skipTest("no DES cipher available")
2958 with ThreadedEchoServer(CERTFILE,
2959 ssl_version=ssl.PROTOCOL_SSLv23,
2960 chatty=False) as server:
Antoine Pitroue1ceb502013-01-12 21:54:44 +01002961 with context.wrap_socket(socket.socket()) as s:
Andrew Svetlov0832af62012-12-18 23:10:48 +02002962 with self.assertRaises(OSError):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002963 s.connect((HOST, server.port))
2964 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2965
Antoine Pitrou47e40422014-09-04 21:00:10 +02002966 def test_version_basic(self):
2967 """
2968 Basic tests for SSLSocket.version().
2969 More tests are done in the test_protocol_*() methods.
2970 """
2971 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2972 with ThreadedEchoServer(CERTFILE,
2973 ssl_version=ssl.PROTOCOL_TLSv1,
2974 chatty=False) as server:
2975 with context.wrap_socket(socket.socket()) as s:
2976 self.assertIs(s.version(), None)
2977 s.connect((HOST, server.port))
2978 self.assertEqual(s.version(), "TLSv1")
2979 self.assertIs(s.version(), None)
2980
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002981 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2982 def test_default_ecdh_curve(self):
2983 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2984 # should be enabled by default on SSL contexts.
2985 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2986 context.load_cert_chain(CERTFILE)
Antoine Pitrouc0430612014-04-16 18:33:39 +02002987 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2988 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2989 # our default cipher list should prefer ECDH-based ciphers
2990 # automatically.
2991 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2992 context.set_ciphers("ECCdraft:ECDH")
Antoine Pitrou0bebbc32014-03-22 18:13:50 +01002993 with ThreadedEchoServer(context=context) as server:
2994 with context.wrap_socket(socket.socket()) as s:
2995 s.connect((HOST, server.port))
2996 self.assertIn("ECDH", s.cipher()[0])
2997
Antoine Pitroud6494802011-07-21 01:11:30 +02002998 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2999 "'tls-unique' channel binding not available")
3000 def test_tls_unique_channel_binding(self):
3001 """Test tls-unique channel binding."""
3002 if support.verbose:
3003 sys.stdout.write("\n")
3004
3005 server = ThreadedEchoServer(CERTFILE,
3006 certreqs=ssl.CERT_NONE,
3007 ssl_version=ssl.PROTOCOL_TLSv1,
3008 cacerts=CERTFILE,
3009 chatty=True,
3010 connectionchatty=False)
Antoine Pitrou6b15c902011-12-21 16:54:45 +01003011 with server:
3012 s = ssl.wrap_socket(socket.socket(),
3013 server_side=False,
3014 certfile=CERTFILE,
3015 ca_certs=CERTFILE,
3016 cert_reqs=ssl.CERT_NONE,
3017 ssl_version=ssl.PROTOCOL_TLSv1)
3018 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02003019 # get the data
3020 cb_data = s.get_channel_binding("tls-unique")
3021 if support.verbose:
3022 sys.stdout.write(" got channel binding data: {0!r}\n"
3023 .format(cb_data))
3024
3025 # check if it is sane
3026 self.assertIsNotNone(cb_data)
3027 self.assertEqual(len(cb_data), 12) # True for TLSv1
3028
3029 # and compare with the peers version
3030 s.write(b"CB tls-unique\n")
3031 peer_data_repr = s.read().strip()
3032 self.assertEqual(peer_data_repr,
3033 repr(cb_data).encode("us-ascii"))
3034 s.close()
3035
3036 # now, again
3037 s = ssl.wrap_socket(socket.socket(),
3038 server_side=False,
3039 certfile=CERTFILE,
3040 ca_certs=CERTFILE,
3041 cert_reqs=ssl.CERT_NONE,
3042 ssl_version=ssl.PROTOCOL_TLSv1)
3043 s.connect((HOST, server.port))
3044 new_cb_data = s.get_channel_binding("tls-unique")
3045 if support.verbose:
3046 sys.stdout.write(" got another channel binding data: {0!r}\n"
3047 .format(new_cb_data))
3048 # is it really unique
3049 self.assertNotEqual(cb_data, new_cb_data)
3050 self.assertIsNotNone(cb_data)
3051 self.assertEqual(len(cb_data), 12) # True for TLSv1
3052 s.write(b"CB tls-unique\n")
3053 peer_data_repr = s.read().strip()
3054 self.assertEqual(peer_data_repr,
3055 repr(new_cb_data).encode("us-ascii"))
3056 s.close()
Bill Janssen58afe4c2008-09-08 16:45:19 +00003057
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003058 def test_compression(self):
3059 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3060 context.load_cert_chain(CERTFILE)
3061 stats = server_params_test(context, context,
3062 chatty=True, connectionchatty=True)
3063 if support.verbose:
3064 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3065 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3066
3067 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3068 "ssl.OP_NO_COMPRESSION needed for this test")
3069 def test_compression_disabled(self):
3070 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3071 context.load_cert_chain(CERTFILE)
Antoine Pitrou8691bff2011-12-20 10:47:42 +01003072 context.options |= ssl.OP_NO_COMPRESSION
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003073 stats = server_params_test(context, context,
3074 chatty=True, connectionchatty=True)
3075 self.assertIs(stats['compression'], None)
3076
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003077 def test_dh_params(self):
3078 # Check we can get a connection with ephemeral Diffie-Hellman
3079 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3080 context.load_cert_chain(CERTFILE)
3081 context.load_dh_params(DHFILE)
3082 context.set_ciphers("kEDH")
3083 stats = server_params_test(context, context,
3084 chatty=True, connectionchatty=True)
3085 cipher = stats["cipher"][0]
3086 parts = cipher.split("-")
3087 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3088 self.fail("Non-DH cipher: " + cipher[0])
3089
Benjamin Petersoncca27322015-01-23 16:35:37 -05003090 def test_selected_alpn_protocol(self):
3091 # selected_alpn_protocol() is None unless ALPN is used.
3092 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3093 context.load_cert_chain(CERTFILE)
3094 stats = server_params_test(context, context,
3095 chatty=True, connectionchatty=True)
3096 self.assertIs(stats['client_alpn_protocol'], None)
3097
3098 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3099 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3100 # selected_alpn_protocol() is None unless ALPN is used by the client.
3101 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3102 client_context.load_verify_locations(CERTFILE)
3103 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3104 server_context.load_cert_chain(CERTFILE)
3105 server_context.set_alpn_protocols(['foo', 'bar'])
3106 stats = server_params_test(client_context, server_context,
3107 chatty=True, connectionchatty=True)
3108 self.assertIs(stats['client_alpn_protocol'], None)
3109
3110 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3111 def test_alpn_protocols(self):
3112 server_protocols = ['foo', 'bar', 'milkshake']
3113 protocol_tests = [
3114 (['foo', 'bar'], 'foo'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003115 (['bar', 'foo'], 'foo'),
Benjamin Petersoncca27322015-01-23 16:35:37 -05003116 (['milkshake'], 'milkshake'),
Benjamin Peterson88615022015-01-23 17:30:26 -05003117 (['http/3.0', 'http/4.0'], None)
Benjamin Petersoncca27322015-01-23 16:35:37 -05003118 ]
3119 for client_protocols, expected in protocol_tests:
3120 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3121 server_context.load_cert_chain(CERTFILE)
3122 server_context.set_alpn_protocols(server_protocols)
3123 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3124 client_context.load_cert_chain(CERTFILE)
3125 client_context.set_alpn_protocols(client_protocols)
3126 stats = server_params_test(client_context, server_context,
3127 chatty=True, connectionchatty=True)
3128
3129 msg = "failed trying %s (s) and %s (c).\n" \
3130 "was expecting %s, but got %%s from the %%s" \
3131 % (str(server_protocols), str(client_protocols),
3132 str(expected))
3133 client_result = stats['client_alpn_protocol']
3134 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3135 server_result = stats['server_alpn_protocols'][-1] \
3136 if len(stats['server_alpn_protocols']) else 'nothing'
3137 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3138
Antoine Pitroud5d17eb2012-03-22 00:23:03 +01003139 def test_selected_npn_protocol(self):
3140 # selected_npn_protocol() is None unless NPN is used
3141 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3142 context.load_cert_chain(CERTFILE)
3143 stats = server_params_test(context, context,
3144 chatty=True, connectionchatty=True)
3145 self.assertIs(stats['client_npn_protocol'], None)
3146
3147 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3148 def test_npn_protocols(self):
3149 server_protocols = ['http/1.1', 'spdy/2']
3150 protocol_tests = [
3151 (['http/1.1', 'spdy/2'], 'http/1.1'),
3152 (['spdy/2', 'http/1.1'], 'http/1.1'),
3153 (['spdy/2', 'test'], 'spdy/2'),
3154 (['abc', 'def'], 'abc')
3155 ]
3156 for client_protocols, expected in protocol_tests:
3157 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3158 server_context.load_cert_chain(CERTFILE)
3159 server_context.set_npn_protocols(server_protocols)
3160 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3161 client_context.load_cert_chain(CERTFILE)
3162 client_context.set_npn_protocols(client_protocols)
3163 stats = server_params_test(client_context, server_context,
3164 chatty=True, connectionchatty=True)
3165
3166 msg = "failed trying %s (s) and %s (c).\n" \
3167 "was expecting %s, but got %%s from the %%s" \
3168 % (str(server_protocols), str(client_protocols),
3169 str(expected))
3170 client_result = stats['client_npn_protocol']
3171 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3172 server_result = stats['server_npn_protocols'][-1] \
3173 if len(stats['server_npn_protocols']) else 'nothing'
3174 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3175
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003176 def sni_contexts(self):
3177 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3178 server_context.load_cert_chain(SIGNED_CERTFILE)
3179 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3180 other_context.load_cert_chain(SIGNED_CERTFILE2)
3181 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3182 client_context.verify_mode = ssl.CERT_REQUIRED
3183 client_context.load_verify_locations(SIGNING_CA)
3184 return server_context, other_context, client_context
3185
3186 def check_common_name(self, stats, name):
3187 cert = stats['peercert']
3188 self.assertIn((('commonName', name),), cert['subject'])
3189
3190 @needs_sni
3191 def test_sni_callback(self):
3192 calls = []
3193 server_context, other_context, client_context = self.sni_contexts()
3194
3195 def servername_cb(ssl_sock, server_name, initial_context):
3196 calls.append((server_name, initial_context))
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003197 if server_name is not None:
3198 ssl_sock.context = other_context
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003199 server_context.set_servername_callback(servername_cb)
3200
3201 stats = server_params_test(client_context, server_context,
3202 chatty=True,
3203 sni_name='supermessage')
3204 # The hostname was fetched properly, and the certificate was
3205 # changed for the connection.
3206 self.assertEqual(calls, [("supermessage", server_context)])
3207 # CERTFILE4 was selected
3208 self.check_common_name(stats, 'fakehostname')
3209
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003210 calls = []
3211 # The callback is called with server_name=None
3212 stats = server_params_test(client_context, server_context,
3213 chatty=True,
3214 sni_name=None)
3215 self.assertEqual(calls, [(None, server_context)])
3216 self.check_common_name(stats, 'localhost')
3217
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003218 # Check disabling the callback
3219 calls = []
3220 server_context.set_servername_callback(None)
3221
3222 stats = server_params_test(client_context, server_context,
3223 chatty=True,
3224 sni_name='notfunny')
3225 # Certificate didn't change
3226 self.check_common_name(stats, 'localhost')
3227 self.assertEqual(calls, [])
3228
3229 @needs_sni
3230 def test_sni_callback_alert(self):
3231 # Returning a TLS alert is reflected to the connecting client
3232 server_context, other_context, client_context = self.sni_contexts()
3233
3234 def cb_returning_alert(ssl_sock, server_name, initial_context):
3235 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3236 server_context.set_servername_callback(cb_returning_alert)
3237
3238 with self.assertRaises(ssl.SSLError) as cm:
3239 stats = server_params_test(client_context, server_context,
3240 chatty=False,
3241 sni_name='supermessage')
3242 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3243
3244 @needs_sni
3245 def test_sni_callback_raising(self):
3246 # Raising fails the connection with a TLS handshake failure alert.
3247 server_context, other_context, client_context = self.sni_contexts()
3248
3249 def cb_raising(ssl_sock, server_name, initial_context):
3250 1/0
3251 server_context.set_servername_callback(cb_raising)
3252
3253 with self.assertRaises(ssl.SSLError) as cm, \
3254 support.captured_stderr() as stderr:
3255 stats = server_params_test(client_context, server_context,
3256 chatty=False,
3257 sni_name='supermessage')
3258 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3259 self.assertIn("ZeroDivisionError", stderr.getvalue())
3260
3261 @needs_sni
3262 def test_sni_callback_wrong_return_type(self):
3263 # Returning the wrong return type terminates the TLS connection
3264 # with an internal error alert.
3265 server_context, other_context, client_context = self.sni_contexts()
3266
3267 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3268 return "foo"
3269 server_context.set_servername_callback(cb_wrong_return_type)
3270
3271 with self.assertRaises(ssl.SSLError) as cm, \
3272 support.captured_stderr() as stderr:
3273 stats = server_params_test(client_context, server_context,
3274 chatty=False,
3275 sni_name='supermessage')
3276 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3277 self.assertIn("TypeError", stderr.getvalue())
3278
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003279 def test_shared_ciphers(self):
Benjamin Petersonaacd5242015-01-07 11:42:38 -06003280 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson15042922015-01-07 22:12:43 -06003281 server_context.load_cert_chain(SIGNED_CERTFILE)
3282 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3283 client_context.verify_mode = ssl.CERT_REQUIRED
3284 client_context.load_verify_locations(SIGNING_CA)
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003285 client_context.set_ciphers("RC4")
Benjamin Petersone6838e02015-01-07 20:52:40 -06003286 server_context.set_ciphers("AES:RC4")
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003287 stats = server_params_test(client_context, server_context)
3288 ciphers = stats['server_shared_ciphers'][0]
3289 self.assertGreater(len(ciphers), 0)
3290 for name, tls_version, bits in ciphers:
Benjamin Peterson23ef9fa2015-01-07 21:21:34 -06003291 self.assertIn("RC4", name.split("-"))
Benjamin Peterson4cb17812015-01-07 11:14:26 -06003292
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003293 def test_read_write_after_close_raises_valuerror(self):
3294 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3295 context.verify_mode = ssl.CERT_REQUIRED
3296 context.load_verify_locations(CERTFILE)
3297 context.load_cert_chain(CERTFILE)
3298 server = ThreadedEchoServer(context=context, chatty=False)
3299
3300 with server:
3301 s = context.wrap_socket(socket.socket())
3302 s.connect((HOST, server.port))
3303 s.close()
3304
3305 self.assertRaises(ValueError, s.read, 1024)
Antoine Pitrou28940732013-07-20 19:36:15 +02003306 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003307
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003308 def test_sendfile(self):
3309 TEST_DATA = b"x" * 512
3310 with open(support.TESTFN, 'wb') as f:
3311 f.write(TEST_DATA)
3312 self.addCleanup(support.unlink, support.TESTFN)
3313 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3314 context.verify_mode = ssl.CERT_REQUIRED
3315 context.load_verify_locations(CERTFILE)
3316 context.load_cert_chain(CERTFILE)
3317 server = ThreadedEchoServer(context=context, chatty=False)
3318 with server:
3319 with context.wrap_socket(socket.socket()) as s:
3320 s.connect((HOST, server.port))
3321 with open(support.TESTFN, 'rb') as file:
3322 s.sendfile(file)
3323 self.assertEqual(s.recv(1024), TEST_DATA)
3324
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003325
Thomas Woutersed03b412007-08-28 21:37:11 +00003326def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003327 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003328 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003329 plats = {
3330 'Linux': platform.linux_distribution,
3331 'Mac': platform.mac_ver,
3332 'Windows': platform.win32_ver,
3333 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003334 with warnings.catch_warnings():
3335 warnings.filterwarnings(
3336 'ignore',
3337 'dist\(\) and linux_distribution\(\) '
3338 'functions are deprecated .*',
3339 PendingDeprecationWarning,
3340 )
3341 for name, func in plats.items():
3342 plat = func()
3343 if plat and plat[0]:
3344 plat = '%s %r' % (name, plat)
3345 break
3346 else:
3347 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003348 print("test_ssl: testing with %r %r" %
3349 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3350 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003351 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003352 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3353 try:
3354 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3355 except AttributeError:
3356 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003357
Antoine Pitrou152efa22010-05-16 18:19:27 +00003358 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003359 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003360 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003361 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003362 BADCERT, BADKEY, EMPTYCERT]:
3363 if not os.path.exists(filename):
3364 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003365
Martin Panter3840b2a2016-03-27 01:53:46 +00003366 tests = [
3367 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
3368 SimpleBackgroundTests,
3369 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003370
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003371 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003372 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003373
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003374 if _have_threads:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003375 thread_info = support.threading_setup()
Antoine Pitroudb5012a2013-01-12 22:00:09 +01003376 if thread_info:
Bill Janssen6e027db2007-11-15 22:23:56 +00003377 tests.append(ThreadedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003378
Antoine Pitrou480a1242010-04-28 21:37:09 +00003379 try:
3380 support.run_unittest(*tests)
3381 finally:
3382 if _have_threads:
3383 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003384
3385if __name__ == "__main__":
3386 test_main()