blob: b2d57cf874787ce6701f00b4908a1d44bb1aa235 [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Bill Janssen934b16d2008-06-28 22:19:33 +00007import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import socket
Bill Janssen934b16d2008-06-28 22:19:33 +00009import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000010import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050011import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000012import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import os
Antoine Pitroub558f172010-04-23 23:25:45 +000014import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000015import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050016import tempfile
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -060017import urllib2
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000018import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000019import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000020import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050021import functools
22from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000023
Benjamin Petersondaeb9252014-08-20 14:14:50 -050024ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000025
Benjamin Petersondaeb9252014-08-20 14:14:50 -050026PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
27HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Benjamin Petersondaeb9252014-08-20 14:14:50 -050029def data_file(*name):
30 return os.path.join(os.path.dirname(__file__), *name)
31
32# The custom key and certificate files used in test_ssl are generated
33# using Lib/test/make_ssl_certs.py.
34# Other certificates are simply fetched from the Internet servers they
35# are meant to authenticate.
36
37CERTFILE = data_file("keycert.pem")
38BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
39ONLYCERT = data_file("ssl_cert.pem")
40ONLYKEY = data_file("ssl_key.pem")
41BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
42BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
43CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
44ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
45KEY_PASSWORD = "somepass"
46CAPATH = data_file("capath")
47BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
48CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
49CAFILE_CACERT = data_file("capath", "5ed36f99.0")
50
51
52# empty CRL
53CRLFILE = data_file("revocation.crl")
54
55# Two keys and certs signed by the same CA (for SNI tests)
56SIGNED_CERTFILE = data_file("keycert3.pem")
57SIGNED_CERTFILE2 = data_file("keycert4.pem")
58SIGNING_CA = data_file("pycacert.pem")
59
60SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
61
62EMPTYCERT = data_file("nullcert.pem")
63BADCERT = data_file("badcert.pem")
64WRONGCERT = data_file("XXXnonexisting.pem")
65BADKEY = data_file("badkey.pem")
66NOKIACERT = data_file("nokia.pem")
67NULLBYTECERT = data_file("nullbytecert.pem")
68
69DHFILE = data_file("dh512.pem")
70BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
71
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000072
Neal Norwitz3e533c22007-08-27 01:03:18 +000073def handle_error(prefix):
74 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050075 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000077
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000078
79class BasicTests(unittest.TestCase):
80
Antoine Pitrou3945c862010-04-28 21:11:01 +000081 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000082 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000083 try:
84 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
85 except IOError, e:
86 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
87 pass
88 else:
89 raise
90 try:
91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
92 except IOError, e:
93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94 pass
95 else:
96 raise
Benjamin Peterson2f334562014-10-01 23:53:01 -040097
98
Benjamin Petersondaeb9252014-08-20 14:14:50 -050099def can_clear_options():
100 # 0.9.8m or higher
101 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
102
103def no_sslv2_implies_sslv3_hello():
104 # 0.9.7h or higher
105 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
106
107def have_verify_flags():
108 # 0.9.8 or higher
109 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
110
111def utc_offset(): #NOTE: ignore issues like #1647654
112 # local time = utc time + utc offset
113 if time.daylight and time.localtime().tm_isdst > 0:
114 return -time.altzone # seconds
115 return -time.timezone
116
117def asn1time(cert_time):
118 # Some versions of OpenSSL ignore seconds, see #18207
119 # 0.9.8.i
120 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
121 fmt = "%b %d %H:%M:%S %Y GMT"
122 dt = datetime.datetime.strptime(cert_time, fmt)
123 dt = dt.replace(second=0)
124 cert_time = dt.strftime(fmt)
125 # %d adds leading zero but ASN1_TIME_print() uses leading space
126 if cert_time[4] == "0":
127 cert_time = cert_time[:4] + " " + cert_time[5:]
128
129 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000130
Antoine Pitroud75efd92010-08-04 17:38:33 +0000131# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
132def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200133 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200134 @functools.wraps(func)
135 def f(*args, **kwargs):
136 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500137 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
138 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200139 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500140 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200141 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
142 return func(*args, **kwargs)
143 return f
144 else:
145 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000146
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500147needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
148
Antoine Pitroud75efd92010-08-04 17:38:33 +0000149
150class BasicSocketTests(unittest.TestCase):
151
Antoine Pitrou3945c862010-04-28 21:11:01 +0000152 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000153 ssl.CERT_NONE
154 ssl.CERT_OPTIONAL
155 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500156 ssl.OP_CIPHER_SERVER_PREFERENCE
157 ssl.OP_SINGLE_DH_USE
158 if ssl.HAS_ECDH:
159 ssl.OP_SINGLE_ECDH_USE
160 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
161 ssl.OP_NO_COMPRESSION
162 self.assertIn(ssl.HAS_SNI, {True, False})
163 self.assertIn(ssl.HAS_ECDH, {True, False})
164
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000165
Antoine Pitrou3945c862010-04-28 21:11:01 +0000166 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500168 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000169 sys.stdout.write("\n RAND_status is %d (%s)\n"
170 % (v, (v and "sufficient randomness") or
171 "insufficient randomness"))
Victor Stinner7c906672015-01-06 13:53:37 +0100172 if hasattr(ssl, 'RAND_egd'):
173 self.assertRaises(TypeError, ssl.RAND_egd, 1)
174 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000175 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000176
Antoine Pitrou3945c862010-04-28 21:11:01 +0000177 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000178 # note that this uses an 'unofficial' function in _ssl.c,
179 # provided solely for this test, to exercise the certificate
180 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500181 p = ssl._ssl._test_decode_cert(CERTFILE)
182 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000183 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500184 self.assertEqual(p['issuer'],
185 ((('countryName', 'XY'),),
186 (('localityName', 'Castle Anthrax'),),
187 (('organizationName', 'Python Software Foundation'),),
188 (('commonName', 'localhost'),))
189 )
190 # Note the next three asserts will fail if the keys are regenerated
191 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
192 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
193 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200194 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100195 ((('countryName', 'XY'),),
196 (('localityName', 'Castle Anthrax'),),
197 (('organizationName', 'Python Software Foundation'),),
198 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200199 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100200 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200201 # Issue #13034: the subjectAltName in some certificates
202 # (notably projects.developer.nokia.com:443) wasn't parsed
203 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500204 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200205 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
206 self.assertEqual(p['subjectAltName'],
207 (('DNS', 'projects.developer.nokia.com'),
208 ('DNS', 'projects.forum.nokia.com'))
209 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500210 # extra OCSP and AIA fields
211 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
212 self.assertEqual(p['caIssuers'],
213 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
214 self.assertEqual(p['crlDistributionPoints'],
215 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000216
Christian Heimes88b174c2013-08-17 00:54:47 +0200217 def test_parse_cert_CVE_2013_4238(self):
218 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500219 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200220 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
221 subject = ((('countryName', 'US'),),
222 (('stateOrProvinceName', 'Oregon'),),
223 (('localityName', 'Beaverton'),),
224 (('organizationName', 'Python Software Foundation'),),
225 (('organizationalUnitName', 'Python Core Development'),),
226 (('commonName', 'null.python.org\x00example.org'),),
227 (('emailAddress', 'python-dev@python.org'),))
228 self.assertEqual(p['subject'], subject)
229 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500230 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200231 san = (('DNS', 'altnull.python.org\x00example.com'),
232 ('email', 'null@python.org\x00user@example.org'),
233 ('URI', 'http://null.python.org\x00http://example.org'),
234 ('IP Address', '192.0.2.1'),
235 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
236 else:
237 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
238 san = (('DNS', 'altnull.python.org\x00example.com'),
239 ('email', 'null@python.org\x00user@example.org'),
240 ('URI', 'http://null.python.org\x00http://example.org'),
241 ('IP Address', '192.0.2.1'),
242 ('IP Address', '<invalid>'))
243
244 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200245
Antoine Pitrou3945c862010-04-28 21:11:01 +0000246 def test_DER_to_PEM(self):
247 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
248 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000249 d1 = ssl.PEM_cert_to_DER_cert(pem)
250 p2 = ssl.DER_cert_to_PEM_cert(d1)
251 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000252 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000253 if not p2.startswith(ssl.PEM_HEADER + '\n'):
254 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
255 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
256 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000257
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000258 def test_openssl_version(self):
259 n = ssl.OPENSSL_VERSION_NUMBER
260 t = ssl.OPENSSL_VERSION_INFO
261 s = ssl.OPENSSL_VERSION
262 self.assertIsInstance(n, (int, long))
263 self.assertIsInstance(t, tuple)
264 self.assertIsInstance(s, str)
265 # Some sanity checks follow
266 # >= 0.9
267 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400268 # < 3.0
269 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000270 major, minor, fix, patch, status = t
271 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400272 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000273 self.assertGreaterEqual(minor, 0)
274 self.assertLess(minor, 256)
275 self.assertGreaterEqual(fix, 0)
276 self.assertLess(fix, 256)
277 self.assertGreaterEqual(patch, 0)
Ned Deilyfa119782015-02-05 17:19:11 +1100278 self.assertLessEqual(patch, 63)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000279 self.assertGreaterEqual(status, 0)
280 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400281 # Version string as returned by {Open,Libre}SSL, the format might change
282 if "LibreSSL" in s:
283 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
284 (s, t))
285 else:
286 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
287 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000288
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500289 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000290 def test_refcycle(self):
291 # Issue #7943: an SSL object doesn't create reference cycles with
292 # itself.
293 s = socket.socket(socket.AF_INET)
294 ss = ssl.wrap_socket(s)
295 wr = weakref.ref(ss)
296 del ss
297 self.assertEqual(wr(), None)
298
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000299 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500300 # Methods on an unconnected SSLSocket propagate the original
301 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000302 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500303 with closing(ssl.wrap_socket(s)) as ss:
304 self.assertRaises(socket.error, ss.recv, 1)
305 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
306 self.assertRaises(socket.error, ss.recvfrom, 1)
307 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
308 self.assertRaises(socket.error, ss.send, b'x')
309 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
310
311 def test_timeout(self):
312 # Issue #8524: when creating an SSL socket, the timeout of the
313 # original socket should be retained.
314 for timeout in (None, 0.0, 5.0):
315 s = socket.socket(socket.AF_INET)
316 s.settimeout(timeout)
317 with closing(ssl.wrap_socket(s)) as ss:
318 self.assertEqual(timeout, ss.gettimeout())
319
320 def test_errors(self):
321 sock = socket.socket()
322 self.assertRaisesRegexp(ValueError,
323 "certfile must be specified",
324 ssl.wrap_socket, sock, keyfile=CERTFILE)
325 self.assertRaisesRegexp(ValueError,
326 "certfile must be specified for server-side operations",
327 ssl.wrap_socket, sock, server_side=True)
328 self.assertRaisesRegexp(ValueError,
329 "certfile must be specified for server-side operations",
330 ssl.wrap_socket, sock, server_side=True, certfile="")
331 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
332 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
333 s.connect, (HOST, 8080))
334 with self.assertRaises(IOError) as cm:
335 with closing(socket.socket()) as sock:
336 ssl.wrap_socket(sock, certfile=WRONGCERT)
337 self.assertEqual(cm.exception.errno, errno.ENOENT)
338 with self.assertRaises(IOError) as cm:
339 with closing(socket.socket()) as sock:
340 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
341 self.assertEqual(cm.exception.errno, errno.ENOENT)
342 with self.assertRaises(IOError) as cm:
343 with closing(socket.socket()) as sock:
344 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
345 self.assertEqual(cm.exception.errno, errno.ENOENT)
346
347 def test_match_hostname(self):
348 def ok(cert, hostname):
349 ssl.match_hostname(cert, hostname)
350 def fail(cert, hostname):
351 self.assertRaises(ssl.CertificateError,
352 ssl.match_hostname, cert, hostname)
353
354 cert = {'subject': ((('commonName', 'example.com'),),)}
355 ok(cert, 'example.com')
356 ok(cert, 'ExAmple.cOm')
357 fail(cert, 'www.example.com')
358 fail(cert, '.example.com')
359 fail(cert, 'example.org')
360 fail(cert, 'exampleXcom')
361
362 cert = {'subject': ((('commonName', '*.a.com'),),)}
363 ok(cert, 'foo.a.com')
364 fail(cert, 'bar.foo.a.com')
365 fail(cert, 'a.com')
366 fail(cert, 'Xa.com')
367 fail(cert, '.a.com')
368
369 # only match one left-most wildcard
370 cert = {'subject': ((('commonName', 'f*.com'),),)}
371 ok(cert, 'foo.com')
372 ok(cert, 'f.com')
373 fail(cert, 'bar.com')
374 fail(cert, 'foo.a.com')
375 fail(cert, 'bar.foo.com')
376
377 # NULL bytes are bad, CVE-2013-4073
378 cert = {'subject': ((('commonName',
379 'null.python.org\x00example.org'),),)}
380 ok(cert, 'null.python.org\x00example.org') # or raise an error?
381 fail(cert, 'example.org')
382 fail(cert, 'null.python.org')
383
384 # error cases with wildcards
385 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
386 fail(cert, 'bar.foo.a.com')
387 fail(cert, 'a.com')
388 fail(cert, 'Xa.com')
389 fail(cert, '.a.com')
390
391 cert = {'subject': ((('commonName', 'a.*.com'),),)}
392 fail(cert, 'a.foo.com')
393 fail(cert, 'a..com')
394 fail(cert, 'a.com')
395
396 # wildcard doesn't match IDNA prefix 'xn--'
397 idna = u'püthon.python.org'.encode("idna").decode("ascii")
398 cert = {'subject': ((('commonName', idna),),)}
399 ok(cert, idna)
400 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
401 fail(cert, idna)
402 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
403 fail(cert, idna)
404
405 # wildcard in first fragment and IDNA A-labels in sequent fragments
406 # are supported.
407 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
408 cert = {'subject': ((('commonName', idna),),)}
409 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
410 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
411 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
412 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
413
414 # Slightly fake real-world example
415 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
416 'subject': ((('commonName', 'linuxfrz.org'),),),
417 'subjectAltName': (('DNS', 'linuxfr.org'),
418 ('DNS', 'linuxfr.com'),
419 ('othername', '<unsupported>'))}
420 ok(cert, 'linuxfr.org')
421 ok(cert, 'linuxfr.com')
422 # Not a "DNS" entry
423 fail(cert, '<unsupported>')
424 # When there is a subjectAltName, commonName isn't used
425 fail(cert, 'linuxfrz.org')
426
427 # A pristine real-world example
428 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
429 'subject': ((('countryName', 'US'),),
430 (('stateOrProvinceName', 'California'),),
431 (('localityName', 'Mountain View'),),
432 (('organizationName', 'Google Inc'),),
433 (('commonName', 'mail.google.com'),))}
434 ok(cert, 'mail.google.com')
435 fail(cert, 'gmail.com')
436 # Only commonName is considered
437 fail(cert, 'California')
438
439 # Neither commonName nor subjectAltName
440 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
441 'subject': ((('countryName', 'US'),),
442 (('stateOrProvinceName', 'California'),),
443 (('localityName', 'Mountain View'),),
444 (('organizationName', 'Google Inc'),))}
445 fail(cert, 'mail.google.com')
446
447 # No DNS entry in subjectAltName but a commonName
448 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
449 'subject': ((('countryName', 'US'),),
450 (('stateOrProvinceName', 'California'),),
451 (('localityName', 'Mountain View'),),
452 (('commonName', 'mail.google.com'),)),
453 'subjectAltName': (('othername', 'blabla'), )}
454 ok(cert, 'mail.google.com')
455
456 # No DNS entry subjectAltName and no commonName
457 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
458 'subject': ((('countryName', 'US'),),
459 (('stateOrProvinceName', 'California'),),
460 (('localityName', 'Mountain View'),),
461 (('organizationName', 'Google Inc'),)),
462 'subjectAltName': (('othername', 'blabla'),)}
463 fail(cert, 'google.com')
464
465 # Empty cert / no cert
466 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
467 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
468
469 # Issue #17980: avoid denials of service by refusing more than one
470 # wildcard per fragment.
471 cert = {'subject': ((('commonName', 'a*b.com'),),)}
472 ok(cert, 'axxb.com')
473 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
474 fail(cert, 'axxb.com')
475 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
476 with self.assertRaises(ssl.CertificateError) as cm:
477 ssl.match_hostname(cert, 'axxbxxc.com')
478 self.assertIn("too many wildcards", str(cm.exception))
479
480 def test_server_side(self):
481 # server_hostname doesn't work for server sockets
482 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
483 with closing(socket.socket()) as sock:
484 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
485 server_hostname="some.hostname")
486
487 def test_unknown_channel_binding(self):
488 # should raise ValueError for unknown type
489 s = socket.socket(socket.AF_INET)
490 with closing(ssl.wrap_socket(s)) as ss:
491 with self.assertRaises(ValueError):
492 ss.get_channel_binding("unknown-type")
493
494 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
495 "'tls-unique' channel binding not available")
496 def test_tls_unique_channel_binding(self):
497 # unconnected should return None for known type
498 s = socket.socket(socket.AF_INET)
499 with closing(ssl.wrap_socket(s)) as ss:
500 self.assertIsNone(ss.get_channel_binding("tls-unique"))
501 # the same for server-side
502 s = socket.socket(socket.AF_INET)
503 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
504 self.assertIsNone(ss.get_channel_binding("tls-unique"))
505
506 def test_get_default_verify_paths(self):
507 paths = ssl.get_default_verify_paths()
508 self.assertEqual(len(paths), 6)
509 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
510
511 with support.EnvironmentVarGuard() as env:
512 env["SSL_CERT_DIR"] = CAPATH
513 env["SSL_CERT_FILE"] = CERTFILE
514 paths = ssl.get_default_verify_paths()
515 self.assertEqual(paths.cafile, CERTFILE)
516 self.assertEqual(paths.capath, CAPATH)
517
518 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
519 def test_enum_certificates(self):
520 self.assertTrue(ssl.enum_certificates("CA"))
521 self.assertTrue(ssl.enum_certificates("ROOT"))
522
523 self.assertRaises(TypeError, ssl.enum_certificates)
524 self.assertRaises(WindowsError, ssl.enum_certificates, "")
525
526 trust_oids = set()
527 for storename in ("CA", "ROOT"):
528 store = ssl.enum_certificates(storename)
529 self.assertIsInstance(store, list)
530 for element in store:
531 self.assertIsInstance(element, tuple)
532 self.assertEqual(len(element), 3)
533 cert, enc, trust = element
534 self.assertIsInstance(cert, bytes)
535 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
536 self.assertIsInstance(trust, (set, bool))
537 if isinstance(trust, set):
538 trust_oids.update(trust)
539
540 serverAuth = "1.3.6.1.5.5.7.3.1"
541 self.assertIn(serverAuth, trust_oids)
542
543 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
544 def test_enum_crls(self):
545 self.assertTrue(ssl.enum_crls("CA"))
546 self.assertRaises(TypeError, ssl.enum_crls)
547 self.assertRaises(WindowsError, ssl.enum_crls, "")
548
549 crls = ssl.enum_crls("CA")
550 self.assertIsInstance(crls, list)
551 for element in crls:
552 self.assertIsInstance(element, tuple)
553 self.assertEqual(len(element), 2)
554 self.assertIsInstance(element[0], bytes)
555 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
556
557
558 def test_asn1object(self):
559 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
560 '1.3.6.1.5.5.7.3.1')
561
562 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
563 self.assertEqual(val, expected)
564 self.assertEqual(val.nid, 129)
565 self.assertEqual(val.shortname, 'serverAuth')
566 self.assertEqual(val.longname, 'TLS Web Server Authentication')
567 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
568 self.assertIsInstance(val, ssl._ASN1Object)
569 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
570
571 val = ssl._ASN1Object.fromnid(129)
572 self.assertEqual(val, expected)
573 self.assertIsInstance(val, ssl._ASN1Object)
574 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
575 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
576 ssl._ASN1Object.fromnid(100000)
577 for i in range(1000):
578 try:
579 obj = ssl._ASN1Object.fromnid(i)
580 except ValueError:
581 pass
582 else:
583 self.assertIsInstance(obj.nid, int)
584 self.assertIsInstance(obj.shortname, str)
585 self.assertIsInstance(obj.longname, str)
586 self.assertIsInstance(obj.oid, (str, type(None)))
587
588 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
589 self.assertEqual(val, expected)
590 self.assertIsInstance(val, ssl._ASN1Object)
591 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
592 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
593 expected)
594 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
595 ssl._ASN1Object.fromname('serverauth')
596
597 def test_purpose_enum(self):
598 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
599 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
600 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
601 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
602 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
603 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
604 '1.3.6.1.5.5.7.3.1')
605
606 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
607 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
608 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
609 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
610 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
611 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
612 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000613
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100614 def test_unsupported_dtls(self):
615 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
616 self.addCleanup(s.close)
617 with self.assertRaises(NotImplementedError) as cx:
618 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
619 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500620 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
621 with self.assertRaises(NotImplementedError) as cx:
622 ctx.wrap_socket(s)
623 self.assertEqual(str(cx.exception), "only stream sockets are supported")
624
625 def cert_time_ok(self, timestring, timestamp):
626 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
627
628 def cert_time_fail(self, timestring):
629 with self.assertRaises(ValueError):
630 ssl.cert_time_to_seconds(timestring)
631
632 @unittest.skipUnless(utc_offset(),
633 'local time needs to be different from UTC')
634 def test_cert_time_to_seconds_timezone(self):
635 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
636 # results if local timezone is not UTC
637 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
638 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
639
640 def test_cert_time_to_seconds(self):
641 timestring = "Jan 5 09:34:43 2018 GMT"
642 ts = 1515144883.0
643 self.cert_time_ok(timestring, ts)
644 # accept keyword parameter, assert its name
645 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
646 # accept both %e and %d (space or zero generated by strftime)
647 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
648 # case-insensitive
649 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
650 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
651 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
652 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
653 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
654 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
655 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
656 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
657
658 newyear_ts = 1230768000.0
659 # leap seconds
660 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
661 # same timestamp
662 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
663
664 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
665 # allow 60th second (even if it is not a leap second)
666 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
667 # allow 2nd leap second for compatibility with time.strptime()
668 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
669 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
670
671 # no special treatement for the special value:
672 # 99991231235959Z (rfc 5280)
673 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
674
675 @support.run_with_locale('LC_ALL', '')
676 def test_cert_time_to_seconds_locale(self):
677 # `cert_time_to_seconds()` should be locale independent
678
679 def local_february_name():
680 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
681
682 if local_february_name().lower() == 'feb':
683 self.skipTest("locale-specific month name needs to be "
684 "different from C locale")
685
686 # locale-independent
687 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
688 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
689
690
691class ContextTests(unittest.TestCase):
692
693 @skip_if_broken_ubuntu_ssl
694 def test_constructor(self):
695 for protocol in PROTOCOLS:
696 ssl.SSLContext(protocol)
697 self.assertRaises(TypeError, ssl.SSLContext)
698 self.assertRaises(ValueError, ssl.SSLContext, -1)
699 self.assertRaises(ValueError, ssl.SSLContext, 42)
700
701 @skip_if_broken_ubuntu_ssl
702 def test_protocol(self):
703 for proto in PROTOCOLS:
704 ctx = ssl.SSLContext(proto)
705 self.assertEqual(ctx.protocol, proto)
706
707 def test_ciphers(self):
708 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
709 ctx.set_ciphers("ALL")
710 ctx.set_ciphers("DEFAULT")
711 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
712 ctx.set_ciphers("^$:,;?*'dorothyx")
713
714 @skip_if_broken_ubuntu_ssl
715 def test_options(self):
716 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
717 # OP_ALL | OP_NO_SSLv2 is the default value
718 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
719 ctx.options)
720 ctx.options |= ssl.OP_NO_SSLv3
721 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
722 ctx.options)
723 if can_clear_options():
724 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
725 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
726 ctx.options)
727 ctx.options = 0
728 self.assertEqual(0, ctx.options)
729 else:
730 with self.assertRaises(ValueError):
731 ctx.options = 0
732
733 def test_verify_mode(self):
734 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
735 # Default value
736 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
737 ctx.verify_mode = ssl.CERT_OPTIONAL
738 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
739 ctx.verify_mode = ssl.CERT_REQUIRED
740 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
741 ctx.verify_mode = ssl.CERT_NONE
742 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
743 with self.assertRaises(TypeError):
744 ctx.verify_mode = None
745 with self.assertRaises(ValueError):
746 ctx.verify_mode = 42
747
748 @unittest.skipUnless(have_verify_flags(),
749 "verify_flags need OpenSSL > 0.9.8")
750 def test_verify_flags(self):
751 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson72ef9612015-03-04 22:49:41 -0500752 # default value
753 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
754 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500755 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
756 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
757 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
758 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
759 ctx.verify_flags = ssl.VERIFY_DEFAULT
760 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
761 # supports any value
762 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
763 self.assertEqual(ctx.verify_flags,
764 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
765 with self.assertRaises(TypeError):
766 ctx.verify_flags = None
767
768 def test_load_cert_chain(self):
769 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
770 # Combined key and cert in a single file
Benjamin Peterson04439fd2014-11-03 21:05:01 -0500771 ctx.load_cert_chain(CERTFILE, keyfile=None)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500772 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
773 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
774 with self.assertRaises(IOError) as cm:
775 ctx.load_cert_chain(WRONGCERT)
776 self.assertEqual(cm.exception.errno, errno.ENOENT)
777 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
778 ctx.load_cert_chain(BADCERT)
779 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
780 ctx.load_cert_chain(EMPTYCERT)
781 # Separate key and cert
782 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
783 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
784 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
785 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
786 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
787 ctx.load_cert_chain(ONLYCERT)
788 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
789 ctx.load_cert_chain(ONLYKEY)
790 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
791 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
792 # Mismatching key and cert
793 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
794 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
795 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
796 # Password protected key and cert
797 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
798 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
799 ctx.load_cert_chain(CERTFILE_PROTECTED,
800 password=bytearray(KEY_PASSWORD.encode()))
801 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
802 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
803 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
804 bytearray(KEY_PASSWORD.encode()))
805 with self.assertRaisesRegexp(TypeError, "should be a string"):
806 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
807 with self.assertRaises(ssl.SSLError):
808 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
809 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
810 # openssl has a fixed limit on the password buffer.
811 # PEM_BUFSIZE is generally set to 1kb.
812 # Return a string larger than this.
813 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
814 # Password callback
815 def getpass_unicode():
816 return KEY_PASSWORD
817 def getpass_bytes():
818 return KEY_PASSWORD.encode()
819 def getpass_bytearray():
820 return bytearray(KEY_PASSWORD.encode())
821 def getpass_badpass():
822 return "badpass"
823 def getpass_huge():
824 return b'a' * (1024 * 1024)
825 def getpass_bad_type():
826 return 9
827 def getpass_exception():
828 raise Exception('getpass error')
829 class GetPassCallable:
830 def __call__(self):
831 return KEY_PASSWORD
832 def getpass(self):
833 return KEY_PASSWORD
834 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
835 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
836 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
837 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
838 ctx.load_cert_chain(CERTFILE_PROTECTED,
839 password=GetPassCallable().getpass)
840 with self.assertRaises(ssl.SSLError):
841 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
842 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
843 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
844 with self.assertRaisesRegexp(TypeError, "must return a string"):
845 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
846 with self.assertRaisesRegexp(Exception, "getpass error"):
847 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
848 # Make sure the password function isn't called if it isn't needed
849 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
850
851 def test_load_verify_locations(self):
852 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
853 ctx.load_verify_locations(CERTFILE)
854 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
855 ctx.load_verify_locations(BYTES_CERTFILE)
856 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400857 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500858 self.assertRaises(TypeError, ctx.load_verify_locations)
859 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
860 with self.assertRaises(IOError) as cm:
861 ctx.load_verify_locations(WRONGCERT)
862 self.assertEqual(cm.exception.errno, errno.ENOENT)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400863 with self.assertRaises(IOError):
864 ctx.load_verify_locations(u'')
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500865 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
866 ctx.load_verify_locations(BADCERT)
867 ctx.load_verify_locations(CERTFILE, CAPATH)
868 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
869
870 # Issue #10989: crash if the second argument type is invalid
871 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
872
873 def test_load_verify_cadata(self):
874 # test cadata
875 with open(CAFILE_CACERT) as f:
876 cacert_pem = f.read().decode("ascii")
877 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
878 with open(CAFILE_NEURONIO) as f:
879 neuronio_pem = f.read().decode("ascii")
880 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
881
882 # test PEM
883 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
884 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
885 ctx.load_verify_locations(cadata=cacert_pem)
886 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
887 ctx.load_verify_locations(cadata=neuronio_pem)
888 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
889 # cert already in hash table
890 ctx.load_verify_locations(cadata=neuronio_pem)
891 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
892
893 # combined
894 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
895 combined = "\n".join((cacert_pem, neuronio_pem))
896 ctx.load_verify_locations(cadata=combined)
897 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
898
899 # with junk around the certs
900 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
901 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
902 neuronio_pem, "tail"]
903 ctx.load_verify_locations(cadata="\n".join(combined))
904 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
905
906 # test DER
907 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
908 ctx.load_verify_locations(cadata=cacert_der)
909 ctx.load_verify_locations(cadata=neuronio_der)
910 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
911 # cert already in hash table
912 ctx.load_verify_locations(cadata=cacert_der)
913 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
914
915 # combined
916 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
917 combined = b"".join((cacert_der, neuronio_der))
918 ctx.load_verify_locations(cadata=combined)
919 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
920
921 # error cases
922 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
923 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
924
925 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
926 ctx.load_verify_locations(cadata=u"broken")
927 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
928 ctx.load_verify_locations(cadata=b"broken")
929
930
931 def test_load_dh_params(self):
932 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
933 ctx.load_dh_params(DHFILE)
934 if os.name != 'nt':
935 ctx.load_dh_params(BYTES_DHFILE)
936 self.assertRaises(TypeError, ctx.load_dh_params)
937 self.assertRaises(TypeError, ctx.load_dh_params, None)
938 with self.assertRaises(IOError) as cm:
939 ctx.load_dh_params(WRONGCERT)
940 self.assertEqual(cm.exception.errno, errno.ENOENT)
941 with self.assertRaises(ssl.SSLError) as cm:
942 ctx.load_dh_params(CERTFILE)
943
944 @skip_if_broken_ubuntu_ssl
945 def test_session_stats(self):
946 for proto in PROTOCOLS:
947 ctx = ssl.SSLContext(proto)
948 self.assertEqual(ctx.session_stats(), {
949 'number': 0,
950 'connect': 0,
951 'connect_good': 0,
952 'connect_renegotiate': 0,
953 'accept': 0,
954 'accept_good': 0,
955 'accept_renegotiate': 0,
956 'hits': 0,
957 'misses': 0,
958 'timeouts': 0,
959 'cache_full': 0,
960 })
961
962 def test_set_default_verify_paths(self):
963 # There's not much we can do to test that it acts as expected,
964 # so just check it doesn't crash or raise an exception.
965 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
966 ctx.set_default_verify_paths()
967
968 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
969 def test_set_ecdh_curve(self):
970 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
971 ctx.set_ecdh_curve("prime256v1")
972 ctx.set_ecdh_curve(b"prime256v1")
973 self.assertRaises(TypeError, ctx.set_ecdh_curve)
974 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
975 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
976 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
977
978 @needs_sni
979 def test_sni_callback(self):
980 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
981
982 # set_servername_callback expects a callable, or None
983 self.assertRaises(TypeError, ctx.set_servername_callback)
984 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
985 self.assertRaises(TypeError, ctx.set_servername_callback, "")
986 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
987
988 def dummycallback(sock, servername, ctx):
989 pass
990 ctx.set_servername_callback(None)
991 ctx.set_servername_callback(dummycallback)
992
993 @needs_sni
994 def test_sni_callback_refcycle(self):
995 # Reference cycles through the servername callback are detected
996 # and cleared.
997 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
998 def dummycallback(sock, servername, ctx, cycle=ctx):
999 pass
1000 ctx.set_servername_callback(dummycallback)
1001 wr = weakref.ref(ctx)
1002 del ctx, dummycallback
1003 gc.collect()
1004 self.assertIs(wr(), None)
1005
1006 def test_cert_store_stats(self):
1007 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1008 self.assertEqual(ctx.cert_store_stats(),
1009 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1010 ctx.load_cert_chain(CERTFILE)
1011 self.assertEqual(ctx.cert_store_stats(),
1012 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1013 ctx.load_verify_locations(CERTFILE)
1014 self.assertEqual(ctx.cert_store_stats(),
1015 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1016 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1017 self.assertEqual(ctx.cert_store_stats(),
1018 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1019
1020 def test_get_ca_certs(self):
1021 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1022 self.assertEqual(ctx.get_ca_certs(), [])
1023 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1024 ctx.load_verify_locations(CERTFILE)
1025 self.assertEqual(ctx.get_ca_certs(), [])
1026 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1027 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1028 self.assertEqual(ctx.get_ca_certs(),
1029 [{'issuer': ((('organizationName', 'Root CA'),),
1030 (('organizationalUnitName', 'http://www.cacert.org'),),
1031 (('commonName', 'CA Cert Signing Authority'),),
1032 (('emailAddress', 'support@cacert.org'),)),
1033 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1034 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1035 'serialNumber': '00',
1036 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1037 'subject': ((('organizationName', 'Root CA'),),
1038 (('organizationalUnitName', 'http://www.cacert.org'),),
1039 (('commonName', 'CA Cert Signing Authority'),),
1040 (('emailAddress', 'support@cacert.org'),)),
1041 'version': 3}])
1042
1043 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1044 pem = f.read()
1045 der = ssl.PEM_cert_to_DER_cert(pem)
1046 self.assertEqual(ctx.get_ca_certs(True), [der])
1047
1048 def test_load_default_certs(self):
1049 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1050 ctx.load_default_certs()
1051
1052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1053 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1054 ctx.load_default_certs()
1055
1056 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1057 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1058
1059 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1060 self.assertRaises(TypeError, ctx.load_default_certs, None)
1061 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1062
Benjamin Petersona02ae252014-10-03 18:17:15 -04001063 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson0b30a2b2014-10-03 17:27:05 -04001064 def test_load_default_certs_env(self):
1065 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1066 with support.EnvironmentVarGuard() as env:
1067 env["SSL_CERT_DIR"] = CAPATH
1068 env["SSL_CERT_FILE"] = CERTFILE
1069 ctx.load_default_certs()
1070 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1071
Benjamin Petersona02ae252014-10-03 18:17:15 -04001072 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1073 def test_load_default_certs_env_windows(self):
1074 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1075 ctx.load_default_certs()
1076 stats = ctx.cert_store_stats()
1077
1078 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1079 with support.EnvironmentVarGuard() as env:
1080 env["SSL_CERT_DIR"] = CAPATH
1081 env["SSL_CERT_FILE"] = CERTFILE
1082 ctx.load_default_certs()
1083 stats["x509"] += 1
1084 self.assertEqual(ctx.cert_store_stats(), stats)
1085
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001086 def test_create_default_context(self):
1087 ctx = ssl.create_default_context()
1088 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1089 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1090 self.assertTrue(ctx.check_hostname)
1091 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1092 self.assertEqual(
1093 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1094 getattr(ssl, "OP_NO_COMPRESSION", 0),
1095 )
1096
1097 with open(SIGNING_CA) as f:
1098 cadata = f.read().decode("ascii")
1099 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1100 cadata=cadata)
1101 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1102 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1103 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1104 self.assertEqual(
1105 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1106 getattr(ssl, "OP_NO_COMPRESSION", 0),
1107 )
1108
1109 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1110 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1111 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1112 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1113 self.assertEqual(
1114 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1115 getattr(ssl, "OP_NO_COMPRESSION", 0),
1116 )
1117 self.assertEqual(
1118 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1119 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1120 )
1121 self.assertEqual(
1122 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1123 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1124 )
1125
1126 def test__create_stdlib_context(self):
1127 ctx = ssl._create_stdlib_context()
1128 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1129 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1130 self.assertFalse(ctx.check_hostname)
1131 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1132
1133 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1134 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1135 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1136 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1137
1138 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1139 cert_reqs=ssl.CERT_REQUIRED,
1140 check_hostname=True)
1141 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1142 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1143 self.assertTrue(ctx.check_hostname)
1144 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1145
1146 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1147 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1148 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1149 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1150
1151 def test_check_hostname(self):
1152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1153 self.assertFalse(ctx.check_hostname)
1154
1155 # Requires CERT_REQUIRED or CERT_OPTIONAL
1156 with self.assertRaises(ValueError):
1157 ctx.check_hostname = True
1158 ctx.verify_mode = ssl.CERT_REQUIRED
1159 self.assertFalse(ctx.check_hostname)
1160 ctx.check_hostname = True
1161 self.assertTrue(ctx.check_hostname)
1162
1163 ctx.verify_mode = ssl.CERT_OPTIONAL
1164 ctx.check_hostname = True
1165 self.assertTrue(ctx.check_hostname)
1166
1167 # Cannot set CERT_NONE with check_hostname enabled
1168 with self.assertRaises(ValueError):
1169 ctx.verify_mode = ssl.CERT_NONE
1170 ctx.check_hostname = False
1171 self.assertFalse(ctx.check_hostname)
1172
1173
1174class SSLErrorTests(unittest.TestCase):
1175
1176 def test_str(self):
1177 # The str() of a SSLError doesn't include the errno
1178 e = ssl.SSLError(1, "foo")
1179 self.assertEqual(str(e), "foo")
1180 self.assertEqual(e.errno, 1)
1181 # Same for a subclass
1182 e = ssl.SSLZeroReturnError(1, "foo")
1183 self.assertEqual(str(e), "foo")
1184 self.assertEqual(e.errno, 1)
1185
1186 def test_lib_reason(self):
1187 # Test the library and reason attributes
1188 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1189 with self.assertRaises(ssl.SSLError) as cm:
1190 ctx.load_dh_params(CERTFILE)
1191 self.assertEqual(cm.exception.library, 'PEM')
1192 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1193 s = str(cm.exception)
1194 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1195
1196 def test_subclass(self):
1197 # Check that the appropriate SSLError subclass is raised
1198 # (this only tests one of them)
1199 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1200 with closing(socket.socket()) as s:
1201 s.bind(("127.0.0.1", 0))
1202 s.listen(5)
1203 c = socket.socket()
1204 c.connect(s.getsockname())
1205 c.setblocking(False)
1206 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1207 with self.assertRaises(ssl.SSLWantReadError) as cm:
1208 c.do_handshake()
1209 s = str(cm.exception)
1210 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1211 # For compatibility
1212 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001213
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001214
Bill Janssen934b16d2008-06-28 22:19:33 +00001215class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001216
Antoine Pitrou3945c862010-04-28 21:11:01 +00001217 def test_connect(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001218 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001219 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1220 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001221 try:
1222 s.connect(("svn.python.org", 443))
1223 self.assertEqual({}, s.getpeercert())
1224 finally:
1225 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001226
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001227 # this should fail because we have no verification certs
1228 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1229 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001230 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1231 s.connect, ("svn.python.org", 443))
1232 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001233
1234 # this should succeed because we specify the root cert
1235 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1236 cert_reqs=ssl.CERT_REQUIRED,
1237 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1238 try:
1239 s.connect(("svn.python.org", 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001240 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001241 finally:
1242 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001243
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001244 def test_connect_ex(self):
1245 # Issue #11326: check connect_ex() implementation
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001246 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001247 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1248 cert_reqs=ssl.CERT_REQUIRED,
1249 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1250 try:
1251 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1252 self.assertTrue(s.getpeercert())
1253 finally:
1254 s.close()
1255
1256 def test_non_blocking_connect_ex(self):
1257 # Issue #11326: non-blocking connect_ex() should allow handshake
1258 # to proceed after the socket gets ready.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001259 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001260 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1261 cert_reqs=ssl.CERT_REQUIRED,
1262 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1263 do_handshake_on_connect=False)
1264 try:
1265 s.setblocking(False)
1266 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001267 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1268 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001269 # Wait for connect to finish
1270 select.select([], [s], [], 5.0)
1271 # Non-blocking handshake
1272 while True:
1273 try:
1274 s.do_handshake()
1275 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001276 except ssl.SSLWantReadError:
1277 select.select([s], [], [], 5.0)
1278 except ssl.SSLWantWriteError:
1279 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001280 # SSL established
1281 self.assertTrue(s.getpeercert())
1282 finally:
1283 s.close()
1284
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001285 def test_timeout_connect_ex(self):
1286 # Issue #12065: on a timeout, connect_ex() should return the original
1287 # errno (mimicking the behaviour of non-SSL sockets).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001288 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001289 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1290 cert_reqs=ssl.CERT_REQUIRED,
1291 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1292 do_handshake_on_connect=False)
1293 try:
1294 s.settimeout(0.0000001)
1295 rc = s.connect_ex(('svn.python.org', 443))
1296 if rc == 0:
1297 self.skipTest("svn.python.org responded too quickly")
1298 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1299 finally:
1300 s.close()
1301
1302 def test_connect_ex_error(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001303 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001304 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1305 cert_reqs=ssl.CERT_REQUIRED,
1306 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1307 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001308 rc = s.connect_ex(("svn.python.org", 444))
1309 # Issue #19919: Windows machines or VMs hosted on Windows
1310 # machines sometimes return EWOULDBLOCK.
1311 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001312 finally:
1313 s.close()
1314
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001315 def test_connect_with_context(self):
1316 with support.transient_internet("svn.python.org"):
1317 # Same as test_connect, but with a separately created context
1318 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1319 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1320 s.connect(("svn.python.org", 443))
1321 try:
1322 self.assertEqual({}, s.getpeercert())
1323 finally:
1324 s.close()
1325 # Same with a server hostname
1326 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1327 server_hostname="svn.python.org")
Benjamin Peterson31aa69e2014-11-23 20:13:31 -06001328 s.connect(("svn.python.org", 443))
1329 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001330 # This should fail because we have no verification certs
1331 ctx.verify_mode = ssl.CERT_REQUIRED
1332 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1333 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1334 s.connect, ("svn.python.org", 443))
1335 s.close()
1336 # This should succeed because we specify the root cert
1337 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1338 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1339 s.connect(("svn.python.org", 443))
1340 try:
1341 cert = s.getpeercert()
1342 self.assertTrue(cert)
1343 finally:
1344 s.close()
1345
1346 def test_connect_capath(self):
1347 # Verify server certificates using the `capath` argument
1348 # NOTE: the subject hashing algorithm has been changed between
1349 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1350 # contain both versions of each certificate (same content, different
1351 # filename) for this test to be portable across OpenSSL releases.
1352 with support.transient_internet("svn.python.org"):
1353 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1354 ctx.verify_mode = ssl.CERT_REQUIRED
1355 ctx.load_verify_locations(capath=CAPATH)
1356 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1357 s.connect(("svn.python.org", 443))
1358 try:
1359 cert = s.getpeercert()
1360 self.assertTrue(cert)
1361 finally:
1362 s.close()
1363 # Same with a bytes `capath` argument
1364 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1365 ctx.verify_mode = ssl.CERT_REQUIRED
1366 ctx.load_verify_locations(capath=BYTES_CAPATH)
1367 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1368 s.connect(("svn.python.org", 443))
1369 try:
1370 cert = s.getpeercert()
1371 self.assertTrue(cert)
1372 finally:
1373 s.close()
1374
1375 def test_connect_cadata(self):
1376 with open(CAFILE_CACERT) as f:
1377 pem = f.read().decode('ascii')
1378 der = ssl.PEM_cert_to_DER_cert(pem)
1379 with support.transient_internet("svn.python.org"):
1380 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1381 ctx.verify_mode = ssl.CERT_REQUIRED
1382 ctx.load_verify_locations(cadata=pem)
1383 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1384 s.connect(("svn.python.org", 443))
1385 cert = s.getpeercert()
1386 self.assertTrue(cert)
1387
1388 # same with DER
1389 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1390 ctx.verify_mode = ssl.CERT_REQUIRED
1391 ctx.load_verify_locations(cadata=der)
1392 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1393 s.connect(("svn.python.org", 443))
1394 cert = s.getpeercert()
1395 self.assertTrue(cert)
1396
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001397 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1398 def test_makefile_close(self):
1399 # Issue #5238: creating a file-like object with makefile() shouldn't
1400 # delay closing the underlying "real socket" (here tested with its
1401 # file descriptor, hence skipping the test under Windows).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001402 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001403 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1404 ss.connect(("svn.python.org", 443))
1405 fd = ss.fileno()
1406 f = ss.makefile()
1407 f.close()
1408 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001409 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001410 # Closing the SSL socket should close the fd too
1411 ss.close()
1412 gc.collect()
1413 with self.assertRaises(OSError) as e:
1414 os.read(fd, 0)
1415 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001416
Antoine Pitrou3945c862010-04-28 21:11:01 +00001417 def test_non_blocking_handshake(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001418 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001419 s = socket.socket(socket.AF_INET)
1420 s.connect(("svn.python.org", 443))
1421 s.setblocking(False)
1422 s = ssl.wrap_socket(s,
1423 cert_reqs=ssl.CERT_NONE,
1424 do_handshake_on_connect=False)
1425 count = 0
1426 while True:
1427 try:
1428 count += 1
1429 s.do_handshake()
1430 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001431 except ssl.SSLWantReadError:
1432 select.select([s], [], [])
1433 except ssl.SSLWantWriteError:
1434 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001435 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001436 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001437 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001438
Antoine Pitrou3945c862010-04-28 21:11:01 +00001439 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001440 def _test_get_server_certificate(host, port, cert=None):
1441 with support.transient_internet(host):
1442 pem = ssl.get_server_certificate((host, port))
1443 if not pem:
1444 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001445
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001446 try:
1447 pem = ssl.get_server_certificate((host, port),
1448 ca_certs=CERTFILE)
1449 except ssl.SSLError as x:
1450 #should fail
1451 if support.verbose:
1452 sys.stdout.write("%s\n" % x)
1453 else:
1454 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001455
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001456 pem = ssl.get_server_certificate((host, port),
1457 ca_certs=cert)
1458 if not pem:
1459 self.fail("No server certificate on %s:%s!" % (host, port))
1460 if support.verbose:
1461 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1462
1463 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1464 if support.IPV6_ENABLED:
1465 _test_get_server_certificate('ipv6.google.com', 443)
1466
1467 def test_ciphers(self):
1468 remote = ("svn.python.org", 443)
1469 with support.transient_internet(remote[0]):
1470 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1471 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1472 s.connect(remote)
1473 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1474 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1475 s.connect(remote)
1476 # Error checking can happen at instantiation or when connecting
1477 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1478 with closing(socket.socket(socket.AF_INET)) as sock:
1479 s = ssl.wrap_socket(sock,
1480 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1481 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001482
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001483 def test_algorithms(self):
1484 # Issue #8484: all algorithms should be available when verifying a
1485 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001486 # SHA256 was added in OpenSSL 0.9.8
1487 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1488 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001489 # sha256.tbs-internet.com needs SNI to use the correct certificate
1490 if not ssl.HAS_SNI:
1491 self.skipTest("SNI needed for this test")
1492 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001493 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001494 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001495 with support.transient_internet("sha256.tbs-internet.com"):
1496 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1497 ctx.verify_mode = ssl.CERT_REQUIRED
1498 ctx.load_verify_locations(sha256_cert)
1499 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1500 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001501 try:
1502 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001503 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001504 sys.stdout.write("\nCipher with %r is %r\n" %
1505 (remote, s.cipher()))
1506 sys.stdout.write("Certificate is:\n%s\n" %
1507 pprint.pformat(s.getpeercert()))
1508 finally:
1509 s.close()
1510
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001511 def test_get_ca_certs_capath(self):
1512 # capath certs are loaded on request
1513 with support.transient_internet("svn.python.org"):
1514 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1515 ctx.verify_mode = ssl.CERT_REQUIRED
1516 ctx.load_verify_locations(capath=CAPATH)
1517 self.assertEqual(ctx.get_ca_certs(), [])
1518 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1519 s.connect(("svn.python.org", 443))
1520 try:
1521 cert = s.getpeercert()
1522 self.assertTrue(cert)
1523 finally:
1524 s.close()
1525 self.assertEqual(len(ctx.get_ca_certs()), 1)
1526
1527 @needs_sni
1528 def test_context_setget(self):
1529 # Check that the context of a connected socket can be replaced.
1530 with support.transient_internet("svn.python.org"):
1531 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1532 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1533 s = socket.socket(socket.AF_INET)
1534 with closing(ctx1.wrap_socket(s)) as ss:
1535 ss.connect(("svn.python.org", 443))
1536 self.assertIs(ss.context, ctx1)
1537 self.assertIs(ss._sslobj.context, ctx1)
1538 ss.context = ctx2
1539 self.assertIs(ss.context, ctx2)
1540 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001541
Bill Janssen98d19da2007-09-10 21:51:02 +00001542try:
1543 import threading
1544except ImportError:
1545 _have_threads = False
1546else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001547 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001548
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001549 from test.ssl_servers import make_https_server
1550
Bill Janssen98d19da2007-09-10 21:51:02 +00001551 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001552
Bill Janssen98d19da2007-09-10 21:51:02 +00001553 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001554
Bill Janssen98d19da2007-09-10 21:51:02 +00001555 """A mildly complicated class, because we want it to work both
1556 with and without the SSL wrapper around the socket connection, so
1557 that we can test the STARTTLS functionality."""
1558
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001559 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001560 self.server = server
1561 self.running = False
1562 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001563 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001564 self.sock.setblocking(1)
1565 self.sslconn = None
1566 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001567 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001568
Antoine Pitrou3945c862010-04-28 21:11:01 +00001569 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001570 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001571 self.sslconn = self.server.context.wrap_socket(
1572 self.sock, server_side=True)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001573 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1574 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001575 except socket.error as e:
1576 # We treat ConnectionResetError as though it were an
1577 # SSLError - OpenSSL on Ubuntu abruptly closes the
1578 # connection when asked to use an unsupported protocol.
1579 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001580 # XXX Various errors can have happened here, for example
1581 # a mismatching protocol version, an invalid certificate,
1582 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001583 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1584 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001585 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001586 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001587 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001588 self.running = False
1589 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001590 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001591 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001592 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001593 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1594 cert = self.sslconn.getpeercert()
1595 if support.verbose and self.server.chatty:
1596 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1597 cert_binary = self.sslconn.getpeercert(True)
1598 if support.verbose and self.server.chatty:
1599 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1600 cipher = self.sslconn.cipher()
1601 if support.verbose and self.server.chatty:
1602 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1603 sys.stdout.write(" server: selected protocol is now "
1604 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001605 return True
1606
1607 def read(self):
1608 if self.sslconn:
1609 return self.sslconn.read()
1610 else:
1611 return self.sock.recv(1024)
1612
1613 def write(self, bytes):
1614 if self.sslconn:
1615 return self.sslconn.write(bytes)
1616 else:
1617 return self.sock.send(bytes)
1618
1619 def close(self):
1620 if self.sslconn:
1621 self.sslconn.close()
1622 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001623 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001624
Antoine Pitrou3945c862010-04-28 21:11:01 +00001625 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001626 self.running = True
1627 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001628 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001629 return
1630 while self.running:
1631 try:
1632 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001633 stripped = msg.strip()
1634 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001635 # eof, so quit this handler
1636 self.running = False
1637 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001638 elif stripped == b'over':
1639 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001640 sys.stdout.write(" server: client closed connection\n")
1641 self.close()
1642 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001643 elif (self.server.starttls_server and
1644 stripped == b'STARTTLS'):
1645 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001646 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001647 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001648 if not self.wrap_conn():
1649 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001650 elif (self.server.starttls_server and self.sslconn
1651 and stripped == b'ENDTLS'):
1652 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001653 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001654 self.write(b"OK\n")
1655 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001656 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001657 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001658 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001659 elif stripped == b'CB tls-unique':
1660 if support.verbose and self.server.connectionchatty:
1661 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1662 data = self.sslconn.get_channel_binding("tls-unique")
1663 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001664 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001665 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001666 self.server.connectionchatty):
1667 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001668 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1669 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001670 self.write(msg.lower())
1671 except ssl.SSLError:
1672 if self.server.chatty:
1673 handle_error("Test server failure:\n")
1674 self.close()
1675 self.running = False
1676 # normally, we'd just stop here, but for the test
1677 # harness, we want to stop the server
1678 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001679
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001680 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001681 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001682 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001683 npn_protocols=None, alpn_protocols=None,
1684 ciphers=None, context=None):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001685 if context:
1686 self.context = context
1687 else:
1688 self.context = ssl.SSLContext(ssl_version
1689 if ssl_version is not None
1690 else ssl.PROTOCOL_TLSv1)
1691 self.context.verify_mode = (certreqs if certreqs is not None
1692 else ssl.CERT_NONE)
1693 if cacerts:
1694 self.context.load_verify_locations(cacerts)
1695 if certificate:
1696 self.context.load_cert_chain(certificate)
1697 if npn_protocols:
1698 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001699 if alpn_protocols:
1700 self.context.set_alpn_protocols(alpn_protocols)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001701 if ciphers:
1702 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001703 self.chatty = chatty
1704 self.connectionchatty = connectionchatty
1705 self.starttls_server = starttls_server
1706 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001707 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001708 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001709 self.active = False
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001710 self.selected_npn_protocols = []
1711 self.selected_alpn_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001712 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001713 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001714 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001715
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001716 def __enter__(self):
1717 self.start(threading.Event())
1718 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001719 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001720
1721 def __exit__(self, *args):
1722 self.stop()
1723 self.join()
1724
Antoine Pitrou3945c862010-04-28 21:11:01 +00001725 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001726 self.flag = flag
1727 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001728
Antoine Pitrou3945c862010-04-28 21:11:01 +00001729 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001730 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001731 self.sock.listen(5)
1732 self.active = True
1733 if self.flag:
1734 # signal an event
1735 self.flag.set()
1736 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001737 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001738 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001739 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001740 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001741 + repr(connaddr) + '\n')
1742 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001743 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001744 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001745 except socket.timeout:
1746 pass
1747 except KeyboardInterrupt:
1748 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001749 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001750
Antoine Pitrou3945c862010-04-28 21:11:01 +00001751 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001752 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001753
Bill Janssen934b16d2008-06-28 22:19:33 +00001754 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001755
Antoine Pitrou3945c862010-04-28 21:11:01 +00001756 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001757
Antoine Pitrou3945c862010-04-28 21:11:01 +00001758 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001759
1760 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001761 self.socket = ssl.wrap_socket(conn, server_side=True,
1762 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001763 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001764 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001765 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001766 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001767
1768 def readable(self):
1769 if isinstance(self.socket, ssl.SSLSocket):
1770 while self.socket.pending() > 0:
1771 self.handle_read_event()
1772 return True
1773
Antoine Pitroufc69af12010-04-24 20:04:58 +00001774 def _do_ssl_handshake(self):
1775 try:
1776 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001777 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1778 return
1779 except ssl.SSLEOFError:
1780 return self.handle_close()
1781 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001782 raise
1783 except socket.error, err:
1784 if err.args[0] == errno.ECONNABORTED:
1785 return self.handle_close()
1786 else:
1787 self._ssl_accepting = False
1788
Bill Janssen934b16d2008-06-28 22:19:33 +00001789 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001790 if self._ssl_accepting:
1791 self._do_ssl_handshake()
1792 else:
1793 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001794 if support.verbose:
1795 sys.stdout.write(" server: read %s from client\n" % repr(data))
1796 if not data:
1797 self.close()
1798 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001799 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001800
1801 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001802 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001803 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001804 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1805
1806 def handle_error(self):
1807 raise
1808
1809 def __init__(self, certfile):
1810 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001811 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1812 self.port = support.bind_port(sock, '')
1813 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001814 self.listen(5)
1815
1816 def handle_accept(self):
1817 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001818 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001819 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1820 self.ConnectionHandler(sock_obj, self.certfile)
1821
1822 def handle_error(self):
1823 raise
1824
1825 def __init__(self, certfile):
1826 self.flag = None
1827 self.active = False
1828 self.server = self.EchoServer(certfile)
1829 self.port = self.server.port
1830 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001831 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001832
1833 def __str__(self):
1834 return "<%s %s>" % (self.__class__.__name__, self.server)
1835
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001836 def __enter__(self):
1837 self.start(threading.Event())
1838 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001839 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001840
1841 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001842 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001843 sys.stdout.write(" cleanup: stopping server.\n")
1844 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001845 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001846 sys.stdout.write(" cleanup: joining server thread.\n")
1847 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001848 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001849 sys.stdout.write(" cleanup: successfully joined.\n")
1850
Antoine Pitrou3945c862010-04-28 21:11:01 +00001851 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001852 self.flag = flag
1853 threading.Thread.start(self)
1854
Antoine Pitrou3945c862010-04-28 21:11:01 +00001855 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001856 self.active = True
1857 if self.flag:
1858 self.flag.set()
1859 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001860 try:
1861 asyncore.loop(1)
1862 except:
1863 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001864
Antoine Pitrou3945c862010-04-28 21:11:01 +00001865 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001866 self.active = False
1867 self.server.close()
1868
Antoine Pitrou3945c862010-04-28 21:11:01 +00001869 def bad_cert_test(certfile):
1870 """
1871 Launch a server with CERT_REQUIRED, and check that trying to
1872 connect to it with the given client certificate fails.
1873 """
Trent Nelsone41b0062008-04-08 23:47:30 +00001874 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00001875 certreqs=ssl.CERT_REQUIRED,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001876 cacerts=CERTFILE, chatty=False,
1877 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001878 with server:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001879 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001880 with closing(socket.socket()) as sock:
1881 s = ssl.wrap_socket(sock,
1882 certfile=certfile,
1883 ssl_version=ssl.PROTOCOL_TLSv1)
1884 s.connect((HOST, server.port))
1885 except ssl.SSLError as x:
1886 if support.verbose:
1887 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1888 except OSError as x:
1889 if support.verbose:
1890 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1891 except OSError as x:
1892 if x.errno != errno.ENOENT:
1893 raise
1894 if support.verbose:
1895 sys.stdout.write("\OSError is %s\n" % str(x))
Bill Janssen98d19da2007-09-10 21:51:02 +00001896 else:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001897 raise AssertionError("Use of invalid cert should have failed!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001898
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001899 def server_params_test(client_context, server_context, indata=b"FOO\n",
1900 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001901 """
1902 Launch a server, connect a client to it and try various reads
1903 and writes.
1904 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001905 stats = {}
1906 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001907 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001908 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001909 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001910 with closing(client_context.wrap_socket(socket.socket(),
1911 server_hostname=sni_name)) as s:
1912 s.connect((HOST, server.port))
1913 for arg in [indata, bytearray(indata), memoryview(indata)]:
1914 if connectionchatty:
1915 if support.verbose:
1916 sys.stdout.write(
1917 " client: sending %r...\n" % indata)
1918 s.write(arg)
1919 outdata = s.read()
1920 if connectionchatty:
1921 if support.verbose:
1922 sys.stdout.write(" client: read %r\n" % outdata)
1923 if outdata != indata.lower():
1924 raise AssertionError(
1925 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1926 % (outdata[:20], len(outdata),
1927 indata[:20].lower(), len(indata)))
1928 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001929 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001930 if support.verbose:
1931 sys.stdout.write(" client: closing connection.\n")
1932 stats.update({
1933 'compression': s.compression(),
1934 'cipher': s.cipher(),
1935 'peercert': s.getpeercert(),
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001936 'client_alpn_protocol': s.selected_alpn_protocol(),
Alex Gaynore98205d2014-09-04 13:33:22 -07001937 'client_npn_protocol': s.selected_npn_protocol(),
1938 'version': s.version(),
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001939 })
1940 s.close()
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001941 stats['server_alpn_protocols'] = server.selected_alpn_protocols
1942 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001943 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001944
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001945 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1946 certsreqs=None, server_options=0, client_options=0):
Alex Gaynore98205d2014-09-04 13:33:22 -07001947 """
1948 Try to SSL-connect using *client_protocol* to *server_protocol*.
1949 If *expect_success* is true, assert that the connection succeeds,
1950 if it's false, assert that the connection fails.
1951 Also, if *expect_success* is a string, assert that it is the protocol
1952 version actually used by the connection.
1953 """
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00001954 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00001955 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00001956 certtype = {
1957 ssl.CERT_NONE: "CERT_NONE",
1958 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1959 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1960 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001961 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001962 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00001963 sys.stdout.write(formatstr %
1964 (ssl.get_protocol_name(client_protocol),
1965 ssl.get_protocol_name(server_protocol),
1966 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001967 client_context = ssl.SSLContext(client_protocol)
1968 client_context.options |= client_options
1969 server_context = ssl.SSLContext(server_protocol)
1970 server_context.options |= server_options
1971
1972 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1973 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1974 # starting from OpenSSL 1.0.0 (see issue #8322).
1975 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1976 client_context.set_ciphers("ALL")
1977
1978 for ctx in (client_context, server_context):
1979 ctx.verify_mode = certsreqs
1980 ctx.load_cert_chain(CERTFILE)
1981 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00001982 try:
Alex Gaynore98205d2014-09-04 13:33:22 -07001983 stats = server_params_test(client_context, server_context,
1984 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001985 # Protocol mismatch can result in either an SSLError, or a
1986 # "Connection reset by peer" error.
1987 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001988 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00001989 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00001990 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001991 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00001992 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00001993 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001994 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001995 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00001996 "Client protocol %s succeeded with server protocol %s!"
1997 % (ssl.get_protocol_name(client_protocol),
1998 ssl.get_protocol_name(server_protocol)))
Alex Gaynore98205d2014-09-04 13:33:22 -07001999 elif (expect_success is not True
2000 and expect_success != stats['version']):
2001 raise AssertionError("version mismatch: expected %r, got %r"
2002 % (expect_success, stats['version']))
Bill Janssen98d19da2007-09-10 21:51:02 +00002003
2004
Bill Janssen934b16d2008-06-28 22:19:33 +00002005 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00002006
Antoine Pitroud75efd92010-08-04 17:38:33 +00002007 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002008 def test_echo(self):
2009 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002010 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002011 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002012 for protocol in PROTOCOLS:
2013 context = ssl.SSLContext(protocol)
2014 context.load_cert_chain(CERTFILE)
2015 server_params_test(context, context,
2016 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002017
Antoine Pitrou3945c862010-04-28 21:11:01 +00002018 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002019 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002020 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002021 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2022 context.verify_mode = ssl.CERT_REQUIRED
2023 context.load_verify_locations(CERTFILE)
2024 context.load_cert_chain(CERTFILE)
2025 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002026 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002027 s = context.wrap_socket(socket.socket(),
2028 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00002029 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002030 # getpeercert() raise ValueError while the handshake isn't
2031 # done.
2032 with self.assertRaises(ValueError):
2033 s.getpeercert()
2034 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00002035 cert = s.getpeercert()
2036 self.assertTrue(cert, "Can't get peer certificate.")
2037 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002038 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002039 sys.stdout.write(pprint.pformat(cert) + '\n')
2040 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2041 if 'subject' not in cert:
2042 self.fail("No subject field in certificate: %s." %
2043 pprint.pformat(cert))
2044 if ((('organizationName', 'Python Software Foundation'),)
2045 not in cert['subject']):
2046 self.fail(
2047 "Missing or invalid 'organizationName' field in certificate subject; "
2048 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002049 self.assertIn('notBefore', cert)
2050 self.assertIn('notAfter', cert)
2051 before = ssl.cert_time_to_seconds(cert['notBefore'])
2052 after = ssl.cert_time_to_seconds(cert['notAfter'])
2053 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002054 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002055
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002056 @unittest.skipUnless(have_verify_flags(),
2057 "verify_flags need OpenSSL > 0.9.8")
2058 def test_crl_check(self):
2059 if support.verbose:
2060 sys.stdout.write("\n")
2061
2062 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2063 server_context.load_cert_chain(SIGNED_CERTFILE)
2064
2065 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2066 context.verify_mode = ssl.CERT_REQUIRED
2067 context.load_verify_locations(SIGNING_CA)
2068 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
2069
2070 # VERIFY_DEFAULT should pass
2071 server = ThreadedEchoServer(context=server_context, chatty=True)
2072 with server:
2073 with closing(context.wrap_socket(socket.socket())) as s:
2074 s.connect((HOST, server.port))
2075 cert = s.getpeercert()
2076 self.assertTrue(cert, "Can't get peer certificate.")
2077
2078 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2079 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2080
2081 server = ThreadedEchoServer(context=server_context, chatty=True)
2082 with server:
2083 with closing(context.wrap_socket(socket.socket())) as s:
2084 with self.assertRaisesRegexp(ssl.SSLError,
2085 "certificate verify failed"):
2086 s.connect((HOST, server.port))
2087
2088 # now load a CRL file. The CRL file is signed by the CA.
2089 context.load_verify_locations(CRLFILE)
2090
2091 server = ThreadedEchoServer(context=server_context, chatty=True)
2092 with server:
2093 with closing(context.wrap_socket(socket.socket())) as s:
2094 s.connect((HOST, server.port))
2095 cert = s.getpeercert()
2096 self.assertTrue(cert, "Can't get peer certificate.")
2097
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002098 def test_check_hostname(self):
2099 if support.verbose:
2100 sys.stdout.write("\n")
2101
2102 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2103 server_context.load_cert_chain(SIGNED_CERTFILE)
2104
2105 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2106 context.verify_mode = ssl.CERT_REQUIRED
2107 context.check_hostname = True
2108 context.load_verify_locations(SIGNING_CA)
2109
2110 # correct hostname should verify
2111 server = ThreadedEchoServer(context=server_context, chatty=True)
2112 with server:
2113 with closing(context.wrap_socket(socket.socket(),
2114 server_hostname="localhost")) as s:
2115 s.connect((HOST, server.port))
2116 cert = s.getpeercert()
2117 self.assertTrue(cert, "Can't get peer certificate.")
2118
2119 # incorrect hostname should raise an exception
2120 server = ThreadedEchoServer(context=server_context, chatty=True)
2121 with server:
2122 with closing(context.wrap_socket(socket.socket(),
2123 server_hostname="invalid")) as s:
2124 with self.assertRaisesRegexp(ssl.CertificateError,
2125 "hostname 'invalid' doesn't match u?'localhost'"):
2126 s.connect((HOST, server.port))
2127
2128 # missing server_hostname arg should cause an exception, too
2129 server = ThreadedEchoServer(context=server_context, chatty=True)
2130 with server:
2131 with closing(socket.socket()) as s:
2132 with self.assertRaisesRegexp(ValueError,
2133 "check_hostname requires server_hostname"):
2134 context.wrap_socket(s)
2135
Antoine Pitrou3945c862010-04-28 21:11:01 +00002136 def test_empty_cert(self):
2137 """Connecting with an empty cert file"""
2138 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2139 "nullcert.pem"))
2140 def test_malformed_cert(self):
2141 """Connecting with a badly formatted certificate (syntax error)"""
2142 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2143 "badcert.pem"))
2144 def test_nonexisting_cert(self):
2145 """Connecting with a non-existing cert file"""
2146 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2147 "wrongcert.pem"))
2148 def test_malformed_key(self):
2149 """Connecting with a badly formatted key (syntax error)"""
2150 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2151 "badkey.pem"))
Bill Janssen98d19da2007-09-10 21:51:02 +00002152
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002153 def test_rude_shutdown(self):
2154 """A brutal shutdown of an SSL server should raise an OSError
2155 in the client when attempting handshake.
2156 """
2157 listener_ready = threading.Event()
2158 listener_gone = threading.Event()
2159
2160 s = socket.socket()
2161 port = support.bind_port(s, HOST)
2162
2163 # `listener` runs in a thread. It sits in an accept() until
2164 # the main thread connects. Then it rudely closes the socket,
2165 # and sets Event `listener_gone` to let the main thread know
2166 # the socket is gone.
2167 def listener():
2168 s.listen(5)
2169 listener_ready.set()
2170 newsock, addr = s.accept()
2171 newsock.close()
2172 s.close()
2173 listener_gone.set()
2174
2175 def connector():
2176 listener_ready.wait()
2177 with closing(socket.socket()) as c:
2178 c.connect((HOST, port))
2179 listener_gone.wait()
2180 try:
2181 ssl_sock = ssl.wrap_socket(c)
Benjamin Petersone208b572014-08-20 14:49:08 -05002182 except socket.error:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002183 pass
2184 else:
2185 self.fail('connecting to closed SSL socket should have failed')
2186
2187 t = threading.Thread(target=listener)
2188 t.start()
2189 try:
2190 connector()
2191 finally:
2192 t.join()
2193
Antoine Pitroud75efd92010-08-04 17:38:33 +00002194 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002195 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2196 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002197 def test_protocol_sslv2(self):
2198 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002199 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002200 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002201 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2202 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2203 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002204 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002205 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2206 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002207 # SSLv23 client with specific SSL options
2208 if no_sslv2_implies_sslv3_hello():
2209 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2210 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2211 client_options=ssl.OP_NO_SSLv2)
2212 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2213 client_options=ssl.OP_NO_SSLv3)
2214 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2215 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002216
Antoine Pitroud75efd92010-08-04 17:38:33 +00002217 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002218 def test_protocol_sslv23(self):
2219 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002220 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002221 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002222 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2223 try:
2224 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2225 except socket.error as x:
2226 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2227 if support.verbose:
2228 sys.stdout.write(
2229 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2230 % str(x))
Benjamin Peterson60766c42014-12-05 21:59:35 -05002231 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2232 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou3945c862010-04-28 21:11:01 +00002233 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Alex Gaynore98205d2014-09-04 13:33:22 -07002234 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Bill Janssen98d19da2007-09-10 21:51:02 +00002235
Benjamin Peterson60766c42014-12-05 21:59:35 -05002236 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2237 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002238 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Alex Gaynore98205d2014-09-04 13:33:22 -07002239 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002240
Benjamin Peterson60766c42014-12-05 21:59:35 -05002241 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2242 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002243 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Alex Gaynore98205d2014-09-04 13:33:22 -07002244 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002245
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002246 # Server with specific SSL options
Benjamin Peterson60766c42014-12-05 21:59:35 -05002247 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2248 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002249 server_options=ssl.OP_NO_SSLv3)
2250 # Will choose TLSv1
2251 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2252 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2253 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2254 server_options=ssl.OP_NO_TLSv1)
2255
2256
Antoine Pitroud75efd92010-08-04 17:38:33 +00002257 @skip_if_broken_ubuntu_ssl
Benjamin Peterson60766c42014-12-05 21:59:35 -05002258 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2259 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002260 def test_protocol_sslv3(self):
2261 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002262 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002263 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002264 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2265 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2266 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002267 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2268 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002269 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2270 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002271 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002272 if no_sslv2_implies_sslv3_hello():
2273 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Alex Gaynore98205d2014-09-04 13:33:22 -07002274 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002275 client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002276
Antoine Pitroud75efd92010-08-04 17:38:33 +00002277 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002278 def test_protocol_tlsv1(self):
2279 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002280 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002281 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002282 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2283 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2284 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002285 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2286 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002287 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2288 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002289 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2290 client_options=ssl.OP_NO_TLSv1)
2291
2292 @skip_if_broken_ubuntu_ssl
2293 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2294 "TLS version 1.1 not supported.")
2295 def test_protocol_tlsv1_1(self):
2296 """Connecting to a TLSv1.1 server with various client options.
2297 Testing against older TLS versions."""
2298 if support.verbose:
2299 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002300 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002301 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2302 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002303 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2304 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002305 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2306 client_options=ssl.OP_NO_TLSv1_1)
2307
Alex Gaynore98205d2014-09-04 13:33:22 -07002308 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002309 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2310 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2311
2312
2313 @skip_if_broken_ubuntu_ssl
2314 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2315 "TLS version 1.2 not supported.")
2316 def test_protocol_tlsv1_2(self):
2317 """Connecting to a TLSv1.2 server with various client options.
2318 Testing against older TLS versions."""
2319 if support.verbose:
2320 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002321 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002322 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2323 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2324 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2325 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002326 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2327 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002328 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2329 client_options=ssl.OP_NO_TLSv1_2)
2330
Alex Gaynore98205d2014-09-04 13:33:22 -07002331 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002332 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2333 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2334 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2335 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002336
Antoine Pitrou3945c862010-04-28 21:11:01 +00002337 def test_starttls(self):
2338 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002339 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002340
Trent Nelsone41b0062008-04-08 23:47:30 +00002341 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002342 ssl_version=ssl.PROTOCOL_TLSv1,
2343 starttls_server=True,
2344 chatty=True,
2345 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002346 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002347 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002348 s = socket.socket()
2349 s.setblocking(1)
2350 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002351 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002352 sys.stdout.write("\n")
2353 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002354 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002355 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002356 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002357 if wrapped:
2358 conn.write(indata)
2359 outdata = conn.read()
2360 else:
2361 s.send(indata)
2362 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002363 msg = outdata.strip().lower()
2364 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002365 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002366 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002367 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002368 " client: read %r from server, starting TLS...\n"
2369 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002370 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2371 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002372 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002373 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002374 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002375 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002376 " client: read %r from server, ending TLS...\n"
2377 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002378 s = conn.unwrap()
2379 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002380 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002381 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002382 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002383 " client: read %r from server\n" % msg)
2384 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002385 sys.stdout.write(" client: closing connection.\n")
2386 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002387 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002388 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002389 s.send(b"over\n")
2390 if wrapped:
2391 conn.close()
2392 else:
2393 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002394
Antoine Pitrou3945c862010-04-28 21:11:01 +00002395 def test_socketserver(self):
2396 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002397 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002398 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002399 if support.verbose:
2400 sys.stdout.write('\n')
2401 with open(CERTFILE, 'rb') as f:
2402 d1 = f.read()
2403 d2 = ''
2404 # now fetch the same data from the HTTPS server
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -06002405 url = 'https://localhost:%d/%s' % (
2406 server.port, os.path.split(CERTFILE)[1])
2407 context = ssl.create_default_context(cafile=CERTFILE)
Benjamin Petersonb0609ec2014-11-23 11:52:46 -06002408 f = urllib2.urlopen(url, context=context)
Bill Janssen296a59d2007-09-16 22:06:00 +00002409 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002410 dlen = f.info().getheader("content-length")
2411 if dlen and (int(dlen) > 0):
2412 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002413 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002414 sys.stdout.write(
2415 " client: read %d bytes from remote server '%s'\n"
2416 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002417 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002418 f.close()
2419 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002420
Antoine Pitrou3945c862010-04-28 21:11:01 +00002421 def test_asyncore_server(self):
2422 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002423 indata = "TEST MESSAGE of mixed case\n"
2424
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002425 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002426 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002427
2428 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002429 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002430 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002431 s = ssl.wrap_socket(socket.socket())
2432 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002433 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002434 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002435 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002436 s.write(indata)
2437 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002438 if support.verbose:
2439 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002440 if outdata != indata.lower():
2441 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002442 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2443 % (outdata[:20], len(outdata),
2444 indata[:20].lower(), len(indata)))
2445 s.write(b"over\n")
2446 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002447 sys.stdout.write(" client: closing connection.\n")
2448 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002449 if support.verbose:
2450 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002451
Antoine Pitrou3945c862010-04-28 21:11:01 +00002452 def test_recv_send(self):
2453 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002454 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002455 sys.stdout.write("\n")
2456
2457 server = ThreadedEchoServer(CERTFILE,
2458 certreqs=ssl.CERT_NONE,
2459 ssl_version=ssl.PROTOCOL_TLSv1,
2460 cacerts=CERTFILE,
2461 chatty=True,
2462 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002463 with server:
2464 s = ssl.wrap_socket(socket.socket(),
2465 server_side=False,
2466 certfile=CERTFILE,
2467 ca_certs=CERTFILE,
2468 cert_reqs=ssl.CERT_NONE,
2469 ssl_version=ssl.PROTOCOL_TLSv1)
2470 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002471 # helper methods for standardising recv* method signatures
2472 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002473 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002474 count = s.recv_into(b)
2475 return b[:count]
2476
2477 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002478 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002479 count, addr = s.recvfrom_into(b)
2480 return b[:count]
2481
2482 # (name, method, whether to expect success, *args)
2483 send_methods = [
2484 ('send', s.send, True, []),
2485 ('sendto', s.sendto, False, ["some.address"]),
2486 ('sendall', s.sendall, True, []),
2487 ]
2488 recv_methods = [
2489 ('recv', s.recv, True, []),
2490 ('recvfrom', s.recvfrom, False, ["some.address"]),
2491 ('recv_into', _recv_into, True, []),
2492 ('recvfrom_into', _recvfrom_into, False, []),
2493 ]
2494 data_prefix = u"PREFIX_"
2495
2496 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002497 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002498 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002499 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002500 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002501 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002502 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002503 "While sending with <<{name:s}>> bad data "
2504 "<<{outdata:r}>> ({nout:d}) received; "
2505 "expected <<{indata:r}>> ({nin:d})\n".format(
2506 name=meth_name, outdata=outdata[:20],
2507 nout=len(outdata),
2508 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002509 )
2510 )
2511 except ValueError as e:
2512 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002513 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002514 "Failed to send with method <<{name:s}>>; "
2515 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002516 )
2517 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002518 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002519 "Method <<{name:s}>> failed with unexpected "
2520 "exception message: {exp:s}\n".format(
2521 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002522 )
2523 )
2524
2525 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002526 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002527 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002528 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002529 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002530 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002531 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002532 "While receiving with <<{name:s}>> bad data "
2533 "<<{outdata:r}>> ({nout:d}) received; "
2534 "expected <<{indata:r}>> ({nin:d})\n".format(
2535 name=meth_name, outdata=outdata[:20],
2536 nout=len(outdata),
2537 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002538 )
2539 )
2540 except ValueError as e:
2541 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002542 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002543 "Failed to receive with method <<{name:s}>>; "
2544 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002545 )
2546 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002547 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002548 "Method <<{name:s}>> failed with unexpected "
2549 "exception message: {exp:s}\n".format(
2550 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002551 )
2552 )
2553 # consume data
2554 s.read()
2555
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002556 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002557 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002558
Antoine Pitroufc69af12010-04-24 20:04:58 +00002559 def test_handshake_timeout(self):
2560 # Issue #5103: SSL handshake must respect the socket timeout
2561 server = socket.socket(socket.AF_INET)
2562 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002563 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002564 started = threading.Event()
2565 finish = False
2566
2567 def serve():
2568 server.listen(5)
2569 started.set()
2570 conns = []
2571 while not finish:
2572 r, w, e = select.select([server], [], [], 0.1)
2573 if server in r:
2574 # Let the socket hang around rather than having
2575 # it closed by garbage collection.
2576 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002577 for sock in conns:
2578 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002579
2580 t = threading.Thread(target=serve)
2581 t.start()
2582 started.wait()
2583
2584 try:
2585 try:
2586 c = socket.socket(socket.AF_INET)
2587 c.settimeout(0.2)
2588 c.connect((host, port))
2589 # Will attempt handshake and time out
2590 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2591 ssl.wrap_socket, c)
2592 finally:
2593 c.close()
2594 try:
2595 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002596 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002597 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002598 # Will attempt handshake and time out
2599 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2600 c.connect, (host, port))
2601 finally:
2602 c.close()
2603 finally:
2604 finish = True
2605 t.join()
2606 server.close()
2607
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002608 def test_server_accept(self):
2609 # Issue #16357: accept() on a SSLSocket created through
2610 # SSLContext.wrap_socket().
2611 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2612 context.verify_mode = ssl.CERT_REQUIRED
2613 context.load_verify_locations(CERTFILE)
2614 context.load_cert_chain(CERTFILE)
2615 server = socket.socket(socket.AF_INET)
2616 host = "127.0.0.1"
2617 port = support.bind_port(server)
2618 server = context.wrap_socket(server, server_side=True)
2619
2620 evt = threading.Event()
2621 remote = [None]
2622 peer = [None]
2623 def serve():
2624 server.listen(5)
2625 # Block on the accept and wait on the connection to close.
2626 evt.set()
2627 remote[0], peer[0] = server.accept()
2628 remote[0].recv(1)
2629
2630 t = threading.Thread(target=serve)
2631 t.start()
2632 # Client wait until server setup and perform a connect.
2633 evt.wait()
2634 client = context.wrap_socket(socket.socket())
2635 client.connect((host, port))
2636 client_addr = client.getsockname()
2637 client.close()
2638 t.join()
2639 remote[0].close()
2640 server.close()
2641 # Sanity checks.
2642 self.assertIsInstance(remote[0], ssl.SSLSocket)
2643 self.assertEqual(peer[0], client_addr)
2644
2645 def test_getpeercert_enotconn(self):
2646 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2647 with closing(context.wrap_socket(socket.socket())) as sock:
2648 with self.assertRaises(socket.error) as cm:
2649 sock.getpeercert()
2650 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2651
2652 def test_do_handshake_enotconn(self):
2653 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2654 with closing(context.wrap_socket(socket.socket())) as sock:
2655 with self.assertRaises(socket.error) as cm:
2656 sock.do_handshake()
2657 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2658
Antoine Pitroud76088d2012-01-03 22:46:48 +01002659 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002660 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2661 try:
2662 # Force a set of weak ciphers on our client context
2663 context.set_ciphers("DES")
2664 except ssl.SSLError:
2665 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002666 with ThreadedEchoServer(CERTFILE,
2667 ssl_version=ssl.PROTOCOL_SSLv23,
2668 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002669 with closing(context.wrap_socket(socket.socket())) as s:
2670 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002671 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002672 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2673
Alex Gaynore98205d2014-09-04 13:33:22 -07002674 def test_version_basic(self):
2675 """
2676 Basic tests for SSLSocket.version().
2677 More tests are done in the test_protocol_*() methods.
2678 """
2679 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2680 with ThreadedEchoServer(CERTFILE,
2681 ssl_version=ssl.PROTOCOL_TLSv1,
2682 chatty=False) as server:
2683 with closing(context.wrap_socket(socket.socket())) as s:
2684 self.assertIs(s.version(), None)
2685 s.connect((HOST, server.port))
2686 self.assertEqual(s.version(), "TLSv1")
2687 self.assertIs(s.version(), None)
2688
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002689 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2690 def test_default_ecdh_curve(self):
2691 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2692 # should be enabled by default on SSL contexts.
2693 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2694 context.load_cert_chain(CERTFILE)
2695 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2696 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2697 # our default cipher list should prefer ECDH-based ciphers
2698 # automatically.
2699 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2700 context.set_ciphers("ECCdraft:ECDH")
2701 with ThreadedEchoServer(context=context) as server:
2702 with closing(context.wrap_socket(socket.socket())) as s:
2703 s.connect((HOST, server.port))
2704 self.assertIn("ECDH", s.cipher()[0])
2705
2706 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2707 "'tls-unique' channel binding not available")
2708 def test_tls_unique_channel_binding(self):
2709 """Test tls-unique channel binding."""
2710 if support.verbose:
2711 sys.stdout.write("\n")
2712
2713 server = ThreadedEchoServer(CERTFILE,
2714 certreqs=ssl.CERT_NONE,
2715 ssl_version=ssl.PROTOCOL_TLSv1,
2716 cacerts=CERTFILE,
2717 chatty=True,
2718 connectionchatty=False)
2719 with server:
2720 s = ssl.wrap_socket(socket.socket(),
2721 server_side=False,
2722 certfile=CERTFILE,
2723 ca_certs=CERTFILE,
2724 cert_reqs=ssl.CERT_NONE,
2725 ssl_version=ssl.PROTOCOL_TLSv1)
2726 s.connect((HOST, server.port))
2727 # get the data
2728 cb_data = s.get_channel_binding("tls-unique")
2729 if support.verbose:
2730 sys.stdout.write(" got channel binding data: {0!r}\n"
2731 .format(cb_data))
2732
2733 # check if it is sane
2734 self.assertIsNotNone(cb_data)
2735 self.assertEqual(len(cb_data), 12) # True for TLSv1
2736
2737 # and compare with the peers version
2738 s.write(b"CB tls-unique\n")
2739 peer_data_repr = s.read().strip()
2740 self.assertEqual(peer_data_repr,
2741 repr(cb_data).encode("us-ascii"))
2742 s.close()
2743
2744 # now, again
2745 s = ssl.wrap_socket(socket.socket(),
2746 server_side=False,
2747 certfile=CERTFILE,
2748 ca_certs=CERTFILE,
2749 cert_reqs=ssl.CERT_NONE,
2750 ssl_version=ssl.PROTOCOL_TLSv1)
2751 s.connect((HOST, server.port))
2752 new_cb_data = s.get_channel_binding("tls-unique")
2753 if support.verbose:
2754 sys.stdout.write(" got another channel binding data: {0!r}\n"
2755 .format(new_cb_data))
2756 # is it really unique
2757 self.assertNotEqual(cb_data, new_cb_data)
2758 self.assertIsNotNone(cb_data)
2759 self.assertEqual(len(cb_data), 12) # True for TLSv1
2760 s.write(b"CB tls-unique\n")
2761 peer_data_repr = s.read().strip()
2762 self.assertEqual(peer_data_repr,
2763 repr(new_cb_data).encode("us-ascii"))
2764 s.close()
2765
2766 def test_compression(self):
2767 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2768 context.load_cert_chain(CERTFILE)
2769 stats = server_params_test(context, context,
2770 chatty=True, connectionchatty=True)
2771 if support.verbose:
2772 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2773 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2774
2775 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2776 "ssl.OP_NO_COMPRESSION needed for this test")
2777 def test_compression_disabled(self):
2778 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2779 context.load_cert_chain(CERTFILE)
2780 context.options |= ssl.OP_NO_COMPRESSION
2781 stats = server_params_test(context, context,
2782 chatty=True, connectionchatty=True)
2783 self.assertIs(stats['compression'], None)
2784
2785 def test_dh_params(self):
2786 # Check we can get a connection with ephemeral Diffie-Hellman
2787 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2788 context.load_cert_chain(CERTFILE)
2789 context.load_dh_params(DHFILE)
2790 context.set_ciphers("kEDH")
2791 stats = server_params_test(context, context,
2792 chatty=True, connectionchatty=True)
2793 cipher = stats["cipher"][0]
2794 parts = cipher.split("-")
2795 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2796 self.fail("Non-DH cipher: " + cipher[0])
2797
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002798 def test_selected_alpn_protocol(self):
2799 # selected_alpn_protocol() is None unless ALPN is used.
2800 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2801 context.load_cert_chain(CERTFILE)
2802 stats = server_params_test(context, context,
2803 chatty=True, connectionchatty=True)
2804 self.assertIs(stats['client_alpn_protocol'], None)
2805
2806 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
2807 def test_selected_alpn_protocol_if_server_uses_alpn(self):
2808 # selected_alpn_protocol() is None unless ALPN is used by the client.
2809 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2810 client_context.load_verify_locations(CERTFILE)
2811 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2812 server_context.load_cert_chain(CERTFILE)
2813 server_context.set_alpn_protocols(['foo', 'bar'])
2814 stats = server_params_test(client_context, server_context,
2815 chatty=True, connectionchatty=True)
2816 self.assertIs(stats['client_alpn_protocol'], None)
2817
2818 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
2819 def test_alpn_protocols(self):
2820 server_protocols = ['foo', 'bar', 'milkshake']
2821 protocol_tests = [
2822 (['foo', 'bar'], 'foo'),
Benjamin Petersonaa707582015-01-23 17:30:26 -05002823 (['bar', 'foo'], 'foo'),
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002824 (['milkshake'], 'milkshake'),
Benjamin Petersonaa707582015-01-23 17:30:26 -05002825 (['http/3.0', 'http/4.0'], None)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002826 ]
2827 for client_protocols, expected in protocol_tests:
2828 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2829 server_context.load_cert_chain(CERTFILE)
2830 server_context.set_alpn_protocols(server_protocols)
2831 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2832 client_context.load_cert_chain(CERTFILE)
2833 client_context.set_alpn_protocols(client_protocols)
2834 stats = server_params_test(client_context, server_context,
2835 chatty=True, connectionchatty=True)
2836
2837 msg = "failed trying %s (s) and %s (c).\n" \
2838 "was expecting %s, but got %%s from the %%s" \
2839 % (str(server_protocols), str(client_protocols),
2840 str(expected))
2841 client_result = stats['client_alpn_protocol']
2842 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2843 server_result = stats['server_alpn_protocols'][-1] \
2844 if len(stats['server_alpn_protocols']) else 'nothing'
2845 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2846
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002847 def test_selected_npn_protocol(self):
2848 # selected_npn_protocol() is None unless NPN is used
2849 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2850 context.load_cert_chain(CERTFILE)
2851 stats = server_params_test(context, context,
2852 chatty=True, connectionchatty=True)
2853 self.assertIs(stats['client_npn_protocol'], None)
2854
2855 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2856 def test_npn_protocols(self):
2857 server_protocols = ['http/1.1', 'spdy/2']
2858 protocol_tests = [
2859 (['http/1.1', 'spdy/2'], 'http/1.1'),
2860 (['spdy/2', 'http/1.1'], 'http/1.1'),
2861 (['spdy/2', 'test'], 'spdy/2'),
2862 (['abc', 'def'], 'abc')
2863 ]
2864 for client_protocols, expected in protocol_tests:
2865 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2866 server_context.load_cert_chain(CERTFILE)
2867 server_context.set_npn_protocols(server_protocols)
2868 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2869 client_context.load_cert_chain(CERTFILE)
2870 client_context.set_npn_protocols(client_protocols)
2871 stats = server_params_test(client_context, server_context,
2872 chatty=True, connectionchatty=True)
2873
2874 msg = "failed trying %s (s) and %s (c).\n" \
2875 "was expecting %s, but got %%s from the %%s" \
2876 % (str(server_protocols), str(client_protocols),
2877 str(expected))
2878 client_result = stats['client_npn_protocol']
2879 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2880 server_result = stats['server_npn_protocols'][-1] \
2881 if len(stats['server_npn_protocols']) else 'nothing'
2882 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2883
2884 def sni_contexts(self):
2885 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2886 server_context.load_cert_chain(SIGNED_CERTFILE)
2887 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2888 other_context.load_cert_chain(SIGNED_CERTFILE2)
2889 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2890 client_context.verify_mode = ssl.CERT_REQUIRED
2891 client_context.load_verify_locations(SIGNING_CA)
2892 return server_context, other_context, client_context
2893
2894 def check_common_name(self, stats, name):
2895 cert = stats['peercert']
2896 self.assertIn((('commonName', name),), cert['subject'])
2897
2898 @needs_sni
2899 def test_sni_callback(self):
2900 calls = []
2901 server_context, other_context, client_context = self.sni_contexts()
2902
2903 def servername_cb(ssl_sock, server_name, initial_context):
2904 calls.append((server_name, initial_context))
2905 if server_name is not None:
2906 ssl_sock.context = other_context
2907 server_context.set_servername_callback(servername_cb)
2908
2909 stats = server_params_test(client_context, server_context,
2910 chatty=True,
2911 sni_name='supermessage')
2912 # The hostname was fetched properly, and the certificate was
2913 # changed for the connection.
2914 self.assertEqual(calls, [("supermessage", server_context)])
2915 # CERTFILE4 was selected
2916 self.check_common_name(stats, 'fakehostname')
2917
2918 calls = []
2919 # The callback is called with server_name=None
2920 stats = server_params_test(client_context, server_context,
2921 chatty=True,
2922 sni_name=None)
2923 self.assertEqual(calls, [(None, server_context)])
2924 self.check_common_name(stats, 'localhost')
2925
2926 # Check disabling the callback
2927 calls = []
2928 server_context.set_servername_callback(None)
2929
2930 stats = server_params_test(client_context, server_context,
2931 chatty=True,
2932 sni_name='notfunny')
2933 # Certificate didn't change
2934 self.check_common_name(stats, 'localhost')
2935 self.assertEqual(calls, [])
2936
2937 @needs_sni
2938 def test_sni_callback_alert(self):
2939 # Returning a TLS alert is reflected to the connecting client
2940 server_context, other_context, client_context = self.sni_contexts()
2941
2942 def cb_returning_alert(ssl_sock, server_name, initial_context):
2943 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2944 server_context.set_servername_callback(cb_returning_alert)
2945
2946 with self.assertRaises(ssl.SSLError) as cm:
2947 stats = server_params_test(client_context, server_context,
2948 chatty=False,
2949 sni_name='supermessage')
2950 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2951
2952 @needs_sni
2953 def test_sni_callback_raising(self):
2954 # Raising fails the connection with a TLS handshake failure alert.
2955 server_context, other_context, client_context = self.sni_contexts()
2956
2957 def cb_raising(ssl_sock, server_name, initial_context):
Serhiy Storchaka5312a7f2015-01-31 11:27:06 +02002958 1.0/0.0
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002959 server_context.set_servername_callback(cb_raising)
2960
2961 with self.assertRaises(ssl.SSLError) as cm, \
2962 support.captured_stderr() as stderr:
2963 stats = server_params_test(client_context, server_context,
2964 chatty=False,
2965 sni_name='supermessage')
2966 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2967 self.assertIn("ZeroDivisionError", stderr.getvalue())
2968
2969 @needs_sni
2970 def test_sni_callback_wrong_return_type(self):
2971 # Returning the wrong return type terminates the TLS connection
2972 # with an internal error alert.
2973 server_context, other_context, client_context = self.sni_contexts()
2974
2975 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2976 return "foo"
2977 server_context.set_servername_callback(cb_wrong_return_type)
2978
2979 with self.assertRaises(ssl.SSLError) as cm, \
2980 support.captured_stderr() as stderr:
2981 stats = server_params_test(client_context, server_context,
2982 chatty=False,
2983 sni_name='supermessage')
2984 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2985 self.assertIn("TypeError", stderr.getvalue())
2986
2987 def test_read_write_after_close_raises_valuerror(self):
2988 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2989 context.verify_mode = ssl.CERT_REQUIRED
2990 context.load_verify_locations(CERTFILE)
2991 context.load_cert_chain(CERTFILE)
2992 server = ThreadedEchoServer(context=context, chatty=False)
2993
2994 with server:
2995 s = context.wrap_socket(socket.socket())
2996 s.connect((HOST, server.port))
2997 s.close()
2998
2999 self.assertRaises(ValueError, s.read, 1024)
3000 self.assertRaises(ValueError, s.write, b'hello')
3001
Bill Janssen61c001a2008-09-08 16:37:24 +00003002
Neal Norwitz9eb9b102007-08-27 01:15:33 +00003003def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003004 if support.verbose:
3005 plats = {
3006 'Linux': platform.linux_distribution,
3007 'Mac': platform.mac_ver,
3008 'Windows': platform.win32_ver,
3009 }
3010 for name, func in plats.items():
3011 plat = func()
3012 if plat and plat[0]:
3013 plat = '%s %r' % (name, plat)
3014 break
3015 else:
3016 plat = repr(platform.platform())
3017 print("test_ssl: testing with %r %r" %
3018 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3019 print(" under %s" % plat)
3020 print(" HAS_SNI = %r" % ssl.HAS_SNI)
3021 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3022 try:
3023 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3024 except AttributeError:
3025 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00003026
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003027 for filename in [
3028 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
3029 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
3030 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
3031 BADCERT, BADKEY, EMPTYCERT]:
3032 if not os.path.exists(filename):
3033 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003034
Benjamin Peterson2f334562014-10-01 23:53:01 -04003035 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003036
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003037 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00003038 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00003039
Bill Janssen98d19da2007-09-10 21:51:02 +00003040 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003041 thread_info = support.threading_setup()
3042 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00003043 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003044
Antoine Pitrou3945c862010-04-28 21:11:01 +00003045 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003046 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00003047 finally:
3048 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003049 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003050
3051if __name__ == "__main__":
3052 test_main()