blob: e9e80ee9ced3a9c52b92b70a690bf57f9549324d [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Bill Janssen934b16d2008-06-28 22:19:33 +00007import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import socket
Bill Janssen934b16d2008-06-28 22:19:33 +00009import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000010import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050011import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000012import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import os
Antoine Pitroub558f172010-04-23 23:25:45 +000014import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000015import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050016import tempfile
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -060017import urllib2
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000018import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000019import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000020import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050021import functools
22from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000023
Benjamin Petersondaeb9252014-08-20 14:14:50 -050024ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000025
Benjamin Petersondaeb9252014-08-20 14:14:50 -050026PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
27HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Benjamin Petersondaeb9252014-08-20 14:14:50 -050029def data_file(*name):
30 return os.path.join(os.path.dirname(__file__), *name)
31
32# The custom key and certificate files used in test_ssl are generated
33# using Lib/test/make_ssl_certs.py.
34# Other certificates are simply fetched from the Internet servers they
35# are meant to authenticate.
36
37CERTFILE = data_file("keycert.pem")
38BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
39ONLYCERT = data_file("ssl_cert.pem")
40ONLYKEY = data_file("ssl_key.pem")
41BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
42BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
43CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
44ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
45KEY_PASSWORD = "somepass"
46CAPATH = data_file("capath")
47BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
48CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
49CAFILE_CACERT = data_file("capath", "5ed36f99.0")
50
51
52# empty CRL
53CRLFILE = data_file("revocation.crl")
54
55# Two keys and certs signed by the same CA (for SNI tests)
56SIGNED_CERTFILE = data_file("keycert3.pem")
57SIGNED_CERTFILE2 = data_file("keycert4.pem")
58SIGNING_CA = data_file("pycacert.pem")
59
60SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
61
62EMPTYCERT = data_file("nullcert.pem")
63BADCERT = data_file("badcert.pem")
64WRONGCERT = data_file("XXXnonexisting.pem")
65BADKEY = data_file("badkey.pem")
66NOKIACERT = data_file("nokia.pem")
67NULLBYTECERT = data_file("nullbytecert.pem")
68
69DHFILE = data_file("dh512.pem")
70BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
71
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000072
Neal Norwitz3e533c22007-08-27 01:03:18 +000073def handle_error(prefix):
74 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050075 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000077
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000078
79class BasicTests(unittest.TestCase):
80
Antoine Pitrou3945c862010-04-28 21:11:01 +000081 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000082 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000083 try:
84 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
85 except IOError, e:
86 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
87 pass
88 else:
89 raise
90 try:
91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
92 except IOError, e:
93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94 pass
95 else:
96 raise
Benjamin Peterson2f334562014-10-01 23:53:01 -040097
98
Benjamin Petersondaeb9252014-08-20 14:14:50 -050099def can_clear_options():
100 # 0.9.8m or higher
101 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
102
103def no_sslv2_implies_sslv3_hello():
104 # 0.9.7h or higher
105 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
106
107def have_verify_flags():
108 # 0.9.8 or higher
109 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
110
111def utc_offset(): #NOTE: ignore issues like #1647654
112 # local time = utc time + utc offset
113 if time.daylight and time.localtime().tm_isdst > 0:
114 return -time.altzone # seconds
115 return -time.timezone
116
117def asn1time(cert_time):
118 # Some versions of OpenSSL ignore seconds, see #18207
119 # 0.9.8.i
120 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
121 fmt = "%b %d %H:%M:%S %Y GMT"
122 dt = datetime.datetime.strptime(cert_time, fmt)
123 dt = dt.replace(second=0)
124 cert_time = dt.strftime(fmt)
125 # %d adds leading zero but ASN1_TIME_print() uses leading space
126 if cert_time[4] == "0":
127 cert_time = cert_time[:4] + " " + cert_time[5:]
128
129 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000130
Antoine Pitroud75efd92010-08-04 17:38:33 +0000131# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
132def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200133 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200134 @functools.wraps(func)
135 def f(*args, **kwargs):
136 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500137 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
138 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200139 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500140 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200141 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
142 return func(*args, **kwargs)
143 return f
144 else:
145 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000146
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500147needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
148
Antoine Pitroud75efd92010-08-04 17:38:33 +0000149
150class BasicSocketTests(unittest.TestCase):
151
Antoine Pitrou3945c862010-04-28 21:11:01 +0000152 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000153 ssl.CERT_NONE
154 ssl.CERT_OPTIONAL
155 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500156 ssl.OP_CIPHER_SERVER_PREFERENCE
157 ssl.OP_SINGLE_DH_USE
158 if ssl.HAS_ECDH:
159 ssl.OP_SINGLE_ECDH_USE
160 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
161 ssl.OP_NO_COMPRESSION
162 self.assertIn(ssl.HAS_SNI, {True, False})
163 self.assertIn(ssl.HAS_ECDH, {True, False})
164
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000165
Antoine Pitrou3945c862010-04-28 21:11:01 +0000166 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500168 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000169 sys.stdout.write("\n RAND_status is %d (%s)\n"
170 % (v, (v and "sufficient randomness") or
171 "insufficient randomness"))
Victor Stinner7c906672015-01-06 13:53:37 +0100172 if hasattr(ssl, 'RAND_egd'):
173 self.assertRaises(TypeError, ssl.RAND_egd, 1)
174 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000175 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000176
Antoine Pitrou3945c862010-04-28 21:11:01 +0000177 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000178 # note that this uses an 'unofficial' function in _ssl.c,
179 # provided solely for this test, to exercise the certificate
180 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500181 p = ssl._ssl._test_decode_cert(CERTFILE)
182 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000183 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500184 self.assertEqual(p['issuer'],
185 ((('countryName', 'XY'),),
186 (('localityName', 'Castle Anthrax'),),
187 (('organizationName', 'Python Software Foundation'),),
188 (('commonName', 'localhost'),))
189 )
190 # Note the next three asserts will fail if the keys are regenerated
191 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
192 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
193 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200194 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100195 ((('countryName', 'XY'),),
196 (('localityName', 'Castle Anthrax'),),
197 (('organizationName', 'Python Software Foundation'),),
198 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200199 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100200 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200201 # Issue #13034: the subjectAltName in some certificates
202 # (notably projects.developer.nokia.com:443) wasn't parsed
203 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500204 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200205 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
206 self.assertEqual(p['subjectAltName'],
207 (('DNS', 'projects.developer.nokia.com'),
208 ('DNS', 'projects.forum.nokia.com'))
209 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500210 # extra OCSP and AIA fields
211 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
212 self.assertEqual(p['caIssuers'],
213 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
214 self.assertEqual(p['crlDistributionPoints'],
215 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000216
Christian Heimes88b174c2013-08-17 00:54:47 +0200217 def test_parse_cert_CVE_2013_4238(self):
218 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500219 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200220 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
221 subject = ((('countryName', 'US'),),
222 (('stateOrProvinceName', 'Oregon'),),
223 (('localityName', 'Beaverton'),),
224 (('organizationName', 'Python Software Foundation'),),
225 (('organizationalUnitName', 'Python Core Development'),),
226 (('commonName', 'null.python.org\x00example.org'),),
227 (('emailAddress', 'python-dev@python.org'),))
228 self.assertEqual(p['subject'], subject)
229 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500230 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200231 san = (('DNS', 'altnull.python.org\x00example.com'),
232 ('email', 'null@python.org\x00user@example.org'),
233 ('URI', 'http://null.python.org\x00http://example.org'),
234 ('IP Address', '192.0.2.1'),
235 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
236 else:
237 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
238 san = (('DNS', 'altnull.python.org\x00example.com'),
239 ('email', 'null@python.org\x00user@example.org'),
240 ('URI', 'http://null.python.org\x00http://example.org'),
241 ('IP Address', '192.0.2.1'),
242 ('IP Address', '<invalid>'))
243
244 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200245
Antoine Pitrou3945c862010-04-28 21:11:01 +0000246 def test_DER_to_PEM(self):
247 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
248 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000249 d1 = ssl.PEM_cert_to_DER_cert(pem)
250 p2 = ssl.DER_cert_to_PEM_cert(d1)
251 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000252 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000253 if not p2.startswith(ssl.PEM_HEADER + '\n'):
254 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
255 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
256 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000257
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000258 def test_openssl_version(self):
259 n = ssl.OPENSSL_VERSION_NUMBER
260 t = ssl.OPENSSL_VERSION_INFO
261 s = ssl.OPENSSL_VERSION
262 self.assertIsInstance(n, (int, long))
263 self.assertIsInstance(t, tuple)
264 self.assertIsInstance(s, str)
265 # Some sanity checks follow
266 # >= 0.9
267 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400268 # < 3.0
269 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000270 major, minor, fix, patch, status = t
271 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400272 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000273 self.assertGreaterEqual(minor, 0)
274 self.assertLess(minor, 256)
275 self.assertGreaterEqual(fix, 0)
276 self.assertLess(fix, 256)
277 self.assertGreaterEqual(patch, 0)
Ned Deilyfa119782015-02-05 17:19:11 +1100278 self.assertLessEqual(patch, 63)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000279 self.assertGreaterEqual(status, 0)
280 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400281 # Version string as returned by {Open,Libre}SSL, the format might change
282 if "LibreSSL" in s:
283 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
284 (s, t))
285 else:
286 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
287 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000288
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500289 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000290 def test_refcycle(self):
291 # Issue #7943: an SSL object doesn't create reference cycles with
292 # itself.
293 s = socket.socket(socket.AF_INET)
294 ss = ssl.wrap_socket(s)
295 wr = weakref.ref(ss)
296 del ss
297 self.assertEqual(wr(), None)
298
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000299 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500300 # Methods on an unconnected SSLSocket propagate the original
301 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000302 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500303 with closing(ssl.wrap_socket(s)) as ss:
304 self.assertRaises(socket.error, ss.recv, 1)
305 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
306 self.assertRaises(socket.error, ss.recvfrom, 1)
307 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
308 self.assertRaises(socket.error, ss.send, b'x')
309 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
310
311 def test_timeout(self):
312 # Issue #8524: when creating an SSL socket, the timeout of the
313 # original socket should be retained.
314 for timeout in (None, 0.0, 5.0):
315 s = socket.socket(socket.AF_INET)
316 s.settimeout(timeout)
317 with closing(ssl.wrap_socket(s)) as ss:
318 self.assertEqual(timeout, ss.gettimeout())
319
320 def test_errors(self):
321 sock = socket.socket()
322 self.assertRaisesRegexp(ValueError,
323 "certfile must be specified",
324 ssl.wrap_socket, sock, keyfile=CERTFILE)
325 self.assertRaisesRegexp(ValueError,
326 "certfile must be specified for server-side operations",
327 ssl.wrap_socket, sock, server_side=True)
328 self.assertRaisesRegexp(ValueError,
329 "certfile must be specified for server-side operations",
330 ssl.wrap_socket, sock, server_side=True, certfile="")
331 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
332 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
333 s.connect, (HOST, 8080))
334 with self.assertRaises(IOError) as cm:
335 with closing(socket.socket()) as sock:
336 ssl.wrap_socket(sock, certfile=WRONGCERT)
337 self.assertEqual(cm.exception.errno, errno.ENOENT)
338 with self.assertRaises(IOError) as cm:
339 with closing(socket.socket()) as sock:
340 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
341 self.assertEqual(cm.exception.errno, errno.ENOENT)
342 with self.assertRaises(IOError) as cm:
343 with closing(socket.socket()) as sock:
344 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
345 self.assertEqual(cm.exception.errno, errno.ENOENT)
346
347 def test_match_hostname(self):
348 def ok(cert, hostname):
349 ssl.match_hostname(cert, hostname)
350 def fail(cert, hostname):
351 self.assertRaises(ssl.CertificateError,
352 ssl.match_hostname, cert, hostname)
353
354 cert = {'subject': ((('commonName', 'example.com'),),)}
355 ok(cert, 'example.com')
356 ok(cert, 'ExAmple.cOm')
357 fail(cert, 'www.example.com')
358 fail(cert, '.example.com')
359 fail(cert, 'example.org')
360 fail(cert, 'exampleXcom')
361
362 cert = {'subject': ((('commonName', '*.a.com'),),)}
363 ok(cert, 'foo.a.com')
364 fail(cert, 'bar.foo.a.com')
365 fail(cert, 'a.com')
366 fail(cert, 'Xa.com')
367 fail(cert, '.a.com')
368
369 # only match one left-most wildcard
370 cert = {'subject': ((('commonName', 'f*.com'),),)}
371 ok(cert, 'foo.com')
372 ok(cert, 'f.com')
373 fail(cert, 'bar.com')
374 fail(cert, 'foo.a.com')
375 fail(cert, 'bar.foo.com')
376
377 # NULL bytes are bad, CVE-2013-4073
378 cert = {'subject': ((('commonName',
379 'null.python.org\x00example.org'),),)}
380 ok(cert, 'null.python.org\x00example.org') # or raise an error?
381 fail(cert, 'example.org')
382 fail(cert, 'null.python.org')
383
384 # error cases with wildcards
385 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
386 fail(cert, 'bar.foo.a.com')
387 fail(cert, 'a.com')
388 fail(cert, 'Xa.com')
389 fail(cert, '.a.com')
390
391 cert = {'subject': ((('commonName', 'a.*.com'),),)}
392 fail(cert, 'a.foo.com')
393 fail(cert, 'a..com')
394 fail(cert, 'a.com')
395
396 # wildcard doesn't match IDNA prefix 'xn--'
397 idna = u'püthon.python.org'.encode("idna").decode("ascii")
398 cert = {'subject': ((('commonName', idna),),)}
399 ok(cert, idna)
400 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
401 fail(cert, idna)
402 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
403 fail(cert, idna)
404
405 # wildcard in first fragment and IDNA A-labels in sequent fragments
406 # are supported.
407 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
408 cert = {'subject': ((('commonName', idna),),)}
409 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
410 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
411 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
412 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
413
414 # Slightly fake real-world example
415 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
416 'subject': ((('commonName', 'linuxfrz.org'),),),
417 'subjectAltName': (('DNS', 'linuxfr.org'),
418 ('DNS', 'linuxfr.com'),
419 ('othername', '<unsupported>'))}
420 ok(cert, 'linuxfr.org')
421 ok(cert, 'linuxfr.com')
422 # Not a "DNS" entry
423 fail(cert, '<unsupported>')
424 # When there is a subjectAltName, commonName isn't used
425 fail(cert, 'linuxfrz.org')
426
427 # A pristine real-world example
428 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
429 'subject': ((('countryName', 'US'),),
430 (('stateOrProvinceName', 'California'),),
431 (('localityName', 'Mountain View'),),
432 (('organizationName', 'Google Inc'),),
433 (('commonName', 'mail.google.com'),))}
434 ok(cert, 'mail.google.com')
435 fail(cert, 'gmail.com')
436 # Only commonName is considered
437 fail(cert, 'California')
438
439 # Neither commonName nor subjectAltName
440 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
441 'subject': ((('countryName', 'US'),),
442 (('stateOrProvinceName', 'California'),),
443 (('localityName', 'Mountain View'),),
444 (('organizationName', 'Google Inc'),))}
445 fail(cert, 'mail.google.com')
446
447 # No DNS entry in subjectAltName but a commonName
448 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
449 'subject': ((('countryName', 'US'),),
450 (('stateOrProvinceName', 'California'),),
451 (('localityName', 'Mountain View'),),
452 (('commonName', 'mail.google.com'),)),
453 'subjectAltName': (('othername', 'blabla'), )}
454 ok(cert, 'mail.google.com')
455
456 # No DNS entry subjectAltName and no commonName
457 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
458 'subject': ((('countryName', 'US'),),
459 (('stateOrProvinceName', 'California'),),
460 (('localityName', 'Mountain View'),),
461 (('organizationName', 'Google Inc'),)),
462 'subjectAltName': (('othername', 'blabla'),)}
463 fail(cert, 'google.com')
464
465 # Empty cert / no cert
466 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
467 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
468
469 # Issue #17980: avoid denials of service by refusing more than one
470 # wildcard per fragment.
471 cert = {'subject': ((('commonName', 'a*b.com'),),)}
472 ok(cert, 'axxb.com')
473 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
474 fail(cert, 'axxb.com')
475 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
476 with self.assertRaises(ssl.CertificateError) as cm:
477 ssl.match_hostname(cert, 'axxbxxc.com')
478 self.assertIn("too many wildcards", str(cm.exception))
479
480 def test_server_side(self):
481 # server_hostname doesn't work for server sockets
482 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
483 with closing(socket.socket()) as sock:
484 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
485 server_hostname="some.hostname")
486
487 def test_unknown_channel_binding(self):
488 # should raise ValueError for unknown type
489 s = socket.socket(socket.AF_INET)
490 with closing(ssl.wrap_socket(s)) as ss:
491 with self.assertRaises(ValueError):
492 ss.get_channel_binding("unknown-type")
493
494 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
495 "'tls-unique' channel binding not available")
496 def test_tls_unique_channel_binding(self):
497 # unconnected should return None for known type
498 s = socket.socket(socket.AF_INET)
499 with closing(ssl.wrap_socket(s)) as ss:
500 self.assertIsNone(ss.get_channel_binding("tls-unique"))
501 # the same for server-side
502 s = socket.socket(socket.AF_INET)
503 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
504 self.assertIsNone(ss.get_channel_binding("tls-unique"))
505
506 def test_get_default_verify_paths(self):
507 paths = ssl.get_default_verify_paths()
508 self.assertEqual(len(paths), 6)
509 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
510
511 with support.EnvironmentVarGuard() as env:
512 env["SSL_CERT_DIR"] = CAPATH
513 env["SSL_CERT_FILE"] = CERTFILE
514 paths = ssl.get_default_verify_paths()
515 self.assertEqual(paths.cafile, CERTFILE)
516 self.assertEqual(paths.capath, CAPATH)
517
518 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
519 def test_enum_certificates(self):
520 self.assertTrue(ssl.enum_certificates("CA"))
521 self.assertTrue(ssl.enum_certificates("ROOT"))
522
523 self.assertRaises(TypeError, ssl.enum_certificates)
524 self.assertRaises(WindowsError, ssl.enum_certificates, "")
525
526 trust_oids = set()
527 for storename in ("CA", "ROOT"):
528 store = ssl.enum_certificates(storename)
529 self.assertIsInstance(store, list)
530 for element in store:
531 self.assertIsInstance(element, tuple)
532 self.assertEqual(len(element), 3)
533 cert, enc, trust = element
534 self.assertIsInstance(cert, bytes)
535 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
536 self.assertIsInstance(trust, (set, bool))
537 if isinstance(trust, set):
538 trust_oids.update(trust)
539
540 serverAuth = "1.3.6.1.5.5.7.3.1"
541 self.assertIn(serverAuth, trust_oids)
542
543 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
544 def test_enum_crls(self):
545 self.assertTrue(ssl.enum_crls("CA"))
546 self.assertRaises(TypeError, ssl.enum_crls)
547 self.assertRaises(WindowsError, ssl.enum_crls, "")
548
549 crls = ssl.enum_crls("CA")
550 self.assertIsInstance(crls, list)
551 for element in crls:
552 self.assertIsInstance(element, tuple)
553 self.assertEqual(len(element), 2)
554 self.assertIsInstance(element[0], bytes)
555 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
556
557
558 def test_asn1object(self):
559 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
560 '1.3.6.1.5.5.7.3.1')
561
562 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
563 self.assertEqual(val, expected)
564 self.assertEqual(val.nid, 129)
565 self.assertEqual(val.shortname, 'serverAuth')
566 self.assertEqual(val.longname, 'TLS Web Server Authentication')
567 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
568 self.assertIsInstance(val, ssl._ASN1Object)
569 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
570
571 val = ssl._ASN1Object.fromnid(129)
572 self.assertEqual(val, expected)
573 self.assertIsInstance(val, ssl._ASN1Object)
574 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
575 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
576 ssl._ASN1Object.fromnid(100000)
577 for i in range(1000):
578 try:
579 obj = ssl._ASN1Object.fromnid(i)
580 except ValueError:
581 pass
582 else:
583 self.assertIsInstance(obj.nid, int)
584 self.assertIsInstance(obj.shortname, str)
585 self.assertIsInstance(obj.longname, str)
586 self.assertIsInstance(obj.oid, (str, type(None)))
587
588 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
589 self.assertEqual(val, expected)
590 self.assertIsInstance(val, ssl._ASN1Object)
591 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
592 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
593 expected)
594 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
595 ssl._ASN1Object.fromname('serverauth')
596
597 def test_purpose_enum(self):
598 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
599 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
600 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
601 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
602 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
603 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
604 '1.3.6.1.5.5.7.3.1')
605
606 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
607 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
608 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
609 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
610 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
611 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
612 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000613
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100614 def test_unsupported_dtls(self):
615 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
616 self.addCleanup(s.close)
617 with self.assertRaises(NotImplementedError) as cx:
618 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
619 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500620 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
621 with self.assertRaises(NotImplementedError) as cx:
622 ctx.wrap_socket(s)
623 self.assertEqual(str(cx.exception), "only stream sockets are supported")
624
625 def cert_time_ok(self, timestring, timestamp):
626 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
627
628 def cert_time_fail(self, timestring):
629 with self.assertRaises(ValueError):
630 ssl.cert_time_to_seconds(timestring)
631
632 @unittest.skipUnless(utc_offset(),
633 'local time needs to be different from UTC')
634 def test_cert_time_to_seconds_timezone(self):
635 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
636 # results if local timezone is not UTC
637 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
638 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
639
640 def test_cert_time_to_seconds(self):
641 timestring = "Jan 5 09:34:43 2018 GMT"
642 ts = 1515144883.0
643 self.cert_time_ok(timestring, ts)
644 # accept keyword parameter, assert its name
645 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
646 # accept both %e and %d (space or zero generated by strftime)
647 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
648 # case-insensitive
649 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
650 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
651 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
652 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
653 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
654 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
655 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
656 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
657
658 newyear_ts = 1230768000.0
659 # leap seconds
660 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
661 # same timestamp
662 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
663
664 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
665 # allow 60th second (even if it is not a leap second)
666 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
667 # allow 2nd leap second for compatibility with time.strptime()
668 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
669 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
670
671 # no special treatement for the special value:
672 # 99991231235959Z (rfc 5280)
673 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
674
675 @support.run_with_locale('LC_ALL', '')
676 def test_cert_time_to_seconds_locale(self):
677 # `cert_time_to_seconds()` should be locale independent
678
679 def local_february_name():
680 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
681
682 if local_february_name().lower() == 'feb':
683 self.skipTest("locale-specific month name needs to be "
684 "different from C locale")
685
686 # locale-independent
687 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
688 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
689
690
691class ContextTests(unittest.TestCase):
692
693 @skip_if_broken_ubuntu_ssl
694 def test_constructor(self):
695 for protocol in PROTOCOLS:
696 ssl.SSLContext(protocol)
697 self.assertRaises(TypeError, ssl.SSLContext)
698 self.assertRaises(ValueError, ssl.SSLContext, -1)
699 self.assertRaises(ValueError, ssl.SSLContext, 42)
700
701 @skip_if_broken_ubuntu_ssl
702 def test_protocol(self):
703 for proto in PROTOCOLS:
704 ctx = ssl.SSLContext(proto)
705 self.assertEqual(ctx.protocol, proto)
706
707 def test_ciphers(self):
708 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
709 ctx.set_ciphers("ALL")
710 ctx.set_ciphers("DEFAULT")
711 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
712 ctx.set_ciphers("^$:,;?*'dorothyx")
713
714 @skip_if_broken_ubuntu_ssl
715 def test_options(self):
716 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
717 # OP_ALL | OP_NO_SSLv2 is the default value
718 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
719 ctx.options)
720 ctx.options |= ssl.OP_NO_SSLv3
721 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
722 ctx.options)
723 if can_clear_options():
724 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
725 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
726 ctx.options)
727 ctx.options = 0
728 self.assertEqual(0, ctx.options)
729 else:
730 with self.assertRaises(ValueError):
731 ctx.options = 0
732
733 def test_verify_mode(self):
734 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
735 # Default value
736 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
737 ctx.verify_mode = ssl.CERT_OPTIONAL
738 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
739 ctx.verify_mode = ssl.CERT_REQUIRED
740 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
741 ctx.verify_mode = ssl.CERT_NONE
742 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
743 with self.assertRaises(TypeError):
744 ctx.verify_mode = None
745 with self.assertRaises(ValueError):
746 ctx.verify_mode = 42
747
748 @unittest.skipUnless(have_verify_flags(),
749 "verify_flags need OpenSSL > 0.9.8")
750 def test_verify_flags(self):
751 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
752 # default value by OpenSSL
753 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
754 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
755 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
756 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
757 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
758 ctx.verify_flags = ssl.VERIFY_DEFAULT
759 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
760 # supports any value
761 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
762 self.assertEqual(ctx.verify_flags,
763 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
764 with self.assertRaises(TypeError):
765 ctx.verify_flags = None
766
767 def test_load_cert_chain(self):
768 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
769 # Combined key and cert in a single file
Benjamin Peterson04439fd2014-11-03 21:05:01 -0500770 ctx.load_cert_chain(CERTFILE, keyfile=None)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500771 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
772 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
773 with self.assertRaises(IOError) as cm:
774 ctx.load_cert_chain(WRONGCERT)
775 self.assertEqual(cm.exception.errno, errno.ENOENT)
776 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
777 ctx.load_cert_chain(BADCERT)
778 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
779 ctx.load_cert_chain(EMPTYCERT)
780 # Separate key and cert
781 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
782 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
783 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
784 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
785 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
786 ctx.load_cert_chain(ONLYCERT)
787 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
788 ctx.load_cert_chain(ONLYKEY)
789 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
790 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
791 # Mismatching key and cert
792 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
793 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
794 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
795 # Password protected key and cert
796 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
797 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
798 ctx.load_cert_chain(CERTFILE_PROTECTED,
799 password=bytearray(KEY_PASSWORD.encode()))
800 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
801 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
802 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
803 bytearray(KEY_PASSWORD.encode()))
804 with self.assertRaisesRegexp(TypeError, "should be a string"):
805 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
806 with self.assertRaises(ssl.SSLError):
807 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
808 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
809 # openssl has a fixed limit on the password buffer.
810 # PEM_BUFSIZE is generally set to 1kb.
811 # Return a string larger than this.
812 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
813 # Password callback
814 def getpass_unicode():
815 return KEY_PASSWORD
816 def getpass_bytes():
817 return KEY_PASSWORD.encode()
818 def getpass_bytearray():
819 return bytearray(KEY_PASSWORD.encode())
820 def getpass_badpass():
821 return "badpass"
822 def getpass_huge():
823 return b'a' * (1024 * 1024)
824 def getpass_bad_type():
825 return 9
826 def getpass_exception():
827 raise Exception('getpass error')
828 class GetPassCallable:
829 def __call__(self):
830 return KEY_PASSWORD
831 def getpass(self):
832 return KEY_PASSWORD
833 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
834 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
835 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
836 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
837 ctx.load_cert_chain(CERTFILE_PROTECTED,
838 password=GetPassCallable().getpass)
839 with self.assertRaises(ssl.SSLError):
840 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
841 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
842 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
843 with self.assertRaisesRegexp(TypeError, "must return a string"):
844 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
845 with self.assertRaisesRegexp(Exception, "getpass error"):
846 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
847 # Make sure the password function isn't called if it isn't needed
848 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
849
850 def test_load_verify_locations(self):
851 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
852 ctx.load_verify_locations(CERTFILE)
853 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
854 ctx.load_verify_locations(BYTES_CERTFILE)
855 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400856 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500857 self.assertRaises(TypeError, ctx.load_verify_locations)
858 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
859 with self.assertRaises(IOError) as cm:
860 ctx.load_verify_locations(WRONGCERT)
861 self.assertEqual(cm.exception.errno, errno.ENOENT)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400862 with self.assertRaises(IOError):
863 ctx.load_verify_locations(u'')
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500864 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
865 ctx.load_verify_locations(BADCERT)
866 ctx.load_verify_locations(CERTFILE, CAPATH)
867 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
868
869 # Issue #10989: crash if the second argument type is invalid
870 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
871
872 def test_load_verify_cadata(self):
873 # test cadata
874 with open(CAFILE_CACERT) as f:
875 cacert_pem = f.read().decode("ascii")
876 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
877 with open(CAFILE_NEURONIO) as f:
878 neuronio_pem = f.read().decode("ascii")
879 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
880
881 # test PEM
882 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
883 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
884 ctx.load_verify_locations(cadata=cacert_pem)
885 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
886 ctx.load_verify_locations(cadata=neuronio_pem)
887 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
888 # cert already in hash table
889 ctx.load_verify_locations(cadata=neuronio_pem)
890 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
891
892 # combined
893 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
894 combined = "\n".join((cacert_pem, neuronio_pem))
895 ctx.load_verify_locations(cadata=combined)
896 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
897
898 # with junk around the certs
899 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
900 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
901 neuronio_pem, "tail"]
902 ctx.load_verify_locations(cadata="\n".join(combined))
903 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
904
905 # test DER
906 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
907 ctx.load_verify_locations(cadata=cacert_der)
908 ctx.load_verify_locations(cadata=neuronio_der)
909 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
910 # cert already in hash table
911 ctx.load_verify_locations(cadata=cacert_der)
912 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
913
914 # combined
915 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
916 combined = b"".join((cacert_der, neuronio_der))
917 ctx.load_verify_locations(cadata=combined)
918 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
919
920 # error cases
921 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
922 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
923
924 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
925 ctx.load_verify_locations(cadata=u"broken")
926 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
927 ctx.load_verify_locations(cadata=b"broken")
928
929
930 def test_load_dh_params(self):
931 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
932 ctx.load_dh_params(DHFILE)
933 if os.name != 'nt':
934 ctx.load_dh_params(BYTES_DHFILE)
935 self.assertRaises(TypeError, ctx.load_dh_params)
936 self.assertRaises(TypeError, ctx.load_dh_params, None)
937 with self.assertRaises(IOError) as cm:
938 ctx.load_dh_params(WRONGCERT)
939 self.assertEqual(cm.exception.errno, errno.ENOENT)
940 with self.assertRaises(ssl.SSLError) as cm:
941 ctx.load_dh_params(CERTFILE)
942
943 @skip_if_broken_ubuntu_ssl
944 def test_session_stats(self):
945 for proto in PROTOCOLS:
946 ctx = ssl.SSLContext(proto)
947 self.assertEqual(ctx.session_stats(), {
948 'number': 0,
949 'connect': 0,
950 'connect_good': 0,
951 'connect_renegotiate': 0,
952 'accept': 0,
953 'accept_good': 0,
954 'accept_renegotiate': 0,
955 'hits': 0,
956 'misses': 0,
957 'timeouts': 0,
958 'cache_full': 0,
959 })
960
961 def test_set_default_verify_paths(self):
962 # There's not much we can do to test that it acts as expected,
963 # so just check it doesn't crash or raise an exception.
964 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
965 ctx.set_default_verify_paths()
966
967 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
968 def test_set_ecdh_curve(self):
969 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
970 ctx.set_ecdh_curve("prime256v1")
971 ctx.set_ecdh_curve(b"prime256v1")
972 self.assertRaises(TypeError, ctx.set_ecdh_curve)
973 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
974 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
975 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
976
977 @needs_sni
978 def test_sni_callback(self):
979 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
980
981 # set_servername_callback expects a callable, or None
982 self.assertRaises(TypeError, ctx.set_servername_callback)
983 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
984 self.assertRaises(TypeError, ctx.set_servername_callback, "")
985 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
986
987 def dummycallback(sock, servername, ctx):
988 pass
989 ctx.set_servername_callback(None)
990 ctx.set_servername_callback(dummycallback)
991
992 @needs_sni
993 def test_sni_callback_refcycle(self):
994 # Reference cycles through the servername callback are detected
995 # and cleared.
996 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
997 def dummycallback(sock, servername, ctx, cycle=ctx):
998 pass
999 ctx.set_servername_callback(dummycallback)
1000 wr = weakref.ref(ctx)
1001 del ctx, dummycallback
1002 gc.collect()
1003 self.assertIs(wr(), None)
1004
1005 def test_cert_store_stats(self):
1006 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1007 self.assertEqual(ctx.cert_store_stats(),
1008 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1009 ctx.load_cert_chain(CERTFILE)
1010 self.assertEqual(ctx.cert_store_stats(),
1011 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1012 ctx.load_verify_locations(CERTFILE)
1013 self.assertEqual(ctx.cert_store_stats(),
1014 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1015 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1016 self.assertEqual(ctx.cert_store_stats(),
1017 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1018
1019 def test_get_ca_certs(self):
1020 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1021 self.assertEqual(ctx.get_ca_certs(), [])
1022 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1023 ctx.load_verify_locations(CERTFILE)
1024 self.assertEqual(ctx.get_ca_certs(), [])
1025 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1026 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1027 self.assertEqual(ctx.get_ca_certs(),
1028 [{'issuer': ((('organizationName', 'Root CA'),),
1029 (('organizationalUnitName', 'http://www.cacert.org'),),
1030 (('commonName', 'CA Cert Signing Authority'),),
1031 (('emailAddress', 'support@cacert.org'),)),
1032 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1033 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1034 'serialNumber': '00',
1035 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1036 'subject': ((('organizationName', 'Root CA'),),
1037 (('organizationalUnitName', 'http://www.cacert.org'),),
1038 (('commonName', 'CA Cert Signing Authority'),),
1039 (('emailAddress', 'support@cacert.org'),)),
1040 'version': 3}])
1041
1042 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1043 pem = f.read()
1044 der = ssl.PEM_cert_to_DER_cert(pem)
1045 self.assertEqual(ctx.get_ca_certs(True), [der])
1046
1047 def test_load_default_certs(self):
1048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1049 ctx.load_default_certs()
1050
1051 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1052 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1053 ctx.load_default_certs()
1054
1055 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1056 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1057
1058 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1059 self.assertRaises(TypeError, ctx.load_default_certs, None)
1060 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1061
Benjamin Petersona02ae252014-10-03 18:17:15 -04001062 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson0b30a2b2014-10-03 17:27:05 -04001063 def test_load_default_certs_env(self):
1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1065 with support.EnvironmentVarGuard() as env:
1066 env["SSL_CERT_DIR"] = CAPATH
1067 env["SSL_CERT_FILE"] = CERTFILE
1068 ctx.load_default_certs()
1069 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1070
Benjamin Petersona02ae252014-10-03 18:17:15 -04001071 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1072 def test_load_default_certs_env_windows(self):
1073 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1074 ctx.load_default_certs()
1075 stats = ctx.cert_store_stats()
1076
1077 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1078 with support.EnvironmentVarGuard() as env:
1079 env["SSL_CERT_DIR"] = CAPATH
1080 env["SSL_CERT_FILE"] = CERTFILE
1081 ctx.load_default_certs()
1082 stats["x509"] += 1
1083 self.assertEqual(ctx.cert_store_stats(), stats)
1084
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001085 def test_create_default_context(self):
1086 ctx = ssl.create_default_context()
1087 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1088 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1089 self.assertTrue(ctx.check_hostname)
1090 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1091 self.assertEqual(
1092 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1093 getattr(ssl, "OP_NO_COMPRESSION", 0),
1094 )
1095
1096 with open(SIGNING_CA) as f:
1097 cadata = f.read().decode("ascii")
1098 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1099 cadata=cadata)
1100 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1101 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1102 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1103 self.assertEqual(
1104 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1105 getattr(ssl, "OP_NO_COMPRESSION", 0),
1106 )
1107
1108 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1109 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1110 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1111 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1112 self.assertEqual(
1113 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1114 getattr(ssl, "OP_NO_COMPRESSION", 0),
1115 )
1116 self.assertEqual(
1117 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1118 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1119 )
1120 self.assertEqual(
1121 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1122 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1123 )
1124
1125 def test__create_stdlib_context(self):
1126 ctx = ssl._create_stdlib_context()
1127 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1128 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1129 self.assertFalse(ctx.check_hostname)
1130 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1131
1132 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1133 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1134 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1135 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1136
1137 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1138 cert_reqs=ssl.CERT_REQUIRED,
1139 check_hostname=True)
1140 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1141 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1142 self.assertTrue(ctx.check_hostname)
1143 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1144
1145 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1146 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1147 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1148 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1149
1150 def test_check_hostname(self):
1151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1152 self.assertFalse(ctx.check_hostname)
1153
1154 # Requires CERT_REQUIRED or CERT_OPTIONAL
1155 with self.assertRaises(ValueError):
1156 ctx.check_hostname = True
1157 ctx.verify_mode = ssl.CERT_REQUIRED
1158 self.assertFalse(ctx.check_hostname)
1159 ctx.check_hostname = True
1160 self.assertTrue(ctx.check_hostname)
1161
1162 ctx.verify_mode = ssl.CERT_OPTIONAL
1163 ctx.check_hostname = True
1164 self.assertTrue(ctx.check_hostname)
1165
1166 # Cannot set CERT_NONE with check_hostname enabled
1167 with self.assertRaises(ValueError):
1168 ctx.verify_mode = ssl.CERT_NONE
1169 ctx.check_hostname = False
1170 self.assertFalse(ctx.check_hostname)
1171
1172
1173class SSLErrorTests(unittest.TestCase):
1174
1175 def test_str(self):
1176 # The str() of a SSLError doesn't include the errno
1177 e = ssl.SSLError(1, "foo")
1178 self.assertEqual(str(e), "foo")
1179 self.assertEqual(e.errno, 1)
1180 # Same for a subclass
1181 e = ssl.SSLZeroReturnError(1, "foo")
1182 self.assertEqual(str(e), "foo")
1183 self.assertEqual(e.errno, 1)
1184
1185 def test_lib_reason(self):
1186 # Test the library and reason attributes
1187 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1188 with self.assertRaises(ssl.SSLError) as cm:
1189 ctx.load_dh_params(CERTFILE)
1190 self.assertEqual(cm.exception.library, 'PEM')
1191 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1192 s = str(cm.exception)
1193 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1194
1195 def test_subclass(self):
1196 # Check that the appropriate SSLError subclass is raised
1197 # (this only tests one of them)
1198 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1199 with closing(socket.socket()) as s:
1200 s.bind(("127.0.0.1", 0))
1201 s.listen(5)
1202 c = socket.socket()
1203 c.connect(s.getsockname())
1204 c.setblocking(False)
1205 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1206 with self.assertRaises(ssl.SSLWantReadError) as cm:
1207 c.do_handshake()
1208 s = str(cm.exception)
1209 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1210 # For compatibility
1211 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001212
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001213
Bill Janssen934b16d2008-06-28 22:19:33 +00001214class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001215
Antoine Pitrou3945c862010-04-28 21:11:01 +00001216 def test_connect(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001217 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001218 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1219 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001220 try:
1221 s.connect(("svn.python.org", 443))
1222 self.assertEqual({}, s.getpeercert())
1223 finally:
1224 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001225
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001226 # this should fail because we have no verification certs
1227 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1228 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001229 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1230 s.connect, ("svn.python.org", 443))
1231 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001232
1233 # this should succeed because we specify the root cert
1234 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1235 cert_reqs=ssl.CERT_REQUIRED,
1236 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1237 try:
1238 s.connect(("svn.python.org", 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001239 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001240 finally:
1241 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001242
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001243 def test_connect_ex(self):
1244 # Issue #11326: check connect_ex() implementation
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001245 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001246 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1247 cert_reqs=ssl.CERT_REQUIRED,
1248 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1249 try:
1250 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1251 self.assertTrue(s.getpeercert())
1252 finally:
1253 s.close()
1254
1255 def test_non_blocking_connect_ex(self):
1256 # Issue #11326: non-blocking connect_ex() should allow handshake
1257 # to proceed after the socket gets ready.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001258 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001259 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1260 cert_reqs=ssl.CERT_REQUIRED,
1261 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1262 do_handshake_on_connect=False)
1263 try:
1264 s.setblocking(False)
1265 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001266 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1267 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001268 # Wait for connect to finish
1269 select.select([], [s], [], 5.0)
1270 # Non-blocking handshake
1271 while True:
1272 try:
1273 s.do_handshake()
1274 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001275 except ssl.SSLWantReadError:
1276 select.select([s], [], [], 5.0)
1277 except ssl.SSLWantWriteError:
1278 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001279 # SSL established
1280 self.assertTrue(s.getpeercert())
1281 finally:
1282 s.close()
1283
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001284 def test_timeout_connect_ex(self):
1285 # Issue #12065: on a timeout, connect_ex() should return the original
1286 # errno (mimicking the behaviour of non-SSL sockets).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001287 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001288 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1289 cert_reqs=ssl.CERT_REQUIRED,
1290 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1291 do_handshake_on_connect=False)
1292 try:
1293 s.settimeout(0.0000001)
1294 rc = s.connect_ex(('svn.python.org', 443))
1295 if rc == 0:
1296 self.skipTest("svn.python.org responded too quickly")
1297 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1298 finally:
1299 s.close()
1300
1301 def test_connect_ex_error(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001302 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001303 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1304 cert_reqs=ssl.CERT_REQUIRED,
1305 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1306 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001307 rc = s.connect_ex(("svn.python.org", 444))
1308 # Issue #19919: Windows machines or VMs hosted on Windows
1309 # machines sometimes return EWOULDBLOCK.
1310 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001311 finally:
1312 s.close()
1313
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001314 def test_connect_with_context(self):
1315 with support.transient_internet("svn.python.org"):
1316 # Same as test_connect, but with a separately created context
1317 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1318 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1319 s.connect(("svn.python.org", 443))
1320 try:
1321 self.assertEqual({}, s.getpeercert())
1322 finally:
1323 s.close()
1324 # Same with a server hostname
1325 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1326 server_hostname="svn.python.org")
Benjamin Peterson31aa69e2014-11-23 20:13:31 -06001327 s.connect(("svn.python.org", 443))
1328 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001329 # This should fail because we have no verification certs
1330 ctx.verify_mode = ssl.CERT_REQUIRED
1331 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1332 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1333 s.connect, ("svn.python.org", 443))
1334 s.close()
1335 # This should succeed because we specify the root cert
1336 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1337 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1338 s.connect(("svn.python.org", 443))
1339 try:
1340 cert = s.getpeercert()
1341 self.assertTrue(cert)
1342 finally:
1343 s.close()
1344
1345 def test_connect_capath(self):
1346 # Verify server certificates using the `capath` argument
1347 # NOTE: the subject hashing algorithm has been changed between
1348 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1349 # contain both versions of each certificate (same content, different
1350 # filename) for this test to be portable across OpenSSL releases.
1351 with support.transient_internet("svn.python.org"):
1352 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1353 ctx.verify_mode = ssl.CERT_REQUIRED
1354 ctx.load_verify_locations(capath=CAPATH)
1355 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1356 s.connect(("svn.python.org", 443))
1357 try:
1358 cert = s.getpeercert()
1359 self.assertTrue(cert)
1360 finally:
1361 s.close()
1362 # Same with a bytes `capath` argument
1363 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1364 ctx.verify_mode = ssl.CERT_REQUIRED
1365 ctx.load_verify_locations(capath=BYTES_CAPATH)
1366 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1367 s.connect(("svn.python.org", 443))
1368 try:
1369 cert = s.getpeercert()
1370 self.assertTrue(cert)
1371 finally:
1372 s.close()
1373
1374 def test_connect_cadata(self):
1375 with open(CAFILE_CACERT) as f:
1376 pem = f.read().decode('ascii')
1377 der = ssl.PEM_cert_to_DER_cert(pem)
1378 with support.transient_internet("svn.python.org"):
1379 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1380 ctx.verify_mode = ssl.CERT_REQUIRED
1381 ctx.load_verify_locations(cadata=pem)
1382 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1383 s.connect(("svn.python.org", 443))
1384 cert = s.getpeercert()
1385 self.assertTrue(cert)
1386
1387 # same with DER
1388 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1389 ctx.verify_mode = ssl.CERT_REQUIRED
1390 ctx.load_verify_locations(cadata=der)
1391 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1392 s.connect(("svn.python.org", 443))
1393 cert = s.getpeercert()
1394 self.assertTrue(cert)
1395
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001396 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1397 def test_makefile_close(self):
1398 # Issue #5238: creating a file-like object with makefile() shouldn't
1399 # delay closing the underlying "real socket" (here tested with its
1400 # file descriptor, hence skipping the test under Windows).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001401 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001402 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1403 ss.connect(("svn.python.org", 443))
1404 fd = ss.fileno()
1405 f = ss.makefile()
1406 f.close()
1407 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001408 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001409 # Closing the SSL socket should close the fd too
1410 ss.close()
1411 gc.collect()
1412 with self.assertRaises(OSError) as e:
1413 os.read(fd, 0)
1414 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001415
Antoine Pitrou3945c862010-04-28 21:11:01 +00001416 def test_non_blocking_handshake(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001417 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001418 s = socket.socket(socket.AF_INET)
1419 s.connect(("svn.python.org", 443))
1420 s.setblocking(False)
1421 s = ssl.wrap_socket(s,
1422 cert_reqs=ssl.CERT_NONE,
1423 do_handshake_on_connect=False)
1424 count = 0
1425 while True:
1426 try:
1427 count += 1
1428 s.do_handshake()
1429 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001430 except ssl.SSLWantReadError:
1431 select.select([s], [], [])
1432 except ssl.SSLWantWriteError:
1433 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001434 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001435 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001436 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001437
Antoine Pitrou3945c862010-04-28 21:11:01 +00001438 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001439 def _test_get_server_certificate(host, port, cert=None):
1440 with support.transient_internet(host):
1441 pem = ssl.get_server_certificate((host, port))
1442 if not pem:
1443 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001444
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001445 try:
1446 pem = ssl.get_server_certificate((host, port),
1447 ca_certs=CERTFILE)
1448 except ssl.SSLError as x:
1449 #should fail
1450 if support.verbose:
1451 sys.stdout.write("%s\n" % x)
1452 else:
1453 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001454
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001455 pem = ssl.get_server_certificate((host, port),
1456 ca_certs=cert)
1457 if not pem:
1458 self.fail("No server certificate on %s:%s!" % (host, port))
1459 if support.verbose:
1460 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1461
1462 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1463 if support.IPV6_ENABLED:
1464 _test_get_server_certificate('ipv6.google.com', 443)
1465
1466 def test_ciphers(self):
1467 remote = ("svn.python.org", 443)
1468 with support.transient_internet(remote[0]):
1469 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1470 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1471 s.connect(remote)
1472 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1473 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1474 s.connect(remote)
1475 # Error checking can happen at instantiation or when connecting
1476 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1477 with closing(socket.socket(socket.AF_INET)) as sock:
1478 s = ssl.wrap_socket(sock,
1479 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1480 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001481
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001482 def test_algorithms(self):
1483 # Issue #8484: all algorithms should be available when verifying a
1484 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001485 # SHA256 was added in OpenSSL 0.9.8
1486 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1487 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001488 # sha256.tbs-internet.com needs SNI to use the correct certificate
1489 if not ssl.HAS_SNI:
1490 self.skipTest("SNI needed for this test")
1491 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001492 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001493 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001494 with support.transient_internet("sha256.tbs-internet.com"):
1495 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1496 ctx.verify_mode = ssl.CERT_REQUIRED
1497 ctx.load_verify_locations(sha256_cert)
1498 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1499 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001500 try:
1501 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001502 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001503 sys.stdout.write("\nCipher with %r is %r\n" %
1504 (remote, s.cipher()))
1505 sys.stdout.write("Certificate is:\n%s\n" %
1506 pprint.pformat(s.getpeercert()))
1507 finally:
1508 s.close()
1509
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001510 def test_get_ca_certs_capath(self):
1511 # capath certs are loaded on request
1512 with support.transient_internet("svn.python.org"):
1513 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1514 ctx.verify_mode = ssl.CERT_REQUIRED
1515 ctx.load_verify_locations(capath=CAPATH)
1516 self.assertEqual(ctx.get_ca_certs(), [])
1517 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1518 s.connect(("svn.python.org", 443))
1519 try:
1520 cert = s.getpeercert()
1521 self.assertTrue(cert)
1522 finally:
1523 s.close()
1524 self.assertEqual(len(ctx.get_ca_certs()), 1)
1525
1526 @needs_sni
1527 def test_context_setget(self):
1528 # Check that the context of a connected socket can be replaced.
1529 with support.transient_internet("svn.python.org"):
1530 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1531 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1532 s = socket.socket(socket.AF_INET)
1533 with closing(ctx1.wrap_socket(s)) as ss:
1534 ss.connect(("svn.python.org", 443))
1535 self.assertIs(ss.context, ctx1)
1536 self.assertIs(ss._sslobj.context, ctx1)
1537 ss.context = ctx2
1538 self.assertIs(ss.context, ctx2)
1539 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001540
Bill Janssen98d19da2007-09-10 21:51:02 +00001541try:
1542 import threading
1543except ImportError:
1544 _have_threads = False
1545else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001546 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001547
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001548 from test.ssl_servers import make_https_server
1549
Bill Janssen98d19da2007-09-10 21:51:02 +00001550 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001551
Bill Janssen98d19da2007-09-10 21:51:02 +00001552 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001553
Bill Janssen98d19da2007-09-10 21:51:02 +00001554 """A mildly complicated class, because we want it to work both
1555 with and without the SSL wrapper around the socket connection, so
1556 that we can test the STARTTLS functionality."""
1557
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001558 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001559 self.server = server
1560 self.running = False
1561 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001562 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001563 self.sock.setblocking(1)
1564 self.sslconn = None
1565 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001566 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001567
Antoine Pitrou3945c862010-04-28 21:11:01 +00001568 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001569 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001570 self.sslconn = self.server.context.wrap_socket(
1571 self.sock, server_side=True)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001572 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1573 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001574 except socket.error as e:
1575 # We treat ConnectionResetError as though it were an
1576 # SSLError - OpenSSL on Ubuntu abruptly closes the
1577 # connection when asked to use an unsupported protocol.
1578 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001579 # XXX Various errors can have happened here, for example
1580 # a mismatching protocol version, an invalid certificate,
1581 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001582 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1583 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001584 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001585 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001586 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001587 self.running = False
1588 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001589 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001590 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001591 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001592 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1593 cert = self.sslconn.getpeercert()
1594 if support.verbose and self.server.chatty:
1595 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1596 cert_binary = self.sslconn.getpeercert(True)
1597 if support.verbose and self.server.chatty:
1598 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1599 cipher = self.sslconn.cipher()
1600 if support.verbose and self.server.chatty:
1601 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1602 sys.stdout.write(" server: selected protocol is now "
1603 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001604 return True
1605
1606 def read(self):
1607 if self.sslconn:
1608 return self.sslconn.read()
1609 else:
1610 return self.sock.recv(1024)
1611
1612 def write(self, bytes):
1613 if self.sslconn:
1614 return self.sslconn.write(bytes)
1615 else:
1616 return self.sock.send(bytes)
1617
1618 def close(self):
1619 if self.sslconn:
1620 self.sslconn.close()
1621 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001622 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001623
Antoine Pitrou3945c862010-04-28 21:11:01 +00001624 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001625 self.running = True
1626 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001627 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001628 return
1629 while self.running:
1630 try:
1631 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001632 stripped = msg.strip()
1633 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001634 # eof, so quit this handler
1635 self.running = False
1636 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001637 elif stripped == b'over':
1638 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001639 sys.stdout.write(" server: client closed connection\n")
1640 self.close()
1641 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001642 elif (self.server.starttls_server and
1643 stripped == b'STARTTLS'):
1644 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001645 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001646 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001647 if not self.wrap_conn():
1648 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001649 elif (self.server.starttls_server and self.sslconn
1650 and stripped == b'ENDTLS'):
1651 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001652 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001653 self.write(b"OK\n")
1654 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001655 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001656 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001657 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001658 elif stripped == b'CB tls-unique':
1659 if support.verbose and self.server.connectionchatty:
1660 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1661 data = self.sslconn.get_channel_binding("tls-unique")
1662 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001663 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001664 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001665 self.server.connectionchatty):
1666 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001667 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1668 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001669 self.write(msg.lower())
1670 except ssl.SSLError:
1671 if self.server.chatty:
1672 handle_error("Test server failure:\n")
1673 self.close()
1674 self.running = False
1675 # normally, we'd just stop here, but for the test
1676 # harness, we want to stop the server
1677 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001678
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001679 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001680 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001681 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001682 npn_protocols=None, alpn_protocols=None,
1683 ciphers=None, context=None):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001684 if context:
1685 self.context = context
1686 else:
1687 self.context = ssl.SSLContext(ssl_version
1688 if ssl_version is not None
1689 else ssl.PROTOCOL_TLSv1)
1690 self.context.verify_mode = (certreqs if certreqs is not None
1691 else ssl.CERT_NONE)
1692 if cacerts:
1693 self.context.load_verify_locations(cacerts)
1694 if certificate:
1695 self.context.load_cert_chain(certificate)
1696 if npn_protocols:
1697 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001698 if alpn_protocols:
1699 self.context.set_alpn_protocols(alpn_protocols)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001700 if ciphers:
1701 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001702 self.chatty = chatty
1703 self.connectionchatty = connectionchatty
1704 self.starttls_server = starttls_server
1705 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001706 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001707 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001708 self.active = False
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001709 self.selected_npn_protocols = []
1710 self.selected_alpn_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001711 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001712 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001713 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001714
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001715 def __enter__(self):
1716 self.start(threading.Event())
1717 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001718 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001719
1720 def __exit__(self, *args):
1721 self.stop()
1722 self.join()
1723
Antoine Pitrou3945c862010-04-28 21:11:01 +00001724 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001725 self.flag = flag
1726 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001727
Antoine Pitrou3945c862010-04-28 21:11:01 +00001728 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001729 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001730 self.sock.listen(5)
1731 self.active = True
1732 if self.flag:
1733 # signal an event
1734 self.flag.set()
1735 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001736 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001737 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001738 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001739 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001740 + repr(connaddr) + '\n')
1741 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001742 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001743 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001744 except socket.timeout:
1745 pass
1746 except KeyboardInterrupt:
1747 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001748 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001749
Antoine Pitrou3945c862010-04-28 21:11:01 +00001750 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001751 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001752
Bill Janssen934b16d2008-06-28 22:19:33 +00001753 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001754
Antoine Pitrou3945c862010-04-28 21:11:01 +00001755 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001756
Antoine Pitrou3945c862010-04-28 21:11:01 +00001757 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001758
1759 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001760 self.socket = ssl.wrap_socket(conn, server_side=True,
1761 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001762 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001763 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001764 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001765 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001766
1767 def readable(self):
1768 if isinstance(self.socket, ssl.SSLSocket):
1769 while self.socket.pending() > 0:
1770 self.handle_read_event()
1771 return True
1772
Antoine Pitroufc69af12010-04-24 20:04:58 +00001773 def _do_ssl_handshake(self):
1774 try:
1775 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001776 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1777 return
1778 except ssl.SSLEOFError:
1779 return self.handle_close()
1780 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001781 raise
1782 except socket.error, err:
1783 if err.args[0] == errno.ECONNABORTED:
1784 return self.handle_close()
1785 else:
1786 self._ssl_accepting = False
1787
Bill Janssen934b16d2008-06-28 22:19:33 +00001788 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001789 if self._ssl_accepting:
1790 self._do_ssl_handshake()
1791 else:
1792 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001793 if support.verbose:
1794 sys.stdout.write(" server: read %s from client\n" % repr(data))
1795 if not data:
1796 self.close()
1797 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001798 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001799
1800 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001801 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001802 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001803 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1804
1805 def handle_error(self):
1806 raise
1807
1808 def __init__(self, certfile):
1809 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001810 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1811 self.port = support.bind_port(sock, '')
1812 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001813 self.listen(5)
1814
1815 def handle_accept(self):
1816 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001817 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001818 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1819 self.ConnectionHandler(sock_obj, self.certfile)
1820
1821 def handle_error(self):
1822 raise
1823
1824 def __init__(self, certfile):
1825 self.flag = None
1826 self.active = False
1827 self.server = self.EchoServer(certfile)
1828 self.port = self.server.port
1829 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001830 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001831
1832 def __str__(self):
1833 return "<%s %s>" % (self.__class__.__name__, self.server)
1834
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001835 def __enter__(self):
1836 self.start(threading.Event())
1837 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001838 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001839
1840 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001841 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001842 sys.stdout.write(" cleanup: stopping server.\n")
1843 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001844 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001845 sys.stdout.write(" cleanup: joining server thread.\n")
1846 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001847 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001848 sys.stdout.write(" cleanup: successfully joined.\n")
1849
Antoine Pitrou3945c862010-04-28 21:11:01 +00001850 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001851 self.flag = flag
1852 threading.Thread.start(self)
1853
Antoine Pitrou3945c862010-04-28 21:11:01 +00001854 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001855 self.active = True
1856 if self.flag:
1857 self.flag.set()
1858 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001859 try:
1860 asyncore.loop(1)
1861 except:
1862 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001863
Antoine Pitrou3945c862010-04-28 21:11:01 +00001864 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001865 self.active = False
1866 self.server.close()
1867
Antoine Pitrou3945c862010-04-28 21:11:01 +00001868 def bad_cert_test(certfile):
1869 """
1870 Launch a server with CERT_REQUIRED, and check that trying to
1871 connect to it with the given client certificate fails.
1872 """
Trent Nelsone41b0062008-04-08 23:47:30 +00001873 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00001874 certreqs=ssl.CERT_REQUIRED,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001875 cacerts=CERTFILE, chatty=False,
1876 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001877 with server:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001878 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001879 with closing(socket.socket()) as sock:
1880 s = ssl.wrap_socket(sock,
1881 certfile=certfile,
1882 ssl_version=ssl.PROTOCOL_TLSv1)
1883 s.connect((HOST, server.port))
1884 except ssl.SSLError as x:
1885 if support.verbose:
1886 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1887 except OSError as x:
1888 if support.verbose:
1889 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1890 except OSError as x:
1891 if x.errno != errno.ENOENT:
1892 raise
1893 if support.verbose:
1894 sys.stdout.write("\OSError is %s\n" % str(x))
Bill Janssen98d19da2007-09-10 21:51:02 +00001895 else:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001896 raise AssertionError("Use of invalid cert should have failed!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001897
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001898 def server_params_test(client_context, server_context, indata=b"FOO\n",
1899 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001900 """
1901 Launch a server, connect a client to it and try various reads
1902 and writes.
1903 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001904 stats = {}
1905 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001906 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001907 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001908 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001909 with closing(client_context.wrap_socket(socket.socket(),
1910 server_hostname=sni_name)) as s:
1911 s.connect((HOST, server.port))
1912 for arg in [indata, bytearray(indata), memoryview(indata)]:
1913 if connectionchatty:
1914 if support.verbose:
1915 sys.stdout.write(
1916 " client: sending %r...\n" % indata)
1917 s.write(arg)
1918 outdata = s.read()
1919 if connectionchatty:
1920 if support.verbose:
1921 sys.stdout.write(" client: read %r\n" % outdata)
1922 if outdata != indata.lower():
1923 raise AssertionError(
1924 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1925 % (outdata[:20], len(outdata),
1926 indata[:20].lower(), len(indata)))
1927 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001928 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001929 if support.verbose:
1930 sys.stdout.write(" client: closing connection.\n")
1931 stats.update({
1932 'compression': s.compression(),
1933 'cipher': s.cipher(),
1934 'peercert': s.getpeercert(),
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001935 'client_alpn_protocol': s.selected_alpn_protocol(),
Alex Gaynore98205d2014-09-04 13:33:22 -07001936 'client_npn_protocol': s.selected_npn_protocol(),
1937 'version': s.version(),
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001938 })
1939 s.close()
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001940 stats['server_alpn_protocols'] = server.selected_alpn_protocols
1941 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001942 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001943
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001944 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1945 certsreqs=None, server_options=0, client_options=0):
Alex Gaynore98205d2014-09-04 13:33:22 -07001946 """
1947 Try to SSL-connect using *client_protocol* to *server_protocol*.
1948 If *expect_success* is true, assert that the connection succeeds,
1949 if it's false, assert that the connection fails.
1950 Also, if *expect_success* is a string, assert that it is the protocol
1951 version actually used by the connection.
1952 """
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00001953 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00001954 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00001955 certtype = {
1956 ssl.CERT_NONE: "CERT_NONE",
1957 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1958 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1959 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001960 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001961 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00001962 sys.stdout.write(formatstr %
1963 (ssl.get_protocol_name(client_protocol),
1964 ssl.get_protocol_name(server_protocol),
1965 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001966 client_context = ssl.SSLContext(client_protocol)
1967 client_context.options |= client_options
1968 server_context = ssl.SSLContext(server_protocol)
1969 server_context.options |= server_options
1970
1971 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1972 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1973 # starting from OpenSSL 1.0.0 (see issue #8322).
1974 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1975 client_context.set_ciphers("ALL")
1976
1977 for ctx in (client_context, server_context):
1978 ctx.verify_mode = certsreqs
1979 ctx.load_cert_chain(CERTFILE)
1980 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00001981 try:
Alex Gaynore98205d2014-09-04 13:33:22 -07001982 stats = server_params_test(client_context, server_context,
1983 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001984 # Protocol mismatch can result in either an SSLError, or a
1985 # "Connection reset by peer" error.
1986 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001987 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00001988 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00001989 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001990 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00001991 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00001992 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001993 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001994 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00001995 "Client protocol %s succeeded with server protocol %s!"
1996 % (ssl.get_protocol_name(client_protocol),
1997 ssl.get_protocol_name(server_protocol)))
Alex Gaynore98205d2014-09-04 13:33:22 -07001998 elif (expect_success is not True
1999 and expect_success != stats['version']):
2000 raise AssertionError("version mismatch: expected %r, got %r"
2001 % (expect_success, stats['version']))
Bill Janssen98d19da2007-09-10 21:51:02 +00002002
2003
Bill Janssen934b16d2008-06-28 22:19:33 +00002004 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00002005
Antoine Pitroud75efd92010-08-04 17:38:33 +00002006 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002007 def test_echo(self):
2008 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002009 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002010 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002011 for protocol in PROTOCOLS:
2012 context = ssl.SSLContext(protocol)
2013 context.load_cert_chain(CERTFILE)
2014 server_params_test(context, context,
2015 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002016
Antoine Pitrou3945c862010-04-28 21:11:01 +00002017 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002018 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002019 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002020 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2021 context.verify_mode = ssl.CERT_REQUIRED
2022 context.load_verify_locations(CERTFILE)
2023 context.load_cert_chain(CERTFILE)
2024 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002025 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002026 s = context.wrap_socket(socket.socket(),
2027 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00002028 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002029 # getpeercert() raise ValueError while the handshake isn't
2030 # done.
2031 with self.assertRaises(ValueError):
2032 s.getpeercert()
2033 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00002034 cert = s.getpeercert()
2035 self.assertTrue(cert, "Can't get peer certificate.")
2036 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002037 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002038 sys.stdout.write(pprint.pformat(cert) + '\n')
2039 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2040 if 'subject' not in cert:
2041 self.fail("No subject field in certificate: %s." %
2042 pprint.pformat(cert))
2043 if ((('organizationName', 'Python Software Foundation'),)
2044 not in cert['subject']):
2045 self.fail(
2046 "Missing or invalid 'organizationName' field in certificate subject; "
2047 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002048 self.assertIn('notBefore', cert)
2049 self.assertIn('notAfter', cert)
2050 before = ssl.cert_time_to_seconds(cert['notBefore'])
2051 after = ssl.cert_time_to_seconds(cert['notAfter'])
2052 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002053 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002054
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002055 @unittest.skipUnless(have_verify_flags(),
2056 "verify_flags need OpenSSL > 0.9.8")
2057 def test_crl_check(self):
2058 if support.verbose:
2059 sys.stdout.write("\n")
2060
2061 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2062 server_context.load_cert_chain(SIGNED_CERTFILE)
2063
2064 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2065 context.verify_mode = ssl.CERT_REQUIRED
2066 context.load_verify_locations(SIGNING_CA)
2067 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
2068
2069 # VERIFY_DEFAULT should pass
2070 server = ThreadedEchoServer(context=server_context, chatty=True)
2071 with server:
2072 with closing(context.wrap_socket(socket.socket())) as s:
2073 s.connect((HOST, server.port))
2074 cert = s.getpeercert()
2075 self.assertTrue(cert, "Can't get peer certificate.")
2076
2077 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2078 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2079
2080 server = ThreadedEchoServer(context=server_context, chatty=True)
2081 with server:
2082 with closing(context.wrap_socket(socket.socket())) as s:
2083 with self.assertRaisesRegexp(ssl.SSLError,
2084 "certificate verify failed"):
2085 s.connect((HOST, server.port))
2086
2087 # now load a CRL file. The CRL file is signed by the CA.
2088 context.load_verify_locations(CRLFILE)
2089
2090 server = ThreadedEchoServer(context=server_context, chatty=True)
2091 with server:
2092 with closing(context.wrap_socket(socket.socket())) as s:
2093 s.connect((HOST, server.port))
2094 cert = s.getpeercert()
2095 self.assertTrue(cert, "Can't get peer certificate.")
2096
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002097 def test_check_hostname(self):
2098 if support.verbose:
2099 sys.stdout.write("\n")
2100
2101 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2102 server_context.load_cert_chain(SIGNED_CERTFILE)
2103
2104 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2105 context.verify_mode = ssl.CERT_REQUIRED
2106 context.check_hostname = True
2107 context.load_verify_locations(SIGNING_CA)
2108
2109 # correct hostname should verify
2110 server = ThreadedEchoServer(context=server_context, chatty=True)
2111 with server:
2112 with closing(context.wrap_socket(socket.socket(),
2113 server_hostname="localhost")) as s:
2114 s.connect((HOST, server.port))
2115 cert = s.getpeercert()
2116 self.assertTrue(cert, "Can't get peer certificate.")
2117
2118 # incorrect hostname should raise an exception
2119 server = ThreadedEchoServer(context=server_context, chatty=True)
2120 with server:
2121 with closing(context.wrap_socket(socket.socket(),
2122 server_hostname="invalid")) as s:
2123 with self.assertRaisesRegexp(ssl.CertificateError,
2124 "hostname 'invalid' doesn't match u?'localhost'"):
2125 s.connect((HOST, server.port))
2126
2127 # missing server_hostname arg should cause an exception, too
2128 server = ThreadedEchoServer(context=server_context, chatty=True)
2129 with server:
2130 with closing(socket.socket()) as s:
2131 with self.assertRaisesRegexp(ValueError,
2132 "check_hostname requires server_hostname"):
2133 context.wrap_socket(s)
2134
Antoine Pitrou3945c862010-04-28 21:11:01 +00002135 def test_empty_cert(self):
2136 """Connecting with an empty cert file"""
2137 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2138 "nullcert.pem"))
2139 def test_malformed_cert(self):
2140 """Connecting with a badly formatted certificate (syntax error)"""
2141 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2142 "badcert.pem"))
2143 def test_nonexisting_cert(self):
2144 """Connecting with a non-existing cert file"""
2145 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2146 "wrongcert.pem"))
2147 def test_malformed_key(self):
2148 """Connecting with a badly formatted key (syntax error)"""
2149 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2150 "badkey.pem"))
Bill Janssen98d19da2007-09-10 21:51:02 +00002151
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002152 def test_rude_shutdown(self):
2153 """A brutal shutdown of an SSL server should raise an OSError
2154 in the client when attempting handshake.
2155 """
2156 listener_ready = threading.Event()
2157 listener_gone = threading.Event()
2158
2159 s = socket.socket()
2160 port = support.bind_port(s, HOST)
2161
2162 # `listener` runs in a thread. It sits in an accept() until
2163 # the main thread connects. Then it rudely closes the socket,
2164 # and sets Event `listener_gone` to let the main thread know
2165 # the socket is gone.
2166 def listener():
2167 s.listen(5)
2168 listener_ready.set()
2169 newsock, addr = s.accept()
2170 newsock.close()
2171 s.close()
2172 listener_gone.set()
2173
2174 def connector():
2175 listener_ready.wait()
2176 with closing(socket.socket()) as c:
2177 c.connect((HOST, port))
2178 listener_gone.wait()
2179 try:
2180 ssl_sock = ssl.wrap_socket(c)
Benjamin Petersone208b572014-08-20 14:49:08 -05002181 except socket.error:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002182 pass
2183 else:
2184 self.fail('connecting to closed SSL socket should have failed')
2185
2186 t = threading.Thread(target=listener)
2187 t.start()
2188 try:
2189 connector()
2190 finally:
2191 t.join()
2192
Antoine Pitroud75efd92010-08-04 17:38:33 +00002193 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002194 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2195 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002196 def test_protocol_sslv2(self):
2197 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002198 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002199 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002200 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2201 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2202 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002203 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002204 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2205 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002206 # SSLv23 client with specific SSL options
2207 if no_sslv2_implies_sslv3_hello():
2208 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2209 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2210 client_options=ssl.OP_NO_SSLv2)
2211 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2212 client_options=ssl.OP_NO_SSLv3)
2213 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2214 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002215
Antoine Pitroud75efd92010-08-04 17:38:33 +00002216 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002217 def test_protocol_sslv23(self):
2218 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002219 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002220 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002221 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2222 try:
2223 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2224 except socket.error as x:
2225 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2226 if support.verbose:
2227 sys.stdout.write(
2228 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2229 % str(x))
Benjamin Peterson60766c42014-12-05 21:59:35 -05002230 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2231 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou3945c862010-04-28 21:11:01 +00002232 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Alex Gaynore98205d2014-09-04 13:33:22 -07002233 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Bill Janssen98d19da2007-09-10 21:51:02 +00002234
Benjamin Peterson60766c42014-12-05 21:59:35 -05002235 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2236 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002237 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Alex Gaynore98205d2014-09-04 13:33:22 -07002238 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002239
Benjamin Peterson60766c42014-12-05 21:59:35 -05002240 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2241 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002242 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Alex Gaynore98205d2014-09-04 13:33:22 -07002243 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002244
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002245 # Server with specific SSL options
Benjamin Peterson60766c42014-12-05 21:59:35 -05002246 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2247 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002248 server_options=ssl.OP_NO_SSLv3)
2249 # Will choose TLSv1
2250 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2251 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2252 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2253 server_options=ssl.OP_NO_TLSv1)
2254
2255
Antoine Pitroud75efd92010-08-04 17:38:33 +00002256 @skip_if_broken_ubuntu_ssl
Benjamin Peterson60766c42014-12-05 21:59:35 -05002257 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2258 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002259 def test_protocol_sslv3(self):
2260 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002261 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002262 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002263 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2264 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2265 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002266 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2267 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002268 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2269 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002270 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002271 if no_sslv2_implies_sslv3_hello():
2272 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Alex Gaynore98205d2014-09-04 13:33:22 -07002273 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002274 client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002275
Antoine Pitroud75efd92010-08-04 17:38:33 +00002276 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002277 def test_protocol_tlsv1(self):
2278 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002279 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002280 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002281 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2282 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2283 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002284 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2285 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002286 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2287 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002288 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2289 client_options=ssl.OP_NO_TLSv1)
2290
2291 @skip_if_broken_ubuntu_ssl
2292 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2293 "TLS version 1.1 not supported.")
2294 def test_protocol_tlsv1_1(self):
2295 """Connecting to a TLSv1.1 server with various client options.
2296 Testing against older TLS versions."""
2297 if support.verbose:
2298 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002299 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002300 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2301 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002302 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2303 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002304 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2305 client_options=ssl.OP_NO_TLSv1_1)
2306
Alex Gaynore98205d2014-09-04 13:33:22 -07002307 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002308 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2309 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2310
2311
2312 @skip_if_broken_ubuntu_ssl
2313 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2314 "TLS version 1.2 not supported.")
2315 def test_protocol_tlsv1_2(self):
2316 """Connecting to a TLSv1.2 server with various client options.
2317 Testing against older TLS versions."""
2318 if support.verbose:
2319 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002320 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002321 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2322 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2323 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2324 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002325 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2326 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002327 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2328 client_options=ssl.OP_NO_TLSv1_2)
2329
Alex Gaynore98205d2014-09-04 13:33:22 -07002330 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002331 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2332 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2333 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2334 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002335
Antoine Pitrou3945c862010-04-28 21:11:01 +00002336 def test_starttls(self):
2337 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002338 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002339
Trent Nelsone41b0062008-04-08 23:47:30 +00002340 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002341 ssl_version=ssl.PROTOCOL_TLSv1,
2342 starttls_server=True,
2343 chatty=True,
2344 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002345 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002346 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002347 s = socket.socket()
2348 s.setblocking(1)
2349 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002350 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002351 sys.stdout.write("\n")
2352 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002353 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002354 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002355 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002356 if wrapped:
2357 conn.write(indata)
2358 outdata = conn.read()
2359 else:
2360 s.send(indata)
2361 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002362 msg = outdata.strip().lower()
2363 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002364 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002365 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002366 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002367 " client: read %r from server, starting TLS...\n"
2368 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002369 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2370 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002371 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002372 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002373 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002374 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002375 " client: read %r from server, ending TLS...\n"
2376 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002377 s = conn.unwrap()
2378 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002379 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002380 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002381 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002382 " client: read %r from server\n" % msg)
2383 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002384 sys.stdout.write(" client: closing connection.\n")
2385 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002386 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002387 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002388 s.send(b"over\n")
2389 if wrapped:
2390 conn.close()
2391 else:
2392 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002393
Antoine Pitrou3945c862010-04-28 21:11:01 +00002394 def test_socketserver(self):
2395 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002396 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002397 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002398 if support.verbose:
2399 sys.stdout.write('\n')
2400 with open(CERTFILE, 'rb') as f:
2401 d1 = f.read()
2402 d2 = ''
2403 # now fetch the same data from the HTTPS server
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -06002404 url = 'https://localhost:%d/%s' % (
2405 server.port, os.path.split(CERTFILE)[1])
2406 context = ssl.create_default_context(cafile=CERTFILE)
Benjamin Petersonb0609ec2014-11-23 11:52:46 -06002407 f = urllib2.urlopen(url, context=context)
Bill Janssen296a59d2007-09-16 22:06:00 +00002408 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002409 dlen = f.info().getheader("content-length")
2410 if dlen and (int(dlen) > 0):
2411 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002412 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002413 sys.stdout.write(
2414 " client: read %d bytes from remote server '%s'\n"
2415 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002416 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002417 f.close()
2418 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002419
Antoine Pitrou3945c862010-04-28 21:11:01 +00002420 def test_asyncore_server(self):
2421 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002422 indata = "TEST MESSAGE of mixed case\n"
2423
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002424 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002425 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002426
2427 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002428 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002429 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002430 s = ssl.wrap_socket(socket.socket())
2431 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002432 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002433 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002434 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002435 s.write(indata)
2436 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002437 if support.verbose:
2438 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002439 if outdata != indata.lower():
2440 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002441 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2442 % (outdata[:20], len(outdata),
2443 indata[:20].lower(), len(indata)))
2444 s.write(b"over\n")
2445 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002446 sys.stdout.write(" client: closing connection.\n")
2447 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002448 if support.verbose:
2449 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002450
Antoine Pitrou3945c862010-04-28 21:11:01 +00002451 def test_recv_send(self):
2452 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002453 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002454 sys.stdout.write("\n")
2455
2456 server = ThreadedEchoServer(CERTFILE,
2457 certreqs=ssl.CERT_NONE,
2458 ssl_version=ssl.PROTOCOL_TLSv1,
2459 cacerts=CERTFILE,
2460 chatty=True,
2461 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002462 with server:
2463 s = ssl.wrap_socket(socket.socket(),
2464 server_side=False,
2465 certfile=CERTFILE,
2466 ca_certs=CERTFILE,
2467 cert_reqs=ssl.CERT_NONE,
2468 ssl_version=ssl.PROTOCOL_TLSv1)
2469 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002470 # helper methods for standardising recv* method signatures
2471 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002472 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002473 count = s.recv_into(b)
2474 return b[:count]
2475
2476 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002477 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002478 count, addr = s.recvfrom_into(b)
2479 return b[:count]
2480
2481 # (name, method, whether to expect success, *args)
2482 send_methods = [
2483 ('send', s.send, True, []),
2484 ('sendto', s.sendto, False, ["some.address"]),
2485 ('sendall', s.sendall, True, []),
2486 ]
2487 recv_methods = [
2488 ('recv', s.recv, True, []),
2489 ('recvfrom', s.recvfrom, False, ["some.address"]),
2490 ('recv_into', _recv_into, True, []),
2491 ('recvfrom_into', _recvfrom_into, False, []),
2492 ]
2493 data_prefix = u"PREFIX_"
2494
2495 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002496 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002497 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002498 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002499 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002500 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002501 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002502 "While sending with <<{name:s}>> bad data "
2503 "<<{outdata:r}>> ({nout:d}) received; "
2504 "expected <<{indata:r}>> ({nin:d})\n".format(
2505 name=meth_name, outdata=outdata[:20],
2506 nout=len(outdata),
2507 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002508 )
2509 )
2510 except ValueError as e:
2511 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002512 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002513 "Failed to send with method <<{name:s}>>; "
2514 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002515 )
2516 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002517 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002518 "Method <<{name:s}>> failed with unexpected "
2519 "exception message: {exp:s}\n".format(
2520 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002521 )
2522 )
2523
2524 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002525 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002526 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002527 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002528 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002529 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002530 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002531 "While receiving with <<{name:s}>> bad data "
2532 "<<{outdata:r}>> ({nout:d}) received; "
2533 "expected <<{indata:r}>> ({nin:d})\n".format(
2534 name=meth_name, outdata=outdata[:20],
2535 nout=len(outdata),
2536 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002537 )
2538 )
2539 except ValueError as e:
2540 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002541 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002542 "Failed to receive with method <<{name:s}>>; "
2543 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002544 )
2545 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002546 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002547 "Method <<{name:s}>> failed with unexpected "
2548 "exception message: {exp:s}\n".format(
2549 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002550 )
2551 )
2552 # consume data
2553 s.read()
2554
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002555 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002556 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002557
Antoine Pitroufc69af12010-04-24 20:04:58 +00002558 def test_handshake_timeout(self):
2559 # Issue #5103: SSL handshake must respect the socket timeout
2560 server = socket.socket(socket.AF_INET)
2561 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002562 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002563 started = threading.Event()
2564 finish = False
2565
2566 def serve():
2567 server.listen(5)
2568 started.set()
2569 conns = []
2570 while not finish:
2571 r, w, e = select.select([server], [], [], 0.1)
2572 if server in r:
2573 # Let the socket hang around rather than having
2574 # it closed by garbage collection.
2575 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002576 for sock in conns:
2577 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002578
2579 t = threading.Thread(target=serve)
2580 t.start()
2581 started.wait()
2582
2583 try:
2584 try:
2585 c = socket.socket(socket.AF_INET)
2586 c.settimeout(0.2)
2587 c.connect((host, port))
2588 # Will attempt handshake and time out
2589 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2590 ssl.wrap_socket, c)
2591 finally:
2592 c.close()
2593 try:
2594 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002595 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002596 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002597 # Will attempt handshake and time out
2598 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2599 c.connect, (host, port))
2600 finally:
2601 c.close()
2602 finally:
2603 finish = True
2604 t.join()
2605 server.close()
2606
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002607 def test_server_accept(self):
2608 # Issue #16357: accept() on a SSLSocket created through
2609 # SSLContext.wrap_socket().
2610 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2611 context.verify_mode = ssl.CERT_REQUIRED
2612 context.load_verify_locations(CERTFILE)
2613 context.load_cert_chain(CERTFILE)
2614 server = socket.socket(socket.AF_INET)
2615 host = "127.0.0.1"
2616 port = support.bind_port(server)
2617 server = context.wrap_socket(server, server_side=True)
2618
2619 evt = threading.Event()
2620 remote = [None]
2621 peer = [None]
2622 def serve():
2623 server.listen(5)
2624 # Block on the accept and wait on the connection to close.
2625 evt.set()
2626 remote[0], peer[0] = server.accept()
2627 remote[0].recv(1)
2628
2629 t = threading.Thread(target=serve)
2630 t.start()
2631 # Client wait until server setup and perform a connect.
2632 evt.wait()
2633 client = context.wrap_socket(socket.socket())
2634 client.connect((host, port))
2635 client_addr = client.getsockname()
2636 client.close()
2637 t.join()
2638 remote[0].close()
2639 server.close()
2640 # Sanity checks.
2641 self.assertIsInstance(remote[0], ssl.SSLSocket)
2642 self.assertEqual(peer[0], client_addr)
2643
2644 def test_getpeercert_enotconn(self):
2645 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2646 with closing(context.wrap_socket(socket.socket())) as sock:
2647 with self.assertRaises(socket.error) as cm:
2648 sock.getpeercert()
2649 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2650
2651 def test_do_handshake_enotconn(self):
2652 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2653 with closing(context.wrap_socket(socket.socket())) as sock:
2654 with self.assertRaises(socket.error) as cm:
2655 sock.do_handshake()
2656 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2657
Antoine Pitroud76088d2012-01-03 22:46:48 +01002658 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002659 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2660 try:
2661 # Force a set of weak ciphers on our client context
2662 context.set_ciphers("DES")
2663 except ssl.SSLError:
2664 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002665 with ThreadedEchoServer(CERTFILE,
2666 ssl_version=ssl.PROTOCOL_SSLv23,
2667 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002668 with closing(context.wrap_socket(socket.socket())) as s:
2669 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002670 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002671 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2672
Alex Gaynore98205d2014-09-04 13:33:22 -07002673 def test_version_basic(self):
2674 """
2675 Basic tests for SSLSocket.version().
2676 More tests are done in the test_protocol_*() methods.
2677 """
2678 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2679 with ThreadedEchoServer(CERTFILE,
2680 ssl_version=ssl.PROTOCOL_TLSv1,
2681 chatty=False) as server:
2682 with closing(context.wrap_socket(socket.socket())) as s:
2683 self.assertIs(s.version(), None)
2684 s.connect((HOST, server.port))
2685 self.assertEqual(s.version(), "TLSv1")
2686 self.assertIs(s.version(), None)
2687
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002688 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2689 def test_default_ecdh_curve(self):
2690 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2691 # should be enabled by default on SSL contexts.
2692 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2693 context.load_cert_chain(CERTFILE)
2694 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2695 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2696 # our default cipher list should prefer ECDH-based ciphers
2697 # automatically.
2698 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2699 context.set_ciphers("ECCdraft:ECDH")
2700 with ThreadedEchoServer(context=context) as server:
2701 with closing(context.wrap_socket(socket.socket())) as s:
2702 s.connect((HOST, server.port))
2703 self.assertIn("ECDH", s.cipher()[0])
2704
2705 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2706 "'tls-unique' channel binding not available")
2707 def test_tls_unique_channel_binding(self):
2708 """Test tls-unique channel binding."""
2709 if support.verbose:
2710 sys.stdout.write("\n")
2711
2712 server = ThreadedEchoServer(CERTFILE,
2713 certreqs=ssl.CERT_NONE,
2714 ssl_version=ssl.PROTOCOL_TLSv1,
2715 cacerts=CERTFILE,
2716 chatty=True,
2717 connectionchatty=False)
2718 with server:
2719 s = ssl.wrap_socket(socket.socket(),
2720 server_side=False,
2721 certfile=CERTFILE,
2722 ca_certs=CERTFILE,
2723 cert_reqs=ssl.CERT_NONE,
2724 ssl_version=ssl.PROTOCOL_TLSv1)
2725 s.connect((HOST, server.port))
2726 # get the data
2727 cb_data = s.get_channel_binding("tls-unique")
2728 if support.verbose:
2729 sys.stdout.write(" got channel binding data: {0!r}\n"
2730 .format(cb_data))
2731
2732 # check if it is sane
2733 self.assertIsNotNone(cb_data)
2734 self.assertEqual(len(cb_data), 12) # True for TLSv1
2735
2736 # and compare with the peers version
2737 s.write(b"CB tls-unique\n")
2738 peer_data_repr = s.read().strip()
2739 self.assertEqual(peer_data_repr,
2740 repr(cb_data).encode("us-ascii"))
2741 s.close()
2742
2743 # now, again
2744 s = ssl.wrap_socket(socket.socket(),
2745 server_side=False,
2746 certfile=CERTFILE,
2747 ca_certs=CERTFILE,
2748 cert_reqs=ssl.CERT_NONE,
2749 ssl_version=ssl.PROTOCOL_TLSv1)
2750 s.connect((HOST, server.port))
2751 new_cb_data = s.get_channel_binding("tls-unique")
2752 if support.verbose:
2753 sys.stdout.write(" got another channel binding data: {0!r}\n"
2754 .format(new_cb_data))
2755 # is it really unique
2756 self.assertNotEqual(cb_data, new_cb_data)
2757 self.assertIsNotNone(cb_data)
2758 self.assertEqual(len(cb_data), 12) # True for TLSv1
2759 s.write(b"CB tls-unique\n")
2760 peer_data_repr = s.read().strip()
2761 self.assertEqual(peer_data_repr,
2762 repr(new_cb_data).encode("us-ascii"))
2763 s.close()
2764
2765 def test_compression(self):
2766 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2767 context.load_cert_chain(CERTFILE)
2768 stats = server_params_test(context, context,
2769 chatty=True, connectionchatty=True)
2770 if support.verbose:
2771 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2772 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2773
2774 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2775 "ssl.OP_NO_COMPRESSION needed for this test")
2776 def test_compression_disabled(self):
2777 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2778 context.load_cert_chain(CERTFILE)
2779 context.options |= ssl.OP_NO_COMPRESSION
2780 stats = server_params_test(context, context,
2781 chatty=True, connectionchatty=True)
2782 self.assertIs(stats['compression'], None)
2783
2784 def test_dh_params(self):
2785 # Check we can get a connection with ephemeral Diffie-Hellman
2786 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2787 context.load_cert_chain(CERTFILE)
2788 context.load_dh_params(DHFILE)
2789 context.set_ciphers("kEDH")
2790 stats = server_params_test(context, context,
2791 chatty=True, connectionchatty=True)
2792 cipher = stats["cipher"][0]
2793 parts = cipher.split("-")
2794 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2795 self.fail("Non-DH cipher: " + cipher[0])
2796
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002797 def test_selected_alpn_protocol(self):
2798 # selected_alpn_protocol() is None unless ALPN is used.
2799 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2800 context.load_cert_chain(CERTFILE)
2801 stats = server_params_test(context, context,
2802 chatty=True, connectionchatty=True)
2803 self.assertIs(stats['client_alpn_protocol'], None)
2804
2805 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
2806 def test_selected_alpn_protocol_if_server_uses_alpn(self):
2807 # selected_alpn_protocol() is None unless ALPN is used by the client.
2808 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2809 client_context.load_verify_locations(CERTFILE)
2810 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2811 server_context.load_cert_chain(CERTFILE)
2812 server_context.set_alpn_protocols(['foo', 'bar'])
2813 stats = server_params_test(client_context, server_context,
2814 chatty=True, connectionchatty=True)
2815 self.assertIs(stats['client_alpn_protocol'], None)
2816
2817 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
2818 def test_alpn_protocols(self):
2819 server_protocols = ['foo', 'bar', 'milkshake']
2820 protocol_tests = [
2821 (['foo', 'bar'], 'foo'),
Benjamin Petersonaa707582015-01-23 17:30:26 -05002822 (['bar', 'foo'], 'foo'),
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002823 (['milkshake'], 'milkshake'),
Benjamin Petersonaa707582015-01-23 17:30:26 -05002824 (['http/3.0', 'http/4.0'], None)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002825 ]
2826 for client_protocols, expected in protocol_tests:
2827 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2828 server_context.load_cert_chain(CERTFILE)
2829 server_context.set_alpn_protocols(server_protocols)
2830 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2831 client_context.load_cert_chain(CERTFILE)
2832 client_context.set_alpn_protocols(client_protocols)
2833 stats = server_params_test(client_context, server_context,
2834 chatty=True, connectionchatty=True)
2835
2836 msg = "failed trying %s (s) and %s (c).\n" \
2837 "was expecting %s, but got %%s from the %%s" \
2838 % (str(server_protocols), str(client_protocols),
2839 str(expected))
2840 client_result = stats['client_alpn_protocol']
2841 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2842 server_result = stats['server_alpn_protocols'][-1] \
2843 if len(stats['server_alpn_protocols']) else 'nothing'
2844 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2845
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002846 def test_selected_npn_protocol(self):
2847 # selected_npn_protocol() is None unless NPN is used
2848 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2849 context.load_cert_chain(CERTFILE)
2850 stats = server_params_test(context, context,
2851 chatty=True, connectionchatty=True)
2852 self.assertIs(stats['client_npn_protocol'], None)
2853
2854 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2855 def test_npn_protocols(self):
2856 server_protocols = ['http/1.1', 'spdy/2']
2857 protocol_tests = [
2858 (['http/1.1', 'spdy/2'], 'http/1.1'),
2859 (['spdy/2', 'http/1.1'], 'http/1.1'),
2860 (['spdy/2', 'test'], 'spdy/2'),
2861 (['abc', 'def'], 'abc')
2862 ]
2863 for client_protocols, expected in protocol_tests:
2864 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2865 server_context.load_cert_chain(CERTFILE)
2866 server_context.set_npn_protocols(server_protocols)
2867 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2868 client_context.load_cert_chain(CERTFILE)
2869 client_context.set_npn_protocols(client_protocols)
2870 stats = server_params_test(client_context, server_context,
2871 chatty=True, connectionchatty=True)
2872
2873 msg = "failed trying %s (s) and %s (c).\n" \
2874 "was expecting %s, but got %%s from the %%s" \
2875 % (str(server_protocols), str(client_protocols),
2876 str(expected))
2877 client_result = stats['client_npn_protocol']
2878 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2879 server_result = stats['server_npn_protocols'][-1] \
2880 if len(stats['server_npn_protocols']) else 'nothing'
2881 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2882
2883 def sni_contexts(self):
2884 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2885 server_context.load_cert_chain(SIGNED_CERTFILE)
2886 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2887 other_context.load_cert_chain(SIGNED_CERTFILE2)
2888 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2889 client_context.verify_mode = ssl.CERT_REQUIRED
2890 client_context.load_verify_locations(SIGNING_CA)
2891 return server_context, other_context, client_context
2892
2893 def check_common_name(self, stats, name):
2894 cert = stats['peercert']
2895 self.assertIn((('commonName', name),), cert['subject'])
2896
2897 @needs_sni
2898 def test_sni_callback(self):
2899 calls = []
2900 server_context, other_context, client_context = self.sni_contexts()
2901
2902 def servername_cb(ssl_sock, server_name, initial_context):
2903 calls.append((server_name, initial_context))
2904 if server_name is not None:
2905 ssl_sock.context = other_context
2906 server_context.set_servername_callback(servername_cb)
2907
2908 stats = server_params_test(client_context, server_context,
2909 chatty=True,
2910 sni_name='supermessage')
2911 # The hostname was fetched properly, and the certificate was
2912 # changed for the connection.
2913 self.assertEqual(calls, [("supermessage", server_context)])
2914 # CERTFILE4 was selected
2915 self.check_common_name(stats, 'fakehostname')
2916
2917 calls = []
2918 # The callback is called with server_name=None
2919 stats = server_params_test(client_context, server_context,
2920 chatty=True,
2921 sni_name=None)
2922 self.assertEqual(calls, [(None, server_context)])
2923 self.check_common_name(stats, 'localhost')
2924
2925 # Check disabling the callback
2926 calls = []
2927 server_context.set_servername_callback(None)
2928
2929 stats = server_params_test(client_context, server_context,
2930 chatty=True,
2931 sni_name='notfunny')
2932 # Certificate didn't change
2933 self.check_common_name(stats, 'localhost')
2934 self.assertEqual(calls, [])
2935
2936 @needs_sni
2937 def test_sni_callback_alert(self):
2938 # Returning a TLS alert is reflected to the connecting client
2939 server_context, other_context, client_context = self.sni_contexts()
2940
2941 def cb_returning_alert(ssl_sock, server_name, initial_context):
2942 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2943 server_context.set_servername_callback(cb_returning_alert)
2944
2945 with self.assertRaises(ssl.SSLError) as cm:
2946 stats = server_params_test(client_context, server_context,
2947 chatty=False,
2948 sni_name='supermessage')
2949 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2950
2951 @needs_sni
2952 def test_sni_callback_raising(self):
2953 # Raising fails the connection with a TLS handshake failure alert.
2954 server_context, other_context, client_context = self.sni_contexts()
2955
2956 def cb_raising(ssl_sock, server_name, initial_context):
Serhiy Storchaka5312a7f2015-01-31 11:27:06 +02002957 1.0/0.0
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002958 server_context.set_servername_callback(cb_raising)
2959
2960 with self.assertRaises(ssl.SSLError) as cm, \
2961 support.captured_stderr() as stderr:
2962 stats = server_params_test(client_context, server_context,
2963 chatty=False,
2964 sni_name='supermessage')
2965 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2966 self.assertIn("ZeroDivisionError", stderr.getvalue())
2967
2968 @needs_sni
2969 def test_sni_callback_wrong_return_type(self):
2970 # Returning the wrong return type terminates the TLS connection
2971 # with an internal error alert.
2972 server_context, other_context, client_context = self.sni_contexts()
2973
2974 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2975 return "foo"
2976 server_context.set_servername_callback(cb_wrong_return_type)
2977
2978 with self.assertRaises(ssl.SSLError) as cm, \
2979 support.captured_stderr() as stderr:
2980 stats = server_params_test(client_context, server_context,
2981 chatty=False,
2982 sni_name='supermessage')
2983 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2984 self.assertIn("TypeError", stderr.getvalue())
2985
2986 def test_read_write_after_close_raises_valuerror(self):
2987 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2988 context.verify_mode = ssl.CERT_REQUIRED
2989 context.load_verify_locations(CERTFILE)
2990 context.load_cert_chain(CERTFILE)
2991 server = ThreadedEchoServer(context=context, chatty=False)
2992
2993 with server:
2994 s = context.wrap_socket(socket.socket())
2995 s.connect((HOST, server.port))
2996 s.close()
2997
2998 self.assertRaises(ValueError, s.read, 1024)
2999 self.assertRaises(ValueError, s.write, b'hello')
3000
Bill Janssen61c001a2008-09-08 16:37:24 +00003001
Neal Norwitz9eb9b102007-08-27 01:15:33 +00003002def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003003 if support.verbose:
3004 plats = {
3005 'Linux': platform.linux_distribution,
3006 'Mac': platform.mac_ver,
3007 'Windows': platform.win32_ver,
3008 }
3009 for name, func in plats.items():
3010 plat = func()
3011 if plat and plat[0]:
3012 plat = '%s %r' % (name, plat)
3013 break
3014 else:
3015 plat = repr(platform.platform())
3016 print("test_ssl: testing with %r %r" %
3017 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3018 print(" under %s" % plat)
3019 print(" HAS_SNI = %r" % ssl.HAS_SNI)
3020 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3021 try:
3022 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3023 except AttributeError:
3024 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00003025
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003026 for filename in [
3027 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
3028 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
3029 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
3030 BADCERT, BADKEY, EMPTYCERT]:
3031 if not os.path.exists(filename):
3032 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003033
Benjamin Peterson2f334562014-10-01 23:53:01 -04003034 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003035
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003036 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00003037 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00003038
Bill Janssen98d19da2007-09-10 21:51:02 +00003039 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003040 thread_info = support.threading_setup()
3041 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00003042 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003043
Antoine Pitrou3945c862010-04-28 21:11:01 +00003044 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003045 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00003046 finally:
3047 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003048 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003049
3050if __name__ == "__main__":
3051 test_main()