blob: 4a6901ca47cdb90b51a33409fda7281491e3ae51 [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Bill Janssen934b16d2008-06-28 22:19:33 +00007import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import socket
Bill Janssen934b16d2008-06-28 22:19:33 +00009import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000010import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050011import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000012import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import os
Antoine Pitroub558f172010-04-23 23:25:45 +000014import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000015import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050016import tempfile
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -060017import urllib2
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000018import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000019import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000020import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050021import functools
22from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000023
Benjamin Petersondaeb9252014-08-20 14:14:50 -050024ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000025
Benjamin Petersondaeb9252014-08-20 14:14:50 -050026PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
27HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Benjamin Petersondaeb9252014-08-20 14:14:50 -050029def data_file(*name):
30 return os.path.join(os.path.dirname(__file__), *name)
31
32# The custom key and certificate files used in test_ssl are generated
33# using Lib/test/make_ssl_certs.py.
34# Other certificates are simply fetched from the Internet servers they
35# are meant to authenticate.
36
37CERTFILE = data_file("keycert.pem")
38BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
39ONLYCERT = data_file("ssl_cert.pem")
40ONLYKEY = data_file("ssl_key.pem")
41BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
42BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
43CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
44ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
45KEY_PASSWORD = "somepass"
46CAPATH = data_file("capath")
47BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
48CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
49CAFILE_CACERT = data_file("capath", "5ed36f99.0")
50
51
52# empty CRL
53CRLFILE = data_file("revocation.crl")
54
55# Two keys and certs signed by the same CA (for SNI tests)
56SIGNED_CERTFILE = data_file("keycert3.pem")
57SIGNED_CERTFILE2 = data_file("keycert4.pem")
58SIGNING_CA = data_file("pycacert.pem")
59
60SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
61
62EMPTYCERT = data_file("nullcert.pem")
63BADCERT = data_file("badcert.pem")
64WRONGCERT = data_file("XXXnonexisting.pem")
65BADKEY = data_file("badkey.pem")
66NOKIACERT = data_file("nokia.pem")
67NULLBYTECERT = data_file("nullbytecert.pem")
68
69DHFILE = data_file("dh512.pem")
70BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
71
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000072
Neal Norwitz3e533c22007-08-27 01:03:18 +000073def handle_error(prefix):
74 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050075 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000077
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000078
79class BasicTests(unittest.TestCase):
80
Antoine Pitrou3945c862010-04-28 21:11:01 +000081 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000082 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000083 try:
84 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
85 except IOError, e:
86 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
87 pass
88 else:
89 raise
90 try:
91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
92 except IOError, e:
93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94 pass
95 else:
96 raise
Benjamin Peterson2f334562014-10-01 23:53:01 -040097
98
Benjamin Petersondaeb9252014-08-20 14:14:50 -050099def can_clear_options():
100 # 0.9.8m or higher
101 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
102
103def no_sslv2_implies_sslv3_hello():
104 # 0.9.7h or higher
105 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
106
107def have_verify_flags():
108 # 0.9.8 or higher
109 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
110
111def utc_offset(): #NOTE: ignore issues like #1647654
112 # local time = utc time + utc offset
113 if time.daylight and time.localtime().tm_isdst > 0:
114 return -time.altzone # seconds
115 return -time.timezone
116
117def asn1time(cert_time):
118 # Some versions of OpenSSL ignore seconds, see #18207
119 # 0.9.8.i
120 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
121 fmt = "%b %d %H:%M:%S %Y GMT"
122 dt = datetime.datetime.strptime(cert_time, fmt)
123 dt = dt.replace(second=0)
124 cert_time = dt.strftime(fmt)
125 # %d adds leading zero but ASN1_TIME_print() uses leading space
126 if cert_time[4] == "0":
127 cert_time = cert_time[:4] + " " + cert_time[5:]
128
129 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000130
Antoine Pitroud75efd92010-08-04 17:38:33 +0000131# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
132def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200133 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200134 @functools.wraps(func)
135 def f(*args, **kwargs):
136 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500137 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
138 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200139 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500140 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200141 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
142 return func(*args, **kwargs)
143 return f
144 else:
145 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000146
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500147needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
148
Antoine Pitroud75efd92010-08-04 17:38:33 +0000149
150class BasicSocketTests(unittest.TestCase):
151
Antoine Pitrou3945c862010-04-28 21:11:01 +0000152 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000153 ssl.CERT_NONE
154 ssl.CERT_OPTIONAL
155 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500156 ssl.OP_CIPHER_SERVER_PREFERENCE
157 ssl.OP_SINGLE_DH_USE
158 if ssl.HAS_ECDH:
159 ssl.OP_SINGLE_ECDH_USE
160 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
161 ssl.OP_NO_COMPRESSION
162 self.assertIn(ssl.HAS_SNI, {True, False})
163 self.assertIn(ssl.HAS_ECDH, {True, False})
164
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000165
Antoine Pitrou3945c862010-04-28 21:11:01 +0000166 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500168 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000169 sys.stdout.write("\n RAND_status is %d (%s)\n"
170 % (v, (v and "sufficient randomness") or
171 "insufficient randomness"))
Victor Stinner7c906672015-01-06 13:53:37 +0100172 if hasattr(ssl, 'RAND_egd'):
173 self.assertRaises(TypeError, ssl.RAND_egd, 1)
174 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000175 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000176
Antoine Pitrou3945c862010-04-28 21:11:01 +0000177 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000178 # note that this uses an 'unofficial' function in _ssl.c,
179 # provided solely for this test, to exercise the certificate
180 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500181 p = ssl._ssl._test_decode_cert(CERTFILE)
182 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000183 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500184 self.assertEqual(p['issuer'],
185 ((('countryName', 'XY'),),
186 (('localityName', 'Castle Anthrax'),),
187 (('organizationName', 'Python Software Foundation'),),
188 (('commonName', 'localhost'),))
189 )
190 # Note the next three asserts will fail if the keys are regenerated
191 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
192 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
193 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200194 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100195 ((('countryName', 'XY'),),
196 (('localityName', 'Castle Anthrax'),),
197 (('organizationName', 'Python Software Foundation'),),
198 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200199 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100200 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200201 # Issue #13034: the subjectAltName in some certificates
202 # (notably projects.developer.nokia.com:443) wasn't parsed
203 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500204 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200205 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
206 self.assertEqual(p['subjectAltName'],
207 (('DNS', 'projects.developer.nokia.com'),
208 ('DNS', 'projects.forum.nokia.com'))
209 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500210 # extra OCSP and AIA fields
211 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
212 self.assertEqual(p['caIssuers'],
213 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
214 self.assertEqual(p['crlDistributionPoints'],
215 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000216
Christian Heimes88b174c2013-08-17 00:54:47 +0200217 def test_parse_cert_CVE_2013_4238(self):
218 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500219 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200220 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
221 subject = ((('countryName', 'US'),),
222 (('stateOrProvinceName', 'Oregon'),),
223 (('localityName', 'Beaverton'),),
224 (('organizationName', 'Python Software Foundation'),),
225 (('organizationalUnitName', 'Python Core Development'),),
226 (('commonName', 'null.python.org\x00example.org'),),
227 (('emailAddress', 'python-dev@python.org'),))
228 self.assertEqual(p['subject'], subject)
229 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500230 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200231 san = (('DNS', 'altnull.python.org\x00example.com'),
232 ('email', 'null@python.org\x00user@example.org'),
233 ('URI', 'http://null.python.org\x00http://example.org'),
234 ('IP Address', '192.0.2.1'),
235 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
236 else:
237 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
238 san = (('DNS', 'altnull.python.org\x00example.com'),
239 ('email', 'null@python.org\x00user@example.org'),
240 ('URI', 'http://null.python.org\x00http://example.org'),
241 ('IP Address', '192.0.2.1'),
242 ('IP Address', '<invalid>'))
243
244 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200245
Antoine Pitrou3945c862010-04-28 21:11:01 +0000246 def test_DER_to_PEM(self):
247 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
248 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000249 d1 = ssl.PEM_cert_to_DER_cert(pem)
250 p2 = ssl.DER_cert_to_PEM_cert(d1)
251 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000252 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000253 if not p2.startswith(ssl.PEM_HEADER + '\n'):
254 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
255 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
256 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000257
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000258 def test_openssl_version(self):
259 n = ssl.OPENSSL_VERSION_NUMBER
260 t = ssl.OPENSSL_VERSION_INFO
261 s = ssl.OPENSSL_VERSION
262 self.assertIsInstance(n, (int, long))
263 self.assertIsInstance(t, tuple)
264 self.assertIsInstance(s, str)
265 # Some sanity checks follow
266 # >= 0.9
267 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400268 # < 3.0
269 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000270 major, minor, fix, patch, status = t
271 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400272 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000273 self.assertGreaterEqual(minor, 0)
274 self.assertLess(minor, 256)
275 self.assertGreaterEqual(fix, 0)
276 self.assertLess(fix, 256)
277 self.assertGreaterEqual(patch, 0)
278 self.assertLessEqual(patch, 26)
279 self.assertGreaterEqual(status, 0)
280 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400281 # Version string as returned by {Open,Libre}SSL, the format might change
282 if "LibreSSL" in s:
283 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
284 (s, t))
285 else:
286 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
287 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000288
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500289 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000290 def test_refcycle(self):
291 # Issue #7943: an SSL object doesn't create reference cycles with
292 # itself.
293 s = socket.socket(socket.AF_INET)
294 ss = ssl.wrap_socket(s)
295 wr = weakref.ref(ss)
296 del ss
297 self.assertEqual(wr(), None)
298
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000299 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500300 # Methods on an unconnected SSLSocket propagate the original
301 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000302 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500303 with closing(ssl.wrap_socket(s)) as ss:
304 self.assertRaises(socket.error, ss.recv, 1)
305 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
306 self.assertRaises(socket.error, ss.recvfrom, 1)
307 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
308 self.assertRaises(socket.error, ss.send, b'x')
309 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
310
311 def test_timeout(self):
312 # Issue #8524: when creating an SSL socket, the timeout of the
313 # original socket should be retained.
314 for timeout in (None, 0.0, 5.0):
315 s = socket.socket(socket.AF_INET)
316 s.settimeout(timeout)
317 with closing(ssl.wrap_socket(s)) as ss:
318 self.assertEqual(timeout, ss.gettimeout())
319
320 def test_errors(self):
321 sock = socket.socket()
322 self.assertRaisesRegexp(ValueError,
323 "certfile must be specified",
324 ssl.wrap_socket, sock, keyfile=CERTFILE)
325 self.assertRaisesRegexp(ValueError,
326 "certfile must be specified for server-side operations",
327 ssl.wrap_socket, sock, server_side=True)
328 self.assertRaisesRegexp(ValueError,
329 "certfile must be specified for server-side operations",
330 ssl.wrap_socket, sock, server_side=True, certfile="")
331 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
332 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
333 s.connect, (HOST, 8080))
334 with self.assertRaises(IOError) as cm:
335 with closing(socket.socket()) as sock:
336 ssl.wrap_socket(sock, certfile=WRONGCERT)
337 self.assertEqual(cm.exception.errno, errno.ENOENT)
338 with self.assertRaises(IOError) as cm:
339 with closing(socket.socket()) as sock:
340 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
341 self.assertEqual(cm.exception.errno, errno.ENOENT)
342 with self.assertRaises(IOError) as cm:
343 with closing(socket.socket()) as sock:
344 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
345 self.assertEqual(cm.exception.errno, errno.ENOENT)
346
347 def test_match_hostname(self):
348 def ok(cert, hostname):
349 ssl.match_hostname(cert, hostname)
350 def fail(cert, hostname):
351 self.assertRaises(ssl.CertificateError,
352 ssl.match_hostname, cert, hostname)
353
354 cert = {'subject': ((('commonName', 'example.com'),),)}
355 ok(cert, 'example.com')
356 ok(cert, 'ExAmple.cOm')
357 fail(cert, 'www.example.com')
358 fail(cert, '.example.com')
359 fail(cert, 'example.org')
360 fail(cert, 'exampleXcom')
361
362 cert = {'subject': ((('commonName', '*.a.com'),),)}
363 ok(cert, 'foo.a.com')
364 fail(cert, 'bar.foo.a.com')
365 fail(cert, 'a.com')
366 fail(cert, 'Xa.com')
367 fail(cert, '.a.com')
368
369 # only match one left-most wildcard
370 cert = {'subject': ((('commonName', 'f*.com'),),)}
371 ok(cert, 'foo.com')
372 ok(cert, 'f.com')
373 fail(cert, 'bar.com')
374 fail(cert, 'foo.a.com')
375 fail(cert, 'bar.foo.com')
376
377 # NULL bytes are bad, CVE-2013-4073
378 cert = {'subject': ((('commonName',
379 'null.python.org\x00example.org'),),)}
380 ok(cert, 'null.python.org\x00example.org') # or raise an error?
381 fail(cert, 'example.org')
382 fail(cert, 'null.python.org')
383
384 # error cases with wildcards
385 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
386 fail(cert, 'bar.foo.a.com')
387 fail(cert, 'a.com')
388 fail(cert, 'Xa.com')
389 fail(cert, '.a.com')
390
391 cert = {'subject': ((('commonName', 'a.*.com'),),)}
392 fail(cert, 'a.foo.com')
393 fail(cert, 'a..com')
394 fail(cert, 'a.com')
395
396 # wildcard doesn't match IDNA prefix 'xn--'
397 idna = u'püthon.python.org'.encode("idna").decode("ascii")
398 cert = {'subject': ((('commonName', idna),),)}
399 ok(cert, idna)
400 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
401 fail(cert, idna)
402 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
403 fail(cert, idna)
404
405 # wildcard in first fragment and IDNA A-labels in sequent fragments
406 # are supported.
407 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
408 cert = {'subject': ((('commonName', idna),),)}
409 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
410 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
411 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
412 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
413
414 # Slightly fake real-world example
415 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
416 'subject': ((('commonName', 'linuxfrz.org'),),),
417 'subjectAltName': (('DNS', 'linuxfr.org'),
418 ('DNS', 'linuxfr.com'),
419 ('othername', '<unsupported>'))}
420 ok(cert, 'linuxfr.org')
421 ok(cert, 'linuxfr.com')
422 # Not a "DNS" entry
423 fail(cert, '<unsupported>')
424 # When there is a subjectAltName, commonName isn't used
425 fail(cert, 'linuxfrz.org')
426
427 # A pristine real-world example
428 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
429 'subject': ((('countryName', 'US'),),
430 (('stateOrProvinceName', 'California'),),
431 (('localityName', 'Mountain View'),),
432 (('organizationName', 'Google Inc'),),
433 (('commonName', 'mail.google.com'),))}
434 ok(cert, 'mail.google.com')
435 fail(cert, 'gmail.com')
436 # Only commonName is considered
437 fail(cert, 'California')
438
439 # Neither commonName nor subjectAltName
440 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
441 'subject': ((('countryName', 'US'),),
442 (('stateOrProvinceName', 'California'),),
443 (('localityName', 'Mountain View'),),
444 (('organizationName', 'Google Inc'),))}
445 fail(cert, 'mail.google.com')
446
447 # No DNS entry in subjectAltName but a commonName
448 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
449 'subject': ((('countryName', 'US'),),
450 (('stateOrProvinceName', 'California'),),
451 (('localityName', 'Mountain View'),),
452 (('commonName', 'mail.google.com'),)),
453 'subjectAltName': (('othername', 'blabla'), )}
454 ok(cert, 'mail.google.com')
455
456 # No DNS entry subjectAltName and no commonName
457 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
458 'subject': ((('countryName', 'US'),),
459 (('stateOrProvinceName', 'California'),),
460 (('localityName', 'Mountain View'),),
461 (('organizationName', 'Google Inc'),)),
462 'subjectAltName': (('othername', 'blabla'),)}
463 fail(cert, 'google.com')
464
465 # Empty cert / no cert
466 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
467 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
468
469 # Issue #17980: avoid denials of service by refusing more than one
470 # wildcard per fragment.
471 cert = {'subject': ((('commonName', 'a*b.com'),),)}
472 ok(cert, 'axxb.com')
473 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
474 fail(cert, 'axxb.com')
475 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
476 with self.assertRaises(ssl.CertificateError) as cm:
477 ssl.match_hostname(cert, 'axxbxxc.com')
478 self.assertIn("too many wildcards", str(cm.exception))
479
480 def test_server_side(self):
481 # server_hostname doesn't work for server sockets
482 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
483 with closing(socket.socket()) as sock:
484 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
485 server_hostname="some.hostname")
486
487 def test_unknown_channel_binding(self):
488 # should raise ValueError for unknown type
489 s = socket.socket(socket.AF_INET)
490 with closing(ssl.wrap_socket(s)) as ss:
491 with self.assertRaises(ValueError):
492 ss.get_channel_binding("unknown-type")
493
494 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
495 "'tls-unique' channel binding not available")
496 def test_tls_unique_channel_binding(self):
497 # unconnected should return None for known type
498 s = socket.socket(socket.AF_INET)
499 with closing(ssl.wrap_socket(s)) as ss:
500 self.assertIsNone(ss.get_channel_binding("tls-unique"))
501 # the same for server-side
502 s = socket.socket(socket.AF_INET)
503 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
504 self.assertIsNone(ss.get_channel_binding("tls-unique"))
505
506 def test_get_default_verify_paths(self):
507 paths = ssl.get_default_verify_paths()
508 self.assertEqual(len(paths), 6)
509 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
510
511 with support.EnvironmentVarGuard() as env:
512 env["SSL_CERT_DIR"] = CAPATH
513 env["SSL_CERT_FILE"] = CERTFILE
514 paths = ssl.get_default_verify_paths()
515 self.assertEqual(paths.cafile, CERTFILE)
516 self.assertEqual(paths.capath, CAPATH)
517
518 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
519 def test_enum_certificates(self):
520 self.assertTrue(ssl.enum_certificates("CA"))
521 self.assertTrue(ssl.enum_certificates("ROOT"))
522
523 self.assertRaises(TypeError, ssl.enum_certificates)
524 self.assertRaises(WindowsError, ssl.enum_certificates, "")
525
526 trust_oids = set()
527 for storename in ("CA", "ROOT"):
528 store = ssl.enum_certificates(storename)
529 self.assertIsInstance(store, list)
530 for element in store:
531 self.assertIsInstance(element, tuple)
532 self.assertEqual(len(element), 3)
533 cert, enc, trust = element
534 self.assertIsInstance(cert, bytes)
535 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
536 self.assertIsInstance(trust, (set, bool))
537 if isinstance(trust, set):
538 trust_oids.update(trust)
539
540 serverAuth = "1.3.6.1.5.5.7.3.1"
541 self.assertIn(serverAuth, trust_oids)
542
543 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
544 def test_enum_crls(self):
545 self.assertTrue(ssl.enum_crls("CA"))
546 self.assertRaises(TypeError, ssl.enum_crls)
547 self.assertRaises(WindowsError, ssl.enum_crls, "")
548
549 crls = ssl.enum_crls("CA")
550 self.assertIsInstance(crls, list)
551 for element in crls:
552 self.assertIsInstance(element, tuple)
553 self.assertEqual(len(element), 2)
554 self.assertIsInstance(element[0], bytes)
555 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
556
557
558 def test_asn1object(self):
559 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
560 '1.3.6.1.5.5.7.3.1')
561
562 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
563 self.assertEqual(val, expected)
564 self.assertEqual(val.nid, 129)
565 self.assertEqual(val.shortname, 'serverAuth')
566 self.assertEqual(val.longname, 'TLS Web Server Authentication')
567 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
568 self.assertIsInstance(val, ssl._ASN1Object)
569 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
570
571 val = ssl._ASN1Object.fromnid(129)
572 self.assertEqual(val, expected)
573 self.assertIsInstance(val, ssl._ASN1Object)
574 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
575 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
576 ssl._ASN1Object.fromnid(100000)
577 for i in range(1000):
578 try:
579 obj = ssl._ASN1Object.fromnid(i)
580 except ValueError:
581 pass
582 else:
583 self.assertIsInstance(obj.nid, int)
584 self.assertIsInstance(obj.shortname, str)
585 self.assertIsInstance(obj.longname, str)
586 self.assertIsInstance(obj.oid, (str, type(None)))
587
588 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
589 self.assertEqual(val, expected)
590 self.assertIsInstance(val, ssl._ASN1Object)
591 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
592 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
593 expected)
594 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
595 ssl._ASN1Object.fromname('serverauth')
596
597 def test_purpose_enum(self):
598 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
599 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
600 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
601 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
602 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
603 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
604 '1.3.6.1.5.5.7.3.1')
605
606 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
607 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
608 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
609 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
610 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
611 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
612 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000613
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100614 def test_unsupported_dtls(self):
615 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
616 self.addCleanup(s.close)
617 with self.assertRaises(NotImplementedError) as cx:
618 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
619 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500620 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
621 with self.assertRaises(NotImplementedError) as cx:
622 ctx.wrap_socket(s)
623 self.assertEqual(str(cx.exception), "only stream sockets are supported")
624
625 def cert_time_ok(self, timestring, timestamp):
626 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
627
628 def cert_time_fail(self, timestring):
629 with self.assertRaises(ValueError):
630 ssl.cert_time_to_seconds(timestring)
631
632 @unittest.skipUnless(utc_offset(),
633 'local time needs to be different from UTC')
634 def test_cert_time_to_seconds_timezone(self):
635 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
636 # results if local timezone is not UTC
637 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
638 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
639
640 def test_cert_time_to_seconds(self):
641 timestring = "Jan 5 09:34:43 2018 GMT"
642 ts = 1515144883.0
643 self.cert_time_ok(timestring, ts)
644 # accept keyword parameter, assert its name
645 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
646 # accept both %e and %d (space or zero generated by strftime)
647 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
648 # case-insensitive
649 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
650 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
651 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
652 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
653 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
654 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
655 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
656 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
657
658 newyear_ts = 1230768000.0
659 # leap seconds
660 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
661 # same timestamp
662 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
663
664 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
665 # allow 60th second (even if it is not a leap second)
666 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
667 # allow 2nd leap second for compatibility with time.strptime()
668 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
669 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
670
671 # no special treatement for the special value:
672 # 99991231235959Z (rfc 5280)
673 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
674
675 @support.run_with_locale('LC_ALL', '')
676 def test_cert_time_to_seconds_locale(self):
677 # `cert_time_to_seconds()` should be locale independent
678
679 def local_february_name():
680 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
681
682 if local_february_name().lower() == 'feb':
683 self.skipTest("locale-specific month name needs to be "
684 "different from C locale")
685
686 # locale-independent
687 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
688 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
689
690
691class ContextTests(unittest.TestCase):
692
693 @skip_if_broken_ubuntu_ssl
694 def test_constructor(self):
695 for protocol in PROTOCOLS:
696 ssl.SSLContext(protocol)
697 self.assertRaises(TypeError, ssl.SSLContext)
698 self.assertRaises(ValueError, ssl.SSLContext, -1)
699 self.assertRaises(ValueError, ssl.SSLContext, 42)
700
701 @skip_if_broken_ubuntu_ssl
702 def test_protocol(self):
703 for proto in PROTOCOLS:
704 ctx = ssl.SSLContext(proto)
705 self.assertEqual(ctx.protocol, proto)
706
707 def test_ciphers(self):
708 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
709 ctx.set_ciphers("ALL")
710 ctx.set_ciphers("DEFAULT")
711 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
712 ctx.set_ciphers("^$:,;?*'dorothyx")
713
714 @skip_if_broken_ubuntu_ssl
715 def test_options(self):
716 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
717 # OP_ALL | OP_NO_SSLv2 is the default value
718 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
719 ctx.options)
720 ctx.options |= ssl.OP_NO_SSLv3
721 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
722 ctx.options)
723 if can_clear_options():
724 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
725 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
726 ctx.options)
727 ctx.options = 0
728 self.assertEqual(0, ctx.options)
729 else:
730 with self.assertRaises(ValueError):
731 ctx.options = 0
732
733 def test_verify_mode(self):
734 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
735 # Default value
736 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
737 ctx.verify_mode = ssl.CERT_OPTIONAL
738 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
739 ctx.verify_mode = ssl.CERT_REQUIRED
740 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
741 ctx.verify_mode = ssl.CERT_NONE
742 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
743 with self.assertRaises(TypeError):
744 ctx.verify_mode = None
745 with self.assertRaises(ValueError):
746 ctx.verify_mode = 42
747
748 @unittest.skipUnless(have_verify_flags(),
749 "verify_flags need OpenSSL > 0.9.8")
750 def test_verify_flags(self):
751 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
752 # default value by OpenSSL
753 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
754 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
755 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
756 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
757 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
758 ctx.verify_flags = ssl.VERIFY_DEFAULT
759 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
760 # supports any value
761 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
762 self.assertEqual(ctx.verify_flags,
763 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
764 with self.assertRaises(TypeError):
765 ctx.verify_flags = None
766
767 def test_load_cert_chain(self):
768 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
769 # Combined key and cert in a single file
Benjamin Peterson04439fd2014-11-03 21:05:01 -0500770 ctx.load_cert_chain(CERTFILE, keyfile=None)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500771 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
772 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
773 with self.assertRaises(IOError) as cm:
774 ctx.load_cert_chain(WRONGCERT)
775 self.assertEqual(cm.exception.errno, errno.ENOENT)
776 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
777 ctx.load_cert_chain(BADCERT)
778 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
779 ctx.load_cert_chain(EMPTYCERT)
780 # Separate key and cert
781 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
782 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
783 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
784 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
785 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
786 ctx.load_cert_chain(ONLYCERT)
787 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
788 ctx.load_cert_chain(ONLYKEY)
789 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
790 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
791 # Mismatching key and cert
792 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
793 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
794 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
795 # Password protected key and cert
796 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
797 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
798 ctx.load_cert_chain(CERTFILE_PROTECTED,
799 password=bytearray(KEY_PASSWORD.encode()))
800 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
801 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
802 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
803 bytearray(KEY_PASSWORD.encode()))
804 with self.assertRaisesRegexp(TypeError, "should be a string"):
805 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
806 with self.assertRaises(ssl.SSLError):
807 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
808 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
809 # openssl has a fixed limit on the password buffer.
810 # PEM_BUFSIZE is generally set to 1kb.
811 # Return a string larger than this.
812 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
813 # Password callback
814 def getpass_unicode():
815 return KEY_PASSWORD
816 def getpass_bytes():
817 return KEY_PASSWORD.encode()
818 def getpass_bytearray():
819 return bytearray(KEY_PASSWORD.encode())
820 def getpass_badpass():
821 return "badpass"
822 def getpass_huge():
823 return b'a' * (1024 * 1024)
824 def getpass_bad_type():
825 return 9
826 def getpass_exception():
827 raise Exception('getpass error')
828 class GetPassCallable:
829 def __call__(self):
830 return KEY_PASSWORD
831 def getpass(self):
832 return KEY_PASSWORD
833 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
834 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
835 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
836 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
837 ctx.load_cert_chain(CERTFILE_PROTECTED,
838 password=GetPassCallable().getpass)
839 with self.assertRaises(ssl.SSLError):
840 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
841 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
842 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
843 with self.assertRaisesRegexp(TypeError, "must return a string"):
844 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
845 with self.assertRaisesRegexp(Exception, "getpass error"):
846 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
847 # Make sure the password function isn't called if it isn't needed
848 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
849
850 def test_load_verify_locations(self):
851 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
852 ctx.load_verify_locations(CERTFILE)
853 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
854 ctx.load_verify_locations(BYTES_CERTFILE)
855 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400856 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500857 self.assertRaises(TypeError, ctx.load_verify_locations)
858 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
859 with self.assertRaises(IOError) as cm:
860 ctx.load_verify_locations(WRONGCERT)
861 self.assertEqual(cm.exception.errno, errno.ENOENT)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400862 with self.assertRaises(IOError):
863 ctx.load_verify_locations(u'')
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500864 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
865 ctx.load_verify_locations(BADCERT)
866 ctx.load_verify_locations(CERTFILE, CAPATH)
867 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
868
869 # Issue #10989: crash if the second argument type is invalid
870 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
871
872 def test_load_verify_cadata(self):
873 # test cadata
874 with open(CAFILE_CACERT) as f:
875 cacert_pem = f.read().decode("ascii")
876 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
877 with open(CAFILE_NEURONIO) as f:
878 neuronio_pem = f.read().decode("ascii")
879 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
880
881 # test PEM
882 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
883 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
884 ctx.load_verify_locations(cadata=cacert_pem)
885 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
886 ctx.load_verify_locations(cadata=neuronio_pem)
887 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
888 # cert already in hash table
889 ctx.load_verify_locations(cadata=neuronio_pem)
890 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
891
892 # combined
893 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
894 combined = "\n".join((cacert_pem, neuronio_pem))
895 ctx.load_verify_locations(cadata=combined)
896 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
897
898 # with junk around the certs
899 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
900 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
901 neuronio_pem, "tail"]
902 ctx.load_verify_locations(cadata="\n".join(combined))
903 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
904
905 # test DER
906 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
907 ctx.load_verify_locations(cadata=cacert_der)
908 ctx.load_verify_locations(cadata=neuronio_der)
909 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
910 # cert already in hash table
911 ctx.load_verify_locations(cadata=cacert_der)
912 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
913
914 # combined
915 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
916 combined = b"".join((cacert_der, neuronio_der))
917 ctx.load_verify_locations(cadata=combined)
918 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
919
920 # error cases
921 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
922 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
923
924 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
925 ctx.load_verify_locations(cadata=u"broken")
926 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
927 ctx.load_verify_locations(cadata=b"broken")
928
929
930 def test_load_dh_params(self):
931 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
932 ctx.load_dh_params(DHFILE)
933 if os.name != 'nt':
934 ctx.load_dh_params(BYTES_DHFILE)
935 self.assertRaises(TypeError, ctx.load_dh_params)
936 self.assertRaises(TypeError, ctx.load_dh_params, None)
937 with self.assertRaises(IOError) as cm:
938 ctx.load_dh_params(WRONGCERT)
939 self.assertEqual(cm.exception.errno, errno.ENOENT)
940 with self.assertRaises(ssl.SSLError) as cm:
941 ctx.load_dh_params(CERTFILE)
942
943 @skip_if_broken_ubuntu_ssl
944 def test_session_stats(self):
945 for proto in PROTOCOLS:
946 ctx = ssl.SSLContext(proto)
947 self.assertEqual(ctx.session_stats(), {
948 'number': 0,
949 'connect': 0,
950 'connect_good': 0,
951 'connect_renegotiate': 0,
952 'accept': 0,
953 'accept_good': 0,
954 'accept_renegotiate': 0,
955 'hits': 0,
956 'misses': 0,
957 'timeouts': 0,
958 'cache_full': 0,
959 })
960
961 def test_set_default_verify_paths(self):
962 # There's not much we can do to test that it acts as expected,
963 # so just check it doesn't crash or raise an exception.
964 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
965 ctx.set_default_verify_paths()
966
967 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
968 def test_set_ecdh_curve(self):
969 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
970 ctx.set_ecdh_curve("prime256v1")
971 ctx.set_ecdh_curve(b"prime256v1")
972 self.assertRaises(TypeError, ctx.set_ecdh_curve)
973 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
974 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
975 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
976
977 @needs_sni
978 def test_sni_callback(self):
979 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
980
981 # set_servername_callback expects a callable, or None
982 self.assertRaises(TypeError, ctx.set_servername_callback)
983 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
984 self.assertRaises(TypeError, ctx.set_servername_callback, "")
985 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
986
987 def dummycallback(sock, servername, ctx):
988 pass
989 ctx.set_servername_callback(None)
990 ctx.set_servername_callback(dummycallback)
991
992 @needs_sni
993 def test_sni_callback_refcycle(self):
994 # Reference cycles through the servername callback are detected
995 # and cleared.
996 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
997 def dummycallback(sock, servername, ctx, cycle=ctx):
998 pass
999 ctx.set_servername_callback(dummycallback)
1000 wr = weakref.ref(ctx)
1001 del ctx, dummycallback
1002 gc.collect()
1003 self.assertIs(wr(), None)
1004
1005 def test_cert_store_stats(self):
1006 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1007 self.assertEqual(ctx.cert_store_stats(),
1008 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1009 ctx.load_cert_chain(CERTFILE)
1010 self.assertEqual(ctx.cert_store_stats(),
1011 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1012 ctx.load_verify_locations(CERTFILE)
1013 self.assertEqual(ctx.cert_store_stats(),
1014 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1015 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1016 self.assertEqual(ctx.cert_store_stats(),
1017 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1018
1019 def test_get_ca_certs(self):
1020 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1021 self.assertEqual(ctx.get_ca_certs(), [])
1022 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1023 ctx.load_verify_locations(CERTFILE)
1024 self.assertEqual(ctx.get_ca_certs(), [])
1025 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1026 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1027 self.assertEqual(ctx.get_ca_certs(),
1028 [{'issuer': ((('organizationName', 'Root CA'),),
1029 (('organizationalUnitName', 'http://www.cacert.org'),),
1030 (('commonName', 'CA Cert Signing Authority'),),
1031 (('emailAddress', 'support@cacert.org'),)),
1032 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1033 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1034 'serialNumber': '00',
1035 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1036 'subject': ((('organizationName', 'Root CA'),),
1037 (('organizationalUnitName', 'http://www.cacert.org'),),
1038 (('commonName', 'CA Cert Signing Authority'),),
1039 (('emailAddress', 'support@cacert.org'),)),
1040 'version': 3}])
1041
1042 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1043 pem = f.read()
1044 der = ssl.PEM_cert_to_DER_cert(pem)
1045 self.assertEqual(ctx.get_ca_certs(True), [der])
1046
1047 def test_load_default_certs(self):
1048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1049 ctx.load_default_certs()
1050
1051 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1052 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1053 ctx.load_default_certs()
1054
1055 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1056 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1057
1058 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1059 self.assertRaises(TypeError, ctx.load_default_certs, None)
1060 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1061
Benjamin Petersona02ae252014-10-03 18:17:15 -04001062 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson0b30a2b2014-10-03 17:27:05 -04001063 def test_load_default_certs_env(self):
1064 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1065 with support.EnvironmentVarGuard() as env:
1066 env["SSL_CERT_DIR"] = CAPATH
1067 env["SSL_CERT_FILE"] = CERTFILE
1068 ctx.load_default_certs()
1069 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1070
Benjamin Petersona02ae252014-10-03 18:17:15 -04001071 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1072 def test_load_default_certs_env_windows(self):
1073 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1074 ctx.load_default_certs()
1075 stats = ctx.cert_store_stats()
1076
1077 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1078 with support.EnvironmentVarGuard() as env:
1079 env["SSL_CERT_DIR"] = CAPATH
1080 env["SSL_CERT_FILE"] = CERTFILE
1081 ctx.load_default_certs()
1082 stats["x509"] += 1
1083 self.assertEqual(ctx.cert_store_stats(), stats)
1084
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001085 def test_create_default_context(self):
1086 ctx = ssl.create_default_context()
1087 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1088 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1089 self.assertTrue(ctx.check_hostname)
1090 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1091 self.assertEqual(
1092 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1093 getattr(ssl, "OP_NO_COMPRESSION", 0),
1094 )
1095
1096 with open(SIGNING_CA) as f:
1097 cadata = f.read().decode("ascii")
1098 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1099 cadata=cadata)
1100 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1101 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1102 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1103 self.assertEqual(
1104 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1105 getattr(ssl, "OP_NO_COMPRESSION", 0),
1106 )
1107
1108 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1109 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1110 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1111 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1112 self.assertEqual(
1113 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1114 getattr(ssl, "OP_NO_COMPRESSION", 0),
1115 )
1116 self.assertEqual(
1117 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1118 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1119 )
1120 self.assertEqual(
1121 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1122 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1123 )
1124
1125 def test__create_stdlib_context(self):
1126 ctx = ssl._create_stdlib_context()
1127 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1128 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1129 self.assertFalse(ctx.check_hostname)
1130 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1131
1132 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1133 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1134 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1135 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1136
1137 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1138 cert_reqs=ssl.CERT_REQUIRED,
1139 check_hostname=True)
1140 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1141 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1142 self.assertTrue(ctx.check_hostname)
1143 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1144
1145 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1146 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1147 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1148 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1149
1150 def test_check_hostname(self):
1151 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1152 self.assertFalse(ctx.check_hostname)
1153
1154 # Requires CERT_REQUIRED or CERT_OPTIONAL
1155 with self.assertRaises(ValueError):
1156 ctx.check_hostname = True
1157 ctx.verify_mode = ssl.CERT_REQUIRED
1158 self.assertFalse(ctx.check_hostname)
1159 ctx.check_hostname = True
1160 self.assertTrue(ctx.check_hostname)
1161
1162 ctx.verify_mode = ssl.CERT_OPTIONAL
1163 ctx.check_hostname = True
1164 self.assertTrue(ctx.check_hostname)
1165
1166 # Cannot set CERT_NONE with check_hostname enabled
1167 with self.assertRaises(ValueError):
1168 ctx.verify_mode = ssl.CERT_NONE
1169 ctx.check_hostname = False
1170 self.assertFalse(ctx.check_hostname)
1171
1172
1173class SSLErrorTests(unittest.TestCase):
1174
1175 def test_str(self):
1176 # The str() of a SSLError doesn't include the errno
1177 e = ssl.SSLError(1, "foo")
1178 self.assertEqual(str(e), "foo")
1179 self.assertEqual(e.errno, 1)
1180 # Same for a subclass
1181 e = ssl.SSLZeroReturnError(1, "foo")
1182 self.assertEqual(str(e), "foo")
1183 self.assertEqual(e.errno, 1)
1184
1185 def test_lib_reason(self):
1186 # Test the library and reason attributes
1187 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1188 with self.assertRaises(ssl.SSLError) as cm:
1189 ctx.load_dh_params(CERTFILE)
1190 self.assertEqual(cm.exception.library, 'PEM')
1191 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1192 s = str(cm.exception)
1193 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1194
1195 def test_subclass(self):
1196 # Check that the appropriate SSLError subclass is raised
1197 # (this only tests one of them)
1198 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1199 with closing(socket.socket()) as s:
1200 s.bind(("127.0.0.1", 0))
1201 s.listen(5)
1202 c = socket.socket()
1203 c.connect(s.getsockname())
1204 c.setblocking(False)
1205 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1206 with self.assertRaises(ssl.SSLWantReadError) as cm:
1207 c.do_handshake()
1208 s = str(cm.exception)
1209 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1210 # For compatibility
1211 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001212
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001213
Bill Janssen934b16d2008-06-28 22:19:33 +00001214class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001215
Antoine Pitrou3945c862010-04-28 21:11:01 +00001216 def test_connect(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001217 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001218 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1219 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001220 try:
1221 s.connect(("svn.python.org", 443))
1222 self.assertEqual({}, s.getpeercert())
1223 finally:
1224 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001225
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001226 # this should fail because we have no verification certs
1227 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1228 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001229 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1230 s.connect, ("svn.python.org", 443))
1231 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001232
1233 # this should succeed because we specify the root cert
1234 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1235 cert_reqs=ssl.CERT_REQUIRED,
1236 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1237 try:
1238 s.connect(("svn.python.org", 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001239 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001240 finally:
1241 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001242
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001243 def test_connect_ex(self):
1244 # Issue #11326: check connect_ex() implementation
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001245 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001246 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1247 cert_reqs=ssl.CERT_REQUIRED,
1248 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1249 try:
1250 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1251 self.assertTrue(s.getpeercert())
1252 finally:
1253 s.close()
1254
1255 def test_non_blocking_connect_ex(self):
1256 # Issue #11326: non-blocking connect_ex() should allow handshake
1257 # to proceed after the socket gets ready.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001258 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001259 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1260 cert_reqs=ssl.CERT_REQUIRED,
1261 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1262 do_handshake_on_connect=False)
1263 try:
1264 s.setblocking(False)
1265 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001266 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1267 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001268 # Wait for connect to finish
1269 select.select([], [s], [], 5.0)
1270 # Non-blocking handshake
1271 while True:
1272 try:
1273 s.do_handshake()
1274 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001275 except ssl.SSLWantReadError:
1276 select.select([s], [], [], 5.0)
1277 except ssl.SSLWantWriteError:
1278 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001279 # SSL established
1280 self.assertTrue(s.getpeercert())
1281 finally:
1282 s.close()
1283
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001284 def test_timeout_connect_ex(self):
1285 # Issue #12065: on a timeout, connect_ex() should return the original
1286 # errno (mimicking the behaviour of non-SSL sockets).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001287 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001288 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1289 cert_reqs=ssl.CERT_REQUIRED,
1290 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1291 do_handshake_on_connect=False)
1292 try:
1293 s.settimeout(0.0000001)
1294 rc = s.connect_ex(('svn.python.org', 443))
1295 if rc == 0:
1296 self.skipTest("svn.python.org responded too quickly")
1297 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1298 finally:
1299 s.close()
1300
1301 def test_connect_ex_error(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001302 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001303 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1304 cert_reqs=ssl.CERT_REQUIRED,
1305 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1306 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001307 rc = s.connect_ex(("svn.python.org", 444))
1308 # Issue #19919: Windows machines or VMs hosted on Windows
1309 # machines sometimes return EWOULDBLOCK.
1310 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001311 finally:
1312 s.close()
1313
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001314 def test_connect_with_context(self):
1315 with support.transient_internet("svn.python.org"):
1316 # Same as test_connect, but with a separately created context
1317 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1318 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1319 s.connect(("svn.python.org", 443))
1320 try:
1321 self.assertEqual({}, s.getpeercert())
1322 finally:
1323 s.close()
1324 # Same with a server hostname
1325 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1326 server_hostname="svn.python.org")
Benjamin Peterson31aa69e2014-11-23 20:13:31 -06001327 s.connect(("svn.python.org", 443))
1328 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001329 # This should fail because we have no verification certs
1330 ctx.verify_mode = ssl.CERT_REQUIRED
1331 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1332 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1333 s.connect, ("svn.python.org", 443))
1334 s.close()
1335 # This should succeed because we specify the root cert
1336 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1337 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1338 s.connect(("svn.python.org", 443))
1339 try:
1340 cert = s.getpeercert()
1341 self.assertTrue(cert)
1342 finally:
1343 s.close()
1344
1345 def test_connect_capath(self):
1346 # Verify server certificates using the `capath` argument
1347 # NOTE: the subject hashing algorithm has been changed between
1348 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1349 # contain both versions of each certificate (same content, different
1350 # filename) for this test to be portable across OpenSSL releases.
1351 with support.transient_internet("svn.python.org"):
1352 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1353 ctx.verify_mode = ssl.CERT_REQUIRED
1354 ctx.load_verify_locations(capath=CAPATH)
1355 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1356 s.connect(("svn.python.org", 443))
1357 try:
1358 cert = s.getpeercert()
1359 self.assertTrue(cert)
1360 finally:
1361 s.close()
1362 # Same with a bytes `capath` argument
1363 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1364 ctx.verify_mode = ssl.CERT_REQUIRED
1365 ctx.load_verify_locations(capath=BYTES_CAPATH)
1366 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1367 s.connect(("svn.python.org", 443))
1368 try:
1369 cert = s.getpeercert()
1370 self.assertTrue(cert)
1371 finally:
1372 s.close()
1373
1374 def test_connect_cadata(self):
1375 with open(CAFILE_CACERT) as f:
1376 pem = f.read().decode('ascii')
1377 der = ssl.PEM_cert_to_DER_cert(pem)
1378 with support.transient_internet("svn.python.org"):
1379 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1380 ctx.verify_mode = ssl.CERT_REQUIRED
1381 ctx.load_verify_locations(cadata=pem)
1382 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1383 s.connect(("svn.python.org", 443))
1384 cert = s.getpeercert()
1385 self.assertTrue(cert)
1386
1387 # same with DER
1388 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1389 ctx.verify_mode = ssl.CERT_REQUIRED
1390 ctx.load_verify_locations(cadata=der)
1391 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1392 s.connect(("svn.python.org", 443))
1393 cert = s.getpeercert()
1394 self.assertTrue(cert)
1395
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001396 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1397 def test_makefile_close(self):
1398 # Issue #5238: creating a file-like object with makefile() shouldn't
1399 # delay closing the underlying "real socket" (here tested with its
1400 # file descriptor, hence skipping the test under Windows).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001401 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001402 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1403 ss.connect(("svn.python.org", 443))
1404 fd = ss.fileno()
1405 f = ss.makefile()
1406 f.close()
1407 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001408 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001409 # Closing the SSL socket should close the fd too
1410 ss.close()
1411 gc.collect()
1412 with self.assertRaises(OSError) as e:
1413 os.read(fd, 0)
1414 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001415
Antoine Pitrou3945c862010-04-28 21:11:01 +00001416 def test_non_blocking_handshake(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001417 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001418 s = socket.socket(socket.AF_INET)
1419 s.connect(("svn.python.org", 443))
1420 s.setblocking(False)
1421 s = ssl.wrap_socket(s,
1422 cert_reqs=ssl.CERT_NONE,
1423 do_handshake_on_connect=False)
1424 count = 0
1425 while True:
1426 try:
1427 count += 1
1428 s.do_handshake()
1429 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001430 except ssl.SSLWantReadError:
1431 select.select([s], [], [])
1432 except ssl.SSLWantWriteError:
1433 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001434 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001435 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001436 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001437
Antoine Pitrou3945c862010-04-28 21:11:01 +00001438 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001439 def _test_get_server_certificate(host, port, cert=None):
1440 with support.transient_internet(host):
1441 pem = ssl.get_server_certificate((host, port))
1442 if not pem:
1443 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001444
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001445 try:
1446 pem = ssl.get_server_certificate((host, port),
1447 ca_certs=CERTFILE)
1448 except ssl.SSLError as x:
1449 #should fail
1450 if support.verbose:
1451 sys.stdout.write("%s\n" % x)
1452 else:
1453 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001454
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001455 pem = ssl.get_server_certificate((host, port),
1456 ca_certs=cert)
1457 if not pem:
1458 self.fail("No server certificate on %s:%s!" % (host, port))
1459 if support.verbose:
1460 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1461
1462 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1463 if support.IPV6_ENABLED:
1464 _test_get_server_certificate('ipv6.google.com', 443)
1465
1466 def test_ciphers(self):
1467 remote = ("svn.python.org", 443)
1468 with support.transient_internet(remote[0]):
1469 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1470 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1471 s.connect(remote)
1472 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1473 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1474 s.connect(remote)
1475 # Error checking can happen at instantiation or when connecting
1476 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1477 with closing(socket.socket(socket.AF_INET)) as sock:
1478 s = ssl.wrap_socket(sock,
1479 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1480 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001481
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001482 def test_algorithms(self):
1483 # Issue #8484: all algorithms should be available when verifying a
1484 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001485 # SHA256 was added in OpenSSL 0.9.8
1486 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1487 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001488 # sha256.tbs-internet.com needs SNI to use the correct certificate
1489 if not ssl.HAS_SNI:
1490 self.skipTest("SNI needed for this test")
1491 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001492 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001493 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001494 with support.transient_internet("sha256.tbs-internet.com"):
1495 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1496 ctx.verify_mode = ssl.CERT_REQUIRED
1497 ctx.load_verify_locations(sha256_cert)
1498 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1499 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001500 try:
1501 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001502 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001503 sys.stdout.write("\nCipher with %r is %r\n" %
1504 (remote, s.cipher()))
1505 sys.stdout.write("Certificate is:\n%s\n" %
1506 pprint.pformat(s.getpeercert()))
1507 finally:
1508 s.close()
1509
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001510 def test_get_ca_certs_capath(self):
1511 # capath certs are loaded on request
1512 with support.transient_internet("svn.python.org"):
1513 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1514 ctx.verify_mode = ssl.CERT_REQUIRED
1515 ctx.load_verify_locations(capath=CAPATH)
1516 self.assertEqual(ctx.get_ca_certs(), [])
1517 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1518 s.connect(("svn.python.org", 443))
1519 try:
1520 cert = s.getpeercert()
1521 self.assertTrue(cert)
1522 finally:
1523 s.close()
1524 self.assertEqual(len(ctx.get_ca_certs()), 1)
1525
1526 @needs_sni
1527 def test_context_setget(self):
1528 # Check that the context of a connected socket can be replaced.
1529 with support.transient_internet("svn.python.org"):
1530 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1531 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1532 s = socket.socket(socket.AF_INET)
1533 with closing(ctx1.wrap_socket(s)) as ss:
1534 ss.connect(("svn.python.org", 443))
1535 self.assertIs(ss.context, ctx1)
1536 self.assertIs(ss._sslobj.context, ctx1)
1537 ss.context = ctx2
1538 self.assertIs(ss.context, ctx2)
1539 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001540
Bill Janssen98d19da2007-09-10 21:51:02 +00001541try:
1542 import threading
1543except ImportError:
1544 _have_threads = False
1545else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001546 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001547
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001548 from test.ssl_servers import make_https_server
1549
Bill Janssen98d19da2007-09-10 21:51:02 +00001550 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001551
Bill Janssen98d19da2007-09-10 21:51:02 +00001552 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001553
Bill Janssen98d19da2007-09-10 21:51:02 +00001554 """A mildly complicated class, because we want it to work both
1555 with and without the SSL wrapper around the socket connection, so
1556 that we can test the STARTTLS functionality."""
1557
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001558 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001559 self.server = server
1560 self.running = False
1561 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001562 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001563 self.sock.setblocking(1)
1564 self.sslconn = None
1565 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001566 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001567
Antoine Pitrou3945c862010-04-28 21:11:01 +00001568 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001569 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001570 self.sslconn = self.server.context.wrap_socket(
1571 self.sock, server_side=True)
1572 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
1573 except socket.error as e:
1574 # We treat ConnectionResetError as though it were an
1575 # SSLError - OpenSSL on Ubuntu abruptly closes the
1576 # connection when asked to use an unsupported protocol.
1577 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001578 # XXX Various errors can have happened here, for example
1579 # a mismatching protocol version, an invalid certificate,
1580 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001581 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1582 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001583 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001584 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001585 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001586 self.running = False
1587 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001588 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001589 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001590 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001591 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1592 cert = self.sslconn.getpeercert()
1593 if support.verbose and self.server.chatty:
1594 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1595 cert_binary = self.sslconn.getpeercert(True)
1596 if support.verbose and self.server.chatty:
1597 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1598 cipher = self.sslconn.cipher()
1599 if support.verbose and self.server.chatty:
1600 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1601 sys.stdout.write(" server: selected protocol is now "
1602 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001603 return True
1604
1605 def read(self):
1606 if self.sslconn:
1607 return self.sslconn.read()
1608 else:
1609 return self.sock.recv(1024)
1610
1611 def write(self, bytes):
1612 if self.sslconn:
1613 return self.sslconn.write(bytes)
1614 else:
1615 return self.sock.send(bytes)
1616
1617 def close(self):
1618 if self.sslconn:
1619 self.sslconn.close()
1620 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001621 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001622
Antoine Pitrou3945c862010-04-28 21:11:01 +00001623 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001624 self.running = True
1625 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001626 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001627 return
1628 while self.running:
1629 try:
1630 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001631 stripped = msg.strip()
1632 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001633 # eof, so quit this handler
1634 self.running = False
1635 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001636 elif stripped == b'over':
1637 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001638 sys.stdout.write(" server: client closed connection\n")
1639 self.close()
1640 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001641 elif (self.server.starttls_server and
1642 stripped == b'STARTTLS'):
1643 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001644 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001645 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001646 if not self.wrap_conn():
1647 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001648 elif (self.server.starttls_server and self.sslconn
1649 and stripped == b'ENDTLS'):
1650 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001651 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001652 self.write(b"OK\n")
1653 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001654 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001655 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001656 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001657 elif stripped == b'CB tls-unique':
1658 if support.verbose and self.server.connectionchatty:
1659 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1660 data = self.sslconn.get_channel_binding("tls-unique")
1661 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001662 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001663 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001664 self.server.connectionchatty):
1665 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001666 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1667 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001668 self.write(msg.lower())
1669 except ssl.SSLError:
1670 if self.server.chatty:
1671 handle_error("Test server failure:\n")
1672 self.close()
1673 self.running = False
1674 # normally, we'd just stop here, but for the test
1675 # harness, we want to stop the server
1676 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001677
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001678 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001679 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001680 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001681 npn_protocols=None, ciphers=None, context=None):
1682 if context:
1683 self.context = context
1684 else:
1685 self.context = ssl.SSLContext(ssl_version
1686 if ssl_version is not None
1687 else ssl.PROTOCOL_TLSv1)
1688 self.context.verify_mode = (certreqs if certreqs is not None
1689 else ssl.CERT_NONE)
1690 if cacerts:
1691 self.context.load_verify_locations(cacerts)
1692 if certificate:
1693 self.context.load_cert_chain(certificate)
1694 if npn_protocols:
1695 self.context.set_npn_protocols(npn_protocols)
1696 if ciphers:
1697 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001698 self.chatty = chatty
1699 self.connectionchatty = connectionchatty
1700 self.starttls_server = starttls_server
1701 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001702 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001703 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001704 self.active = False
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001705 self.selected_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001706 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001707 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001708 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001709
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001710 def __enter__(self):
1711 self.start(threading.Event())
1712 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001713 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001714
1715 def __exit__(self, *args):
1716 self.stop()
1717 self.join()
1718
Antoine Pitrou3945c862010-04-28 21:11:01 +00001719 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001720 self.flag = flag
1721 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001722
Antoine Pitrou3945c862010-04-28 21:11:01 +00001723 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001724 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001725 self.sock.listen(5)
1726 self.active = True
1727 if self.flag:
1728 # signal an event
1729 self.flag.set()
1730 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001731 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001732 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001733 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001734 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001735 + repr(connaddr) + '\n')
1736 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001737 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001738 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001739 except socket.timeout:
1740 pass
1741 except KeyboardInterrupt:
1742 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001743 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001744
Antoine Pitrou3945c862010-04-28 21:11:01 +00001745 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001746 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001747
Bill Janssen934b16d2008-06-28 22:19:33 +00001748 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001749
Antoine Pitrou3945c862010-04-28 21:11:01 +00001750 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001751
Antoine Pitrou3945c862010-04-28 21:11:01 +00001752 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001753
1754 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001755 self.socket = ssl.wrap_socket(conn, server_side=True,
1756 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001757 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001758 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001759 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001760 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001761
1762 def readable(self):
1763 if isinstance(self.socket, ssl.SSLSocket):
1764 while self.socket.pending() > 0:
1765 self.handle_read_event()
1766 return True
1767
Antoine Pitroufc69af12010-04-24 20:04:58 +00001768 def _do_ssl_handshake(self):
1769 try:
1770 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001771 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1772 return
1773 except ssl.SSLEOFError:
1774 return self.handle_close()
1775 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001776 raise
1777 except socket.error, err:
1778 if err.args[0] == errno.ECONNABORTED:
1779 return self.handle_close()
1780 else:
1781 self._ssl_accepting = False
1782
Bill Janssen934b16d2008-06-28 22:19:33 +00001783 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001784 if self._ssl_accepting:
1785 self._do_ssl_handshake()
1786 else:
1787 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001788 if support.verbose:
1789 sys.stdout.write(" server: read %s from client\n" % repr(data))
1790 if not data:
1791 self.close()
1792 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001793 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001794
1795 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001796 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001797 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001798 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1799
1800 def handle_error(self):
1801 raise
1802
1803 def __init__(self, certfile):
1804 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001805 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1806 self.port = support.bind_port(sock, '')
1807 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001808 self.listen(5)
1809
1810 def handle_accept(self):
1811 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001812 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001813 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1814 self.ConnectionHandler(sock_obj, self.certfile)
1815
1816 def handle_error(self):
1817 raise
1818
1819 def __init__(self, certfile):
1820 self.flag = None
1821 self.active = False
1822 self.server = self.EchoServer(certfile)
1823 self.port = self.server.port
1824 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001825 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001826
1827 def __str__(self):
1828 return "<%s %s>" % (self.__class__.__name__, self.server)
1829
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001830 def __enter__(self):
1831 self.start(threading.Event())
1832 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001833 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001834
1835 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001836 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001837 sys.stdout.write(" cleanup: stopping server.\n")
1838 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001839 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001840 sys.stdout.write(" cleanup: joining server thread.\n")
1841 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001842 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001843 sys.stdout.write(" cleanup: successfully joined.\n")
1844
Antoine Pitrou3945c862010-04-28 21:11:01 +00001845 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001846 self.flag = flag
1847 threading.Thread.start(self)
1848
Antoine Pitrou3945c862010-04-28 21:11:01 +00001849 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001850 self.active = True
1851 if self.flag:
1852 self.flag.set()
1853 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001854 try:
1855 asyncore.loop(1)
1856 except:
1857 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001858
Antoine Pitrou3945c862010-04-28 21:11:01 +00001859 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001860 self.active = False
1861 self.server.close()
1862
Antoine Pitrou3945c862010-04-28 21:11:01 +00001863 def bad_cert_test(certfile):
1864 """
1865 Launch a server with CERT_REQUIRED, and check that trying to
1866 connect to it with the given client certificate fails.
1867 """
Trent Nelsone41b0062008-04-08 23:47:30 +00001868 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00001869 certreqs=ssl.CERT_REQUIRED,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001870 cacerts=CERTFILE, chatty=False,
1871 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001872 with server:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001873 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001874 with closing(socket.socket()) as sock:
1875 s = ssl.wrap_socket(sock,
1876 certfile=certfile,
1877 ssl_version=ssl.PROTOCOL_TLSv1)
1878 s.connect((HOST, server.port))
1879 except ssl.SSLError as x:
1880 if support.verbose:
1881 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1882 except OSError as x:
1883 if support.verbose:
1884 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1885 except OSError as x:
1886 if x.errno != errno.ENOENT:
1887 raise
1888 if support.verbose:
1889 sys.stdout.write("\OSError is %s\n" % str(x))
Bill Janssen98d19da2007-09-10 21:51:02 +00001890 else:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001891 raise AssertionError("Use of invalid cert should have failed!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001892
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001893 def server_params_test(client_context, server_context, indata=b"FOO\n",
1894 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001895 """
1896 Launch a server, connect a client to it and try various reads
1897 and writes.
1898 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001899 stats = {}
1900 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001901 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001902 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001903 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001904 with closing(client_context.wrap_socket(socket.socket(),
1905 server_hostname=sni_name)) as s:
1906 s.connect((HOST, server.port))
1907 for arg in [indata, bytearray(indata), memoryview(indata)]:
1908 if connectionchatty:
1909 if support.verbose:
1910 sys.stdout.write(
1911 " client: sending %r...\n" % indata)
1912 s.write(arg)
1913 outdata = s.read()
1914 if connectionchatty:
1915 if support.verbose:
1916 sys.stdout.write(" client: read %r\n" % outdata)
1917 if outdata != indata.lower():
1918 raise AssertionError(
1919 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1920 % (outdata[:20], len(outdata),
1921 indata[:20].lower(), len(indata)))
1922 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001923 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001924 if support.verbose:
1925 sys.stdout.write(" client: closing connection.\n")
1926 stats.update({
1927 'compression': s.compression(),
1928 'cipher': s.cipher(),
1929 'peercert': s.getpeercert(),
Alex Gaynore98205d2014-09-04 13:33:22 -07001930 'client_npn_protocol': s.selected_npn_protocol(),
1931 'version': s.version(),
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001932 })
1933 s.close()
1934 stats['server_npn_protocols'] = server.selected_protocols
1935 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001936
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001937 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1938 certsreqs=None, server_options=0, client_options=0):
Alex Gaynore98205d2014-09-04 13:33:22 -07001939 """
1940 Try to SSL-connect using *client_protocol* to *server_protocol*.
1941 If *expect_success* is true, assert that the connection succeeds,
1942 if it's false, assert that the connection fails.
1943 Also, if *expect_success* is a string, assert that it is the protocol
1944 version actually used by the connection.
1945 """
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00001946 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00001947 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00001948 certtype = {
1949 ssl.CERT_NONE: "CERT_NONE",
1950 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1951 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1952 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001953 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001954 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00001955 sys.stdout.write(formatstr %
1956 (ssl.get_protocol_name(client_protocol),
1957 ssl.get_protocol_name(server_protocol),
1958 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001959 client_context = ssl.SSLContext(client_protocol)
1960 client_context.options |= client_options
1961 server_context = ssl.SSLContext(server_protocol)
1962 server_context.options |= server_options
1963
1964 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1965 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1966 # starting from OpenSSL 1.0.0 (see issue #8322).
1967 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1968 client_context.set_ciphers("ALL")
1969
1970 for ctx in (client_context, server_context):
1971 ctx.verify_mode = certsreqs
1972 ctx.load_cert_chain(CERTFILE)
1973 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00001974 try:
Alex Gaynore98205d2014-09-04 13:33:22 -07001975 stats = server_params_test(client_context, server_context,
1976 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001977 # Protocol mismatch can result in either an SSLError, or a
1978 # "Connection reset by peer" error.
1979 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001980 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00001981 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00001982 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001983 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00001984 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00001985 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001986 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001987 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00001988 "Client protocol %s succeeded with server protocol %s!"
1989 % (ssl.get_protocol_name(client_protocol),
1990 ssl.get_protocol_name(server_protocol)))
Alex Gaynore98205d2014-09-04 13:33:22 -07001991 elif (expect_success is not True
1992 and expect_success != stats['version']):
1993 raise AssertionError("version mismatch: expected %r, got %r"
1994 % (expect_success, stats['version']))
Bill Janssen98d19da2007-09-10 21:51:02 +00001995
1996
Bill Janssen934b16d2008-06-28 22:19:33 +00001997 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00001998
Antoine Pitroud75efd92010-08-04 17:38:33 +00001999 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002000 def test_echo(self):
2001 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002002 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002003 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002004 for protocol in PROTOCOLS:
2005 context = ssl.SSLContext(protocol)
2006 context.load_cert_chain(CERTFILE)
2007 server_params_test(context, context,
2008 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002009
Antoine Pitrou3945c862010-04-28 21:11:01 +00002010 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002011 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002012 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002013 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2014 context.verify_mode = ssl.CERT_REQUIRED
2015 context.load_verify_locations(CERTFILE)
2016 context.load_cert_chain(CERTFILE)
2017 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002018 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002019 s = context.wrap_socket(socket.socket(),
2020 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00002021 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002022 # getpeercert() raise ValueError while the handshake isn't
2023 # done.
2024 with self.assertRaises(ValueError):
2025 s.getpeercert()
2026 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00002027 cert = s.getpeercert()
2028 self.assertTrue(cert, "Can't get peer certificate.")
2029 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002030 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002031 sys.stdout.write(pprint.pformat(cert) + '\n')
2032 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2033 if 'subject' not in cert:
2034 self.fail("No subject field in certificate: %s." %
2035 pprint.pformat(cert))
2036 if ((('organizationName', 'Python Software Foundation'),)
2037 not in cert['subject']):
2038 self.fail(
2039 "Missing or invalid 'organizationName' field in certificate subject; "
2040 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002041 self.assertIn('notBefore', cert)
2042 self.assertIn('notAfter', cert)
2043 before = ssl.cert_time_to_seconds(cert['notBefore'])
2044 after = ssl.cert_time_to_seconds(cert['notAfter'])
2045 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002046 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002047
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002048 @unittest.skipUnless(have_verify_flags(),
2049 "verify_flags need OpenSSL > 0.9.8")
2050 def test_crl_check(self):
2051 if support.verbose:
2052 sys.stdout.write("\n")
2053
2054 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2055 server_context.load_cert_chain(SIGNED_CERTFILE)
2056
2057 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2058 context.verify_mode = ssl.CERT_REQUIRED
2059 context.load_verify_locations(SIGNING_CA)
2060 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
2061
2062 # VERIFY_DEFAULT should pass
2063 server = ThreadedEchoServer(context=server_context, chatty=True)
2064 with server:
2065 with closing(context.wrap_socket(socket.socket())) as s:
2066 s.connect((HOST, server.port))
2067 cert = s.getpeercert()
2068 self.assertTrue(cert, "Can't get peer certificate.")
2069
2070 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2071 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2072
2073 server = ThreadedEchoServer(context=server_context, chatty=True)
2074 with server:
2075 with closing(context.wrap_socket(socket.socket())) as s:
2076 with self.assertRaisesRegexp(ssl.SSLError,
2077 "certificate verify failed"):
2078 s.connect((HOST, server.port))
2079
2080 # now load a CRL file. The CRL file is signed by the CA.
2081 context.load_verify_locations(CRLFILE)
2082
2083 server = ThreadedEchoServer(context=server_context, chatty=True)
2084 with server:
2085 with closing(context.wrap_socket(socket.socket())) as s:
2086 s.connect((HOST, server.port))
2087 cert = s.getpeercert()
2088 self.assertTrue(cert, "Can't get peer certificate.")
2089
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002090 def test_check_hostname(self):
2091 if support.verbose:
2092 sys.stdout.write("\n")
2093
2094 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2095 server_context.load_cert_chain(SIGNED_CERTFILE)
2096
2097 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2098 context.verify_mode = ssl.CERT_REQUIRED
2099 context.check_hostname = True
2100 context.load_verify_locations(SIGNING_CA)
2101
2102 # correct hostname should verify
2103 server = ThreadedEchoServer(context=server_context, chatty=True)
2104 with server:
2105 with closing(context.wrap_socket(socket.socket(),
2106 server_hostname="localhost")) as s:
2107 s.connect((HOST, server.port))
2108 cert = s.getpeercert()
2109 self.assertTrue(cert, "Can't get peer certificate.")
2110
2111 # incorrect hostname should raise an exception
2112 server = ThreadedEchoServer(context=server_context, chatty=True)
2113 with server:
2114 with closing(context.wrap_socket(socket.socket(),
2115 server_hostname="invalid")) as s:
2116 with self.assertRaisesRegexp(ssl.CertificateError,
2117 "hostname 'invalid' doesn't match u?'localhost'"):
2118 s.connect((HOST, server.port))
2119
2120 # missing server_hostname arg should cause an exception, too
2121 server = ThreadedEchoServer(context=server_context, chatty=True)
2122 with server:
2123 with closing(socket.socket()) as s:
2124 with self.assertRaisesRegexp(ValueError,
2125 "check_hostname requires server_hostname"):
2126 context.wrap_socket(s)
2127
Antoine Pitrou3945c862010-04-28 21:11:01 +00002128 def test_empty_cert(self):
2129 """Connecting with an empty cert file"""
2130 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2131 "nullcert.pem"))
2132 def test_malformed_cert(self):
2133 """Connecting with a badly formatted certificate (syntax error)"""
2134 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2135 "badcert.pem"))
2136 def test_nonexisting_cert(self):
2137 """Connecting with a non-existing cert file"""
2138 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2139 "wrongcert.pem"))
2140 def test_malformed_key(self):
2141 """Connecting with a badly formatted key (syntax error)"""
2142 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2143 "badkey.pem"))
Bill Janssen98d19da2007-09-10 21:51:02 +00002144
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002145 def test_rude_shutdown(self):
2146 """A brutal shutdown of an SSL server should raise an OSError
2147 in the client when attempting handshake.
2148 """
2149 listener_ready = threading.Event()
2150 listener_gone = threading.Event()
2151
2152 s = socket.socket()
2153 port = support.bind_port(s, HOST)
2154
2155 # `listener` runs in a thread. It sits in an accept() until
2156 # the main thread connects. Then it rudely closes the socket,
2157 # and sets Event `listener_gone` to let the main thread know
2158 # the socket is gone.
2159 def listener():
2160 s.listen(5)
2161 listener_ready.set()
2162 newsock, addr = s.accept()
2163 newsock.close()
2164 s.close()
2165 listener_gone.set()
2166
2167 def connector():
2168 listener_ready.wait()
2169 with closing(socket.socket()) as c:
2170 c.connect((HOST, port))
2171 listener_gone.wait()
2172 try:
2173 ssl_sock = ssl.wrap_socket(c)
Benjamin Petersone208b572014-08-20 14:49:08 -05002174 except socket.error:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002175 pass
2176 else:
2177 self.fail('connecting to closed SSL socket should have failed')
2178
2179 t = threading.Thread(target=listener)
2180 t.start()
2181 try:
2182 connector()
2183 finally:
2184 t.join()
2185
Antoine Pitroud75efd92010-08-04 17:38:33 +00002186 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002187 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2188 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002189 def test_protocol_sslv2(self):
2190 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002191 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002192 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002193 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2194 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2195 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002196 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002197 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2198 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002199 # SSLv23 client with specific SSL options
2200 if no_sslv2_implies_sslv3_hello():
2201 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2202 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2203 client_options=ssl.OP_NO_SSLv2)
2204 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2205 client_options=ssl.OP_NO_SSLv3)
2206 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2207 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002208
Antoine Pitroud75efd92010-08-04 17:38:33 +00002209 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002210 def test_protocol_sslv23(self):
2211 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002212 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002213 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002214 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2215 try:
2216 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2217 except socket.error as x:
2218 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2219 if support.verbose:
2220 sys.stdout.write(
2221 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2222 % str(x))
Benjamin Peterson60766c42014-12-05 21:59:35 -05002223 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2224 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou3945c862010-04-28 21:11:01 +00002225 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Alex Gaynore98205d2014-09-04 13:33:22 -07002226 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Bill Janssen98d19da2007-09-10 21:51:02 +00002227
Benjamin Peterson60766c42014-12-05 21:59:35 -05002228 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2229 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002230 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Alex Gaynore98205d2014-09-04 13:33:22 -07002231 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002232
Benjamin Peterson60766c42014-12-05 21:59:35 -05002233 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2234 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002235 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Alex Gaynore98205d2014-09-04 13:33:22 -07002236 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002237
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002238 # Server with specific SSL options
Benjamin Peterson60766c42014-12-05 21:59:35 -05002239 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2240 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002241 server_options=ssl.OP_NO_SSLv3)
2242 # Will choose TLSv1
2243 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2244 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2245 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2246 server_options=ssl.OP_NO_TLSv1)
2247
2248
Antoine Pitroud75efd92010-08-04 17:38:33 +00002249 @skip_if_broken_ubuntu_ssl
Benjamin Peterson60766c42014-12-05 21:59:35 -05002250 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2251 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002252 def test_protocol_sslv3(self):
2253 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002254 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002255 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002256 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2257 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2258 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002259 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2260 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002261 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2262 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002263 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002264 if no_sslv2_implies_sslv3_hello():
2265 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Alex Gaynore98205d2014-09-04 13:33:22 -07002266 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002267 client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002268
Antoine Pitroud75efd92010-08-04 17:38:33 +00002269 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002270 def test_protocol_tlsv1(self):
2271 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002272 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002273 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002274 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2275 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2276 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002277 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2278 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002279 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2280 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002281 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2282 client_options=ssl.OP_NO_TLSv1)
2283
2284 @skip_if_broken_ubuntu_ssl
2285 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2286 "TLS version 1.1 not supported.")
2287 def test_protocol_tlsv1_1(self):
2288 """Connecting to a TLSv1.1 server with various client options.
2289 Testing against older TLS versions."""
2290 if support.verbose:
2291 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002292 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002293 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2294 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002295 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2296 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002297 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2298 client_options=ssl.OP_NO_TLSv1_1)
2299
Alex Gaynore98205d2014-09-04 13:33:22 -07002300 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002301 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2302 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2303
2304
2305 @skip_if_broken_ubuntu_ssl
2306 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2307 "TLS version 1.2 not supported.")
2308 def test_protocol_tlsv1_2(self):
2309 """Connecting to a TLSv1.2 server with various client options.
2310 Testing against older TLS versions."""
2311 if support.verbose:
2312 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002313 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002314 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2315 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2316 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2317 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002318 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2319 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002320 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2321 client_options=ssl.OP_NO_TLSv1_2)
2322
Alex Gaynore98205d2014-09-04 13:33:22 -07002323 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002324 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2325 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2326 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2327 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002328
Antoine Pitrou3945c862010-04-28 21:11:01 +00002329 def test_starttls(self):
2330 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002331 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002332
Trent Nelsone41b0062008-04-08 23:47:30 +00002333 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002334 ssl_version=ssl.PROTOCOL_TLSv1,
2335 starttls_server=True,
2336 chatty=True,
2337 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002338 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002339 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002340 s = socket.socket()
2341 s.setblocking(1)
2342 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002343 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002344 sys.stdout.write("\n")
2345 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002346 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002347 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002348 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002349 if wrapped:
2350 conn.write(indata)
2351 outdata = conn.read()
2352 else:
2353 s.send(indata)
2354 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002355 msg = outdata.strip().lower()
2356 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002357 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002358 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002359 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002360 " client: read %r from server, starting TLS...\n"
2361 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002362 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2363 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002364 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002365 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002366 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002367 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002368 " client: read %r from server, ending TLS...\n"
2369 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002370 s = conn.unwrap()
2371 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002372 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002373 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002374 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002375 " client: read %r from server\n" % msg)
2376 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002377 sys.stdout.write(" client: closing connection.\n")
2378 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002379 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002380 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002381 s.send(b"over\n")
2382 if wrapped:
2383 conn.close()
2384 else:
2385 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002386
Antoine Pitrou3945c862010-04-28 21:11:01 +00002387 def test_socketserver(self):
2388 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002389 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002390 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002391 if support.verbose:
2392 sys.stdout.write('\n')
2393 with open(CERTFILE, 'rb') as f:
2394 d1 = f.read()
2395 d2 = ''
2396 # now fetch the same data from the HTTPS server
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -06002397 url = 'https://localhost:%d/%s' % (
2398 server.port, os.path.split(CERTFILE)[1])
2399 context = ssl.create_default_context(cafile=CERTFILE)
Benjamin Petersonb0609ec2014-11-23 11:52:46 -06002400 f = urllib2.urlopen(url, context=context)
Bill Janssen296a59d2007-09-16 22:06:00 +00002401 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002402 dlen = f.info().getheader("content-length")
2403 if dlen and (int(dlen) > 0):
2404 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002405 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002406 sys.stdout.write(
2407 " client: read %d bytes from remote server '%s'\n"
2408 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002409 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002410 f.close()
2411 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002412
Antoine Pitrou3945c862010-04-28 21:11:01 +00002413 def test_asyncore_server(self):
2414 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002415 indata = "TEST MESSAGE of mixed case\n"
2416
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002417 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002418 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002419
2420 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002421 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002422 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002423 s = ssl.wrap_socket(socket.socket())
2424 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002425 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002426 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002427 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002428 s.write(indata)
2429 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002430 if support.verbose:
2431 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002432 if outdata != indata.lower():
2433 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002434 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2435 % (outdata[:20], len(outdata),
2436 indata[:20].lower(), len(indata)))
2437 s.write(b"over\n")
2438 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002439 sys.stdout.write(" client: closing connection.\n")
2440 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002441 if support.verbose:
2442 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002443
Antoine Pitrou3945c862010-04-28 21:11:01 +00002444 def test_recv_send(self):
2445 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002446 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002447 sys.stdout.write("\n")
2448
2449 server = ThreadedEchoServer(CERTFILE,
2450 certreqs=ssl.CERT_NONE,
2451 ssl_version=ssl.PROTOCOL_TLSv1,
2452 cacerts=CERTFILE,
2453 chatty=True,
2454 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002455 with server:
2456 s = ssl.wrap_socket(socket.socket(),
2457 server_side=False,
2458 certfile=CERTFILE,
2459 ca_certs=CERTFILE,
2460 cert_reqs=ssl.CERT_NONE,
2461 ssl_version=ssl.PROTOCOL_TLSv1)
2462 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002463 # helper methods for standardising recv* method signatures
2464 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002465 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002466 count = s.recv_into(b)
2467 return b[:count]
2468
2469 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002470 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002471 count, addr = s.recvfrom_into(b)
2472 return b[:count]
2473
2474 # (name, method, whether to expect success, *args)
2475 send_methods = [
2476 ('send', s.send, True, []),
2477 ('sendto', s.sendto, False, ["some.address"]),
2478 ('sendall', s.sendall, True, []),
2479 ]
2480 recv_methods = [
2481 ('recv', s.recv, True, []),
2482 ('recvfrom', s.recvfrom, False, ["some.address"]),
2483 ('recv_into', _recv_into, True, []),
2484 ('recvfrom_into', _recvfrom_into, False, []),
2485 ]
2486 data_prefix = u"PREFIX_"
2487
2488 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002489 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002490 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002491 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002492 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002493 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002494 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002495 "While sending with <<{name:s}>> bad data "
2496 "<<{outdata:r}>> ({nout:d}) received; "
2497 "expected <<{indata:r}>> ({nin:d})\n".format(
2498 name=meth_name, outdata=outdata[:20],
2499 nout=len(outdata),
2500 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002501 )
2502 )
2503 except ValueError as e:
2504 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002505 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002506 "Failed to send with method <<{name:s}>>; "
2507 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002508 )
2509 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002510 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002511 "Method <<{name:s}>> failed with unexpected "
2512 "exception message: {exp:s}\n".format(
2513 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002514 )
2515 )
2516
2517 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002518 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002519 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002520 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002521 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002522 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002523 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002524 "While receiving with <<{name:s}>> bad data "
2525 "<<{outdata:r}>> ({nout:d}) received; "
2526 "expected <<{indata:r}>> ({nin:d})\n".format(
2527 name=meth_name, outdata=outdata[:20],
2528 nout=len(outdata),
2529 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002530 )
2531 )
2532 except ValueError as e:
2533 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002534 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002535 "Failed to receive with method <<{name:s}>>; "
2536 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002537 )
2538 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002539 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002540 "Method <<{name:s}>> failed with unexpected "
2541 "exception message: {exp:s}\n".format(
2542 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002543 )
2544 )
2545 # consume data
2546 s.read()
2547
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002548 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002549 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002550
Antoine Pitroufc69af12010-04-24 20:04:58 +00002551 def test_handshake_timeout(self):
2552 # Issue #5103: SSL handshake must respect the socket timeout
2553 server = socket.socket(socket.AF_INET)
2554 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002555 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002556 started = threading.Event()
2557 finish = False
2558
2559 def serve():
2560 server.listen(5)
2561 started.set()
2562 conns = []
2563 while not finish:
2564 r, w, e = select.select([server], [], [], 0.1)
2565 if server in r:
2566 # Let the socket hang around rather than having
2567 # it closed by garbage collection.
2568 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002569 for sock in conns:
2570 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002571
2572 t = threading.Thread(target=serve)
2573 t.start()
2574 started.wait()
2575
2576 try:
2577 try:
2578 c = socket.socket(socket.AF_INET)
2579 c.settimeout(0.2)
2580 c.connect((host, port))
2581 # Will attempt handshake and time out
2582 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2583 ssl.wrap_socket, c)
2584 finally:
2585 c.close()
2586 try:
2587 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002588 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002589 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002590 # Will attempt handshake and time out
2591 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2592 c.connect, (host, port))
2593 finally:
2594 c.close()
2595 finally:
2596 finish = True
2597 t.join()
2598 server.close()
2599
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002600 def test_server_accept(self):
2601 # Issue #16357: accept() on a SSLSocket created through
2602 # SSLContext.wrap_socket().
2603 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2604 context.verify_mode = ssl.CERT_REQUIRED
2605 context.load_verify_locations(CERTFILE)
2606 context.load_cert_chain(CERTFILE)
2607 server = socket.socket(socket.AF_INET)
2608 host = "127.0.0.1"
2609 port = support.bind_port(server)
2610 server = context.wrap_socket(server, server_side=True)
2611
2612 evt = threading.Event()
2613 remote = [None]
2614 peer = [None]
2615 def serve():
2616 server.listen(5)
2617 # Block on the accept and wait on the connection to close.
2618 evt.set()
2619 remote[0], peer[0] = server.accept()
2620 remote[0].recv(1)
2621
2622 t = threading.Thread(target=serve)
2623 t.start()
2624 # Client wait until server setup and perform a connect.
2625 evt.wait()
2626 client = context.wrap_socket(socket.socket())
2627 client.connect((host, port))
2628 client_addr = client.getsockname()
2629 client.close()
2630 t.join()
2631 remote[0].close()
2632 server.close()
2633 # Sanity checks.
2634 self.assertIsInstance(remote[0], ssl.SSLSocket)
2635 self.assertEqual(peer[0], client_addr)
2636
2637 def test_getpeercert_enotconn(self):
2638 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2639 with closing(context.wrap_socket(socket.socket())) as sock:
2640 with self.assertRaises(socket.error) as cm:
2641 sock.getpeercert()
2642 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2643
2644 def test_do_handshake_enotconn(self):
2645 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2646 with closing(context.wrap_socket(socket.socket())) as sock:
2647 with self.assertRaises(socket.error) as cm:
2648 sock.do_handshake()
2649 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2650
Antoine Pitroud76088d2012-01-03 22:46:48 +01002651 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002652 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2653 try:
2654 # Force a set of weak ciphers on our client context
2655 context.set_ciphers("DES")
2656 except ssl.SSLError:
2657 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002658 with ThreadedEchoServer(CERTFILE,
2659 ssl_version=ssl.PROTOCOL_SSLv23,
2660 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002661 with closing(context.wrap_socket(socket.socket())) as s:
2662 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002663 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002664 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2665
Alex Gaynore98205d2014-09-04 13:33:22 -07002666 def test_version_basic(self):
2667 """
2668 Basic tests for SSLSocket.version().
2669 More tests are done in the test_protocol_*() methods.
2670 """
2671 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2672 with ThreadedEchoServer(CERTFILE,
2673 ssl_version=ssl.PROTOCOL_TLSv1,
2674 chatty=False) as server:
2675 with closing(context.wrap_socket(socket.socket())) as s:
2676 self.assertIs(s.version(), None)
2677 s.connect((HOST, server.port))
2678 self.assertEqual(s.version(), "TLSv1")
2679 self.assertIs(s.version(), None)
2680
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002681 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2682 def test_default_ecdh_curve(self):
2683 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2684 # should be enabled by default on SSL contexts.
2685 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2686 context.load_cert_chain(CERTFILE)
2687 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2688 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2689 # our default cipher list should prefer ECDH-based ciphers
2690 # automatically.
2691 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2692 context.set_ciphers("ECCdraft:ECDH")
2693 with ThreadedEchoServer(context=context) as server:
2694 with closing(context.wrap_socket(socket.socket())) as s:
2695 s.connect((HOST, server.port))
2696 self.assertIn("ECDH", s.cipher()[0])
2697
2698 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2699 "'tls-unique' channel binding not available")
2700 def test_tls_unique_channel_binding(self):
2701 """Test tls-unique channel binding."""
2702 if support.verbose:
2703 sys.stdout.write("\n")
2704
2705 server = ThreadedEchoServer(CERTFILE,
2706 certreqs=ssl.CERT_NONE,
2707 ssl_version=ssl.PROTOCOL_TLSv1,
2708 cacerts=CERTFILE,
2709 chatty=True,
2710 connectionchatty=False)
2711 with server:
2712 s = ssl.wrap_socket(socket.socket(),
2713 server_side=False,
2714 certfile=CERTFILE,
2715 ca_certs=CERTFILE,
2716 cert_reqs=ssl.CERT_NONE,
2717 ssl_version=ssl.PROTOCOL_TLSv1)
2718 s.connect((HOST, server.port))
2719 # get the data
2720 cb_data = s.get_channel_binding("tls-unique")
2721 if support.verbose:
2722 sys.stdout.write(" got channel binding data: {0!r}\n"
2723 .format(cb_data))
2724
2725 # check if it is sane
2726 self.assertIsNotNone(cb_data)
2727 self.assertEqual(len(cb_data), 12) # True for TLSv1
2728
2729 # and compare with the peers version
2730 s.write(b"CB tls-unique\n")
2731 peer_data_repr = s.read().strip()
2732 self.assertEqual(peer_data_repr,
2733 repr(cb_data).encode("us-ascii"))
2734 s.close()
2735
2736 # now, again
2737 s = ssl.wrap_socket(socket.socket(),
2738 server_side=False,
2739 certfile=CERTFILE,
2740 ca_certs=CERTFILE,
2741 cert_reqs=ssl.CERT_NONE,
2742 ssl_version=ssl.PROTOCOL_TLSv1)
2743 s.connect((HOST, server.port))
2744 new_cb_data = s.get_channel_binding("tls-unique")
2745 if support.verbose:
2746 sys.stdout.write(" got another channel binding data: {0!r}\n"
2747 .format(new_cb_data))
2748 # is it really unique
2749 self.assertNotEqual(cb_data, new_cb_data)
2750 self.assertIsNotNone(cb_data)
2751 self.assertEqual(len(cb_data), 12) # True for TLSv1
2752 s.write(b"CB tls-unique\n")
2753 peer_data_repr = s.read().strip()
2754 self.assertEqual(peer_data_repr,
2755 repr(new_cb_data).encode("us-ascii"))
2756 s.close()
2757
2758 def test_compression(self):
2759 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2760 context.load_cert_chain(CERTFILE)
2761 stats = server_params_test(context, context,
2762 chatty=True, connectionchatty=True)
2763 if support.verbose:
2764 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2765 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2766
2767 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2768 "ssl.OP_NO_COMPRESSION needed for this test")
2769 def test_compression_disabled(self):
2770 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2771 context.load_cert_chain(CERTFILE)
2772 context.options |= ssl.OP_NO_COMPRESSION
2773 stats = server_params_test(context, context,
2774 chatty=True, connectionchatty=True)
2775 self.assertIs(stats['compression'], None)
2776
2777 def test_dh_params(self):
2778 # Check we can get a connection with ephemeral Diffie-Hellman
2779 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2780 context.load_cert_chain(CERTFILE)
2781 context.load_dh_params(DHFILE)
2782 context.set_ciphers("kEDH")
2783 stats = server_params_test(context, context,
2784 chatty=True, connectionchatty=True)
2785 cipher = stats["cipher"][0]
2786 parts = cipher.split("-")
2787 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2788 self.fail("Non-DH cipher: " + cipher[0])
2789
2790 def test_selected_npn_protocol(self):
2791 # selected_npn_protocol() is None unless NPN is used
2792 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2793 context.load_cert_chain(CERTFILE)
2794 stats = server_params_test(context, context,
2795 chatty=True, connectionchatty=True)
2796 self.assertIs(stats['client_npn_protocol'], None)
2797
2798 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2799 def test_npn_protocols(self):
2800 server_protocols = ['http/1.1', 'spdy/2']
2801 protocol_tests = [
2802 (['http/1.1', 'spdy/2'], 'http/1.1'),
2803 (['spdy/2', 'http/1.1'], 'http/1.1'),
2804 (['spdy/2', 'test'], 'spdy/2'),
2805 (['abc', 'def'], 'abc')
2806 ]
2807 for client_protocols, expected in protocol_tests:
2808 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2809 server_context.load_cert_chain(CERTFILE)
2810 server_context.set_npn_protocols(server_protocols)
2811 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2812 client_context.load_cert_chain(CERTFILE)
2813 client_context.set_npn_protocols(client_protocols)
2814 stats = server_params_test(client_context, server_context,
2815 chatty=True, connectionchatty=True)
2816
2817 msg = "failed trying %s (s) and %s (c).\n" \
2818 "was expecting %s, but got %%s from the %%s" \
2819 % (str(server_protocols), str(client_protocols),
2820 str(expected))
2821 client_result = stats['client_npn_protocol']
2822 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2823 server_result = stats['server_npn_protocols'][-1] \
2824 if len(stats['server_npn_protocols']) else 'nothing'
2825 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2826
2827 def sni_contexts(self):
2828 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2829 server_context.load_cert_chain(SIGNED_CERTFILE)
2830 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2831 other_context.load_cert_chain(SIGNED_CERTFILE2)
2832 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2833 client_context.verify_mode = ssl.CERT_REQUIRED
2834 client_context.load_verify_locations(SIGNING_CA)
2835 return server_context, other_context, client_context
2836
2837 def check_common_name(self, stats, name):
2838 cert = stats['peercert']
2839 self.assertIn((('commonName', name),), cert['subject'])
2840
2841 @needs_sni
2842 def test_sni_callback(self):
2843 calls = []
2844 server_context, other_context, client_context = self.sni_contexts()
2845
2846 def servername_cb(ssl_sock, server_name, initial_context):
2847 calls.append((server_name, initial_context))
2848 if server_name is not None:
2849 ssl_sock.context = other_context
2850 server_context.set_servername_callback(servername_cb)
2851
2852 stats = server_params_test(client_context, server_context,
2853 chatty=True,
2854 sni_name='supermessage')
2855 # The hostname was fetched properly, and the certificate was
2856 # changed for the connection.
2857 self.assertEqual(calls, [("supermessage", server_context)])
2858 # CERTFILE4 was selected
2859 self.check_common_name(stats, 'fakehostname')
2860
2861 calls = []
2862 # The callback is called with server_name=None
2863 stats = server_params_test(client_context, server_context,
2864 chatty=True,
2865 sni_name=None)
2866 self.assertEqual(calls, [(None, server_context)])
2867 self.check_common_name(stats, 'localhost')
2868
2869 # Check disabling the callback
2870 calls = []
2871 server_context.set_servername_callback(None)
2872
2873 stats = server_params_test(client_context, server_context,
2874 chatty=True,
2875 sni_name='notfunny')
2876 # Certificate didn't change
2877 self.check_common_name(stats, 'localhost')
2878 self.assertEqual(calls, [])
2879
2880 @needs_sni
2881 def test_sni_callback_alert(self):
2882 # Returning a TLS alert is reflected to the connecting client
2883 server_context, other_context, client_context = self.sni_contexts()
2884
2885 def cb_returning_alert(ssl_sock, server_name, initial_context):
2886 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2887 server_context.set_servername_callback(cb_returning_alert)
2888
2889 with self.assertRaises(ssl.SSLError) as cm:
2890 stats = server_params_test(client_context, server_context,
2891 chatty=False,
2892 sni_name='supermessage')
2893 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2894
2895 @needs_sni
2896 def test_sni_callback_raising(self):
2897 # Raising fails the connection with a TLS handshake failure alert.
2898 server_context, other_context, client_context = self.sni_contexts()
2899
2900 def cb_raising(ssl_sock, server_name, initial_context):
2901 1/0
2902 server_context.set_servername_callback(cb_raising)
2903
2904 with self.assertRaises(ssl.SSLError) as cm, \
2905 support.captured_stderr() as stderr:
2906 stats = server_params_test(client_context, server_context,
2907 chatty=False,
2908 sni_name='supermessage')
2909 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2910 self.assertIn("ZeroDivisionError", stderr.getvalue())
2911
2912 @needs_sni
2913 def test_sni_callback_wrong_return_type(self):
2914 # Returning the wrong return type terminates the TLS connection
2915 # with an internal error alert.
2916 server_context, other_context, client_context = self.sni_contexts()
2917
2918 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2919 return "foo"
2920 server_context.set_servername_callback(cb_wrong_return_type)
2921
2922 with self.assertRaises(ssl.SSLError) as cm, \
2923 support.captured_stderr() as stderr:
2924 stats = server_params_test(client_context, server_context,
2925 chatty=False,
2926 sni_name='supermessage')
2927 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2928 self.assertIn("TypeError", stderr.getvalue())
2929
2930 def test_read_write_after_close_raises_valuerror(self):
2931 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2932 context.verify_mode = ssl.CERT_REQUIRED
2933 context.load_verify_locations(CERTFILE)
2934 context.load_cert_chain(CERTFILE)
2935 server = ThreadedEchoServer(context=context, chatty=False)
2936
2937 with server:
2938 s = context.wrap_socket(socket.socket())
2939 s.connect((HOST, server.port))
2940 s.close()
2941
2942 self.assertRaises(ValueError, s.read, 1024)
2943 self.assertRaises(ValueError, s.write, b'hello')
2944
Bill Janssen61c001a2008-09-08 16:37:24 +00002945
Neal Norwitz9eb9b102007-08-27 01:15:33 +00002946def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002947 if support.verbose:
2948 plats = {
2949 'Linux': platform.linux_distribution,
2950 'Mac': platform.mac_ver,
2951 'Windows': platform.win32_ver,
2952 }
2953 for name, func in plats.items():
2954 plat = func()
2955 if plat and plat[0]:
2956 plat = '%s %r' % (name, plat)
2957 break
2958 else:
2959 plat = repr(platform.platform())
2960 print("test_ssl: testing with %r %r" %
2961 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
2962 print(" under %s" % plat)
2963 print(" HAS_SNI = %r" % ssl.HAS_SNI)
2964 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2965 try:
2966 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2967 except AttributeError:
2968 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00002969
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002970 for filename in [
2971 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2972 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
2973 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
2974 BADCERT, BADKEY, EMPTYCERT]:
2975 if not os.path.exists(filename):
2976 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002977
Benjamin Peterson2f334562014-10-01 23:53:01 -04002978 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002979
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002980 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00002981 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00002982
Bill Janssen98d19da2007-09-10 21:51:02 +00002983 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002984 thread_info = support.threading_setup()
2985 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00002986 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002987
Antoine Pitrou3945c862010-04-28 21:11:01 +00002988 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002989 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002990 finally:
2991 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002992 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002993
2994if __name__ == "__main__":
2995 test_main()