blob: 27572c392fe9d41aa23736f7ceb4d6031634201e [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Bill Janssen934b16d2008-06-28 22:19:33 +00007import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import socket
Bill Janssen934b16d2008-06-28 22:19:33 +00009import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000010import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050011import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000012import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import os
Antoine Pitroub558f172010-04-23 23:25:45 +000014import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000015import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050016import tempfile
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -060017import urllib2
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000018import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000019import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000020import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050021import functools
22from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000023
Benjamin Petersondaeb9252014-08-20 14:14:50 -050024ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000025
Benjamin Petersondaeb9252014-08-20 14:14:50 -050026PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
27HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Benjamin Petersondaeb9252014-08-20 14:14:50 -050029def data_file(*name):
30 return os.path.join(os.path.dirname(__file__), *name)
31
32# The custom key and certificate files used in test_ssl are generated
33# using Lib/test/make_ssl_certs.py.
34# Other certificates are simply fetched from the Internet servers they
35# are meant to authenticate.
36
37CERTFILE = data_file("keycert.pem")
38BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
39ONLYCERT = data_file("ssl_cert.pem")
40ONLYKEY = data_file("ssl_key.pem")
41BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
42BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
43CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
44ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
45KEY_PASSWORD = "somepass"
46CAPATH = data_file("capath")
47BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
48CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
49CAFILE_CACERT = data_file("capath", "5ed36f99.0")
50
51
52# empty CRL
53CRLFILE = data_file("revocation.crl")
54
55# Two keys and certs signed by the same CA (for SNI tests)
56SIGNED_CERTFILE = data_file("keycert3.pem")
57SIGNED_CERTFILE2 = data_file("keycert4.pem")
58SIGNING_CA = data_file("pycacert.pem")
59
60SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
61
62EMPTYCERT = data_file("nullcert.pem")
63BADCERT = data_file("badcert.pem")
64WRONGCERT = data_file("XXXnonexisting.pem")
65BADKEY = data_file("badkey.pem")
66NOKIACERT = data_file("nokia.pem")
67NULLBYTECERT = data_file("nullbytecert.pem")
68
Benjamin Petersondf11d4c2015-04-02 00:04:06 -040069DHFILE = data_file("dh1024.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -050070BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
71
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000072
Neal Norwitz3e533c22007-08-27 01:03:18 +000073def handle_error(prefix):
74 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050075 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000077
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000078
79class BasicTests(unittest.TestCase):
80
Antoine Pitrou3945c862010-04-28 21:11:01 +000081 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000082 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000083 try:
84 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
85 except IOError, e:
86 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
87 pass
88 else:
89 raise
90 try:
91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
92 except IOError, e:
93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94 pass
95 else:
96 raise
Benjamin Peterson2f334562014-10-01 23:53:01 -040097
98
Benjamin Petersondaeb9252014-08-20 14:14:50 -050099def can_clear_options():
100 # 0.9.8m or higher
101 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
102
103def no_sslv2_implies_sslv3_hello():
104 # 0.9.7h or higher
105 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
106
107def have_verify_flags():
108 # 0.9.8 or higher
109 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
110
111def utc_offset(): #NOTE: ignore issues like #1647654
112 # local time = utc time + utc offset
113 if time.daylight and time.localtime().tm_isdst > 0:
114 return -time.altzone # seconds
115 return -time.timezone
116
117def asn1time(cert_time):
118 # Some versions of OpenSSL ignore seconds, see #18207
119 # 0.9.8.i
120 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
121 fmt = "%b %d %H:%M:%S %Y GMT"
122 dt = datetime.datetime.strptime(cert_time, fmt)
123 dt = dt.replace(second=0)
124 cert_time = dt.strftime(fmt)
125 # %d adds leading zero but ASN1_TIME_print() uses leading space
126 if cert_time[4] == "0":
127 cert_time = cert_time[:4] + " " + cert_time[5:]
128
129 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000130
Antoine Pitroud75efd92010-08-04 17:38:33 +0000131# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
132def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200133 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200134 @functools.wraps(func)
135 def f(*args, **kwargs):
136 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500137 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
138 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200139 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500140 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200141 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
142 return func(*args, **kwargs)
143 return f
144 else:
145 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000146
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500147needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
148
Antoine Pitroud75efd92010-08-04 17:38:33 +0000149
150class BasicSocketTests(unittest.TestCase):
151
Antoine Pitrou3945c862010-04-28 21:11:01 +0000152 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000153 ssl.CERT_NONE
154 ssl.CERT_OPTIONAL
155 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500156 ssl.OP_CIPHER_SERVER_PREFERENCE
157 ssl.OP_SINGLE_DH_USE
158 if ssl.HAS_ECDH:
159 ssl.OP_SINGLE_ECDH_USE
160 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
161 ssl.OP_NO_COMPRESSION
162 self.assertIn(ssl.HAS_SNI, {True, False})
163 self.assertIn(ssl.HAS_ECDH, {True, False})
164
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000165
Antoine Pitrou3945c862010-04-28 21:11:01 +0000166 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500168 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000169 sys.stdout.write("\n RAND_status is %d (%s)\n"
170 % (v, (v and "sufficient randomness") or
171 "insufficient randomness"))
Victor Stinner7c906672015-01-06 13:53:37 +0100172 if hasattr(ssl, 'RAND_egd'):
173 self.assertRaises(TypeError, ssl.RAND_egd, 1)
174 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000175 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000176
Antoine Pitrou3945c862010-04-28 21:11:01 +0000177 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000178 # note that this uses an 'unofficial' function in _ssl.c,
179 # provided solely for this test, to exercise the certificate
180 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500181 p = ssl._ssl._test_decode_cert(CERTFILE)
182 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000183 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500184 self.assertEqual(p['issuer'],
185 ((('countryName', 'XY'),),
186 (('localityName', 'Castle Anthrax'),),
187 (('organizationName', 'Python Software Foundation'),),
188 (('commonName', 'localhost'),))
189 )
190 # Note the next three asserts will fail if the keys are regenerated
191 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
192 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
193 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200194 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100195 ((('countryName', 'XY'),),
196 (('localityName', 'Castle Anthrax'),),
197 (('organizationName', 'Python Software Foundation'),),
198 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200199 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100200 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200201 # Issue #13034: the subjectAltName in some certificates
202 # (notably projects.developer.nokia.com:443) wasn't parsed
203 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500204 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200205 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
206 self.assertEqual(p['subjectAltName'],
207 (('DNS', 'projects.developer.nokia.com'),
208 ('DNS', 'projects.forum.nokia.com'))
209 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500210 # extra OCSP and AIA fields
211 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
212 self.assertEqual(p['caIssuers'],
213 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
214 self.assertEqual(p['crlDistributionPoints'],
215 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000216
Christian Heimes88b174c2013-08-17 00:54:47 +0200217 def test_parse_cert_CVE_2013_4238(self):
218 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500219 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200220 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
221 subject = ((('countryName', 'US'),),
222 (('stateOrProvinceName', 'Oregon'),),
223 (('localityName', 'Beaverton'),),
224 (('organizationName', 'Python Software Foundation'),),
225 (('organizationalUnitName', 'Python Core Development'),),
226 (('commonName', 'null.python.org\x00example.org'),),
227 (('emailAddress', 'python-dev@python.org'),))
228 self.assertEqual(p['subject'], subject)
229 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500230 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200231 san = (('DNS', 'altnull.python.org\x00example.com'),
232 ('email', 'null@python.org\x00user@example.org'),
233 ('URI', 'http://null.python.org\x00http://example.org'),
234 ('IP Address', '192.0.2.1'),
235 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
236 else:
237 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
238 san = (('DNS', 'altnull.python.org\x00example.com'),
239 ('email', 'null@python.org\x00user@example.org'),
240 ('URI', 'http://null.python.org\x00http://example.org'),
241 ('IP Address', '192.0.2.1'),
242 ('IP Address', '<invalid>'))
243
244 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200245
Antoine Pitrou3945c862010-04-28 21:11:01 +0000246 def test_DER_to_PEM(self):
247 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
248 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000249 d1 = ssl.PEM_cert_to_DER_cert(pem)
250 p2 = ssl.DER_cert_to_PEM_cert(d1)
251 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000252 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000253 if not p2.startswith(ssl.PEM_HEADER + '\n'):
254 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
255 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
256 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000257
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000258 def test_openssl_version(self):
259 n = ssl.OPENSSL_VERSION_NUMBER
260 t = ssl.OPENSSL_VERSION_INFO
261 s = ssl.OPENSSL_VERSION
262 self.assertIsInstance(n, (int, long))
263 self.assertIsInstance(t, tuple)
264 self.assertIsInstance(s, str)
265 # Some sanity checks follow
266 # >= 0.9
267 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400268 # < 3.0
269 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000270 major, minor, fix, patch, status = t
271 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400272 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000273 self.assertGreaterEqual(minor, 0)
274 self.assertLess(minor, 256)
275 self.assertGreaterEqual(fix, 0)
276 self.assertLess(fix, 256)
277 self.assertGreaterEqual(patch, 0)
Ned Deilyfa119782015-02-05 17:19:11 +1100278 self.assertLessEqual(patch, 63)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000279 self.assertGreaterEqual(status, 0)
280 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400281 # Version string as returned by {Open,Libre}SSL, the format might change
282 if "LibreSSL" in s:
283 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
284 (s, t))
285 else:
286 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
287 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000288
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500289 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000290 def test_refcycle(self):
291 # Issue #7943: an SSL object doesn't create reference cycles with
292 # itself.
293 s = socket.socket(socket.AF_INET)
294 ss = ssl.wrap_socket(s)
295 wr = weakref.ref(ss)
296 del ss
297 self.assertEqual(wr(), None)
298
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000299 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500300 # Methods on an unconnected SSLSocket propagate the original
301 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000302 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500303 with closing(ssl.wrap_socket(s)) as ss:
304 self.assertRaises(socket.error, ss.recv, 1)
305 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
306 self.assertRaises(socket.error, ss.recvfrom, 1)
307 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
308 self.assertRaises(socket.error, ss.send, b'x')
309 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
310
311 def test_timeout(self):
312 # Issue #8524: when creating an SSL socket, the timeout of the
313 # original socket should be retained.
314 for timeout in (None, 0.0, 5.0):
315 s = socket.socket(socket.AF_INET)
316 s.settimeout(timeout)
317 with closing(ssl.wrap_socket(s)) as ss:
318 self.assertEqual(timeout, ss.gettimeout())
319
320 def test_errors(self):
321 sock = socket.socket()
322 self.assertRaisesRegexp(ValueError,
323 "certfile must be specified",
324 ssl.wrap_socket, sock, keyfile=CERTFILE)
325 self.assertRaisesRegexp(ValueError,
326 "certfile must be specified for server-side operations",
327 ssl.wrap_socket, sock, server_side=True)
328 self.assertRaisesRegexp(ValueError,
329 "certfile must be specified for server-side operations",
330 ssl.wrap_socket, sock, server_side=True, certfile="")
331 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
332 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
333 s.connect, (HOST, 8080))
334 with self.assertRaises(IOError) as cm:
335 with closing(socket.socket()) as sock:
336 ssl.wrap_socket(sock, certfile=WRONGCERT)
337 self.assertEqual(cm.exception.errno, errno.ENOENT)
338 with self.assertRaises(IOError) as cm:
339 with closing(socket.socket()) as sock:
340 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
341 self.assertEqual(cm.exception.errno, errno.ENOENT)
342 with self.assertRaises(IOError) as cm:
343 with closing(socket.socket()) as sock:
344 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
345 self.assertEqual(cm.exception.errno, errno.ENOENT)
346
347 def test_match_hostname(self):
348 def ok(cert, hostname):
349 ssl.match_hostname(cert, hostname)
350 def fail(cert, hostname):
351 self.assertRaises(ssl.CertificateError,
352 ssl.match_hostname, cert, hostname)
353
354 cert = {'subject': ((('commonName', 'example.com'),),)}
355 ok(cert, 'example.com')
356 ok(cert, 'ExAmple.cOm')
357 fail(cert, 'www.example.com')
358 fail(cert, '.example.com')
359 fail(cert, 'example.org')
360 fail(cert, 'exampleXcom')
361
362 cert = {'subject': ((('commonName', '*.a.com'),),)}
363 ok(cert, 'foo.a.com')
364 fail(cert, 'bar.foo.a.com')
365 fail(cert, 'a.com')
366 fail(cert, 'Xa.com')
367 fail(cert, '.a.com')
368
369 # only match one left-most wildcard
370 cert = {'subject': ((('commonName', 'f*.com'),),)}
371 ok(cert, 'foo.com')
372 ok(cert, 'f.com')
373 fail(cert, 'bar.com')
374 fail(cert, 'foo.a.com')
375 fail(cert, 'bar.foo.com')
376
377 # NULL bytes are bad, CVE-2013-4073
378 cert = {'subject': ((('commonName',
379 'null.python.org\x00example.org'),),)}
380 ok(cert, 'null.python.org\x00example.org') # or raise an error?
381 fail(cert, 'example.org')
382 fail(cert, 'null.python.org')
383
384 # error cases with wildcards
385 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
386 fail(cert, 'bar.foo.a.com')
387 fail(cert, 'a.com')
388 fail(cert, 'Xa.com')
389 fail(cert, '.a.com')
390
391 cert = {'subject': ((('commonName', 'a.*.com'),),)}
392 fail(cert, 'a.foo.com')
393 fail(cert, 'a..com')
394 fail(cert, 'a.com')
395
396 # wildcard doesn't match IDNA prefix 'xn--'
397 idna = u'püthon.python.org'.encode("idna").decode("ascii")
398 cert = {'subject': ((('commonName', idna),),)}
399 ok(cert, idna)
400 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
401 fail(cert, idna)
402 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
403 fail(cert, idna)
404
405 # wildcard in first fragment and IDNA A-labels in sequent fragments
406 # are supported.
407 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
408 cert = {'subject': ((('commonName', idna),),)}
409 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
410 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
411 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
412 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
413
414 # Slightly fake real-world example
415 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
416 'subject': ((('commonName', 'linuxfrz.org'),),),
417 'subjectAltName': (('DNS', 'linuxfr.org'),
418 ('DNS', 'linuxfr.com'),
419 ('othername', '<unsupported>'))}
420 ok(cert, 'linuxfr.org')
421 ok(cert, 'linuxfr.com')
422 # Not a "DNS" entry
423 fail(cert, '<unsupported>')
424 # When there is a subjectAltName, commonName isn't used
425 fail(cert, 'linuxfrz.org')
426
427 # A pristine real-world example
428 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
429 'subject': ((('countryName', 'US'),),
430 (('stateOrProvinceName', 'California'),),
431 (('localityName', 'Mountain View'),),
432 (('organizationName', 'Google Inc'),),
433 (('commonName', 'mail.google.com'),))}
434 ok(cert, 'mail.google.com')
435 fail(cert, 'gmail.com')
436 # Only commonName is considered
437 fail(cert, 'California')
438
439 # Neither commonName nor subjectAltName
440 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
441 'subject': ((('countryName', 'US'),),
442 (('stateOrProvinceName', 'California'),),
443 (('localityName', 'Mountain View'),),
444 (('organizationName', 'Google Inc'),))}
445 fail(cert, 'mail.google.com')
446
447 # No DNS entry in subjectAltName but a commonName
448 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
449 'subject': ((('countryName', 'US'),),
450 (('stateOrProvinceName', 'California'),),
451 (('localityName', 'Mountain View'),),
452 (('commonName', 'mail.google.com'),)),
453 'subjectAltName': (('othername', 'blabla'), )}
454 ok(cert, 'mail.google.com')
455
456 # No DNS entry subjectAltName and no commonName
457 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
458 'subject': ((('countryName', 'US'),),
459 (('stateOrProvinceName', 'California'),),
460 (('localityName', 'Mountain View'),),
461 (('organizationName', 'Google Inc'),)),
462 'subjectAltName': (('othername', 'blabla'),)}
463 fail(cert, 'google.com')
464
465 # Empty cert / no cert
466 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
467 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
468
469 # Issue #17980: avoid denials of service by refusing more than one
470 # wildcard per fragment.
471 cert = {'subject': ((('commonName', 'a*b.com'),),)}
472 ok(cert, 'axxb.com')
473 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
474 fail(cert, 'axxb.com')
475 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
476 with self.assertRaises(ssl.CertificateError) as cm:
477 ssl.match_hostname(cert, 'axxbxxc.com')
478 self.assertIn("too many wildcards", str(cm.exception))
479
480 def test_server_side(self):
481 # server_hostname doesn't work for server sockets
482 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
483 with closing(socket.socket()) as sock:
484 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
485 server_hostname="some.hostname")
486
487 def test_unknown_channel_binding(self):
488 # should raise ValueError for unknown type
489 s = socket.socket(socket.AF_INET)
490 with closing(ssl.wrap_socket(s)) as ss:
491 with self.assertRaises(ValueError):
492 ss.get_channel_binding("unknown-type")
493
494 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
495 "'tls-unique' channel binding not available")
496 def test_tls_unique_channel_binding(self):
497 # unconnected should return None for known type
498 s = socket.socket(socket.AF_INET)
499 with closing(ssl.wrap_socket(s)) as ss:
500 self.assertIsNone(ss.get_channel_binding("tls-unique"))
501 # the same for server-side
502 s = socket.socket(socket.AF_INET)
503 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
504 self.assertIsNone(ss.get_channel_binding("tls-unique"))
505
506 def test_get_default_verify_paths(self):
507 paths = ssl.get_default_verify_paths()
508 self.assertEqual(len(paths), 6)
509 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
510
511 with support.EnvironmentVarGuard() as env:
512 env["SSL_CERT_DIR"] = CAPATH
513 env["SSL_CERT_FILE"] = CERTFILE
514 paths = ssl.get_default_verify_paths()
515 self.assertEqual(paths.cafile, CERTFILE)
516 self.assertEqual(paths.capath, CAPATH)
517
518 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
519 def test_enum_certificates(self):
520 self.assertTrue(ssl.enum_certificates("CA"))
521 self.assertTrue(ssl.enum_certificates("ROOT"))
522
523 self.assertRaises(TypeError, ssl.enum_certificates)
524 self.assertRaises(WindowsError, ssl.enum_certificates, "")
525
526 trust_oids = set()
527 for storename in ("CA", "ROOT"):
528 store = ssl.enum_certificates(storename)
529 self.assertIsInstance(store, list)
530 for element in store:
531 self.assertIsInstance(element, tuple)
532 self.assertEqual(len(element), 3)
533 cert, enc, trust = element
534 self.assertIsInstance(cert, bytes)
535 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
536 self.assertIsInstance(trust, (set, bool))
537 if isinstance(trust, set):
538 trust_oids.update(trust)
539
540 serverAuth = "1.3.6.1.5.5.7.3.1"
541 self.assertIn(serverAuth, trust_oids)
542
543 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
544 def test_enum_crls(self):
545 self.assertTrue(ssl.enum_crls("CA"))
546 self.assertRaises(TypeError, ssl.enum_crls)
547 self.assertRaises(WindowsError, ssl.enum_crls, "")
548
549 crls = ssl.enum_crls("CA")
550 self.assertIsInstance(crls, list)
551 for element in crls:
552 self.assertIsInstance(element, tuple)
553 self.assertEqual(len(element), 2)
554 self.assertIsInstance(element[0], bytes)
555 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
556
557
558 def test_asn1object(self):
559 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
560 '1.3.6.1.5.5.7.3.1')
561
562 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
563 self.assertEqual(val, expected)
564 self.assertEqual(val.nid, 129)
565 self.assertEqual(val.shortname, 'serverAuth')
566 self.assertEqual(val.longname, 'TLS Web Server Authentication')
567 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
568 self.assertIsInstance(val, ssl._ASN1Object)
569 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
570
571 val = ssl._ASN1Object.fromnid(129)
572 self.assertEqual(val, expected)
573 self.assertIsInstance(val, ssl._ASN1Object)
574 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
575 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
576 ssl._ASN1Object.fromnid(100000)
577 for i in range(1000):
578 try:
579 obj = ssl._ASN1Object.fromnid(i)
580 except ValueError:
581 pass
582 else:
583 self.assertIsInstance(obj.nid, int)
584 self.assertIsInstance(obj.shortname, str)
585 self.assertIsInstance(obj.longname, str)
586 self.assertIsInstance(obj.oid, (str, type(None)))
587
588 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
589 self.assertEqual(val, expected)
590 self.assertIsInstance(val, ssl._ASN1Object)
591 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
592 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
593 expected)
594 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
595 ssl._ASN1Object.fromname('serverauth')
596
597 def test_purpose_enum(self):
598 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
599 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
600 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
601 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
602 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
603 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
604 '1.3.6.1.5.5.7.3.1')
605
606 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
607 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
608 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
609 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
610 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
611 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
612 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000613
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100614 def test_unsupported_dtls(self):
615 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
616 self.addCleanup(s.close)
617 with self.assertRaises(NotImplementedError) as cx:
618 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
619 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500620 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
621 with self.assertRaises(NotImplementedError) as cx:
622 ctx.wrap_socket(s)
623 self.assertEqual(str(cx.exception), "only stream sockets are supported")
624
625 def cert_time_ok(self, timestring, timestamp):
626 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
627
628 def cert_time_fail(self, timestring):
629 with self.assertRaises(ValueError):
630 ssl.cert_time_to_seconds(timestring)
631
632 @unittest.skipUnless(utc_offset(),
633 'local time needs to be different from UTC')
634 def test_cert_time_to_seconds_timezone(self):
635 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
636 # results if local timezone is not UTC
637 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
638 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
639
640 def test_cert_time_to_seconds(self):
641 timestring = "Jan 5 09:34:43 2018 GMT"
642 ts = 1515144883.0
643 self.cert_time_ok(timestring, ts)
644 # accept keyword parameter, assert its name
645 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
646 # accept both %e and %d (space or zero generated by strftime)
647 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
648 # case-insensitive
649 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
650 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
651 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
652 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
653 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
654 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
655 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
656 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
657
658 newyear_ts = 1230768000.0
659 # leap seconds
660 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
661 # same timestamp
662 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
663
664 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
665 # allow 60th second (even if it is not a leap second)
666 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
667 # allow 2nd leap second for compatibility with time.strptime()
668 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
669 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
670
671 # no special treatement for the special value:
672 # 99991231235959Z (rfc 5280)
673 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
674
675 @support.run_with_locale('LC_ALL', '')
676 def test_cert_time_to_seconds_locale(self):
677 # `cert_time_to_seconds()` should be locale independent
678
679 def local_february_name():
680 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
681
682 if local_february_name().lower() == 'feb':
683 self.skipTest("locale-specific month name needs to be "
684 "different from C locale")
685
686 # locale-independent
687 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
688 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
689
690
691class ContextTests(unittest.TestCase):
692
693 @skip_if_broken_ubuntu_ssl
694 def test_constructor(self):
695 for protocol in PROTOCOLS:
696 ssl.SSLContext(protocol)
697 self.assertRaises(TypeError, ssl.SSLContext)
698 self.assertRaises(ValueError, ssl.SSLContext, -1)
699 self.assertRaises(ValueError, ssl.SSLContext, 42)
700
701 @skip_if_broken_ubuntu_ssl
702 def test_protocol(self):
703 for proto in PROTOCOLS:
704 ctx = ssl.SSLContext(proto)
705 self.assertEqual(ctx.protocol, proto)
706
707 def test_ciphers(self):
708 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
709 ctx.set_ciphers("ALL")
710 ctx.set_ciphers("DEFAULT")
711 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
712 ctx.set_ciphers("^$:,;?*'dorothyx")
713
714 @skip_if_broken_ubuntu_ssl
715 def test_options(self):
716 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson10aaca92015-11-11 22:38:41 -0800717 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500718 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
719 ctx.options)
Benjamin Peterson10aaca92015-11-11 22:38:41 -0800720 ctx.options |= ssl.OP_NO_TLSv1
721 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1,
722 ctx.options)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500723 if can_clear_options():
724 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
725 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
726 ctx.options)
727 ctx.options = 0
728 self.assertEqual(0, ctx.options)
729 else:
730 with self.assertRaises(ValueError):
731 ctx.options = 0
732
733 def test_verify_mode(self):
734 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
735 # Default value
736 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
737 ctx.verify_mode = ssl.CERT_OPTIONAL
738 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
739 ctx.verify_mode = ssl.CERT_REQUIRED
740 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
741 ctx.verify_mode = ssl.CERT_NONE
742 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
743 with self.assertRaises(TypeError):
744 ctx.verify_mode = None
745 with self.assertRaises(ValueError):
746 ctx.verify_mode = 42
747
748 @unittest.skipUnless(have_verify_flags(),
749 "verify_flags need OpenSSL > 0.9.8")
750 def test_verify_flags(self):
751 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson72ef9612015-03-04 22:49:41 -0500752 # default value
753 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
754 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500755 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
756 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
757 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
758 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
759 ctx.verify_flags = ssl.VERIFY_DEFAULT
760 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
761 # supports any value
762 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
763 self.assertEqual(ctx.verify_flags,
764 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
765 with self.assertRaises(TypeError):
766 ctx.verify_flags = None
767
768 def test_load_cert_chain(self):
769 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
770 # Combined key and cert in a single file
Benjamin Peterson04439fd2014-11-03 21:05:01 -0500771 ctx.load_cert_chain(CERTFILE, keyfile=None)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500772 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
773 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
774 with self.assertRaises(IOError) as cm:
775 ctx.load_cert_chain(WRONGCERT)
776 self.assertEqual(cm.exception.errno, errno.ENOENT)
777 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
778 ctx.load_cert_chain(BADCERT)
779 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
780 ctx.load_cert_chain(EMPTYCERT)
781 # Separate key and cert
782 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
783 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
784 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
785 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
786 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
787 ctx.load_cert_chain(ONLYCERT)
788 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
789 ctx.load_cert_chain(ONLYKEY)
790 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
791 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
792 # Mismatching key and cert
793 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
794 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
795 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
796 # Password protected key and cert
797 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
798 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
799 ctx.load_cert_chain(CERTFILE_PROTECTED,
800 password=bytearray(KEY_PASSWORD.encode()))
801 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
802 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
803 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
804 bytearray(KEY_PASSWORD.encode()))
805 with self.assertRaisesRegexp(TypeError, "should be a string"):
806 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
807 with self.assertRaises(ssl.SSLError):
808 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
809 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
810 # openssl has a fixed limit on the password buffer.
811 # PEM_BUFSIZE is generally set to 1kb.
812 # Return a string larger than this.
813 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
814 # Password callback
815 def getpass_unicode():
816 return KEY_PASSWORD
817 def getpass_bytes():
818 return KEY_PASSWORD.encode()
819 def getpass_bytearray():
820 return bytearray(KEY_PASSWORD.encode())
821 def getpass_badpass():
822 return "badpass"
823 def getpass_huge():
824 return b'a' * (1024 * 1024)
825 def getpass_bad_type():
826 return 9
827 def getpass_exception():
828 raise Exception('getpass error')
829 class GetPassCallable:
830 def __call__(self):
831 return KEY_PASSWORD
832 def getpass(self):
833 return KEY_PASSWORD
834 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
835 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
836 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
837 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
838 ctx.load_cert_chain(CERTFILE_PROTECTED,
839 password=GetPassCallable().getpass)
840 with self.assertRaises(ssl.SSLError):
841 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
842 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
843 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
844 with self.assertRaisesRegexp(TypeError, "must return a string"):
845 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
846 with self.assertRaisesRegexp(Exception, "getpass error"):
847 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
848 # Make sure the password function isn't called if it isn't needed
849 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
850
851 def test_load_verify_locations(self):
852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
853 ctx.load_verify_locations(CERTFILE)
854 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
855 ctx.load_verify_locations(BYTES_CERTFILE)
856 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400857 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500858 self.assertRaises(TypeError, ctx.load_verify_locations)
859 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
860 with self.assertRaises(IOError) as cm:
861 ctx.load_verify_locations(WRONGCERT)
862 self.assertEqual(cm.exception.errno, errno.ENOENT)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400863 with self.assertRaises(IOError):
864 ctx.load_verify_locations(u'')
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500865 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
866 ctx.load_verify_locations(BADCERT)
867 ctx.load_verify_locations(CERTFILE, CAPATH)
868 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
869
870 # Issue #10989: crash if the second argument type is invalid
871 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
872
873 def test_load_verify_cadata(self):
874 # test cadata
875 with open(CAFILE_CACERT) as f:
876 cacert_pem = f.read().decode("ascii")
877 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
878 with open(CAFILE_NEURONIO) as f:
879 neuronio_pem = f.read().decode("ascii")
880 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
881
882 # test PEM
883 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
884 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
885 ctx.load_verify_locations(cadata=cacert_pem)
886 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
887 ctx.load_verify_locations(cadata=neuronio_pem)
888 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
889 # cert already in hash table
890 ctx.load_verify_locations(cadata=neuronio_pem)
891 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
892
893 # combined
894 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
895 combined = "\n".join((cacert_pem, neuronio_pem))
896 ctx.load_verify_locations(cadata=combined)
897 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
898
899 # with junk around the certs
900 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
901 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
902 neuronio_pem, "tail"]
903 ctx.load_verify_locations(cadata="\n".join(combined))
904 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
905
906 # test DER
907 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
908 ctx.load_verify_locations(cadata=cacert_der)
909 ctx.load_verify_locations(cadata=neuronio_der)
910 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
911 # cert already in hash table
912 ctx.load_verify_locations(cadata=cacert_der)
913 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
914
915 # combined
916 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
917 combined = b"".join((cacert_der, neuronio_der))
918 ctx.load_verify_locations(cadata=combined)
919 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
920
921 # error cases
922 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
923 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
924
925 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
926 ctx.load_verify_locations(cadata=u"broken")
927 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
928 ctx.load_verify_locations(cadata=b"broken")
929
930
931 def test_load_dh_params(self):
932 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
933 ctx.load_dh_params(DHFILE)
934 if os.name != 'nt':
935 ctx.load_dh_params(BYTES_DHFILE)
936 self.assertRaises(TypeError, ctx.load_dh_params)
937 self.assertRaises(TypeError, ctx.load_dh_params, None)
938 with self.assertRaises(IOError) as cm:
939 ctx.load_dh_params(WRONGCERT)
940 self.assertEqual(cm.exception.errno, errno.ENOENT)
941 with self.assertRaises(ssl.SSLError) as cm:
942 ctx.load_dh_params(CERTFILE)
943
944 @skip_if_broken_ubuntu_ssl
945 def test_session_stats(self):
946 for proto in PROTOCOLS:
947 ctx = ssl.SSLContext(proto)
948 self.assertEqual(ctx.session_stats(), {
949 'number': 0,
950 'connect': 0,
951 'connect_good': 0,
952 'connect_renegotiate': 0,
953 'accept': 0,
954 'accept_good': 0,
955 'accept_renegotiate': 0,
956 'hits': 0,
957 'misses': 0,
958 'timeouts': 0,
959 'cache_full': 0,
960 })
961
962 def test_set_default_verify_paths(self):
963 # There's not much we can do to test that it acts as expected,
964 # so just check it doesn't crash or raise an exception.
965 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
966 ctx.set_default_verify_paths()
967
968 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
969 def test_set_ecdh_curve(self):
970 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
971 ctx.set_ecdh_curve("prime256v1")
972 ctx.set_ecdh_curve(b"prime256v1")
973 self.assertRaises(TypeError, ctx.set_ecdh_curve)
974 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
975 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
976 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
977
978 @needs_sni
979 def test_sni_callback(self):
980 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
981
982 # set_servername_callback expects a callable, or None
983 self.assertRaises(TypeError, ctx.set_servername_callback)
984 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
985 self.assertRaises(TypeError, ctx.set_servername_callback, "")
986 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
987
988 def dummycallback(sock, servername, ctx):
989 pass
990 ctx.set_servername_callback(None)
991 ctx.set_servername_callback(dummycallback)
992
993 @needs_sni
994 def test_sni_callback_refcycle(self):
995 # Reference cycles through the servername callback are detected
996 # and cleared.
997 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
998 def dummycallback(sock, servername, ctx, cycle=ctx):
999 pass
1000 ctx.set_servername_callback(dummycallback)
1001 wr = weakref.ref(ctx)
1002 del ctx, dummycallback
1003 gc.collect()
1004 self.assertIs(wr(), None)
1005
1006 def test_cert_store_stats(self):
1007 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1008 self.assertEqual(ctx.cert_store_stats(),
1009 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1010 ctx.load_cert_chain(CERTFILE)
1011 self.assertEqual(ctx.cert_store_stats(),
1012 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1013 ctx.load_verify_locations(CERTFILE)
1014 self.assertEqual(ctx.cert_store_stats(),
1015 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1016 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1017 self.assertEqual(ctx.cert_store_stats(),
1018 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1019
1020 def test_get_ca_certs(self):
1021 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1022 self.assertEqual(ctx.get_ca_certs(), [])
1023 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1024 ctx.load_verify_locations(CERTFILE)
1025 self.assertEqual(ctx.get_ca_certs(), [])
1026 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1027 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1028 self.assertEqual(ctx.get_ca_certs(),
1029 [{'issuer': ((('organizationName', 'Root CA'),),
1030 (('organizationalUnitName', 'http://www.cacert.org'),),
1031 (('commonName', 'CA Cert Signing Authority'),),
1032 (('emailAddress', 'support@cacert.org'),)),
1033 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1034 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1035 'serialNumber': '00',
1036 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1037 'subject': ((('organizationName', 'Root CA'),),
1038 (('organizationalUnitName', 'http://www.cacert.org'),),
1039 (('commonName', 'CA Cert Signing Authority'),),
1040 (('emailAddress', 'support@cacert.org'),)),
1041 'version': 3}])
1042
1043 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1044 pem = f.read()
1045 der = ssl.PEM_cert_to_DER_cert(pem)
1046 self.assertEqual(ctx.get_ca_certs(True), [der])
1047
1048 def test_load_default_certs(self):
1049 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1050 ctx.load_default_certs()
1051
1052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1053 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1054 ctx.load_default_certs()
1055
1056 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1057 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1058
1059 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1060 self.assertRaises(TypeError, ctx.load_default_certs, None)
1061 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1062
Benjamin Petersona02ae252014-10-03 18:17:15 -04001063 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson0b30a2b2014-10-03 17:27:05 -04001064 def test_load_default_certs_env(self):
1065 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1066 with support.EnvironmentVarGuard() as env:
1067 env["SSL_CERT_DIR"] = CAPATH
1068 env["SSL_CERT_FILE"] = CERTFILE
1069 ctx.load_default_certs()
1070 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1071
Benjamin Petersona02ae252014-10-03 18:17:15 -04001072 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1073 def test_load_default_certs_env_windows(self):
1074 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1075 ctx.load_default_certs()
1076 stats = ctx.cert_store_stats()
1077
1078 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1079 with support.EnvironmentVarGuard() as env:
1080 env["SSL_CERT_DIR"] = CAPATH
1081 env["SSL_CERT_FILE"] = CERTFILE
1082 ctx.load_default_certs()
1083 stats["x509"] += 1
1084 self.assertEqual(ctx.cert_store_stats(), stats)
1085
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001086 def test_create_default_context(self):
1087 ctx = ssl.create_default_context()
1088 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1089 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1090 self.assertTrue(ctx.check_hostname)
1091 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1092 self.assertEqual(
1093 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1094 getattr(ssl, "OP_NO_COMPRESSION", 0),
1095 )
1096
1097 with open(SIGNING_CA) as f:
1098 cadata = f.read().decode("ascii")
1099 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1100 cadata=cadata)
1101 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1102 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1103 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1104 self.assertEqual(
1105 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1106 getattr(ssl, "OP_NO_COMPRESSION", 0),
1107 )
1108
1109 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1110 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1111 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1112 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1113 self.assertEqual(
1114 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1115 getattr(ssl, "OP_NO_COMPRESSION", 0),
1116 )
1117 self.assertEqual(
1118 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1119 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1120 )
1121 self.assertEqual(
1122 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1123 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1124 )
1125
1126 def test__create_stdlib_context(self):
1127 ctx = ssl._create_stdlib_context()
1128 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1129 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1130 self.assertFalse(ctx.check_hostname)
1131 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1132
1133 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1134 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1135 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1136 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1137
1138 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1139 cert_reqs=ssl.CERT_REQUIRED,
1140 check_hostname=True)
1141 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1142 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1143 self.assertTrue(ctx.check_hostname)
1144 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1145
1146 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1147 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1148 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1149 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1150
1151 def test_check_hostname(self):
1152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1153 self.assertFalse(ctx.check_hostname)
1154
1155 # Requires CERT_REQUIRED or CERT_OPTIONAL
1156 with self.assertRaises(ValueError):
1157 ctx.check_hostname = True
1158 ctx.verify_mode = ssl.CERT_REQUIRED
1159 self.assertFalse(ctx.check_hostname)
1160 ctx.check_hostname = True
1161 self.assertTrue(ctx.check_hostname)
1162
1163 ctx.verify_mode = ssl.CERT_OPTIONAL
1164 ctx.check_hostname = True
1165 self.assertTrue(ctx.check_hostname)
1166
1167 # Cannot set CERT_NONE with check_hostname enabled
1168 with self.assertRaises(ValueError):
1169 ctx.verify_mode = ssl.CERT_NONE
1170 ctx.check_hostname = False
1171 self.assertFalse(ctx.check_hostname)
1172
1173
1174class SSLErrorTests(unittest.TestCase):
1175
1176 def test_str(self):
1177 # The str() of a SSLError doesn't include the errno
1178 e = ssl.SSLError(1, "foo")
1179 self.assertEqual(str(e), "foo")
1180 self.assertEqual(e.errno, 1)
1181 # Same for a subclass
1182 e = ssl.SSLZeroReturnError(1, "foo")
1183 self.assertEqual(str(e), "foo")
1184 self.assertEqual(e.errno, 1)
1185
1186 def test_lib_reason(self):
1187 # Test the library and reason attributes
1188 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1189 with self.assertRaises(ssl.SSLError) as cm:
1190 ctx.load_dh_params(CERTFILE)
1191 self.assertEqual(cm.exception.library, 'PEM')
1192 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1193 s = str(cm.exception)
1194 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1195
1196 def test_subclass(self):
1197 # Check that the appropriate SSLError subclass is raised
1198 # (this only tests one of them)
1199 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1200 with closing(socket.socket()) as s:
1201 s.bind(("127.0.0.1", 0))
1202 s.listen(5)
1203 c = socket.socket()
1204 c.connect(s.getsockname())
1205 c.setblocking(False)
1206 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1207 with self.assertRaises(ssl.SSLWantReadError) as cm:
1208 c.do_handshake()
1209 s = str(cm.exception)
1210 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1211 # For compatibility
1212 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001213
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001214
Bill Janssen934b16d2008-06-28 22:19:33 +00001215class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001216
Antoine Pitrou3945c862010-04-28 21:11:01 +00001217 def test_connect(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001218 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001219 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1220 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001221 try:
1222 s.connect(("svn.python.org", 443))
1223 self.assertEqual({}, s.getpeercert())
1224 finally:
1225 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001226
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001227 # this should fail because we have no verification certs
1228 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1229 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001230 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1231 s.connect, ("svn.python.org", 443))
1232 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001233
1234 # this should succeed because we specify the root cert
1235 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1236 cert_reqs=ssl.CERT_REQUIRED,
1237 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1238 try:
1239 s.connect(("svn.python.org", 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001240 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001241 finally:
1242 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001243
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001244 def test_connect_ex(self):
1245 # Issue #11326: check connect_ex() implementation
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001246 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001247 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1248 cert_reqs=ssl.CERT_REQUIRED,
1249 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1250 try:
1251 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1252 self.assertTrue(s.getpeercert())
1253 finally:
1254 s.close()
1255
1256 def test_non_blocking_connect_ex(self):
1257 # Issue #11326: non-blocking connect_ex() should allow handshake
1258 # to proceed after the socket gets ready.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001259 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001260 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1261 cert_reqs=ssl.CERT_REQUIRED,
1262 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1263 do_handshake_on_connect=False)
1264 try:
1265 s.setblocking(False)
1266 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001267 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1268 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001269 # Wait for connect to finish
1270 select.select([], [s], [], 5.0)
1271 # Non-blocking handshake
1272 while True:
1273 try:
1274 s.do_handshake()
1275 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001276 except ssl.SSLWantReadError:
1277 select.select([s], [], [], 5.0)
1278 except ssl.SSLWantWriteError:
1279 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001280 # SSL established
1281 self.assertTrue(s.getpeercert())
1282 finally:
1283 s.close()
1284
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001285 def test_timeout_connect_ex(self):
1286 # Issue #12065: on a timeout, connect_ex() should return the original
1287 # errno (mimicking the behaviour of non-SSL sockets).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001288 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001289 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1290 cert_reqs=ssl.CERT_REQUIRED,
1291 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1292 do_handshake_on_connect=False)
1293 try:
1294 s.settimeout(0.0000001)
1295 rc = s.connect_ex(('svn.python.org', 443))
1296 if rc == 0:
1297 self.skipTest("svn.python.org responded too quickly")
1298 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1299 finally:
1300 s.close()
1301
1302 def test_connect_ex_error(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001303 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001304 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1305 cert_reqs=ssl.CERT_REQUIRED,
1306 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1307 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001308 rc = s.connect_ex(("svn.python.org", 444))
1309 # Issue #19919: Windows machines or VMs hosted on Windows
1310 # machines sometimes return EWOULDBLOCK.
1311 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001312 finally:
1313 s.close()
1314
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001315 def test_connect_with_context(self):
1316 with support.transient_internet("svn.python.org"):
1317 # Same as test_connect, but with a separately created context
1318 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1319 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1320 s.connect(("svn.python.org", 443))
1321 try:
1322 self.assertEqual({}, s.getpeercert())
1323 finally:
1324 s.close()
1325 # Same with a server hostname
1326 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1327 server_hostname="svn.python.org")
Benjamin Peterson31aa69e2014-11-23 20:13:31 -06001328 s.connect(("svn.python.org", 443))
1329 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001330 # This should fail because we have no verification certs
1331 ctx.verify_mode = ssl.CERT_REQUIRED
1332 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1333 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1334 s.connect, ("svn.python.org", 443))
1335 s.close()
1336 # This should succeed because we specify the root cert
1337 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1338 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1339 s.connect(("svn.python.org", 443))
1340 try:
1341 cert = s.getpeercert()
1342 self.assertTrue(cert)
1343 finally:
1344 s.close()
1345
1346 def test_connect_capath(self):
1347 # Verify server certificates using the `capath` argument
1348 # NOTE: the subject hashing algorithm has been changed between
1349 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1350 # contain both versions of each certificate (same content, different
1351 # filename) for this test to be portable across OpenSSL releases.
1352 with support.transient_internet("svn.python.org"):
1353 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1354 ctx.verify_mode = ssl.CERT_REQUIRED
1355 ctx.load_verify_locations(capath=CAPATH)
1356 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1357 s.connect(("svn.python.org", 443))
1358 try:
1359 cert = s.getpeercert()
1360 self.assertTrue(cert)
1361 finally:
1362 s.close()
1363 # Same with a bytes `capath` argument
1364 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1365 ctx.verify_mode = ssl.CERT_REQUIRED
1366 ctx.load_verify_locations(capath=BYTES_CAPATH)
1367 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1368 s.connect(("svn.python.org", 443))
1369 try:
1370 cert = s.getpeercert()
1371 self.assertTrue(cert)
1372 finally:
1373 s.close()
1374
1375 def test_connect_cadata(self):
1376 with open(CAFILE_CACERT) as f:
1377 pem = f.read().decode('ascii')
1378 der = ssl.PEM_cert_to_DER_cert(pem)
1379 with support.transient_internet("svn.python.org"):
1380 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1381 ctx.verify_mode = ssl.CERT_REQUIRED
1382 ctx.load_verify_locations(cadata=pem)
1383 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1384 s.connect(("svn.python.org", 443))
1385 cert = s.getpeercert()
1386 self.assertTrue(cert)
1387
1388 # same with DER
1389 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1390 ctx.verify_mode = ssl.CERT_REQUIRED
1391 ctx.load_verify_locations(cadata=der)
1392 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1393 s.connect(("svn.python.org", 443))
1394 cert = s.getpeercert()
1395 self.assertTrue(cert)
1396
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001397 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1398 def test_makefile_close(self):
1399 # Issue #5238: creating a file-like object with makefile() shouldn't
1400 # delay closing the underlying "real socket" (here tested with its
1401 # file descriptor, hence skipping the test under Windows).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001402 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001403 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1404 ss.connect(("svn.python.org", 443))
1405 fd = ss.fileno()
1406 f = ss.makefile()
1407 f.close()
1408 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001409 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001410 # Closing the SSL socket should close the fd too
1411 ss.close()
1412 gc.collect()
1413 with self.assertRaises(OSError) as e:
1414 os.read(fd, 0)
1415 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001416
Antoine Pitrou3945c862010-04-28 21:11:01 +00001417 def test_non_blocking_handshake(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001418 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001419 s = socket.socket(socket.AF_INET)
1420 s.connect(("svn.python.org", 443))
1421 s.setblocking(False)
1422 s = ssl.wrap_socket(s,
1423 cert_reqs=ssl.CERT_NONE,
1424 do_handshake_on_connect=False)
1425 count = 0
1426 while True:
1427 try:
1428 count += 1
1429 s.do_handshake()
1430 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001431 except ssl.SSLWantReadError:
1432 select.select([s], [], [])
1433 except ssl.SSLWantWriteError:
1434 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001435 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001436 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001437 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001438
Antoine Pitrou3945c862010-04-28 21:11:01 +00001439 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001440 def _test_get_server_certificate(host, port, cert=None):
1441 with support.transient_internet(host):
1442 pem = ssl.get_server_certificate((host, port))
1443 if not pem:
1444 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001445
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001446 try:
1447 pem = ssl.get_server_certificate((host, port),
1448 ca_certs=CERTFILE)
1449 except ssl.SSLError as x:
1450 #should fail
1451 if support.verbose:
1452 sys.stdout.write("%s\n" % x)
1453 else:
1454 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001455
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001456 pem = ssl.get_server_certificate((host, port),
1457 ca_certs=cert)
1458 if not pem:
1459 self.fail("No server certificate on %s:%s!" % (host, port))
1460 if support.verbose:
1461 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1462
1463 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1464 if support.IPV6_ENABLED:
1465 _test_get_server_certificate('ipv6.google.com', 443)
1466
1467 def test_ciphers(self):
1468 remote = ("svn.python.org", 443)
1469 with support.transient_internet(remote[0]):
1470 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1471 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1472 s.connect(remote)
1473 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1474 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1475 s.connect(remote)
1476 # Error checking can happen at instantiation or when connecting
1477 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1478 with closing(socket.socket(socket.AF_INET)) as sock:
1479 s = ssl.wrap_socket(sock,
1480 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1481 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001482
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001483 def test_algorithms(self):
1484 # Issue #8484: all algorithms should be available when verifying a
1485 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001486 # SHA256 was added in OpenSSL 0.9.8
1487 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1488 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001489 # sha256.tbs-internet.com needs SNI to use the correct certificate
1490 if not ssl.HAS_SNI:
1491 self.skipTest("SNI needed for this test")
1492 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001493 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001494 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001495 with support.transient_internet("sha256.tbs-internet.com"):
1496 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1497 ctx.verify_mode = ssl.CERT_REQUIRED
1498 ctx.load_verify_locations(sha256_cert)
1499 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1500 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001501 try:
1502 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001503 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001504 sys.stdout.write("\nCipher with %r is %r\n" %
1505 (remote, s.cipher()))
1506 sys.stdout.write("Certificate is:\n%s\n" %
1507 pprint.pformat(s.getpeercert()))
1508 finally:
1509 s.close()
1510
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001511 def test_get_ca_certs_capath(self):
1512 # capath certs are loaded on request
1513 with support.transient_internet("svn.python.org"):
1514 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1515 ctx.verify_mode = ssl.CERT_REQUIRED
1516 ctx.load_verify_locations(capath=CAPATH)
1517 self.assertEqual(ctx.get_ca_certs(), [])
1518 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1519 s.connect(("svn.python.org", 443))
1520 try:
1521 cert = s.getpeercert()
1522 self.assertTrue(cert)
1523 finally:
1524 s.close()
1525 self.assertEqual(len(ctx.get_ca_certs()), 1)
1526
1527 @needs_sni
1528 def test_context_setget(self):
1529 # Check that the context of a connected socket can be replaced.
1530 with support.transient_internet("svn.python.org"):
1531 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1532 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1533 s = socket.socket(socket.AF_INET)
1534 with closing(ctx1.wrap_socket(s)) as ss:
1535 ss.connect(("svn.python.org", 443))
1536 self.assertIs(ss.context, ctx1)
1537 self.assertIs(ss._sslobj.context, ctx1)
1538 ss.context = ctx2
1539 self.assertIs(ss.context, ctx2)
1540 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001541
Bill Janssen98d19da2007-09-10 21:51:02 +00001542try:
1543 import threading
1544except ImportError:
1545 _have_threads = False
1546else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001547 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001548
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001549 from test.ssl_servers import make_https_server
1550
Bill Janssen98d19da2007-09-10 21:51:02 +00001551 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001552
Bill Janssen98d19da2007-09-10 21:51:02 +00001553 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001554
Bill Janssen98d19da2007-09-10 21:51:02 +00001555 """A mildly complicated class, because we want it to work both
1556 with and without the SSL wrapper around the socket connection, so
1557 that we can test the STARTTLS functionality."""
1558
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001559 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001560 self.server = server
1561 self.running = False
1562 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001563 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001564 self.sock.setblocking(1)
1565 self.sslconn = None
1566 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001567 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001568
Antoine Pitrou3945c862010-04-28 21:11:01 +00001569 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001570 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001571 self.sslconn = self.server.context.wrap_socket(
1572 self.sock, server_side=True)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001573 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1574 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001575 except socket.error as e:
1576 # We treat ConnectionResetError as though it were an
1577 # SSLError - OpenSSL on Ubuntu abruptly closes the
1578 # connection when asked to use an unsupported protocol.
1579 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001580 # XXX Various errors can have happened here, for example
1581 # a mismatching protocol version, an invalid certificate,
1582 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001583 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1584 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001585 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001586 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001587 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001588 self.running = False
1589 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001590 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001591 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001592 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001593 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1594 cert = self.sslconn.getpeercert()
1595 if support.verbose and self.server.chatty:
1596 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1597 cert_binary = self.sslconn.getpeercert(True)
1598 if support.verbose and self.server.chatty:
1599 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1600 cipher = self.sslconn.cipher()
1601 if support.verbose and self.server.chatty:
1602 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1603 sys.stdout.write(" server: selected protocol is now "
1604 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001605 return True
1606
1607 def read(self):
1608 if self.sslconn:
1609 return self.sslconn.read()
1610 else:
1611 return self.sock.recv(1024)
1612
1613 def write(self, bytes):
1614 if self.sslconn:
1615 return self.sslconn.write(bytes)
1616 else:
1617 return self.sock.send(bytes)
1618
1619 def close(self):
1620 if self.sslconn:
1621 self.sslconn.close()
1622 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001623 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001624
Antoine Pitrou3945c862010-04-28 21:11:01 +00001625 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001626 self.running = True
1627 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001628 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001629 return
1630 while self.running:
1631 try:
1632 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001633 stripped = msg.strip()
1634 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001635 # eof, so quit this handler
1636 self.running = False
1637 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001638 elif stripped == b'over':
1639 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001640 sys.stdout.write(" server: client closed connection\n")
1641 self.close()
1642 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001643 elif (self.server.starttls_server and
1644 stripped == b'STARTTLS'):
1645 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001646 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001647 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001648 if not self.wrap_conn():
1649 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001650 elif (self.server.starttls_server and self.sslconn
1651 and stripped == b'ENDTLS'):
1652 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001653 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001654 self.write(b"OK\n")
1655 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001656 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001657 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001658 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001659 elif stripped == b'CB tls-unique':
1660 if support.verbose and self.server.connectionchatty:
1661 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1662 data = self.sslconn.get_channel_binding("tls-unique")
1663 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001664 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001665 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001666 self.server.connectionchatty):
1667 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001668 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1669 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001670 self.write(msg.lower())
1671 except ssl.SSLError:
1672 if self.server.chatty:
1673 handle_error("Test server failure:\n")
1674 self.close()
1675 self.running = False
1676 # normally, we'd just stop here, but for the test
1677 # harness, we want to stop the server
1678 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001679
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001680 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001681 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001682 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001683 npn_protocols=None, alpn_protocols=None,
1684 ciphers=None, context=None):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001685 if context:
1686 self.context = context
1687 else:
1688 self.context = ssl.SSLContext(ssl_version
1689 if ssl_version is not None
1690 else ssl.PROTOCOL_TLSv1)
1691 self.context.verify_mode = (certreqs if certreqs is not None
1692 else ssl.CERT_NONE)
1693 if cacerts:
1694 self.context.load_verify_locations(cacerts)
1695 if certificate:
1696 self.context.load_cert_chain(certificate)
1697 if npn_protocols:
1698 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001699 if alpn_protocols:
1700 self.context.set_alpn_protocols(alpn_protocols)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001701 if ciphers:
1702 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001703 self.chatty = chatty
1704 self.connectionchatty = connectionchatty
1705 self.starttls_server = starttls_server
1706 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001707 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001708 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001709 self.active = False
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001710 self.selected_npn_protocols = []
1711 self.selected_alpn_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001712 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001713 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001714 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001715
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001716 def __enter__(self):
1717 self.start(threading.Event())
1718 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001719 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001720
1721 def __exit__(self, *args):
1722 self.stop()
1723 self.join()
1724
Antoine Pitrou3945c862010-04-28 21:11:01 +00001725 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001726 self.flag = flag
1727 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001728
Antoine Pitrou3945c862010-04-28 21:11:01 +00001729 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001730 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001731 self.sock.listen(5)
1732 self.active = True
1733 if self.flag:
1734 # signal an event
1735 self.flag.set()
1736 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001737 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001738 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001739 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001740 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001741 + repr(connaddr) + '\n')
1742 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001743 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001744 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001745 except socket.timeout:
1746 pass
1747 except KeyboardInterrupt:
1748 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001749 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001750
Antoine Pitrou3945c862010-04-28 21:11:01 +00001751 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001752 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001753
Bill Janssen934b16d2008-06-28 22:19:33 +00001754 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001755
Antoine Pitrou3945c862010-04-28 21:11:01 +00001756 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001757
Antoine Pitrou3945c862010-04-28 21:11:01 +00001758 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001759
1760 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001761 self.socket = ssl.wrap_socket(conn, server_side=True,
1762 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001763 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001764 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001765 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001766 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001767
1768 def readable(self):
1769 if isinstance(self.socket, ssl.SSLSocket):
1770 while self.socket.pending() > 0:
1771 self.handle_read_event()
1772 return True
1773
Antoine Pitroufc69af12010-04-24 20:04:58 +00001774 def _do_ssl_handshake(self):
1775 try:
1776 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001777 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1778 return
1779 except ssl.SSLEOFError:
1780 return self.handle_close()
1781 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001782 raise
1783 except socket.error, err:
1784 if err.args[0] == errno.ECONNABORTED:
1785 return self.handle_close()
1786 else:
1787 self._ssl_accepting = False
1788
Bill Janssen934b16d2008-06-28 22:19:33 +00001789 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001790 if self._ssl_accepting:
1791 self._do_ssl_handshake()
1792 else:
1793 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001794 if support.verbose:
1795 sys.stdout.write(" server: read %s from client\n" % repr(data))
1796 if not data:
1797 self.close()
1798 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001799 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001800
1801 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001802 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001803 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001804 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1805
1806 def handle_error(self):
1807 raise
1808
1809 def __init__(self, certfile):
1810 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001811 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1812 self.port = support.bind_port(sock, '')
1813 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001814 self.listen(5)
1815
1816 def handle_accept(self):
1817 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001818 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001819 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1820 self.ConnectionHandler(sock_obj, self.certfile)
1821
1822 def handle_error(self):
1823 raise
1824
1825 def __init__(self, certfile):
1826 self.flag = None
1827 self.active = False
1828 self.server = self.EchoServer(certfile)
1829 self.port = self.server.port
1830 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001831 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001832
1833 def __str__(self):
1834 return "<%s %s>" % (self.__class__.__name__, self.server)
1835
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001836 def __enter__(self):
1837 self.start(threading.Event())
1838 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001839 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001840
1841 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001842 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001843 sys.stdout.write(" cleanup: stopping server.\n")
1844 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001845 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001846 sys.stdout.write(" cleanup: joining server thread.\n")
1847 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001848 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001849 sys.stdout.write(" cleanup: successfully joined.\n")
1850
Antoine Pitrou3945c862010-04-28 21:11:01 +00001851 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001852 self.flag = flag
1853 threading.Thread.start(self)
1854
Antoine Pitrou3945c862010-04-28 21:11:01 +00001855 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001856 self.active = True
1857 if self.flag:
1858 self.flag.set()
1859 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001860 try:
1861 asyncore.loop(1)
1862 except:
1863 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001864
Antoine Pitrou3945c862010-04-28 21:11:01 +00001865 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001866 self.active = False
1867 self.server.close()
1868
Antoine Pitrou3945c862010-04-28 21:11:01 +00001869 def bad_cert_test(certfile):
1870 """
1871 Launch a server with CERT_REQUIRED, and check that trying to
1872 connect to it with the given client certificate fails.
1873 """
Trent Nelsone41b0062008-04-08 23:47:30 +00001874 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00001875 certreqs=ssl.CERT_REQUIRED,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001876 cacerts=CERTFILE, chatty=False,
1877 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001878 with server:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001879 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001880 with closing(socket.socket()) as sock:
1881 s = ssl.wrap_socket(sock,
1882 certfile=certfile,
1883 ssl_version=ssl.PROTOCOL_TLSv1)
1884 s.connect((HOST, server.port))
1885 except ssl.SSLError as x:
1886 if support.verbose:
1887 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1888 except OSError as x:
1889 if support.verbose:
1890 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1891 except OSError as x:
1892 if x.errno != errno.ENOENT:
1893 raise
1894 if support.verbose:
1895 sys.stdout.write("\OSError is %s\n" % str(x))
Bill Janssen98d19da2007-09-10 21:51:02 +00001896 else:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001897 raise AssertionError("Use of invalid cert should have failed!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001898
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001899 def server_params_test(client_context, server_context, indata=b"FOO\n",
1900 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001901 """
1902 Launch a server, connect a client to it and try various reads
1903 and writes.
1904 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001905 stats = {}
1906 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001907 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001908 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001909 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001910 with closing(client_context.wrap_socket(socket.socket(),
1911 server_hostname=sni_name)) as s:
1912 s.connect((HOST, server.port))
1913 for arg in [indata, bytearray(indata), memoryview(indata)]:
1914 if connectionchatty:
1915 if support.verbose:
1916 sys.stdout.write(
1917 " client: sending %r...\n" % indata)
1918 s.write(arg)
1919 outdata = s.read()
1920 if connectionchatty:
1921 if support.verbose:
1922 sys.stdout.write(" client: read %r\n" % outdata)
1923 if outdata != indata.lower():
1924 raise AssertionError(
1925 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1926 % (outdata[:20], len(outdata),
1927 indata[:20].lower(), len(indata)))
1928 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001929 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001930 if support.verbose:
1931 sys.stdout.write(" client: closing connection.\n")
1932 stats.update({
1933 'compression': s.compression(),
1934 'cipher': s.cipher(),
1935 'peercert': s.getpeercert(),
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001936 'client_alpn_protocol': s.selected_alpn_protocol(),
Alex Gaynore98205d2014-09-04 13:33:22 -07001937 'client_npn_protocol': s.selected_npn_protocol(),
1938 'version': s.version(),
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001939 })
1940 s.close()
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001941 stats['server_alpn_protocols'] = server.selected_alpn_protocols
1942 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001943 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001944
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001945 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1946 certsreqs=None, server_options=0, client_options=0):
Alex Gaynore98205d2014-09-04 13:33:22 -07001947 """
1948 Try to SSL-connect using *client_protocol* to *server_protocol*.
1949 If *expect_success* is true, assert that the connection succeeds,
1950 if it's false, assert that the connection fails.
1951 Also, if *expect_success* is a string, assert that it is the protocol
1952 version actually used by the connection.
1953 """
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00001954 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00001955 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00001956 certtype = {
1957 ssl.CERT_NONE: "CERT_NONE",
1958 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1959 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1960 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001961 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001962 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00001963 sys.stdout.write(formatstr %
1964 (ssl.get_protocol_name(client_protocol),
1965 ssl.get_protocol_name(server_protocol),
1966 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001967 client_context = ssl.SSLContext(client_protocol)
1968 client_context.options |= client_options
1969 server_context = ssl.SSLContext(server_protocol)
1970 server_context.options |= server_options
1971
1972 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1973 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1974 # starting from OpenSSL 1.0.0 (see issue #8322).
1975 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1976 client_context.set_ciphers("ALL")
1977
1978 for ctx in (client_context, server_context):
1979 ctx.verify_mode = certsreqs
1980 ctx.load_cert_chain(CERTFILE)
1981 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00001982 try:
Alex Gaynore98205d2014-09-04 13:33:22 -07001983 stats = server_params_test(client_context, server_context,
1984 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001985 # Protocol mismatch can result in either an SSLError, or a
1986 # "Connection reset by peer" error.
1987 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001988 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00001989 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00001990 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001991 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00001992 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00001993 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001994 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001995 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00001996 "Client protocol %s succeeded with server protocol %s!"
1997 % (ssl.get_protocol_name(client_protocol),
1998 ssl.get_protocol_name(server_protocol)))
Alex Gaynore98205d2014-09-04 13:33:22 -07001999 elif (expect_success is not True
2000 and expect_success != stats['version']):
2001 raise AssertionError("version mismatch: expected %r, got %r"
2002 % (expect_success, stats['version']))
Bill Janssen98d19da2007-09-10 21:51:02 +00002003
2004
Bill Janssen934b16d2008-06-28 22:19:33 +00002005 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00002006
Antoine Pitroud75efd92010-08-04 17:38:33 +00002007 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002008 def test_echo(self):
2009 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002010 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002011 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002012 for protocol in PROTOCOLS:
2013 context = ssl.SSLContext(protocol)
2014 context.load_cert_chain(CERTFILE)
2015 server_params_test(context, context,
2016 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002017
Antoine Pitrou3945c862010-04-28 21:11:01 +00002018 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002019 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002020 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002021 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2022 context.verify_mode = ssl.CERT_REQUIRED
2023 context.load_verify_locations(CERTFILE)
2024 context.load_cert_chain(CERTFILE)
2025 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002026 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002027 s = context.wrap_socket(socket.socket(),
2028 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00002029 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002030 # getpeercert() raise ValueError while the handshake isn't
2031 # done.
2032 with self.assertRaises(ValueError):
2033 s.getpeercert()
2034 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00002035 cert = s.getpeercert()
2036 self.assertTrue(cert, "Can't get peer certificate.")
2037 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002038 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002039 sys.stdout.write(pprint.pformat(cert) + '\n')
2040 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2041 if 'subject' not in cert:
2042 self.fail("No subject field in certificate: %s." %
2043 pprint.pformat(cert))
2044 if ((('organizationName', 'Python Software Foundation'),)
2045 not in cert['subject']):
2046 self.fail(
2047 "Missing or invalid 'organizationName' field in certificate subject; "
2048 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002049 self.assertIn('notBefore', cert)
2050 self.assertIn('notAfter', cert)
2051 before = ssl.cert_time_to_seconds(cert['notBefore'])
2052 after = ssl.cert_time_to_seconds(cert['notAfter'])
2053 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002054 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002055
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002056 @unittest.skipUnless(have_verify_flags(),
2057 "verify_flags need OpenSSL > 0.9.8")
2058 def test_crl_check(self):
2059 if support.verbose:
2060 sys.stdout.write("\n")
2061
2062 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2063 server_context.load_cert_chain(SIGNED_CERTFILE)
2064
2065 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2066 context.verify_mode = ssl.CERT_REQUIRED
2067 context.load_verify_locations(SIGNING_CA)
Benjamin Petersond86699f2015-03-04 23:18:48 -05002068 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2069 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002070
2071 # VERIFY_DEFAULT should pass
2072 server = ThreadedEchoServer(context=server_context, chatty=True)
2073 with server:
2074 with closing(context.wrap_socket(socket.socket())) as s:
2075 s.connect((HOST, server.port))
2076 cert = s.getpeercert()
2077 self.assertTrue(cert, "Can't get peer certificate.")
2078
2079 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2080 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2081
2082 server = ThreadedEchoServer(context=server_context, chatty=True)
2083 with server:
2084 with closing(context.wrap_socket(socket.socket())) as s:
2085 with self.assertRaisesRegexp(ssl.SSLError,
2086 "certificate verify failed"):
2087 s.connect((HOST, server.port))
2088
2089 # now load a CRL file. The CRL file is signed by the CA.
2090 context.load_verify_locations(CRLFILE)
2091
2092 server = ThreadedEchoServer(context=server_context, chatty=True)
2093 with server:
2094 with closing(context.wrap_socket(socket.socket())) as s:
2095 s.connect((HOST, server.port))
2096 cert = s.getpeercert()
2097 self.assertTrue(cert, "Can't get peer certificate.")
2098
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002099 def test_check_hostname(self):
2100 if support.verbose:
2101 sys.stdout.write("\n")
2102
2103 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2104 server_context.load_cert_chain(SIGNED_CERTFILE)
2105
2106 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2107 context.verify_mode = ssl.CERT_REQUIRED
2108 context.check_hostname = True
2109 context.load_verify_locations(SIGNING_CA)
2110
2111 # correct hostname should verify
2112 server = ThreadedEchoServer(context=server_context, chatty=True)
2113 with server:
2114 with closing(context.wrap_socket(socket.socket(),
2115 server_hostname="localhost")) as s:
2116 s.connect((HOST, server.port))
2117 cert = s.getpeercert()
2118 self.assertTrue(cert, "Can't get peer certificate.")
2119
2120 # incorrect hostname should raise an exception
2121 server = ThreadedEchoServer(context=server_context, chatty=True)
2122 with server:
2123 with closing(context.wrap_socket(socket.socket(),
2124 server_hostname="invalid")) as s:
2125 with self.assertRaisesRegexp(ssl.CertificateError,
2126 "hostname 'invalid' doesn't match u?'localhost'"):
2127 s.connect((HOST, server.port))
2128
2129 # missing server_hostname arg should cause an exception, too
2130 server = ThreadedEchoServer(context=server_context, chatty=True)
2131 with server:
2132 with closing(socket.socket()) as s:
2133 with self.assertRaisesRegexp(ValueError,
2134 "check_hostname requires server_hostname"):
2135 context.wrap_socket(s)
2136
Antoine Pitrou3945c862010-04-28 21:11:01 +00002137 def test_empty_cert(self):
2138 """Connecting with an empty cert file"""
2139 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2140 "nullcert.pem"))
2141 def test_malformed_cert(self):
2142 """Connecting with a badly formatted certificate (syntax error)"""
2143 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2144 "badcert.pem"))
2145 def test_nonexisting_cert(self):
2146 """Connecting with a non-existing cert file"""
2147 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2148 "wrongcert.pem"))
2149 def test_malformed_key(self):
2150 """Connecting with a badly formatted key (syntax error)"""
2151 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2152 "badkey.pem"))
Bill Janssen98d19da2007-09-10 21:51:02 +00002153
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002154 def test_rude_shutdown(self):
2155 """A brutal shutdown of an SSL server should raise an OSError
2156 in the client when attempting handshake.
2157 """
2158 listener_ready = threading.Event()
2159 listener_gone = threading.Event()
2160
2161 s = socket.socket()
2162 port = support.bind_port(s, HOST)
2163
2164 # `listener` runs in a thread. It sits in an accept() until
2165 # the main thread connects. Then it rudely closes the socket,
2166 # and sets Event `listener_gone` to let the main thread know
2167 # the socket is gone.
2168 def listener():
2169 s.listen(5)
2170 listener_ready.set()
2171 newsock, addr = s.accept()
2172 newsock.close()
2173 s.close()
2174 listener_gone.set()
2175
2176 def connector():
2177 listener_ready.wait()
2178 with closing(socket.socket()) as c:
2179 c.connect((HOST, port))
2180 listener_gone.wait()
2181 try:
2182 ssl_sock = ssl.wrap_socket(c)
Benjamin Petersone208b572014-08-20 14:49:08 -05002183 except socket.error:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002184 pass
2185 else:
2186 self.fail('connecting to closed SSL socket should have failed')
2187
2188 t = threading.Thread(target=listener)
2189 t.start()
2190 try:
2191 connector()
2192 finally:
2193 t.join()
2194
Antoine Pitroud75efd92010-08-04 17:38:33 +00002195 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002196 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2197 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002198 def test_protocol_sslv2(self):
2199 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002200 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002201 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002202 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2203 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2204 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002205 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002206 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2207 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002208 # SSLv23 client with specific SSL options
2209 if no_sslv2_implies_sslv3_hello():
2210 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2211 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2212 client_options=ssl.OP_NO_SSLv2)
2213 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2214 client_options=ssl.OP_NO_SSLv3)
2215 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2216 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002217
Antoine Pitroud75efd92010-08-04 17:38:33 +00002218 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002219 def test_protocol_sslv23(self):
2220 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002221 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002222 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002223 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2224 try:
2225 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2226 except socket.error as x:
2227 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2228 if support.verbose:
2229 sys.stdout.write(
2230 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2231 % str(x))
Benjamin Peterson60766c42014-12-05 21:59:35 -05002232 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson10aaca92015-11-11 22:38:41 -08002233 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002234 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Alex Gaynore98205d2014-09-04 13:33:22 -07002235 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Bill Janssen98d19da2007-09-10 21:51:02 +00002236
Benjamin Peterson60766c42014-12-05 21:59:35 -05002237 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson10aaca92015-11-11 22:38:41 -08002238 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002239 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Alex Gaynore98205d2014-09-04 13:33:22 -07002240 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002241
Benjamin Peterson60766c42014-12-05 21:59:35 -05002242 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson10aaca92015-11-11 22:38:41 -08002243 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002244 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Alex Gaynore98205d2014-09-04 13:33:22 -07002245 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002246
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002247 # Server with specific SSL options
Benjamin Peterson60766c42014-12-05 21:59:35 -05002248 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2249 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002250 server_options=ssl.OP_NO_SSLv3)
2251 # Will choose TLSv1
2252 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2253 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2254 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2255 server_options=ssl.OP_NO_TLSv1)
2256
2257
Antoine Pitroud75efd92010-08-04 17:38:33 +00002258 @skip_if_broken_ubuntu_ssl
Benjamin Peterson60766c42014-12-05 21:59:35 -05002259 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2260 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002261 def test_protocol_sslv3(self):
2262 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002263 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002264 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002265 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2266 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2267 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002268 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2269 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002270 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2271 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002272 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002273 if no_sslv2_implies_sslv3_hello():
2274 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Benjamin Peterson10aaca92015-11-11 22:38:41 -08002275 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2276 False, client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002277
Antoine Pitroud75efd92010-08-04 17:38:33 +00002278 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002279 def test_protocol_tlsv1(self):
2280 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002281 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002282 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002283 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2284 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2285 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002286 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2287 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002288 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2289 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002290 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2291 client_options=ssl.OP_NO_TLSv1)
2292
2293 @skip_if_broken_ubuntu_ssl
2294 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2295 "TLS version 1.1 not supported.")
2296 def test_protocol_tlsv1_1(self):
2297 """Connecting to a TLSv1.1 server with various client options.
2298 Testing against older TLS versions."""
2299 if support.verbose:
2300 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002301 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002302 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2303 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002304 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2305 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002306 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2307 client_options=ssl.OP_NO_TLSv1_1)
2308
Alex Gaynore98205d2014-09-04 13:33:22 -07002309 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002310 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2311 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2312
2313
2314 @skip_if_broken_ubuntu_ssl
2315 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2316 "TLS version 1.2 not supported.")
2317 def test_protocol_tlsv1_2(self):
2318 """Connecting to a TLSv1.2 server with various client options.
2319 Testing against older TLS versions."""
2320 if support.verbose:
2321 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002322 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002323 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2324 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2325 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2326 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002327 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2328 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002329 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2330 client_options=ssl.OP_NO_TLSv1_2)
2331
Alex Gaynore98205d2014-09-04 13:33:22 -07002332 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002333 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2334 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2335 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2336 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002337
Antoine Pitrou3945c862010-04-28 21:11:01 +00002338 def test_starttls(self):
2339 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002340 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002341
Trent Nelsone41b0062008-04-08 23:47:30 +00002342 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002343 ssl_version=ssl.PROTOCOL_TLSv1,
2344 starttls_server=True,
2345 chatty=True,
2346 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002347 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002348 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002349 s = socket.socket()
2350 s.setblocking(1)
2351 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002352 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002353 sys.stdout.write("\n")
2354 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002355 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002356 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002357 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002358 if wrapped:
2359 conn.write(indata)
2360 outdata = conn.read()
2361 else:
2362 s.send(indata)
2363 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002364 msg = outdata.strip().lower()
2365 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002366 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002367 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002368 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002369 " client: read %r from server, starting TLS...\n"
2370 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002371 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2372 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002373 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002374 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002375 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002376 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002377 " client: read %r from server, ending TLS...\n"
2378 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002379 s = conn.unwrap()
2380 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002381 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002382 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002383 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002384 " client: read %r from server\n" % msg)
2385 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002386 sys.stdout.write(" client: closing connection.\n")
2387 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002388 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002389 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002390 s.send(b"over\n")
2391 if wrapped:
2392 conn.close()
2393 else:
2394 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002395
Antoine Pitrou3945c862010-04-28 21:11:01 +00002396 def test_socketserver(self):
2397 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002398 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002399 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002400 if support.verbose:
2401 sys.stdout.write('\n')
2402 with open(CERTFILE, 'rb') as f:
2403 d1 = f.read()
2404 d2 = ''
2405 # now fetch the same data from the HTTPS server
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -06002406 url = 'https://localhost:%d/%s' % (
2407 server.port, os.path.split(CERTFILE)[1])
2408 context = ssl.create_default_context(cafile=CERTFILE)
Benjamin Petersonb0609ec2014-11-23 11:52:46 -06002409 f = urllib2.urlopen(url, context=context)
Bill Janssen296a59d2007-09-16 22:06:00 +00002410 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002411 dlen = f.info().getheader("content-length")
2412 if dlen and (int(dlen) > 0):
2413 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002414 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002415 sys.stdout.write(
2416 " client: read %d bytes from remote server '%s'\n"
2417 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002418 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002419 f.close()
2420 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002421
Antoine Pitrou3945c862010-04-28 21:11:01 +00002422 def test_asyncore_server(self):
2423 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002424 indata = "TEST MESSAGE of mixed case\n"
2425
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002426 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002427 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002428
2429 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002430 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002431 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002432 s = ssl.wrap_socket(socket.socket())
2433 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002434 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002435 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002436 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002437 s.write(indata)
2438 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002439 if support.verbose:
2440 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002441 if outdata != indata.lower():
2442 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002443 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2444 % (outdata[:20], len(outdata),
2445 indata[:20].lower(), len(indata)))
2446 s.write(b"over\n")
2447 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002448 sys.stdout.write(" client: closing connection.\n")
2449 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002450 if support.verbose:
2451 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002452
Antoine Pitrou3945c862010-04-28 21:11:01 +00002453 def test_recv_send(self):
2454 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002455 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002456 sys.stdout.write("\n")
2457
2458 server = ThreadedEchoServer(CERTFILE,
2459 certreqs=ssl.CERT_NONE,
2460 ssl_version=ssl.PROTOCOL_TLSv1,
2461 cacerts=CERTFILE,
2462 chatty=True,
2463 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002464 with server:
2465 s = ssl.wrap_socket(socket.socket(),
2466 server_side=False,
2467 certfile=CERTFILE,
2468 ca_certs=CERTFILE,
2469 cert_reqs=ssl.CERT_NONE,
2470 ssl_version=ssl.PROTOCOL_TLSv1)
2471 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002472 # helper methods for standardising recv* method signatures
2473 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002474 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002475 count = s.recv_into(b)
2476 return b[:count]
2477
2478 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002479 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002480 count, addr = s.recvfrom_into(b)
2481 return b[:count]
2482
2483 # (name, method, whether to expect success, *args)
2484 send_methods = [
2485 ('send', s.send, True, []),
2486 ('sendto', s.sendto, False, ["some.address"]),
2487 ('sendall', s.sendall, True, []),
2488 ]
2489 recv_methods = [
2490 ('recv', s.recv, True, []),
2491 ('recvfrom', s.recvfrom, False, ["some.address"]),
2492 ('recv_into', _recv_into, True, []),
2493 ('recvfrom_into', _recvfrom_into, False, []),
2494 ]
2495 data_prefix = u"PREFIX_"
2496
2497 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002498 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002499 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002500 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002501 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002502 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002503 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002504 "While sending with <<{name:s}>> bad data "
2505 "<<{outdata:r}>> ({nout:d}) received; "
2506 "expected <<{indata:r}>> ({nin:d})\n".format(
2507 name=meth_name, outdata=outdata[:20],
2508 nout=len(outdata),
2509 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002510 )
2511 )
2512 except ValueError as e:
2513 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002514 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002515 "Failed to send with method <<{name:s}>>; "
2516 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002517 )
2518 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002519 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002520 "Method <<{name:s}>> failed with unexpected "
2521 "exception message: {exp:s}\n".format(
2522 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002523 )
2524 )
2525
2526 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002527 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002528 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002529 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002530 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002531 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002532 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002533 "While receiving with <<{name:s}>> bad data "
2534 "<<{outdata:r}>> ({nout:d}) received; "
2535 "expected <<{indata:r}>> ({nin:d})\n".format(
2536 name=meth_name, outdata=outdata[:20],
2537 nout=len(outdata),
2538 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002539 )
2540 )
2541 except ValueError as e:
2542 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002543 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002544 "Failed to receive with method <<{name:s}>>; "
2545 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002546 )
2547 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002548 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002549 "Method <<{name:s}>> failed with unexpected "
2550 "exception message: {exp:s}\n".format(
2551 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002552 )
2553 )
2554 # consume data
2555 s.read()
2556
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002557 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002558 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002559
Antoine Pitroufc69af12010-04-24 20:04:58 +00002560 def test_handshake_timeout(self):
2561 # Issue #5103: SSL handshake must respect the socket timeout
2562 server = socket.socket(socket.AF_INET)
2563 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002564 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002565 started = threading.Event()
2566 finish = False
2567
2568 def serve():
2569 server.listen(5)
2570 started.set()
2571 conns = []
2572 while not finish:
2573 r, w, e = select.select([server], [], [], 0.1)
2574 if server in r:
2575 # Let the socket hang around rather than having
2576 # it closed by garbage collection.
2577 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002578 for sock in conns:
2579 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002580
2581 t = threading.Thread(target=serve)
2582 t.start()
2583 started.wait()
2584
2585 try:
2586 try:
2587 c = socket.socket(socket.AF_INET)
2588 c.settimeout(0.2)
2589 c.connect((host, port))
2590 # Will attempt handshake and time out
2591 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2592 ssl.wrap_socket, c)
2593 finally:
2594 c.close()
2595 try:
2596 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002597 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002598 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002599 # Will attempt handshake and time out
2600 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2601 c.connect, (host, port))
2602 finally:
2603 c.close()
2604 finally:
2605 finish = True
2606 t.join()
2607 server.close()
2608
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002609 def test_server_accept(self):
2610 # Issue #16357: accept() on a SSLSocket created through
2611 # SSLContext.wrap_socket().
2612 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2613 context.verify_mode = ssl.CERT_REQUIRED
2614 context.load_verify_locations(CERTFILE)
2615 context.load_cert_chain(CERTFILE)
2616 server = socket.socket(socket.AF_INET)
2617 host = "127.0.0.1"
2618 port = support.bind_port(server)
2619 server = context.wrap_socket(server, server_side=True)
2620
2621 evt = threading.Event()
2622 remote = [None]
2623 peer = [None]
2624 def serve():
2625 server.listen(5)
2626 # Block on the accept and wait on the connection to close.
2627 evt.set()
2628 remote[0], peer[0] = server.accept()
2629 remote[0].recv(1)
2630
2631 t = threading.Thread(target=serve)
2632 t.start()
2633 # Client wait until server setup and perform a connect.
2634 evt.wait()
2635 client = context.wrap_socket(socket.socket())
2636 client.connect((host, port))
2637 client_addr = client.getsockname()
2638 client.close()
2639 t.join()
2640 remote[0].close()
2641 server.close()
2642 # Sanity checks.
2643 self.assertIsInstance(remote[0], ssl.SSLSocket)
2644 self.assertEqual(peer[0], client_addr)
2645
2646 def test_getpeercert_enotconn(self):
2647 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2648 with closing(context.wrap_socket(socket.socket())) as sock:
2649 with self.assertRaises(socket.error) as cm:
2650 sock.getpeercert()
2651 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2652
2653 def test_do_handshake_enotconn(self):
2654 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2655 with closing(context.wrap_socket(socket.socket())) as sock:
2656 with self.assertRaises(socket.error) as cm:
2657 sock.do_handshake()
2658 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2659
Antoine Pitroud76088d2012-01-03 22:46:48 +01002660 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002661 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2662 try:
2663 # Force a set of weak ciphers on our client context
2664 context.set_ciphers("DES")
2665 except ssl.SSLError:
2666 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002667 with ThreadedEchoServer(CERTFILE,
2668 ssl_version=ssl.PROTOCOL_SSLv23,
2669 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002670 with closing(context.wrap_socket(socket.socket())) as s:
2671 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002672 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002673 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2674
Alex Gaynore98205d2014-09-04 13:33:22 -07002675 def test_version_basic(self):
2676 """
2677 Basic tests for SSLSocket.version().
2678 More tests are done in the test_protocol_*() methods.
2679 """
2680 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2681 with ThreadedEchoServer(CERTFILE,
2682 ssl_version=ssl.PROTOCOL_TLSv1,
2683 chatty=False) as server:
2684 with closing(context.wrap_socket(socket.socket())) as s:
2685 self.assertIs(s.version(), None)
2686 s.connect((HOST, server.port))
2687 self.assertEqual(s.version(), "TLSv1")
2688 self.assertIs(s.version(), None)
2689
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002690 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2691 def test_default_ecdh_curve(self):
2692 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2693 # should be enabled by default on SSL contexts.
2694 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2695 context.load_cert_chain(CERTFILE)
2696 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2697 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2698 # our default cipher list should prefer ECDH-based ciphers
2699 # automatically.
2700 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2701 context.set_ciphers("ECCdraft:ECDH")
2702 with ThreadedEchoServer(context=context) as server:
2703 with closing(context.wrap_socket(socket.socket())) as s:
2704 s.connect((HOST, server.port))
2705 self.assertIn("ECDH", s.cipher()[0])
2706
2707 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2708 "'tls-unique' channel binding not available")
2709 def test_tls_unique_channel_binding(self):
2710 """Test tls-unique channel binding."""
2711 if support.verbose:
2712 sys.stdout.write("\n")
2713
2714 server = ThreadedEchoServer(CERTFILE,
2715 certreqs=ssl.CERT_NONE,
2716 ssl_version=ssl.PROTOCOL_TLSv1,
2717 cacerts=CERTFILE,
2718 chatty=True,
2719 connectionchatty=False)
2720 with server:
2721 s = ssl.wrap_socket(socket.socket(),
2722 server_side=False,
2723 certfile=CERTFILE,
2724 ca_certs=CERTFILE,
2725 cert_reqs=ssl.CERT_NONE,
2726 ssl_version=ssl.PROTOCOL_TLSv1)
2727 s.connect((HOST, server.port))
2728 # get the data
2729 cb_data = s.get_channel_binding("tls-unique")
2730 if support.verbose:
2731 sys.stdout.write(" got channel binding data: {0!r}\n"
2732 .format(cb_data))
2733
2734 # check if it is sane
2735 self.assertIsNotNone(cb_data)
2736 self.assertEqual(len(cb_data), 12) # True for TLSv1
2737
2738 # and compare with the peers version
2739 s.write(b"CB tls-unique\n")
2740 peer_data_repr = s.read().strip()
2741 self.assertEqual(peer_data_repr,
2742 repr(cb_data).encode("us-ascii"))
2743 s.close()
2744
2745 # now, again
2746 s = ssl.wrap_socket(socket.socket(),
2747 server_side=False,
2748 certfile=CERTFILE,
2749 ca_certs=CERTFILE,
2750 cert_reqs=ssl.CERT_NONE,
2751 ssl_version=ssl.PROTOCOL_TLSv1)
2752 s.connect((HOST, server.port))
2753 new_cb_data = s.get_channel_binding("tls-unique")
2754 if support.verbose:
2755 sys.stdout.write(" got another channel binding data: {0!r}\n"
2756 .format(new_cb_data))
2757 # is it really unique
2758 self.assertNotEqual(cb_data, new_cb_data)
2759 self.assertIsNotNone(cb_data)
2760 self.assertEqual(len(cb_data), 12) # True for TLSv1
2761 s.write(b"CB tls-unique\n")
2762 peer_data_repr = s.read().strip()
2763 self.assertEqual(peer_data_repr,
2764 repr(new_cb_data).encode("us-ascii"))
2765 s.close()
2766
2767 def test_compression(self):
2768 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2769 context.load_cert_chain(CERTFILE)
2770 stats = server_params_test(context, context,
2771 chatty=True, connectionchatty=True)
2772 if support.verbose:
2773 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2774 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2775
2776 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2777 "ssl.OP_NO_COMPRESSION needed for this test")
2778 def test_compression_disabled(self):
2779 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2780 context.load_cert_chain(CERTFILE)
2781 context.options |= ssl.OP_NO_COMPRESSION
2782 stats = server_params_test(context, context,
2783 chatty=True, connectionchatty=True)
2784 self.assertIs(stats['compression'], None)
2785
2786 def test_dh_params(self):
2787 # Check we can get a connection with ephemeral Diffie-Hellman
2788 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2789 context.load_cert_chain(CERTFILE)
2790 context.load_dh_params(DHFILE)
2791 context.set_ciphers("kEDH")
2792 stats = server_params_test(context, context,
2793 chatty=True, connectionchatty=True)
2794 cipher = stats["cipher"][0]
2795 parts = cipher.split("-")
2796 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2797 self.fail("Non-DH cipher: " + cipher[0])
2798
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002799 def test_selected_alpn_protocol(self):
2800 # selected_alpn_protocol() is None unless ALPN is used.
2801 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2802 context.load_cert_chain(CERTFILE)
2803 stats = server_params_test(context, context,
2804 chatty=True, connectionchatty=True)
2805 self.assertIs(stats['client_alpn_protocol'], None)
2806
2807 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
2808 def test_selected_alpn_protocol_if_server_uses_alpn(self):
2809 # selected_alpn_protocol() is None unless ALPN is used by the client.
2810 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2811 client_context.load_verify_locations(CERTFILE)
2812 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2813 server_context.load_cert_chain(CERTFILE)
2814 server_context.set_alpn_protocols(['foo', 'bar'])
2815 stats = server_params_test(client_context, server_context,
2816 chatty=True, connectionchatty=True)
2817 self.assertIs(stats['client_alpn_protocol'], None)
2818
2819 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
2820 def test_alpn_protocols(self):
2821 server_protocols = ['foo', 'bar', 'milkshake']
2822 protocol_tests = [
2823 (['foo', 'bar'], 'foo'),
Benjamin Petersonaa707582015-01-23 17:30:26 -05002824 (['bar', 'foo'], 'foo'),
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002825 (['milkshake'], 'milkshake'),
Benjamin Petersonaa707582015-01-23 17:30:26 -05002826 (['http/3.0', 'http/4.0'], None)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002827 ]
2828 for client_protocols, expected in protocol_tests:
2829 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2830 server_context.load_cert_chain(CERTFILE)
2831 server_context.set_alpn_protocols(server_protocols)
2832 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2833 client_context.load_cert_chain(CERTFILE)
2834 client_context.set_alpn_protocols(client_protocols)
2835 stats = server_params_test(client_context, server_context,
2836 chatty=True, connectionchatty=True)
2837
2838 msg = "failed trying %s (s) and %s (c).\n" \
2839 "was expecting %s, but got %%s from the %%s" \
2840 % (str(server_protocols), str(client_protocols),
2841 str(expected))
2842 client_result = stats['client_alpn_protocol']
2843 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2844 server_result = stats['server_alpn_protocols'][-1] \
2845 if len(stats['server_alpn_protocols']) else 'nothing'
2846 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2847
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002848 def test_selected_npn_protocol(self):
2849 # selected_npn_protocol() is None unless NPN is used
2850 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2851 context.load_cert_chain(CERTFILE)
2852 stats = server_params_test(context, context,
2853 chatty=True, connectionchatty=True)
2854 self.assertIs(stats['client_npn_protocol'], None)
2855
2856 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2857 def test_npn_protocols(self):
2858 server_protocols = ['http/1.1', 'spdy/2']
2859 protocol_tests = [
2860 (['http/1.1', 'spdy/2'], 'http/1.1'),
2861 (['spdy/2', 'http/1.1'], 'http/1.1'),
2862 (['spdy/2', 'test'], 'spdy/2'),
2863 (['abc', 'def'], 'abc')
2864 ]
2865 for client_protocols, expected in protocol_tests:
2866 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2867 server_context.load_cert_chain(CERTFILE)
2868 server_context.set_npn_protocols(server_protocols)
2869 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2870 client_context.load_cert_chain(CERTFILE)
2871 client_context.set_npn_protocols(client_protocols)
2872 stats = server_params_test(client_context, server_context,
2873 chatty=True, connectionchatty=True)
2874
2875 msg = "failed trying %s (s) and %s (c).\n" \
2876 "was expecting %s, but got %%s from the %%s" \
2877 % (str(server_protocols), str(client_protocols),
2878 str(expected))
2879 client_result = stats['client_npn_protocol']
2880 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2881 server_result = stats['server_npn_protocols'][-1] \
2882 if len(stats['server_npn_protocols']) else 'nothing'
2883 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2884
2885 def sni_contexts(self):
2886 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2887 server_context.load_cert_chain(SIGNED_CERTFILE)
2888 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2889 other_context.load_cert_chain(SIGNED_CERTFILE2)
2890 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2891 client_context.verify_mode = ssl.CERT_REQUIRED
2892 client_context.load_verify_locations(SIGNING_CA)
2893 return server_context, other_context, client_context
2894
2895 def check_common_name(self, stats, name):
2896 cert = stats['peercert']
2897 self.assertIn((('commonName', name),), cert['subject'])
2898
2899 @needs_sni
2900 def test_sni_callback(self):
2901 calls = []
2902 server_context, other_context, client_context = self.sni_contexts()
2903
2904 def servername_cb(ssl_sock, server_name, initial_context):
2905 calls.append((server_name, initial_context))
2906 if server_name is not None:
2907 ssl_sock.context = other_context
2908 server_context.set_servername_callback(servername_cb)
2909
2910 stats = server_params_test(client_context, server_context,
2911 chatty=True,
2912 sni_name='supermessage')
2913 # The hostname was fetched properly, and the certificate was
2914 # changed for the connection.
2915 self.assertEqual(calls, [("supermessage", server_context)])
2916 # CERTFILE4 was selected
2917 self.check_common_name(stats, 'fakehostname')
2918
2919 calls = []
2920 # The callback is called with server_name=None
2921 stats = server_params_test(client_context, server_context,
2922 chatty=True,
2923 sni_name=None)
2924 self.assertEqual(calls, [(None, server_context)])
2925 self.check_common_name(stats, 'localhost')
2926
2927 # Check disabling the callback
2928 calls = []
2929 server_context.set_servername_callback(None)
2930
2931 stats = server_params_test(client_context, server_context,
2932 chatty=True,
2933 sni_name='notfunny')
2934 # Certificate didn't change
2935 self.check_common_name(stats, 'localhost')
2936 self.assertEqual(calls, [])
2937
2938 @needs_sni
2939 def test_sni_callback_alert(self):
2940 # Returning a TLS alert is reflected to the connecting client
2941 server_context, other_context, client_context = self.sni_contexts()
2942
2943 def cb_returning_alert(ssl_sock, server_name, initial_context):
2944 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2945 server_context.set_servername_callback(cb_returning_alert)
2946
2947 with self.assertRaises(ssl.SSLError) as cm:
2948 stats = server_params_test(client_context, server_context,
2949 chatty=False,
2950 sni_name='supermessage')
2951 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2952
2953 @needs_sni
2954 def test_sni_callback_raising(self):
2955 # Raising fails the connection with a TLS handshake failure alert.
2956 server_context, other_context, client_context = self.sni_contexts()
2957
2958 def cb_raising(ssl_sock, server_name, initial_context):
Serhiy Storchaka5312a7f2015-01-31 11:27:06 +02002959 1.0/0.0
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002960 server_context.set_servername_callback(cb_raising)
2961
2962 with self.assertRaises(ssl.SSLError) as cm, \
2963 support.captured_stderr() as stderr:
2964 stats = server_params_test(client_context, server_context,
2965 chatty=False,
2966 sni_name='supermessage')
2967 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2968 self.assertIn("ZeroDivisionError", stderr.getvalue())
2969
2970 @needs_sni
2971 def test_sni_callback_wrong_return_type(self):
2972 # Returning the wrong return type terminates the TLS connection
2973 # with an internal error alert.
2974 server_context, other_context, client_context = self.sni_contexts()
2975
2976 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2977 return "foo"
2978 server_context.set_servername_callback(cb_wrong_return_type)
2979
2980 with self.assertRaises(ssl.SSLError) as cm, \
2981 support.captured_stderr() as stderr:
2982 stats = server_params_test(client_context, server_context,
2983 chatty=False,
2984 sni_name='supermessage')
2985 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2986 self.assertIn("TypeError", stderr.getvalue())
2987
2988 def test_read_write_after_close_raises_valuerror(self):
2989 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2990 context.verify_mode = ssl.CERT_REQUIRED
2991 context.load_verify_locations(CERTFILE)
2992 context.load_cert_chain(CERTFILE)
2993 server = ThreadedEchoServer(context=context, chatty=False)
2994
2995 with server:
2996 s = context.wrap_socket(socket.socket())
2997 s.connect((HOST, server.port))
2998 s.close()
2999
3000 self.assertRaises(ValueError, s.read, 1024)
3001 self.assertRaises(ValueError, s.write, b'hello')
3002
Bill Janssen61c001a2008-09-08 16:37:24 +00003003
Neal Norwitz9eb9b102007-08-27 01:15:33 +00003004def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003005 if support.verbose:
3006 plats = {
3007 'Linux': platform.linux_distribution,
3008 'Mac': platform.mac_ver,
3009 'Windows': platform.win32_ver,
3010 }
3011 for name, func in plats.items():
3012 plat = func()
3013 if plat and plat[0]:
3014 plat = '%s %r' % (name, plat)
3015 break
3016 else:
3017 plat = repr(platform.platform())
3018 print("test_ssl: testing with %r %r" %
3019 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3020 print(" under %s" % plat)
3021 print(" HAS_SNI = %r" % ssl.HAS_SNI)
3022 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3023 try:
3024 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3025 except AttributeError:
3026 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00003027
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003028 for filename in [
3029 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
3030 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
3031 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
3032 BADCERT, BADKEY, EMPTYCERT]:
3033 if not os.path.exists(filename):
3034 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003035
Benjamin Peterson2f334562014-10-01 23:53:01 -04003036 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003037
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003038 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00003039 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00003040
Bill Janssen98d19da2007-09-10 21:51:02 +00003041 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003042 thread_info = support.threading_setup()
3043 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00003044 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003045
Antoine Pitrou3945c862010-04-28 21:11:01 +00003046 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003047 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00003048 finally:
3049 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003050 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003051
3052if __name__ == "__main__":
3053 test_main()