blob: 86ba655eaa7756bfc938deee974cba2ef432d44f [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Nick Coghlandbcd4572016-03-20 22:39:15 +10007from test.script_helper import assert_python_ok
Bill Janssen934b16d2008-06-28 22:19:33 +00008import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00009import socket
Bill Janssen934b16d2008-06-28 22:19:33 +000010import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000011import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050012import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000013import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000014import os
Antoine Pitroub558f172010-04-23 23:25:45 +000015import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000016import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050017import tempfile
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -060018import urllib2
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000019import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000020import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000021import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050022import functools
23from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000024
Benjamin Petersondaeb9252014-08-20 14:14:50 -050025ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000026
Benjamin Petersondaeb9252014-08-20 14:14:50 -050027PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
28HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000029
Benjamin Petersondaeb9252014-08-20 14:14:50 -050030def data_file(*name):
31 return os.path.join(os.path.dirname(__file__), *name)
32
33# The custom key and certificate files used in test_ssl are generated
34# using Lib/test/make_ssl_certs.py.
35# Other certificates are simply fetched from the Internet servers they
36# are meant to authenticate.
37
38CERTFILE = data_file("keycert.pem")
39BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
40ONLYCERT = data_file("ssl_cert.pem")
41ONLYKEY = data_file("ssl_key.pem")
42BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
43BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
44CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
45ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
46KEY_PASSWORD = "somepass"
47CAPATH = data_file("capath")
48BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
49CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
50CAFILE_CACERT = data_file("capath", "5ed36f99.0")
51
52
53# empty CRL
54CRLFILE = data_file("revocation.crl")
55
56# Two keys and certs signed by the same CA (for SNI tests)
57SIGNED_CERTFILE = data_file("keycert3.pem")
58SIGNED_CERTFILE2 = data_file("keycert4.pem")
59SIGNING_CA = data_file("pycacert.pem")
60
Martin Panter71202bb2016-01-15 00:25:29 +000061REMOTE_HOST = "self-signed.pythontest.net"
62REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -050063
64EMPTYCERT = data_file("nullcert.pem")
65BADCERT = data_file("badcert.pem")
Martin Panterfd8e8502016-01-30 02:36:00 +000066NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -050067BADKEY = data_file("badkey.pem")
68NOKIACERT = data_file("nokia.pem")
69NULLBYTECERT = data_file("nullbytecert.pem")
70
Benjamin Petersondf11d4c2015-04-02 00:04:06 -040071DHFILE = data_file("dh1024.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -050072BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
73
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000074
Neal Norwitz3e533c22007-08-27 01:03:18 +000075def handle_error(prefix):
76 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050077 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000078 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000079
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000080
81class BasicTests(unittest.TestCase):
82
Antoine Pitrou3945c862010-04-28 21:11:01 +000083 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000084 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000085 try:
86 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
87 except IOError, e:
88 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
89 pass
90 else:
91 raise
92 try:
93 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
94 except IOError, e:
95 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
96 pass
97 else:
98 raise
Benjamin Peterson2f334562014-10-01 23:53:01 -040099
100
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500101def can_clear_options():
102 # 0.9.8m or higher
103 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
104
105def no_sslv2_implies_sslv3_hello():
106 # 0.9.7h or higher
107 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
108
109def have_verify_flags():
110 # 0.9.8 or higher
111 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
112
113def utc_offset(): #NOTE: ignore issues like #1647654
114 # local time = utc time + utc offset
115 if time.daylight and time.localtime().tm_isdst > 0:
116 return -time.altzone # seconds
117 return -time.timezone
118
119def asn1time(cert_time):
120 # Some versions of OpenSSL ignore seconds, see #18207
121 # 0.9.8.i
122 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
123 fmt = "%b %d %H:%M:%S %Y GMT"
124 dt = datetime.datetime.strptime(cert_time, fmt)
125 dt = dt.replace(second=0)
126 cert_time = dt.strftime(fmt)
127 # %d adds leading zero but ASN1_TIME_print() uses leading space
128 if cert_time[4] == "0":
129 cert_time = cert_time[:4] + " " + cert_time[5:]
130
131 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000132
Antoine Pitroud75efd92010-08-04 17:38:33 +0000133# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
134def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200135 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200136 @functools.wraps(func)
137 def f(*args, **kwargs):
138 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500139 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
140 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200141 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500142 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200143 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
144 return func(*args, **kwargs)
145 return f
146 else:
147 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000148
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500149needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
150
Antoine Pitroud75efd92010-08-04 17:38:33 +0000151
152class BasicSocketTests(unittest.TestCase):
153
Antoine Pitrou3945c862010-04-28 21:11:01 +0000154 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000155 ssl.CERT_NONE
156 ssl.CERT_OPTIONAL
157 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500158 ssl.OP_CIPHER_SERVER_PREFERENCE
159 ssl.OP_SINGLE_DH_USE
160 if ssl.HAS_ECDH:
161 ssl.OP_SINGLE_ECDH_USE
162 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
163 ssl.OP_NO_COMPRESSION
164 self.assertIn(ssl.HAS_SNI, {True, False})
165 self.assertIn(ssl.HAS_ECDH, {True, False})
166
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000167
Antoine Pitrou3945c862010-04-28 21:11:01 +0000168 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000169 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500170 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000171 sys.stdout.write("\n RAND_status is %d (%s)\n"
172 % (v, (v and "sufficient randomness") or
173 "insufficient randomness"))
Victor Stinner7c906672015-01-06 13:53:37 +0100174 if hasattr(ssl, 'RAND_egd'):
175 self.assertRaises(TypeError, ssl.RAND_egd, 1)
176 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000177 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000178
Antoine Pitrou3945c862010-04-28 21:11:01 +0000179 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000180 # note that this uses an 'unofficial' function in _ssl.c,
181 # provided solely for this test, to exercise the certificate
182 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500183 p = ssl._ssl._test_decode_cert(CERTFILE)
184 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000185 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500186 self.assertEqual(p['issuer'],
187 ((('countryName', 'XY'),),
188 (('localityName', 'Castle Anthrax'),),
189 (('organizationName', 'Python Software Foundation'),),
190 (('commonName', 'localhost'),))
191 )
192 # Note the next three asserts will fail if the keys are regenerated
193 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
194 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
195 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200196 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100197 ((('countryName', 'XY'),),
198 (('localityName', 'Castle Anthrax'),),
199 (('organizationName', 'Python Software Foundation'),),
200 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200201 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100202 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200203 # Issue #13034: the subjectAltName in some certificates
204 # (notably projects.developer.nokia.com:443) wasn't parsed
205 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500206 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200207 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
208 self.assertEqual(p['subjectAltName'],
209 (('DNS', 'projects.developer.nokia.com'),
210 ('DNS', 'projects.forum.nokia.com'))
211 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500212 # extra OCSP and AIA fields
213 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
214 self.assertEqual(p['caIssuers'],
215 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
216 self.assertEqual(p['crlDistributionPoints'],
217 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000218
Christian Heimes88b174c2013-08-17 00:54:47 +0200219 def test_parse_cert_CVE_2013_4238(self):
220 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500221 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200222 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
223 subject = ((('countryName', 'US'),),
224 (('stateOrProvinceName', 'Oregon'),),
225 (('localityName', 'Beaverton'),),
226 (('organizationName', 'Python Software Foundation'),),
227 (('organizationalUnitName', 'Python Core Development'),),
228 (('commonName', 'null.python.org\x00example.org'),),
229 (('emailAddress', 'python-dev@python.org'),))
230 self.assertEqual(p['subject'], subject)
231 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500232 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200233 san = (('DNS', 'altnull.python.org\x00example.com'),
234 ('email', 'null@python.org\x00user@example.org'),
235 ('URI', 'http://null.python.org\x00http://example.org'),
236 ('IP Address', '192.0.2.1'),
237 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
238 else:
239 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
240 san = (('DNS', 'altnull.python.org\x00example.com'),
241 ('email', 'null@python.org\x00user@example.org'),
242 ('URI', 'http://null.python.org\x00http://example.org'),
243 ('IP Address', '192.0.2.1'),
244 ('IP Address', '<invalid>'))
245
246 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200247
Antoine Pitrou3945c862010-04-28 21:11:01 +0000248 def test_DER_to_PEM(self):
Martin Panter71202bb2016-01-15 00:25:29 +0000249 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou3945c862010-04-28 21:11:01 +0000250 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000251 d1 = ssl.PEM_cert_to_DER_cert(pem)
252 p2 = ssl.DER_cert_to_PEM_cert(d1)
253 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000254 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000255 if not p2.startswith(ssl.PEM_HEADER + '\n'):
256 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
257 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
258 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000259
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000260 def test_openssl_version(self):
261 n = ssl.OPENSSL_VERSION_NUMBER
262 t = ssl.OPENSSL_VERSION_INFO
263 s = ssl.OPENSSL_VERSION
264 self.assertIsInstance(n, (int, long))
265 self.assertIsInstance(t, tuple)
266 self.assertIsInstance(s, str)
267 # Some sanity checks follow
268 # >= 0.9
269 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400270 # < 3.0
271 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000272 major, minor, fix, patch, status = t
273 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400274 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000275 self.assertGreaterEqual(minor, 0)
276 self.assertLess(minor, 256)
277 self.assertGreaterEqual(fix, 0)
278 self.assertLess(fix, 256)
279 self.assertGreaterEqual(patch, 0)
Ned Deilyfa119782015-02-05 17:19:11 +1100280 self.assertLessEqual(patch, 63)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000281 self.assertGreaterEqual(status, 0)
282 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400283 # Version string as returned by {Open,Libre}SSL, the format might change
284 if "LibreSSL" in s:
285 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
286 (s, t))
287 else:
288 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
289 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000290
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500291 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000292 def test_refcycle(self):
293 # Issue #7943: an SSL object doesn't create reference cycles with
294 # itself.
295 s = socket.socket(socket.AF_INET)
296 ss = ssl.wrap_socket(s)
297 wr = weakref.ref(ss)
298 del ss
299 self.assertEqual(wr(), None)
300
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000301 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500302 # Methods on an unconnected SSLSocket propagate the original
303 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000304 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500305 with closing(ssl.wrap_socket(s)) as ss:
306 self.assertRaises(socket.error, ss.recv, 1)
307 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
308 self.assertRaises(socket.error, ss.recvfrom, 1)
309 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
310 self.assertRaises(socket.error, ss.send, b'x')
311 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
312
313 def test_timeout(self):
314 # Issue #8524: when creating an SSL socket, the timeout of the
315 # original socket should be retained.
316 for timeout in (None, 0.0, 5.0):
317 s = socket.socket(socket.AF_INET)
318 s.settimeout(timeout)
319 with closing(ssl.wrap_socket(s)) as ss:
320 self.assertEqual(timeout, ss.gettimeout())
321
322 def test_errors(self):
323 sock = socket.socket()
324 self.assertRaisesRegexp(ValueError,
325 "certfile must be specified",
326 ssl.wrap_socket, sock, keyfile=CERTFILE)
327 self.assertRaisesRegexp(ValueError,
328 "certfile must be specified for server-side operations",
329 ssl.wrap_socket, sock, server_side=True)
330 self.assertRaisesRegexp(ValueError,
331 "certfile must be specified for server-side operations",
332 ssl.wrap_socket, sock, server_side=True, certfile="")
333 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
334 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
335 s.connect, (HOST, 8080))
336 with self.assertRaises(IOError) as cm:
337 with closing(socket.socket()) as sock:
Martin Panterfd8e8502016-01-30 02:36:00 +0000338 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500339 self.assertEqual(cm.exception.errno, errno.ENOENT)
340 with self.assertRaises(IOError) as cm:
341 with closing(socket.socket()) as sock:
Martin Panterfd8e8502016-01-30 02:36:00 +0000342 ssl.wrap_socket(sock,
343 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500344 self.assertEqual(cm.exception.errno, errno.ENOENT)
345 with self.assertRaises(IOError) as cm:
346 with closing(socket.socket()) as sock:
Martin Panterfd8e8502016-01-30 02:36:00 +0000347 ssl.wrap_socket(sock,
348 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500349 self.assertEqual(cm.exception.errno, errno.ENOENT)
350
Martin Panter886aba42016-02-01 21:58:11 +0000351 def bad_cert_test(self, certfile):
352 """Check that trying to use the given client certificate fails"""
353 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
354 certfile)
355 sock = socket.socket()
356 self.addCleanup(sock.close)
357 with self.assertRaises(ssl.SSLError):
358 ssl.wrap_socket(sock,
359 certfile=certfile,
360 ssl_version=ssl.PROTOCOL_TLSv1)
361
362 def test_empty_cert(self):
363 """Wrapping with an empty cert file"""
364 self.bad_cert_test("nullcert.pem")
365
366 def test_malformed_cert(self):
367 """Wrapping with a badly formatted certificate (syntax error)"""
368 self.bad_cert_test("badcert.pem")
369
370 def test_malformed_key(self):
371 """Wrapping with a badly formatted key (syntax error)"""
372 self.bad_cert_test("badkey.pem")
373
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500374 def test_match_hostname(self):
375 def ok(cert, hostname):
376 ssl.match_hostname(cert, hostname)
377 def fail(cert, hostname):
378 self.assertRaises(ssl.CertificateError,
379 ssl.match_hostname, cert, hostname)
380
381 cert = {'subject': ((('commonName', 'example.com'),),)}
382 ok(cert, 'example.com')
383 ok(cert, 'ExAmple.cOm')
384 fail(cert, 'www.example.com')
385 fail(cert, '.example.com')
386 fail(cert, 'example.org')
387 fail(cert, 'exampleXcom')
388
389 cert = {'subject': ((('commonName', '*.a.com'),),)}
390 ok(cert, 'foo.a.com')
391 fail(cert, 'bar.foo.a.com')
392 fail(cert, 'a.com')
393 fail(cert, 'Xa.com')
394 fail(cert, '.a.com')
395
396 # only match one left-most wildcard
397 cert = {'subject': ((('commonName', 'f*.com'),),)}
398 ok(cert, 'foo.com')
399 ok(cert, 'f.com')
400 fail(cert, 'bar.com')
401 fail(cert, 'foo.a.com')
402 fail(cert, 'bar.foo.com')
403
404 # NULL bytes are bad, CVE-2013-4073
405 cert = {'subject': ((('commonName',
406 'null.python.org\x00example.org'),),)}
407 ok(cert, 'null.python.org\x00example.org') # or raise an error?
408 fail(cert, 'example.org')
409 fail(cert, 'null.python.org')
410
411 # error cases with wildcards
412 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
413 fail(cert, 'bar.foo.a.com')
414 fail(cert, 'a.com')
415 fail(cert, 'Xa.com')
416 fail(cert, '.a.com')
417
418 cert = {'subject': ((('commonName', 'a.*.com'),),)}
419 fail(cert, 'a.foo.com')
420 fail(cert, 'a..com')
421 fail(cert, 'a.com')
422
423 # wildcard doesn't match IDNA prefix 'xn--'
424 idna = u'püthon.python.org'.encode("idna").decode("ascii")
425 cert = {'subject': ((('commonName', idna),),)}
426 ok(cert, idna)
427 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
428 fail(cert, idna)
429 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
430 fail(cert, idna)
431
432 # wildcard in first fragment and IDNA A-labels in sequent fragments
433 # are supported.
434 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
435 cert = {'subject': ((('commonName', idna),),)}
436 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
437 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
438 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
439 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
440
441 # Slightly fake real-world example
442 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
443 'subject': ((('commonName', 'linuxfrz.org'),),),
444 'subjectAltName': (('DNS', 'linuxfr.org'),
445 ('DNS', 'linuxfr.com'),
446 ('othername', '<unsupported>'))}
447 ok(cert, 'linuxfr.org')
448 ok(cert, 'linuxfr.com')
449 # Not a "DNS" entry
450 fail(cert, '<unsupported>')
451 # When there is a subjectAltName, commonName isn't used
452 fail(cert, 'linuxfrz.org')
453
454 # A pristine real-world example
455 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
456 'subject': ((('countryName', 'US'),),
457 (('stateOrProvinceName', 'California'),),
458 (('localityName', 'Mountain View'),),
459 (('organizationName', 'Google Inc'),),
460 (('commonName', 'mail.google.com'),))}
461 ok(cert, 'mail.google.com')
462 fail(cert, 'gmail.com')
463 # Only commonName is considered
464 fail(cert, 'California')
465
466 # Neither commonName nor subjectAltName
467 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
468 'subject': ((('countryName', 'US'),),
469 (('stateOrProvinceName', 'California'),),
470 (('localityName', 'Mountain View'),),
471 (('organizationName', 'Google Inc'),))}
472 fail(cert, 'mail.google.com')
473
474 # No DNS entry in subjectAltName but a commonName
475 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
476 'subject': ((('countryName', 'US'),),
477 (('stateOrProvinceName', 'California'),),
478 (('localityName', 'Mountain View'),),
479 (('commonName', 'mail.google.com'),)),
480 'subjectAltName': (('othername', 'blabla'), )}
481 ok(cert, 'mail.google.com')
482
483 # No DNS entry subjectAltName and no commonName
484 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
485 'subject': ((('countryName', 'US'),),
486 (('stateOrProvinceName', 'California'),),
487 (('localityName', 'Mountain View'),),
488 (('organizationName', 'Google Inc'),)),
489 'subjectAltName': (('othername', 'blabla'),)}
490 fail(cert, 'google.com')
491
492 # Empty cert / no cert
493 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
494 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
495
496 # Issue #17980: avoid denials of service by refusing more than one
497 # wildcard per fragment.
498 cert = {'subject': ((('commonName', 'a*b.com'),),)}
499 ok(cert, 'axxb.com')
500 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
501 fail(cert, 'axxb.com')
502 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
503 with self.assertRaises(ssl.CertificateError) as cm:
504 ssl.match_hostname(cert, 'axxbxxc.com')
505 self.assertIn("too many wildcards", str(cm.exception))
506
507 def test_server_side(self):
508 # server_hostname doesn't work for server sockets
509 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
510 with closing(socket.socket()) as sock:
511 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
512 server_hostname="some.hostname")
513
514 def test_unknown_channel_binding(self):
515 # should raise ValueError for unknown type
516 s = socket.socket(socket.AF_INET)
517 with closing(ssl.wrap_socket(s)) as ss:
518 with self.assertRaises(ValueError):
519 ss.get_channel_binding("unknown-type")
520
521 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
522 "'tls-unique' channel binding not available")
523 def test_tls_unique_channel_binding(self):
524 # unconnected should return None for known type
525 s = socket.socket(socket.AF_INET)
526 with closing(ssl.wrap_socket(s)) as ss:
527 self.assertIsNone(ss.get_channel_binding("tls-unique"))
528 # the same for server-side
529 s = socket.socket(socket.AF_INET)
530 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
531 self.assertIsNone(ss.get_channel_binding("tls-unique"))
532
533 def test_get_default_verify_paths(self):
534 paths = ssl.get_default_verify_paths()
535 self.assertEqual(len(paths), 6)
536 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
537
538 with support.EnvironmentVarGuard() as env:
539 env["SSL_CERT_DIR"] = CAPATH
540 env["SSL_CERT_FILE"] = CERTFILE
541 paths = ssl.get_default_verify_paths()
542 self.assertEqual(paths.cafile, CERTFILE)
543 self.assertEqual(paths.capath, CAPATH)
544
545 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
546 def test_enum_certificates(self):
547 self.assertTrue(ssl.enum_certificates("CA"))
548 self.assertTrue(ssl.enum_certificates("ROOT"))
549
550 self.assertRaises(TypeError, ssl.enum_certificates)
551 self.assertRaises(WindowsError, ssl.enum_certificates, "")
552
553 trust_oids = set()
554 for storename in ("CA", "ROOT"):
555 store = ssl.enum_certificates(storename)
556 self.assertIsInstance(store, list)
557 for element in store:
558 self.assertIsInstance(element, tuple)
559 self.assertEqual(len(element), 3)
560 cert, enc, trust = element
561 self.assertIsInstance(cert, bytes)
562 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
563 self.assertIsInstance(trust, (set, bool))
564 if isinstance(trust, set):
565 trust_oids.update(trust)
566
567 serverAuth = "1.3.6.1.5.5.7.3.1"
568 self.assertIn(serverAuth, trust_oids)
569
570 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
571 def test_enum_crls(self):
572 self.assertTrue(ssl.enum_crls("CA"))
573 self.assertRaises(TypeError, ssl.enum_crls)
574 self.assertRaises(WindowsError, ssl.enum_crls, "")
575
576 crls = ssl.enum_crls("CA")
577 self.assertIsInstance(crls, list)
578 for element in crls:
579 self.assertIsInstance(element, tuple)
580 self.assertEqual(len(element), 2)
581 self.assertIsInstance(element[0], bytes)
582 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
583
584
585 def test_asn1object(self):
586 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
587 '1.3.6.1.5.5.7.3.1')
588
589 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
590 self.assertEqual(val, expected)
591 self.assertEqual(val.nid, 129)
592 self.assertEqual(val.shortname, 'serverAuth')
593 self.assertEqual(val.longname, 'TLS Web Server Authentication')
594 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
595 self.assertIsInstance(val, ssl._ASN1Object)
596 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
597
598 val = ssl._ASN1Object.fromnid(129)
599 self.assertEqual(val, expected)
600 self.assertIsInstance(val, ssl._ASN1Object)
601 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
602 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
603 ssl._ASN1Object.fromnid(100000)
604 for i in range(1000):
605 try:
606 obj = ssl._ASN1Object.fromnid(i)
607 except ValueError:
608 pass
609 else:
610 self.assertIsInstance(obj.nid, int)
611 self.assertIsInstance(obj.shortname, str)
612 self.assertIsInstance(obj.longname, str)
613 self.assertIsInstance(obj.oid, (str, type(None)))
614
615 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
616 self.assertEqual(val, expected)
617 self.assertIsInstance(val, ssl._ASN1Object)
618 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
619 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
620 expected)
621 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
622 ssl._ASN1Object.fromname('serverauth')
623
624 def test_purpose_enum(self):
625 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
626 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
627 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
628 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
629 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
630 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
631 '1.3.6.1.5.5.7.3.1')
632
633 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
634 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
635 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
636 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
637 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
638 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
639 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000640
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100641 def test_unsupported_dtls(self):
642 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
643 self.addCleanup(s.close)
644 with self.assertRaises(NotImplementedError) as cx:
645 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
646 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500647 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
648 with self.assertRaises(NotImplementedError) as cx:
649 ctx.wrap_socket(s)
650 self.assertEqual(str(cx.exception), "only stream sockets are supported")
651
652 def cert_time_ok(self, timestring, timestamp):
653 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
654
655 def cert_time_fail(self, timestring):
656 with self.assertRaises(ValueError):
657 ssl.cert_time_to_seconds(timestring)
658
659 @unittest.skipUnless(utc_offset(),
660 'local time needs to be different from UTC')
661 def test_cert_time_to_seconds_timezone(self):
662 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
663 # results if local timezone is not UTC
664 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
665 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
666
667 def test_cert_time_to_seconds(self):
668 timestring = "Jan 5 09:34:43 2018 GMT"
669 ts = 1515144883.0
670 self.cert_time_ok(timestring, ts)
671 # accept keyword parameter, assert its name
672 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
673 # accept both %e and %d (space or zero generated by strftime)
674 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
675 # case-insensitive
676 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
677 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
678 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
679 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
680 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
681 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
682 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
683 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
684
685 newyear_ts = 1230768000.0
686 # leap seconds
687 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
688 # same timestamp
689 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
690
691 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
692 # allow 60th second (even if it is not a leap second)
693 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
694 # allow 2nd leap second for compatibility with time.strptime()
695 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
696 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
697
698 # no special treatement for the special value:
699 # 99991231235959Z (rfc 5280)
700 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
701
702 @support.run_with_locale('LC_ALL', '')
703 def test_cert_time_to_seconds_locale(self):
704 # `cert_time_to_seconds()` should be locale independent
705
706 def local_february_name():
707 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
708
709 if local_february_name().lower() == 'feb':
710 self.skipTest("locale-specific month name needs to be "
711 "different from C locale")
712
713 # locale-independent
714 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
715 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
716
717
718class ContextTests(unittest.TestCase):
719
720 @skip_if_broken_ubuntu_ssl
721 def test_constructor(self):
722 for protocol in PROTOCOLS:
723 ssl.SSLContext(protocol)
724 self.assertRaises(TypeError, ssl.SSLContext)
725 self.assertRaises(ValueError, ssl.SSLContext, -1)
726 self.assertRaises(ValueError, ssl.SSLContext, 42)
727
728 @skip_if_broken_ubuntu_ssl
729 def test_protocol(self):
730 for proto in PROTOCOLS:
731 ctx = ssl.SSLContext(proto)
732 self.assertEqual(ctx.protocol, proto)
733
734 def test_ciphers(self):
735 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
736 ctx.set_ciphers("ALL")
737 ctx.set_ciphers("DEFAULT")
738 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
739 ctx.set_ciphers("^$:,;?*'dorothyx")
740
741 @skip_if_broken_ubuntu_ssl
742 def test_options(self):
743 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson10aaca92015-11-11 22:38:41 -0800744 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500745 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
746 ctx.options)
Benjamin Peterson10aaca92015-11-11 22:38:41 -0800747 ctx.options |= ssl.OP_NO_TLSv1
748 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1,
749 ctx.options)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500750 if can_clear_options():
751 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
752 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
753 ctx.options)
754 ctx.options = 0
755 self.assertEqual(0, ctx.options)
756 else:
757 with self.assertRaises(ValueError):
758 ctx.options = 0
759
760 def test_verify_mode(self):
761 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
762 # Default value
763 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
764 ctx.verify_mode = ssl.CERT_OPTIONAL
765 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
766 ctx.verify_mode = ssl.CERT_REQUIRED
767 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
768 ctx.verify_mode = ssl.CERT_NONE
769 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
770 with self.assertRaises(TypeError):
771 ctx.verify_mode = None
772 with self.assertRaises(ValueError):
773 ctx.verify_mode = 42
774
775 @unittest.skipUnless(have_verify_flags(),
776 "verify_flags need OpenSSL > 0.9.8")
777 def test_verify_flags(self):
778 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
Benjamin Peterson72ef9612015-03-04 22:49:41 -0500779 # default value
780 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
781 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500782 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
783 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
784 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
785 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
786 ctx.verify_flags = ssl.VERIFY_DEFAULT
787 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
788 # supports any value
789 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
790 self.assertEqual(ctx.verify_flags,
791 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
792 with self.assertRaises(TypeError):
793 ctx.verify_flags = None
794
795 def test_load_cert_chain(self):
796 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
797 # Combined key and cert in a single file
Benjamin Peterson04439fd2014-11-03 21:05:01 -0500798 ctx.load_cert_chain(CERTFILE, keyfile=None)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500799 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
800 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
801 with self.assertRaises(IOError) as cm:
Martin Panterfd8e8502016-01-30 02:36:00 +0000802 ctx.load_cert_chain(NONEXISTINGCERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500803 self.assertEqual(cm.exception.errno, errno.ENOENT)
804 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
805 ctx.load_cert_chain(BADCERT)
806 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
807 ctx.load_cert_chain(EMPTYCERT)
808 # Separate key and cert
809 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
810 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
811 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
812 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
813 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
814 ctx.load_cert_chain(ONLYCERT)
815 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
816 ctx.load_cert_chain(ONLYKEY)
817 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
818 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
819 # Mismatching key and cert
820 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
821 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
Martin Panter71202bb2016-01-15 00:25:29 +0000822 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500823 # Password protected key and cert
824 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
825 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
826 ctx.load_cert_chain(CERTFILE_PROTECTED,
827 password=bytearray(KEY_PASSWORD.encode()))
828 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
829 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
830 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
831 bytearray(KEY_PASSWORD.encode()))
832 with self.assertRaisesRegexp(TypeError, "should be a string"):
833 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
834 with self.assertRaises(ssl.SSLError):
835 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
836 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
837 # openssl has a fixed limit on the password buffer.
838 # PEM_BUFSIZE is generally set to 1kb.
839 # Return a string larger than this.
840 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
841 # Password callback
842 def getpass_unicode():
843 return KEY_PASSWORD
844 def getpass_bytes():
845 return KEY_PASSWORD.encode()
846 def getpass_bytearray():
847 return bytearray(KEY_PASSWORD.encode())
848 def getpass_badpass():
849 return "badpass"
850 def getpass_huge():
851 return b'a' * (1024 * 1024)
852 def getpass_bad_type():
853 return 9
854 def getpass_exception():
855 raise Exception('getpass error')
856 class GetPassCallable:
857 def __call__(self):
858 return KEY_PASSWORD
859 def getpass(self):
860 return KEY_PASSWORD
861 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
862 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
863 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
864 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
865 ctx.load_cert_chain(CERTFILE_PROTECTED,
866 password=GetPassCallable().getpass)
867 with self.assertRaises(ssl.SSLError):
868 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
869 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
870 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
871 with self.assertRaisesRegexp(TypeError, "must return a string"):
872 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
873 with self.assertRaisesRegexp(Exception, "getpass error"):
874 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
875 # Make sure the password function isn't called if it isn't needed
876 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
877
878 def test_load_verify_locations(self):
879 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
880 ctx.load_verify_locations(CERTFILE)
881 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
882 ctx.load_verify_locations(BYTES_CERTFILE)
883 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400884 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500885 self.assertRaises(TypeError, ctx.load_verify_locations)
886 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
887 with self.assertRaises(IOError) as cm:
Martin Panterfd8e8502016-01-30 02:36:00 +0000888 ctx.load_verify_locations(NONEXISTINGCERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500889 self.assertEqual(cm.exception.errno, errno.ENOENT)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400890 with self.assertRaises(IOError):
891 ctx.load_verify_locations(u'')
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500892 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
893 ctx.load_verify_locations(BADCERT)
894 ctx.load_verify_locations(CERTFILE, CAPATH)
895 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
896
897 # Issue #10989: crash if the second argument type is invalid
898 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
899
900 def test_load_verify_cadata(self):
901 # test cadata
902 with open(CAFILE_CACERT) as f:
903 cacert_pem = f.read().decode("ascii")
904 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
905 with open(CAFILE_NEURONIO) as f:
906 neuronio_pem = f.read().decode("ascii")
907 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
908
909 # test PEM
910 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
911 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
912 ctx.load_verify_locations(cadata=cacert_pem)
913 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
914 ctx.load_verify_locations(cadata=neuronio_pem)
915 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
916 # cert already in hash table
917 ctx.load_verify_locations(cadata=neuronio_pem)
918 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
919
920 # combined
921 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
922 combined = "\n".join((cacert_pem, neuronio_pem))
923 ctx.load_verify_locations(cadata=combined)
924 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
925
926 # with junk around the certs
927 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
928 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
929 neuronio_pem, "tail"]
930 ctx.load_verify_locations(cadata="\n".join(combined))
931 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
932
933 # test DER
934 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
935 ctx.load_verify_locations(cadata=cacert_der)
936 ctx.load_verify_locations(cadata=neuronio_der)
937 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
938 # cert already in hash table
939 ctx.load_verify_locations(cadata=cacert_der)
940 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
941
942 # combined
943 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
944 combined = b"".join((cacert_der, neuronio_der))
945 ctx.load_verify_locations(cadata=combined)
946 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
947
948 # error cases
949 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
950 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
951
952 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
953 ctx.load_verify_locations(cadata=u"broken")
954 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
955 ctx.load_verify_locations(cadata=b"broken")
956
957
958 def test_load_dh_params(self):
959 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
960 ctx.load_dh_params(DHFILE)
961 if os.name != 'nt':
962 ctx.load_dh_params(BYTES_DHFILE)
963 self.assertRaises(TypeError, ctx.load_dh_params)
964 self.assertRaises(TypeError, ctx.load_dh_params, None)
965 with self.assertRaises(IOError) as cm:
Martin Panterfd8e8502016-01-30 02:36:00 +0000966 ctx.load_dh_params(NONEXISTINGCERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500967 self.assertEqual(cm.exception.errno, errno.ENOENT)
968 with self.assertRaises(ssl.SSLError) as cm:
969 ctx.load_dh_params(CERTFILE)
970
971 @skip_if_broken_ubuntu_ssl
972 def test_session_stats(self):
973 for proto in PROTOCOLS:
974 ctx = ssl.SSLContext(proto)
975 self.assertEqual(ctx.session_stats(), {
976 'number': 0,
977 'connect': 0,
978 'connect_good': 0,
979 'connect_renegotiate': 0,
980 'accept': 0,
981 'accept_good': 0,
982 'accept_renegotiate': 0,
983 'hits': 0,
984 'misses': 0,
985 'timeouts': 0,
986 'cache_full': 0,
987 })
988
989 def test_set_default_verify_paths(self):
990 # There's not much we can do to test that it acts as expected,
991 # so just check it doesn't crash or raise an exception.
992 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
993 ctx.set_default_verify_paths()
994
995 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
996 def test_set_ecdh_curve(self):
997 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
998 ctx.set_ecdh_curve("prime256v1")
999 ctx.set_ecdh_curve(b"prime256v1")
1000 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1001 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1002 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1003 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1004
1005 @needs_sni
1006 def test_sni_callback(self):
1007 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1008
1009 # set_servername_callback expects a callable, or None
1010 self.assertRaises(TypeError, ctx.set_servername_callback)
1011 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1012 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1013 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1014
1015 def dummycallback(sock, servername, ctx):
1016 pass
1017 ctx.set_servername_callback(None)
1018 ctx.set_servername_callback(dummycallback)
1019
1020 @needs_sni
1021 def test_sni_callback_refcycle(self):
1022 # Reference cycles through the servername callback are detected
1023 # and cleared.
1024 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1025 def dummycallback(sock, servername, ctx, cycle=ctx):
1026 pass
1027 ctx.set_servername_callback(dummycallback)
1028 wr = weakref.ref(ctx)
1029 del ctx, dummycallback
1030 gc.collect()
1031 self.assertIs(wr(), None)
1032
1033 def test_cert_store_stats(self):
1034 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1035 self.assertEqual(ctx.cert_store_stats(),
1036 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1037 ctx.load_cert_chain(CERTFILE)
1038 self.assertEqual(ctx.cert_store_stats(),
1039 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1040 ctx.load_verify_locations(CERTFILE)
1041 self.assertEqual(ctx.cert_store_stats(),
1042 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panter71202bb2016-01-15 00:25:29 +00001043 ctx.load_verify_locations(CAFILE_CACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001044 self.assertEqual(ctx.cert_store_stats(),
1045 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1046
1047 def test_get_ca_certs(self):
1048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1049 self.assertEqual(ctx.get_ca_certs(), [])
1050 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1051 ctx.load_verify_locations(CERTFILE)
1052 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panter71202bb2016-01-15 00:25:29 +00001053 # but CAFILE_CACERT is a CA cert
1054 ctx.load_verify_locations(CAFILE_CACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001055 self.assertEqual(ctx.get_ca_certs(),
1056 [{'issuer': ((('organizationName', 'Root CA'),),
1057 (('organizationalUnitName', 'http://www.cacert.org'),),
1058 (('commonName', 'CA Cert Signing Authority'),),
1059 (('emailAddress', 'support@cacert.org'),)),
1060 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1061 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1062 'serialNumber': '00',
1063 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1064 'subject': ((('organizationName', 'Root CA'),),
1065 (('organizationalUnitName', 'http://www.cacert.org'),),
1066 (('commonName', 'CA Cert Signing Authority'),),
1067 (('emailAddress', 'support@cacert.org'),)),
1068 'version': 3}])
1069
Martin Panter71202bb2016-01-15 00:25:29 +00001070 with open(CAFILE_CACERT) as f:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001071 pem = f.read()
1072 der = ssl.PEM_cert_to_DER_cert(pem)
1073 self.assertEqual(ctx.get_ca_certs(True), [der])
1074
1075 def test_load_default_certs(self):
1076 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1077 ctx.load_default_certs()
1078
1079 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1080 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1081 ctx.load_default_certs()
1082
1083 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1084 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1085
1086 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1087 self.assertRaises(TypeError, ctx.load_default_certs, None)
1088 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1089
Benjamin Petersona02ae252014-10-03 18:17:15 -04001090 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Benjamin Peterson0b30a2b2014-10-03 17:27:05 -04001091 def test_load_default_certs_env(self):
1092 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1093 with support.EnvironmentVarGuard() as env:
1094 env["SSL_CERT_DIR"] = CAPATH
1095 env["SSL_CERT_FILE"] = CERTFILE
1096 ctx.load_default_certs()
1097 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1098
Benjamin Petersona02ae252014-10-03 18:17:15 -04001099 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1100 def test_load_default_certs_env_windows(self):
1101 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1102 ctx.load_default_certs()
1103 stats = ctx.cert_store_stats()
1104
1105 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1106 with support.EnvironmentVarGuard() as env:
1107 env["SSL_CERT_DIR"] = CAPATH
1108 env["SSL_CERT_FILE"] = CERTFILE
1109 ctx.load_default_certs()
1110 stats["x509"] += 1
1111 self.assertEqual(ctx.cert_store_stats(), stats)
1112
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001113 def test_create_default_context(self):
1114 ctx = ssl.create_default_context()
1115 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1116 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1117 self.assertTrue(ctx.check_hostname)
1118 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1119 self.assertEqual(
1120 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1121 getattr(ssl, "OP_NO_COMPRESSION", 0),
1122 )
1123
1124 with open(SIGNING_CA) as f:
1125 cadata = f.read().decode("ascii")
1126 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1127 cadata=cadata)
1128 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1129 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1130 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1131 self.assertEqual(
1132 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1133 getattr(ssl, "OP_NO_COMPRESSION", 0),
1134 )
1135
1136 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1137 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1138 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1139 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1140 self.assertEqual(
1141 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1142 getattr(ssl, "OP_NO_COMPRESSION", 0),
1143 )
1144 self.assertEqual(
1145 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1146 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1147 )
1148 self.assertEqual(
1149 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1150 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1151 )
1152
1153 def test__create_stdlib_context(self):
1154 ctx = ssl._create_stdlib_context()
1155 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1156 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1157 self.assertFalse(ctx.check_hostname)
1158 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1159
1160 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1161 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1162 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1163 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1164
1165 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1166 cert_reqs=ssl.CERT_REQUIRED,
1167 check_hostname=True)
1168 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1169 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1170 self.assertTrue(ctx.check_hostname)
1171 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1172
1173 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1174 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1175 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1176 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1177
Nick Coghlandbcd4572016-03-20 22:39:15 +10001178 def test__https_verify_certificates(self):
1179 # Unit test to check the contect factory mapping
1180 # The factories themselves are tested above
1181 # This test will fail by design if run under PYTHONHTTPSVERIFY=0
1182 # (as will various test_httplib tests)
1183
1184 # Uses a fresh SSL module to avoid affecting the real one
1185 local_ssl = support.import_fresh_module("ssl")
1186 # Certificate verification is enabled by default
1187 self.assertIs(local_ssl._create_default_https_context,
1188 local_ssl.create_default_context)
1189 # Turn default verification off
1190 local_ssl._https_verify_certificates(enable=False)
1191 self.assertIs(local_ssl._create_default_https_context,
1192 local_ssl._create_unverified_context)
1193 # And back on
1194 local_ssl._https_verify_certificates(enable=True)
1195 self.assertIs(local_ssl._create_default_https_context,
1196 local_ssl.create_default_context)
1197 # The default behaviour is to enable
1198 local_ssl._https_verify_certificates(enable=False)
1199 local_ssl._https_verify_certificates()
1200 self.assertIs(local_ssl._create_default_https_context,
1201 local_ssl.create_default_context)
1202
1203 def test__https_verify_envvar(self):
1204 # Unit test to check the PYTHONHTTPSVERIFY handling
1205 # Need to use a subprocess so it can still be run under -E
1206 https_is_verified = """import ssl, sys; \
1207 status = "Error: _create_default_https_context does not verify certs" \
1208 if ssl._create_default_https_context is \
1209 ssl._create_unverified_context \
1210 else None; \
1211 sys.exit(status)"""
1212 https_is_not_verified = """import ssl, sys; \
1213 status = "Error: _create_default_https_context verifies certs" \
1214 if ssl._create_default_https_context is \
1215 ssl.create_default_context \
1216 else None; \
1217 sys.exit(status)"""
1218 extra_env = {}
1219 # Omitting it leaves verification on
1220 assert_python_ok("-c", https_is_verified, **extra_env)
1221 # Setting it to zero turns verification off
1222 extra_env[ssl._https_verify_envvar] = "0"
1223 assert_python_ok("-c", https_is_not_verified, **extra_env)
1224 # Any other value should also leave it on
1225 for setting in ("", "1", "enabled", "foo"):
1226 extra_env[ssl._https_verify_envvar] = setting
1227 assert_python_ok("-c", https_is_verified, **extra_env)
1228
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001229 def test_check_hostname(self):
1230 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1231 self.assertFalse(ctx.check_hostname)
1232
1233 # Requires CERT_REQUIRED or CERT_OPTIONAL
1234 with self.assertRaises(ValueError):
1235 ctx.check_hostname = True
1236 ctx.verify_mode = ssl.CERT_REQUIRED
1237 self.assertFalse(ctx.check_hostname)
1238 ctx.check_hostname = True
1239 self.assertTrue(ctx.check_hostname)
1240
1241 ctx.verify_mode = ssl.CERT_OPTIONAL
1242 ctx.check_hostname = True
1243 self.assertTrue(ctx.check_hostname)
1244
1245 # Cannot set CERT_NONE with check_hostname enabled
1246 with self.assertRaises(ValueError):
1247 ctx.verify_mode = ssl.CERT_NONE
1248 ctx.check_hostname = False
1249 self.assertFalse(ctx.check_hostname)
1250
1251
1252class SSLErrorTests(unittest.TestCase):
1253
1254 def test_str(self):
1255 # The str() of a SSLError doesn't include the errno
1256 e = ssl.SSLError(1, "foo")
1257 self.assertEqual(str(e), "foo")
1258 self.assertEqual(e.errno, 1)
1259 # Same for a subclass
1260 e = ssl.SSLZeroReturnError(1, "foo")
1261 self.assertEqual(str(e), "foo")
1262 self.assertEqual(e.errno, 1)
1263
1264 def test_lib_reason(self):
1265 # Test the library and reason attributes
1266 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1267 with self.assertRaises(ssl.SSLError) as cm:
1268 ctx.load_dh_params(CERTFILE)
1269 self.assertEqual(cm.exception.library, 'PEM')
1270 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1271 s = str(cm.exception)
1272 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1273
1274 def test_subclass(self):
1275 # Check that the appropriate SSLError subclass is raised
1276 # (this only tests one of them)
1277 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1278 with closing(socket.socket()) as s:
1279 s.bind(("127.0.0.1", 0))
1280 s.listen(5)
1281 c = socket.socket()
1282 c.connect(s.getsockname())
1283 c.setblocking(False)
1284 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1285 with self.assertRaises(ssl.SSLWantReadError) as cm:
1286 c.do_handshake()
1287 s = str(cm.exception)
1288 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1289 # For compatibility
1290 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001291
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001292
Bill Janssen934b16d2008-06-28 22:19:33 +00001293class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001294
Antoine Pitrou3945c862010-04-28 21:11:01 +00001295 def test_connect(self):
Martin Panter71202bb2016-01-15 00:25:29 +00001296 with support.transient_internet(REMOTE_HOST):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001297 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1298 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001299 try:
Martin Panter71202bb2016-01-15 00:25:29 +00001300 s.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001301 self.assertEqual({}, s.getpeercert())
1302 finally:
1303 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001304
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001305 # this should fail because we have no verification certs
1306 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1307 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001308 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
Martin Panter71202bb2016-01-15 00:25:29 +00001309 s.connect, (REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001310 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001311
1312 # this should succeed because we specify the root cert
1313 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1314 cert_reqs=ssl.CERT_REQUIRED,
Martin Panter71202bb2016-01-15 00:25:29 +00001315 ca_certs=REMOTE_ROOT_CERT)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001316 try:
Martin Panter71202bb2016-01-15 00:25:29 +00001317 s.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001318 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001319 finally:
1320 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001321
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001322 def test_connect_ex(self):
1323 # Issue #11326: check connect_ex() implementation
Martin Panter71202bb2016-01-15 00:25:29 +00001324 with support.transient_internet(REMOTE_HOST):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001325 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1326 cert_reqs=ssl.CERT_REQUIRED,
Martin Panter71202bb2016-01-15 00:25:29 +00001327 ca_certs=REMOTE_ROOT_CERT)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001328 try:
Martin Panter71202bb2016-01-15 00:25:29 +00001329 self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443)))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001330 self.assertTrue(s.getpeercert())
1331 finally:
1332 s.close()
1333
1334 def test_non_blocking_connect_ex(self):
1335 # Issue #11326: non-blocking connect_ex() should allow handshake
1336 # to proceed after the socket gets ready.
Martin Panter71202bb2016-01-15 00:25:29 +00001337 with support.transient_internet(REMOTE_HOST):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001338 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1339 cert_reqs=ssl.CERT_REQUIRED,
Martin Panter71202bb2016-01-15 00:25:29 +00001340 ca_certs=REMOTE_ROOT_CERT,
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001341 do_handshake_on_connect=False)
1342 try:
1343 s.setblocking(False)
Martin Panter71202bb2016-01-15 00:25:29 +00001344 rc = s.connect_ex((REMOTE_HOST, 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001345 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1346 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001347 # Wait for connect to finish
1348 select.select([], [s], [], 5.0)
1349 # Non-blocking handshake
1350 while True:
1351 try:
1352 s.do_handshake()
1353 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001354 except ssl.SSLWantReadError:
1355 select.select([s], [], [], 5.0)
1356 except ssl.SSLWantWriteError:
1357 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001358 # SSL established
1359 self.assertTrue(s.getpeercert())
1360 finally:
1361 s.close()
1362
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001363 def test_timeout_connect_ex(self):
1364 # Issue #12065: on a timeout, connect_ex() should return the original
1365 # errno (mimicking the behaviour of non-SSL sockets).
Martin Panter71202bb2016-01-15 00:25:29 +00001366 with support.transient_internet(REMOTE_HOST):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001367 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1368 cert_reqs=ssl.CERT_REQUIRED,
Martin Panter71202bb2016-01-15 00:25:29 +00001369 ca_certs=REMOTE_ROOT_CERT,
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001370 do_handshake_on_connect=False)
1371 try:
1372 s.settimeout(0.0000001)
Martin Panter71202bb2016-01-15 00:25:29 +00001373 rc = s.connect_ex((REMOTE_HOST, 443))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001374 if rc == 0:
Martin Panter71202bb2016-01-15 00:25:29 +00001375 self.skipTest("REMOTE_HOST responded too quickly")
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001376 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1377 finally:
1378 s.close()
1379
1380 def test_connect_ex_error(self):
Martin Panter71202bb2016-01-15 00:25:29 +00001381 with support.transient_internet(REMOTE_HOST):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001382 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1383 cert_reqs=ssl.CERT_REQUIRED,
Martin Panter71202bb2016-01-15 00:25:29 +00001384 ca_certs=REMOTE_ROOT_CERT)
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001385 try:
Martin Panter71202bb2016-01-15 00:25:29 +00001386 rc = s.connect_ex((REMOTE_HOST, 444))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001387 # Issue #19919: Windows machines or VMs hosted on Windows
1388 # machines sometimes return EWOULDBLOCK.
Martin Panter71202bb2016-01-15 00:25:29 +00001389 errors = (
1390 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1391 errno.EWOULDBLOCK,
1392 )
1393 self.assertIn(rc, errors)
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001394 finally:
1395 s.close()
1396
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001397 def test_connect_with_context(self):
Martin Panter71202bb2016-01-15 00:25:29 +00001398 with support.transient_internet(REMOTE_HOST):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001399 # Same as test_connect, but with a separately created context
1400 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1401 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Martin Panter71202bb2016-01-15 00:25:29 +00001402 s.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001403 try:
1404 self.assertEqual({}, s.getpeercert())
1405 finally:
1406 s.close()
1407 # Same with a server hostname
1408 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
Martin Panter71202bb2016-01-15 00:25:29 +00001409 server_hostname=REMOTE_HOST)
1410 s.connect((REMOTE_HOST, 443))
Benjamin Peterson31aa69e2014-11-23 20:13:31 -06001411 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001412 # This should fail because we have no verification certs
1413 ctx.verify_mode = ssl.CERT_REQUIRED
1414 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1415 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
Martin Panter71202bb2016-01-15 00:25:29 +00001416 s.connect, (REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001417 s.close()
1418 # This should succeed because we specify the root cert
Martin Panter71202bb2016-01-15 00:25:29 +00001419 ctx.load_verify_locations(REMOTE_ROOT_CERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001420 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Martin Panter71202bb2016-01-15 00:25:29 +00001421 s.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001422 try:
1423 cert = s.getpeercert()
1424 self.assertTrue(cert)
1425 finally:
1426 s.close()
1427
1428 def test_connect_capath(self):
1429 # Verify server certificates using the `capath` argument
1430 # NOTE: the subject hashing algorithm has been changed between
1431 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1432 # contain both versions of each certificate (same content, different
1433 # filename) for this test to be portable across OpenSSL releases.
Martin Panter71202bb2016-01-15 00:25:29 +00001434 with support.transient_internet(REMOTE_HOST):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001435 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1436 ctx.verify_mode = ssl.CERT_REQUIRED
1437 ctx.load_verify_locations(capath=CAPATH)
1438 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Martin Panter71202bb2016-01-15 00:25:29 +00001439 s.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001440 try:
1441 cert = s.getpeercert()
1442 self.assertTrue(cert)
1443 finally:
1444 s.close()
1445 # Same with a bytes `capath` argument
1446 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1447 ctx.verify_mode = ssl.CERT_REQUIRED
1448 ctx.load_verify_locations(capath=BYTES_CAPATH)
1449 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Martin Panter71202bb2016-01-15 00:25:29 +00001450 s.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001451 try:
1452 cert = s.getpeercert()
1453 self.assertTrue(cert)
1454 finally:
1455 s.close()
1456
1457 def test_connect_cadata(self):
Martin Panter71202bb2016-01-15 00:25:29 +00001458 with open(REMOTE_ROOT_CERT) as f:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001459 pem = f.read().decode('ascii')
1460 der = ssl.PEM_cert_to_DER_cert(pem)
Martin Panter71202bb2016-01-15 00:25:29 +00001461 with support.transient_internet(REMOTE_HOST):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001462 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1463 ctx.verify_mode = ssl.CERT_REQUIRED
1464 ctx.load_verify_locations(cadata=pem)
1465 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
Martin Panter71202bb2016-01-15 00:25:29 +00001466 s.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001467 cert = s.getpeercert()
1468 self.assertTrue(cert)
1469
1470 # same with DER
1471 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1472 ctx.verify_mode = ssl.CERT_REQUIRED
1473 ctx.load_verify_locations(cadata=der)
1474 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
Martin Panter71202bb2016-01-15 00:25:29 +00001475 s.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001476 cert = s.getpeercert()
1477 self.assertTrue(cert)
1478
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001479 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1480 def test_makefile_close(self):
1481 # Issue #5238: creating a file-like object with makefile() shouldn't
1482 # delay closing the underlying "real socket" (here tested with its
1483 # file descriptor, hence skipping the test under Windows).
Martin Panter71202bb2016-01-15 00:25:29 +00001484 with support.transient_internet(REMOTE_HOST):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001485 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
Martin Panter71202bb2016-01-15 00:25:29 +00001486 ss.connect((REMOTE_HOST, 443))
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001487 fd = ss.fileno()
1488 f = ss.makefile()
1489 f.close()
1490 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001491 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001492 # Closing the SSL socket should close the fd too
1493 ss.close()
1494 gc.collect()
1495 with self.assertRaises(OSError) as e:
1496 os.read(fd, 0)
1497 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001498
Antoine Pitrou3945c862010-04-28 21:11:01 +00001499 def test_non_blocking_handshake(self):
Martin Panter71202bb2016-01-15 00:25:29 +00001500 with support.transient_internet(REMOTE_HOST):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001501 s = socket.socket(socket.AF_INET)
Martin Panter71202bb2016-01-15 00:25:29 +00001502 s.connect((REMOTE_HOST, 443))
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001503 s.setblocking(False)
1504 s = ssl.wrap_socket(s,
1505 cert_reqs=ssl.CERT_NONE,
1506 do_handshake_on_connect=False)
1507 count = 0
1508 while True:
1509 try:
1510 count += 1
1511 s.do_handshake()
1512 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001513 except ssl.SSLWantReadError:
1514 select.select([s], [], [])
1515 except ssl.SSLWantWriteError:
1516 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001517 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001518 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001519 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001520
Antoine Pitrou3945c862010-04-28 21:11:01 +00001521 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001522 def _test_get_server_certificate(host, port, cert=None):
1523 with support.transient_internet(host):
1524 pem = ssl.get_server_certificate((host, port))
1525 if not pem:
1526 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001527
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001528 try:
1529 pem = ssl.get_server_certificate((host, port),
1530 ca_certs=CERTFILE)
1531 except ssl.SSLError as x:
1532 #should fail
1533 if support.verbose:
1534 sys.stdout.write("%s\n" % x)
1535 else:
1536 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001537
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001538 pem = ssl.get_server_certificate((host, port),
1539 ca_certs=cert)
1540 if not pem:
1541 self.fail("No server certificate on %s:%s!" % (host, port))
1542 if support.verbose:
1543 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1544
Martin Panter71202bb2016-01-15 00:25:29 +00001545 _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001546 if support.IPV6_ENABLED:
1547 _test_get_server_certificate('ipv6.google.com', 443)
1548
1549 def test_ciphers(self):
Martin Panter71202bb2016-01-15 00:25:29 +00001550 remote = (REMOTE_HOST, 443)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001551 with support.transient_internet(remote[0]):
1552 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1553 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1554 s.connect(remote)
1555 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1556 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1557 s.connect(remote)
1558 # Error checking can happen at instantiation or when connecting
1559 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1560 with closing(socket.socket(socket.AF_INET)) as sock:
1561 s = ssl.wrap_socket(sock,
1562 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1563 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001564
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001565 def test_algorithms(self):
1566 # Issue #8484: all algorithms should be available when verifying a
1567 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001568 # SHA256 was added in OpenSSL 0.9.8
1569 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1570 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001571 # sha256.tbs-internet.com needs SNI to use the correct certificate
1572 if not ssl.HAS_SNI:
1573 self.skipTest("SNI needed for this test")
1574 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001575 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001576 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001577 with support.transient_internet("sha256.tbs-internet.com"):
1578 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1579 ctx.verify_mode = ssl.CERT_REQUIRED
1580 ctx.load_verify_locations(sha256_cert)
1581 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1582 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001583 try:
1584 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001585 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001586 sys.stdout.write("\nCipher with %r is %r\n" %
1587 (remote, s.cipher()))
1588 sys.stdout.write("Certificate is:\n%s\n" %
1589 pprint.pformat(s.getpeercert()))
1590 finally:
1591 s.close()
1592
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001593 def test_get_ca_certs_capath(self):
1594 # capath certs are loaded on request
Martin Panter71202bb2016-01-15 00:25:29 +00001595 with support.transient_internet(REMOTE_HOST):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001596 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1597 ctx.verify_mode = ssl.CERT_REQUIRED
1598 ctx.load_verify_locations(capath=CAPATH)
1599 self.assertEqual(ctx.get_ca_certs(), [])
1600 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
Martin Panter71202bb2016-01-15 00:25:29 +00001601 s.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001602 try:
1603 cert = s.getpeercert()
1604 self.assertTrue(cert)
1605 finally:
1606 s.close()
1607 self.assertEqual(len(ctx.get_ca_certs()), 1)
1608
1609 @needs_sni
1610 def test_context_setget(self):
1611 # Check that the context of a connected socket can be replaced.
Martin Panter71202bb2016-01-15 00:25:29 +00001612 with support.transient_internet(REMOTE_HOST):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001613 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1614 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1615 s = socket.socket(socket.AF_INET)
1616 with closing(ctx1.wrap_socket(s)) as ss:
Martin Panter71202bb2016-01-15 00:25:29 +00001617 ss.connect((REMOTE_HOST, 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001618 self.assertIs(ss.context, ctx1)
1619 self.assertIs(ss._sslobj.context, ctx1)
1620 ss.context = ctx2
1621 self.assertIs(ss.context, ctx2)
1622 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001623
Bill Janssen98d19da2007-09-10 21:51:02 +00001624try:
1625 import threading
1626except ImportError:
1627 _have_threads = False
1628else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001629 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001630
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001631 from test.ssl_servers import make_https_server
1632
Bill Janssen98d19da2007-09-10 21:51:02 +00001633 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001634
Bill Janssen98d19da2007-09-10 21:51:02 +00001635 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001636
Bill Janssen98d19da2007-09-10 21:51:02 +00001637 """A mildly complicated class, because we want it to work both
1638 with and without the SSL wrapper around the socket connection, so
1639 that we can test the STARTTLS functionality."""
1640
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001641 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001642 self.server = server
1643 self.running = False
1644 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001645 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001646 self.sock.setblocking(1)
1647 self.sslconn = None
1648 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001649 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001650
Antoine Pitrou3945c862010-04-28 21:11:01 +00001651 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001652 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001653 self.sslconn = self.server.context.wrap_socket(
1654 self.sock, server_side=True)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001655 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1656 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001657 except socket.error as e:
1658 # We treat ConnectionResetError as though it were an
1659 # SSLError - OpenSSL on Ubuntu abruptly closes the
1660 # connection when asked to use an unsupported protocol.
1661 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001662 # XXX Various errors can have happened here, for example
1663 # a mismatching protocol version, an invalid certificate,
1664 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001665 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1666 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001667 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001668 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001669 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001670 self.running = False
1671 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001672 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001673 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001674 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001675 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1676 cert = self.sslconn.getpeercert()
1677 if support.verbose and self.server.chatty:
1678 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1679 cert_binary = self.sslconn.getpeercert(True)
1680 if support.verbose and self.server.chatty:
1681 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1682 cipher = self.sslconn.cipher()
1683 if support.verbose and self.server.chatty:
1684 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1685 sys.stdout.write(" server: selected protocol is now "
1686 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001687 return True
1688
1689 def read(self):
1690 if self.sslconn:
1691 return self.sslconn.read()
1692 else:
1693 return self.sock.recv(1024)
1694
1695 def write(self, bytes):
1696 if self.sslconn:
1697 return self.sslconn.write(bytes)
1698 else:
1699 return self.sock.send(bytes)
1700
1701 def close(self):
1702 if self.sslconn:
1703 self.sslconn.close()
1704 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001705 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001706
Antoine Pitrou3945c862010-04-28 21:11:01 +00001707 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001708 self.running = True
1709 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001710 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001711 return
1712 while self.running:
1713 try:
1714 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001715 stripped = msg.strip()
1716 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001717 # eof, so quit this handler
1718 self.running = False
1719 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001720 elif stripped == b'over':
1721 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001722 sys.stdout.write(" server: client closed connection\n")
1723 self.close()
1724 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001725 elif (self.server.starttls_server and
1726 stripped == b'STARTTLS'):
1727 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001728 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001729 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001730 if not self.wrap_conn():
1731 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001732 elif (self.server.starttls_server and self.sslconn
1733 and stripped == b'ENDTLS'):
1734 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001735 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001736 self.write(b"OK\n")
1737 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001738 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001739 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001740 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001741 elif stripped == b'CB tls-unique':
1742 if support.verbose and self.server.connectionchatty:
1743 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1744 data = self.sslconn.get_channel_binding("tls-unique")
1745 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001746 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001747 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001748 self.server.connectionchatty):
1749 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001750 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1751 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001752 self.write(msg.lower())
1753 except ssl.SSLError:
1754 if self.server.chatty:
1755 handle_error("Test server failure:\n")
1756 self.close()
1757 self.running = False
1758 # normally, we'd just stop here, but for the test
1759 # harness, we want to stop the server
1760 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001761
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001762 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001763 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001764 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001765 npn_protocols=None, alpn_protocols=None,
1766 ciphers=None, context=None):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001767 if context:
1768 self.context = context
1769 else:
1770 self.context = ssl.SSLContext(ssl_version
1771 if ssl_version is not None
1772 else ssl.PROTOCOL_TLSv1)
1773 self.context.verify_mode = (certreqs if certreqs is not None
1774 else ssl.CERT_NONE)
1775 if cacerts:
1776 self.context.load_verify_locations(cacerts)
1777 if certificate:
1778 self.context.load_cert_chain(certificate)
1779 if npn_protocols:
1780 self.context.set_npn_protocols(npn_protocols)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001781 if alpn_protocols:
1782 self.context.set_alpn_protocols(alpn_protocols)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001783 if ciphers:
1784 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001785 self.chatty = chatty
1786 self.connectionchatty = connectionchatty
1787 self.starttls_server = starttls_server
1788 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001789 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001790 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001791 self.active = False
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001792 self.selected_npn_protocols = []
1793 self.selected_alpn_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001794 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001795 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001796 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001797
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001798 def __enter__(self):
1799 self.start(threading.Event())
1800 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001801 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001802
1803 def __exit__(self, *args):
1804 self.stop()
1805 self.join()
1806
Antoine Pitrou3945c862010-04-28 21:11:01 +00001807 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001808 self.flag = flag
1809 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001810
Antoine Pitrou3945c862010-04-28 21:11:01 +00001811 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001812 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001813 self.sock.listen(5)
1814 self.active = True
1815 if self.flag:
1816 # signal an event
1817 self.flag.set()
1818 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001819 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001820 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001821 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001822 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001823 + repr(connaddr) + '\n')
1824 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001825 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001826 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001827 except socket.timeout:
1828 pass
1829 except KeyboardInterrupt:
1830 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001831 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001832
Antoine Pitrou3945c862010-04-28 21:11:01 +00001833 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001834 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001835
Bill Janssen934b16d2008-06-28 22:19:33 +00001836 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001837
Antoine Pitrou3945c862010-04-28 21:11:01 +00001838 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001839
Antoine Pitrou3945c862010-04-28 21:11:01 +00001840 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001841
1842 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001843 self.socket = ssl.wrap_socket(conn, server_side=True,
1844 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001845 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001846 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001847 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001848 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001849
1850 def readable(self):
1851 if isinstance(self.socket, ssl.SSLSocket):
1852 while self.socket.pending() > 0:
1853 self.handle_read_event()
1854 return True
1855
Antoine Pitroufc69af12010-04-24 20:04:58 +00001856 def _do_ssl_handshake(self):
1857 try:
1858 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001859 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1860 return
1861 except ssl.SSLEOFError:
1862 return self.handle_close()
1863 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001864 raise
1865 except socket.error, err:
1866 if err.args[0] == errno.ECONNABORTED:
1867 return self.handle_close()
1868 else:
1869 self._ssl_accepting = False
1870
Bill Janssen934b16d2008-06-28 22:19:33 +00001871 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001872 if self._ssl_accepting:
1873 self._do_ssl_handshake()
1874 else:
1875 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001876 if support.verbose:
1877 sys.stdout.write(" server: read %s from client\n" % repr(data))
1878 if not data:
1879 self.close()
1880 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001881 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001882
1883 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001884 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001885 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001886 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1887
1888 def handle_error(self):
1889 raise
1890
1891 def __init__(self, certfile):
1892 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001893 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1894 self.port = support.bind_port(sock, '')
1895 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001896 self.listen(5)
1897
1898 def handle_accept(self):
1899 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001900 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001901 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1902 self.ConnectionHandler(sock_obj, self.certfile)
1903
1904 def handle_error(self):
1905 raise
1906
1907 def __init__(self, certfile):
1908 self.flag = None
1909 self.active = False
1910 self.server = self.EchoServer(certfile)
1911 self.port = self.server.port
1912 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001913 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001914
1915 def __str__(self):
1916 return "<%s %s>" % (self.__class__.__name__, self.server)
1917
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001918 def __enter__(self):
1919 self.start(threading.Event())
1920 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001921 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001922
1923 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001924 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001925 sys.stdout.write(" cleanup: stopping server.\n")
1926 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001927 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001928 sys.stdout.write(" cleanup: joining server thread.\n")
1929 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001930 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001931 sys.stdout.write(" cleanup: successfully joined.\n")
1932
Antoine Pitrou3945c862010-04-28 21:11:01 +00001933 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001934 self.flag = flag
1935 threading.Thread.start(self)
1936
Antoine Pitrou3945c862010-04-28 21:11:01 +00001937 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001938 self.active = True
1939 if self.flag:
1940 self.flag.set()
1941 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001942 try:
1943 asyncore.loop(1)
1944 except:
1945 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001946
Antoine Pitrou3945c862010-04-28 21:11:01 +00001947 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001948 self.active = False
1949 self.server.close()
1950
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001951 def server_params_test(client_context, server_context, indata=b"FOO\n",
1952 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001953 """
1954 Launch a server, connect a client to it and try various reads
1955 and writes.
1956 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001957 stats = {}
1958 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001959 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001960 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001961 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001962 with closing(client_context.wrap_socket(socket.socket(),
1963 server_hostname=sni_name)) as s:
1964 s.connect((HOST, server.port))
1965 for arg in [indata, bytearray(indata), memoryview(indata)]:
1966 if connectionchatty:
1967 if support.verbose:
1968 sys.stdout.write(
1969 " client: sending %r...\n" % indata)
1970 s.write(arg)
1971 outdata = s.read()
1972 if connectionchatty:
1973 if support.verbose:
1974 sys.stdout.write(" client: read %r\n" % outdata)
1975 if outdata != indata.lower():
1976 raise AssertionError(
1977 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1978 % (outdata[:20], len(outdata),
1979 indata[:20].lower(), len(indata)))
1980 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001981 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001982 if support.verbose:
1983 sys.stdout.write(" client: closing connection.\n")
1984 stats.update({
1985 'compression': s.compression(),
1986 'cipher': s.cipher(),
1987 'peercert': s.getpeercert(),
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001988 'client_alpn_protocol': s.selected_alpn_protocol(),
Alex Gaynore98205d2014-09-04 13:33:22 -07001989 'client_npn_protocol': s.selected_npn_protocol(),
1990 'version': s.version(),
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001991 })
1992 s.close()
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05001993 stats['server_alpn_protocols'] = server.selected_alpn_protocols
1994 stats['server_npn_protocols'] = server.selected_npn_protocols
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001995 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001996
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001997 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1998 certsreqs=None, server_options=0, client_options=0):
Alex Gaynore98205d2014-09-04 13:33:22 -07001999 """
2000 Try to SSL-connect using *client_protocol* to *server_protocol*.
2001 If *expect_success* is true, assert that the connection succeeds,
2002 if it's false, assert that the connection fails.
2003 Also, if *expect_success* is a string, assert that it is the protocol
2004 version actually used by the connection.
2005 """
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00002006 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00002007 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00002008 certtype = {
2009 ssl.CERT_NONE: "CERT_NONE",
2010 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2011 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2012 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002013 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00002014 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00002015 sys.stdout.write(formatstr %
2016 (ssl.get_protocol_name(client_protocol),
2017 ssl.get_protocol_name(server_protocol),
2018 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002019 client_context = ssl.SSLContext(client_protocol)
2020 client_context.options |= client_options
2021 server_context = ssl.SSLContext(server_protocol)
2022 server_context.options |= server_options
2023
2024 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2025 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2026 # starting from OpenSSL 1.0.0 (see issue #8322).
2027 if client_context.protocol == ssl.PROTOCOL_SSLv23:
2028 client_context.set_ciphers("ALL")
2029
2030 for ctx in (client_context, server_context):
2031 ctx.verify_mode = certsreqs
2032 ctx.load_cert_chain(CERTFILE)
2033 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00002034 try:
Alex Gaynore98205d2014-09-04 13:33:22 -07002035 stats = server_params_test(client_context, server_context,
2036 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00002037 # Protocol mismatch can result in either an SSLError, or a
2038 # "Connection reset by peer" error.
2039 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00002040 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00002041 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00002042 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00002043 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00002044 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00002045 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00002046 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00002047 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00002048 "Client protocol %s succeeded with server protocol %s!"
2049 % (ssl.get_protocol_name(client_protocol),
2050 ssl.get_protocol_name(server_protocol)))
Alex Gaynore98205d2014-09-04 13:33:22 -07002051 elif (expect_success is not True
2052 and expect_success != stats['version']):
2053 raise AssertionError("version mismatch: expected %r, got %r"
2054 % (expect_success, stats['version']))
Bill Janssen98d19da2007-09-10 21:51:02 +00002055
2056
Bill Janssen934b16d2008-06-28 22:19:33 +00002057 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00002058
Antoine Pitroud75efd92010-08-04 17:38:33 +00002059 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002060 def test_echo(self):
2061 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002062 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002063 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002064 for protocol in PROTOCOLS:
2065 context = ssl.SSLContext(protocol)
2066 context.load_cert_chain(CERTFILE)
2067 server_params_test(context, context,
2068 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002069
Antoine Pitrou3945c862010-04-28 21:11:01 +00002070 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002071 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002072 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002073 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2074 context.verify_mode = ssl.CERT_REQUIRED
2075 context.load_verify_locations(CERTFILE)
2076 context.load_cert_chain(CERTFILE)
2077 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002078 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002079 s = context.wrap_socket(socket.socket(),
2080 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00002081 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002082 # getpeercert() raise ValueError while the handshake isn't
2083 # done.
2084 with self.assertRaises(ValueError):
2085 s.getpeercert()
2086 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00002087 cert = s.getpeercert()
2088 self.assertTrue(cert, "Can't get peer certificate.")
2089 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002090 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002091 sys.stdout.write(pprint.pformat(cert) + '\n')
2092 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2093 if 'subject' not in cert:
2094 self.fail("No subject field in certificate: %s." %
2095 pprint.pformat(cert))
2096 if ((('organizationName', 'Python Software Foundation'),)
2097 not in cert['subject']):
2098 self.fail(
2099 "Missing or invalid 'organizationName' field in certificate subject; "
2100 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002101 self.assertIn('notBefore', cert)
2102 self.assertIn('notAfter', cert)
2103 before = ssl.cert_time_to_seconds(cert['notBefore'])
2104 after = ssl.cert_time_to_seconds(cert['notAfter'])
2105 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002106 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002107
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002108 @unittest.skipUnless(have_verify_flags(),
2109 "verify_flags need OpenSSL > 0.9.8")
2110 def test_crl_check(self):
2111 if support.verbose:
2112 sys.stdout.write("\n")
2113
2114 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2115 server_context.load_cert_chain(SIGNED_CERTFILE)
2116
2117 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2118 context.verify_mode = ssl.CERT_REQUIRED
2119 context.load_verify_locations(SIGNING_CA)
Benjamin Petersond86699f2015-03-04 23:18:48 -05002120 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2121 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002122
2123 # VERIFY_DEFAULT should pass
2124 server = ThreadedEchoServer(context=server_context, chatty=True)
2125 with server:
2126 with closing(context.wrap_socket(socket.socket())) as s:
2127 s.connect((HOST, server.port))
2128 cert = s.getpeercert()
2129 self.assertTrue(cert, "Can't get peer certificate.")
2130
2131 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2132 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2133
2134 server = ThreadedEchoServer(context=server_context, chatty=True)
2135 with server:
2136 with closing(context.wrap_socket(socket.socket())) as s:
2137 with self.assertRaisesRegexp(ssl.SSLError,
2138 "certificate verify failed"):
2139 s.connect((HOST, server.port))
2140
2141 # now load a CRL file. The CRL file is signed by the CA.
2142 context.load_verify_locations(CRLFILE)
2143
2144 server = ThreadedEchoServer(context=server_context, chatty=True)
2145 with server:
2146 with closing(context.wrap_socket(socket.socket())) as s:
2147 s.connect((HOST, server.port))
2148 cert = s.getpeercert()
2149 self.assertTrue(cert, "Can't get peer certificate.")
2150
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002151 def test_check_hostname(self):
2152 if support.verbose:
2153 sys.stdout.write("\n")
2154
2155 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2156 server_context.load_cert_chain(SIGNED_CERTFILE)
2157
2158 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2159 context.verify_mode = ssl.CERT_REQUIRED
2160 context.check_hostname = True
2161 context.load_verify_locations(SIGNING_CA)
2162
2163 # correct hostname should verify
2164 server = ThreadedEchoServer(context=server_context, chatty=True)
2165 with server:
2166 with closing(context.wrap_socket(socket.socket(),
2167 server_hostname="localhost")) as s:
2168 s.connect((HOST, server.port))
2169 cert = s.getpeercert()
2170 self.assertTrue(cert, "Can't get peer certificate.")
2171
2172 # incorrect hostname should raise an exception
2173 server = ThreadedEchoServer(context=server_context, chatty=True)
2174 with server:
2175 with closing(context.wrap_socket(socket.socket(),
2176 server_hostname="invalid")) as s:
2177 with self.assertRaisesRegexp(ssl.CertificateError,
2178 "hostname 'invalid' doesn't match u?'localhost'"):
2179 s.connect((HOST, server.port))
2180
2181 # missing server_hostname arg should cause an exception, too
2182 server = ThreadedEchoServer(context=server_context, chatty=True)
2183 with server:
2184 with closing(socket.socket()) as s:
2185 with self.assertRaisesRegexp(ValueError,
2186 "check_hostname requires server_hostname"):
2187 context.wrap_socket(s)
2188
Martin Panterfd8e8502016-01-30 02:36:00 +00002189 def test_wrong_cert(self):
Martin Panter886aba42016-02-01 21:58:11 +00002190 """Connecting when the server rejects the client's certificate
2191
2192 Launch a server with CERT_REQUIRED, and check that trying to
2193 connect to it with a wrong client certificate fails.
2194 """
2195 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2196 "wrongcert.pem")
2197 server = ThreadedEchoServer(CERTFILE,
2198 certreqs=ssl.CERT_REQUIRED,
2199 cacerts=CERTFILE, chatty=False,
2200 connectionchatty=False)
2201 with server, \
2202 closing(socket.socket()) as sock, \
2203 closing(ssl.wrap_socket(sock,
2204 certfile=certfile,
2205 ssl_version=ssl.PROTOCOL_TLSv1)) as s:
2206 try:
2207 # Expect either an SSL error about the server rejecting
2208 # the connection, or a low-level connection reset (which
2209 # sometimes happens on Windows)
2210 s.connect((HOST, server.port))
2211 except ssl.SSLError as e:
2212 if support.verbose:
2213 sys.stdout.write("\nSSLError is %r\n" % e)
2214 except socket.error as e:
2215 if e.errno != errno.ECONNRESET:
2216 raise
2217 if support.verbose:
2218 sys.stdout.write("\nsocket.error is %r\n" % e)
2219 else:
2220 self.fail("Use of invalid cert should have failed!")
Bill Janssen98d19da2007-09-10 21:51:02 +00002221
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002222 def test_rude_shutdown(self):
2223 """A brutal shutdown of an SSL server should raise an OSError
2224 in the client when attempting handshake.
2225 """
2226 listener_ready = threading.Event()
2227 listener_gone = threading.Event()
2228
2229 s = socket.socket()
2230 port = support.bind_port(s, HOST)
2231
2232 # `listener` runs in a thread. It sits in an accept() until
2233 # the main thread connects. Then it rudely closes the socket,
2234 # and sets Event `listener_gone` to let the main thread know
2235 # the socket is gone.
2236 def listener():
2237 s.listen(5)
2238 listener_ready.set()
2239 newsock, addr = s.accept()
2240 newsock.close()
2241 s.close()
2242 listener_gone.set()
2243
2244 def connector():
2245 listener_ready.wait()
2246 with closing(socket.socket()) as c:
2247 c.connect((HOST, port))
2248 listener_gone.wait()
2249 try:
2250 ssl_sock = ssl.wrap_socket(c)
Benjamin Petersone208b572014-08-20 14:49:08 -05002251 except socket.error:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002252 pass
2253 else:
2254 self.fail('connecting to closed SSL socket should have failed')
2255
2256 t = threading.Thread(target=listener)
2257 t.start()
2258 try:
2259 connector()
2260 finally:
2261 t.join()
2262
Antoine Pitroud75efd92010-08-04 17:38:33 +00002263 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002264 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2265 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002266 def test_protocol_sslv2(self):
2267 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002268 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002269 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002270 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2271 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2272 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002273 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002274 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2275 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002276 # SSLv23 client with specific SSL options
2277 if no_sslv2_implies_sslv3_hello():
2278 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2279 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2280 client_options=ssl.OP_NO_SSLv2)
2281 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2282 client_options=ssl.OP_NO_SSLv3)
2283 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2284 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002285
Antoine Pitroud75efd92010-08-04 17:38:33 +00002286 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002287 def test_protocol_sslv23(self):
2288 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002289 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002290 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002291 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2292 try:
2293 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2294 except socket.error as x:
2295 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2296 if support.verbose:
2297 sys.stdout.write(
2298 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2299 % str(x))
Benjamin Peterson60766c42014-12-05 21:59:35 -05002300 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson10aaca92015-11-11 22:38:41 -08002301 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002302 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Alex Gaynore98205d2014-09-04 13:33:22 -07002303 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Bill Janssen98d19da2007-09-10 21:51:02 +00002304
Benjamin Peterson60766c42014-12-05 21:59:35 -05002305 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson10aaca92015-11-11 22:38:41 -08002306 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002307 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Alex Gaynore98205d2014-09-04 13:33:22 -07002308 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002309
Benjamin Peterson60766c42014-12-05 21:59:35 -05002310 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Benjamin Peterson10aaca92015-11-11 22:38:41 -08002311 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002312 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Alex Gaynore98205d2014-09-04 13:33:22 -07002313 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002314
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002315 # Server with specific SSL options
Benjamin Peterson60766c42014-12-05 21:59:35 -05002316 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2317 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002318 server_options=ssl.OP_NO_SSLv3)
2319 # Will choose TLSv1
2320 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2321 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2322 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2323 server_options=ssl.OP_NO_TLSv1)
2324
2325
Antoine Pitroud75efd92010-08-04 17:38:33 +00002326 @skip_if_broken_ubuntu_ssl
Benjamin Peterson60766c42014-12-05 21:59:35 -05002327 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2328 "OpenSSL is compiled without SSLv3 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002329 def test_protocol_sslv3(self):
2330 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002331 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002332 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002333 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2334 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2335 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002336 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2337 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002338 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2339 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002340 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002341 if no_sslv2_implies_sslv3_hello():
2342 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Benjamin Peterson10aaca92015-11-11 22:38:41 -08002343 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2344 False, client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002345
Antoine Pitroud75efd92010-08-04 17:38:33 +00002346 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002347 def test_protocol_tlsv1(self):
2348 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002349 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002350 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002351 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2352 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2353 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002354 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2355 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002356 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2357 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002358 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2359 client_options=ssl.OP_NO_TLSv1)
2360
2361 @skip_if_broken_ubuntu_ssl
2362 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2363 "TLS version 1.1 not supported.")
2364 def test_protocol_tlsv1_1(self):
2365 """Connecting to a TLSv1.1 server with various client options.
2366 Testing against older TLS versions."""
2367 if support.verbose:
2368 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002369 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002370 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2371 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002372 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2373 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002374 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2375 client_options=ssl.OP_NO_TLSv1_1)
2376
Alex Gaynore98205d2014-09-04 13:33:22 -07002377 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002378 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2379 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2380
2381
2382 @skip_if_broken_ubuntu_ssl
2383 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2384 "TLS version 1.2 not supported.")
2385 def test_protocol_tlsv1_2(self):
2386 """Connecting to a TLSv1.2 server with various client options.
2387 Testing against older TLS versions."""
2388 if support.verbose:
2389 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002390 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002391 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2392 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2393 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2394 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
Benjamin Peterson60766c42014-12-05 21:59:35 -05002395 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2396 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002397 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2398 client_options=ssl.OP_NO_TLSv1_2)
2399
Alex Gaynore98205d2014-09-04 13:33:22 -07002400 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002401 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2402 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2403 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2404 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002405
Antoine Pitrou3945c862010-04-28 21:11:01 +00002406 def test_starttls(self):
2407 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002408 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002409
Trent Nelsone41b0062008-04-08 23:47:30 +00002410 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002411 ssl_version=ssl.PROTOCOL_TLSv1,
2412 starttls_server=True,
2413 chatty=True,
2414 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002415 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002416 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002417 s = socket.socket()
2418 s.setblocking(1)
2419 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002420 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002421 sys.stdout.write("\n")
2422 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002423 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002424 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002425 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002426 if wrapped:
2427 conn.write(indata)
2428 outdata = conn.read()
2429 else:
2430 s.send(indata)
2431 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002432 msg = outdata.strip().lower()
2433 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002434 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002435 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002436 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002437 " client: read %r from server, starting TLS...\n"
2438 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002439 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2440 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002441 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002442 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002443 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002444 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002445 " client: read %r from server, ending TLS...\n"
2446 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002447 s = conn.unwrap()
2448 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002449 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002450 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002451 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002452 " client: read %r from server\n" % msg)
2453 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002454 sys.stdout.write(" client: closing connection.\n")
2455 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002456 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002457 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002458 s.send(b"over\n")
2459 if wrapped:
2460 conn.close()
2461 else:
2462 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002463
Antoine Pitrou3945c862010-04-28 21:11:01 +00002464 def test_socketserver(self):
2465 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002466 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002467 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002468 if support.verbose:
2469 sys.stdout.write('\n')
2470 with open(CERTFILE, 'rb') as f:
2471 d1 = f.read()
2472 d2 = ''
2473 # now fetch the same data from the HTTPS server
Benjamin Petersonfcfb18e2014-11-23 11:42:45 -06002474 url = 'https://localhost:%d/%s' % (
2475 server.port, os.path.split(CERTFILE)[1])
2476 context = ssl.create_default_context(cafile=CERTFILE)
Benjamin Petersonb0609ec2014-11-23 11:52:46 -06002477 f = urllib2.urlopen(url, context=context)
Bill Janssen296a59d2007-09-16 22:06:00 +00002478 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002479 dlen = f.info().getheader("content-length")
2480 if dlen and (int(dlen) > 0):
2481 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002482 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002483 sys.stdout.write(
2484 " client: read %d bytes from remote server '%s'\n"
2485 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002486 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002487 f.close()
2488 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002489
Antoine Pitrou3945c862010-04-28 21:11:01 +00002490 def test_asyncore_server(self):
2491 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002492 indata = "TEST MESSAGE of mixed case\n"
2493
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002494 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002495 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002496
2497 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002498 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002499 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002500 s = ssl.wrap_socket(socket.socket())
2501 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002502 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002503 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002504 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002505 s.write(indata)
2506 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002507 if support.verbose:
2508 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002509 if outdata != indata.lower():
2510 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002511 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2512 % (outdata[:20], len(outdata),
2513 indata[:20].lower(), len(indata)))
2514 s.write(b"over\n")
2515 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002516 sys.stdout.write(" client: closing connection.\n")
2517 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002518 if support.verbose:
2519 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002520
Antoine Pitrou3945c862010-04-28 21:11:01 +00002521 def test_recv_send(self):
2522 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002523 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002524 sys.stdout.write("\n")
2525
2526 server = ThreadedEchoServer(CERTFILE,
2527 certreqs=ssl.CERT_NONE,
2528 ssl_version=ssl.PROTOCOL_TLSv1,
2529 cacerts=CERTFILE,
2530 chatty=True,
2531 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002532 with server:
2533 s = ssl.wrap_socket(socket.socket(),
2534 server_side=False,
2535 certfile=CERTFILE,
2536 ca_certs=CERTFILE,
2537 cert_reqs=ssl.CERT_NONE,
2538 ssl_version=ssl.PROTOCOL_TLSv1)
2539 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002540 # helper methods for standardising recv* method signatures
2541 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002542 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002543 count = s.recv_into(b)
2544 return b[:count]
2545
2546 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002547 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002548 count, addr = s.recvfrom_into(b)
2549 return b[:count]
2550
2551 # (name, method, whether to expect success, *args)
2552 send_methods = [
2553 ('send', s.send, True, []),
2554 ('sendto', s.sendto, False, ["some.address"]),
2555 ('sendall', s.sendall, True, []),
2556 ]
2557 recv_methods = [
2558 ('recv', s.recv, True, []),
2559 ('recvfrom', s.recvfrom, False, ["some.address"]),
2560 ('recv_into', _recv_into, True, []),
2561 ('recvfrom_into', _recvfrom_into, False, []),
2562 ]
2563 data_prefix = u"PREFIX_"
2564
2565 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002566 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002567 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002568 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002569 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002570 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002571 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002572 "While sending with <<{name:s}>> bad data "
2573 "<<{outdata:r}>> ({nout:d}) received; "
2574 "expected <<{indata:r}>> ({nin:d})\n".format(
2575 name=meth_name, outdata=outdata[:20],
2576 nout=len(outdata),
2577 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002578 )
2579 )
2580 except ValueError as e:
2581 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002582 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002583 "Failed to send with method <<{name:s}>>; "
2584 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002585 )
2586 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002587 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002588 "Method <<{name:s}>> failed with unexpected "
2589 "exception message: {exp:s}\n".format(
2590 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002591 )
2592 )
2593
2594 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002595 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002596 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002597 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002598 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002599 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002600 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002601 "While receiving with <<{name:s}>> bad data "
2602 "<<{outdata:r}>> ({nout:d}) received; "
2603 "expected <<{indata:r}>> ({nin:d})\n".format(
2604 name=meth_name, outdata=outdata[:20],
2605 nout=len(outdata),
2606 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002607 )
2608 )
2609 except ValueError as e:
2610 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002611 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002612 "Failed to receive with method <<{name:s}>>; "
2613 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002614 )
2615 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002616 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002617 "Method <<{name:s}>> failed with unexpected "
2618 "exception message: {exp:s}\n".format(
2619 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002620 )
2621 )
2622 # consume data
2623 s.read()
2624
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002625 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002626 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002627
Antoine Pitroufc69af12010-04-24 20:04:58 +00002628 def test_handshake_timeout(self):
2629 # Issue #5103: SSL handshake must respect the socket timeout
2630 server = socket.socket(socket.AF_INET)
2631 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002632 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002633 started = threading.Event()
2634 finish = False
2635
2636 def serve():
2637 server.listen(5)
2638 started.set()
2639 conns = []
2640 while not finish:
2641 r, w, e = select.select([server], [], [], 0.1)
2642 if server in r:
2643 # Let the socket hang around rather than having
2644 # it closed by garbage collection.
2645 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002646 for sock in conns:
2647 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002648
2649 t = threading.Thread(target=serve)
2650 t.start()
2651 started.wait()
2652
2653 try:
2654 try:
2655 c = socket.socket(socket.AF_INET)
2656 c.settimeout(0.2)
2657 c.connect((host, port))
2658 # Will attempt handshake and time out
2659 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2660 ssl.wrap_socket, c)
2661 finally:
2662 c.close()
2663 try:
2664 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002665 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002666 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002667 # Will attempt handshake and time out
2668 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2669 c.connect, (host, port))
2670 finally:
2671 c.close()
2672 finally:
2673 finish = True
2674 t.join()
2675 server.close()
2676
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002677 def test_server_accept(self):
2678 # Issue #16357: accept() on a SSLSocket created through
2679 # SSLContext.wrap_socket().
2680 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2681 context.verify_mode = ssl.CERT_REQUIRED
2682 context.load_verify_locations(CERTFILE)
2683 context.load_cert_chain(CERTFILE)
2684 server = socket.socket(socket.AF_INET)
2685 host = "127.0.0.1"
2686 port = support.bind_port(server)
2687 server = context.wrap_socket(server, server_side=True)
2688
2689 evt = threading.Event()
2690 remote = [None]
2691 peer = [None]
2692 def serve():
2693 server.listen(5)
2694 # Block on the accept and wait on the connection to close.
2695 evt.set()
2696 remote[0], peer[0] = server.accept()
2697 remote[0].recv(1)
2698
2699 t = threading.Thread(target=serve)
2700 t.start()
2701 # Client wait until server setup and perform a connect.
2702 evt.wait()
2703 client = context.wrap_socket(socket.socket())
2704 client.connect((host, port))
2705 client_addr = client.getsockname()
2706 client.close()
2707 t.join()
2708 remote[0].close()
2709 server.close()
2710 # Sanity checks.
2711 self.assertIsInstance(remote[0], ssl.SSLSocket)
2712 self.assertEqual(peer[0], client_addr)
2713
2714 def test_getpeercert_enotconn(self):
2715 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2716 with closing(context.wrap_socket(socket.socket())) as sock:
2717 with self.assertRaises(socket.error) as cm:
2718 sock.getpeercert()
2719 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2720
2721 def test_do_handshake_enotconn(self):
2722 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2723 with closing(context.wrap_socket(socket.socket())) as sock:
2724 with self.assertRaises(socket.error) as cm:
2725 sock.do_handshake()
2726 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2727
Antoine Pitroud76088d2012-01-03 22:46:48 +01002728 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002729 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2730 try:
2731 # Force a set of weak ciphers on our client context
2732 context.set_ciphers("DES")
2733 except ssl.SSLError:
2734 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002735 with ThreadedEchoServer(CERTFILE,
2736 ssl_version=ssl.PROTOCOL_SSLv23,
2737 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002738 with closing(context.wrap_socket(socket.socket())) as s:
2739 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002740 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002741 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2742
Alex Gaynore98205d2014-09-04 13:33:22 -07002743 def test_version_basic(self):
2744 """
2745 Basic tests for SSLSocket.version().
2746 More tests are done in the test_protocol_*() methods.
2747 """
2748 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2749 with ThreadedEchoServer(CERTFILE,
2750 ssl_version=ssl.PROTOCOL_TLSv1,
2751 chatty=False) as server:
2752 with closing(context.wrap_socket(socket.socket())) as s:
2753 self.assertIs(s.version(), None)
2754 s.connect((HOST, server.port))
2755 self.assertEqual(s.version(), "TLSv1")
2756 self.assertIs(s.version(), None)
2757
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002758 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2759 def test_default_ecdh_curve(self):
2760 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2761 # should be enabled by default on SSL contexts.
2762 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2763 context.load_cert_chain(CERTFILE)
2764 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2765 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2766 # our default cipher list should prefer ECDH-based ciphers
2767 # automatically.
2768 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2769 context.set_ciphers("ECCdraft:ECDH")
2770 with ThreadedEchoServer(context=context) as server:
2771 with closing(context.wrap_socket(socket.socket())) as s:
2772 s.connect((HOST, server.port))
2773 self.assertIn("ECDH", s.cipher()[0])
2774
2775 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2776 "'tls-unique' channel binding not available")
2777 def test_tls_unique_channel_binding(self):
2778 """Test tls-unique channel binding."""
2779 if support.verbose:
2780 sys.stdout.write("\n")
2781
2782 server = ThreadedEchoServer(CERTFILE,
2783 certreqs=ssl.CERT_NONE,
2784 ssl_version=ssl.PROTOCOL_TLSv1,
2785 cacerts=CERTFILE,
2786 chatty=True,
2787 connectionchatty=False)
2788 with server:
2789 s = ssl.wrap_socket(socket.socket(),
2790 server_side=False,
2791 certfile=CERTFILE,
2792 ca_certs=CERTFILE,
2793 cert_reqs=ssl.CERT_NONE,
2794 ssl_version=ssl.PROTOCOL_TLSv1)
2795 s.connect((HOST, server.port))
2796 # get the data
2797 cb_data = s.get_channel_binding("tls-unique")
2798 if support.verbose:
2799 sys.stdout.write(" got channel binding data: {0!r}\n"
2800 .format(cb_data))
2801
2802 # check if it is sane
2803 self.assertIsNotNone(cb_data)
2804 self.assertEqual(len(cb_data), 12) # True for TLSv1
2805
2806 # and compare with the peers version
2807 s.write(b"CB tls-unique\n")
2808 peer_data_repr = s.read().strip()
2809 self.assertEqual(peer_data_repr,
2810 repr(cb_data).encode("us-ascii"))
2811 s.close()
2812
2813 # now, again
2814 s = ssl.wrap_socket(socket.socket(),
2815 server_side=False,
2816 certfile=CERTFILE,
2817 ca_certs=CERTFILE,
2818 cert_reqs=ssl.CERT_NONE,
2819 ssl_version=ssl.PROTOCOL_TLSv1)
2820 s.connect((HOST, server.port))
2821 new_cb_data = s.get_channel_binding("tls-unique")
2822 if support.verbose:
2823 sys.stdout.write(" got another channel binding data: {0!r}\n"
2824 .format(new_cb_data))
2825 # is it really unique
2826 self.assertNotEqual(cb_data, new_cb_data)
2827 self.assertIsNotNone(cb_data)
2828 self.assertEqual(len(cb_data), 12) # True for TLSv1
2829 s.write(b"CB tls-unique\n")
2830 peer_data_repr = s.read().strip()
2831 self.assertEqual(peer_data_repr,
2832 repr(new_cb_data).encode("us-ascii"))
2833 s.close()
2834
2835 def test_compression(self):
2836 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2837 context.load_cert_chain(CERTFILE)
2838 stats = server_params_test(context, context,
2839 chatty=True, connectionchatty=True)
2840 if support.verbose:
2841 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2842 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2843
2844 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2845 "ssl.OP_NO_COMPRESSION needed for this test")
2846 def test_compression_disabled(self):
2847 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2848 context.load_cert_chain(CERTFILE)
2849 context.options |= ssl.OP_NO_COMPRESSION
2850 stats = server_params_test(context, context,
2851 chatty=True, connectionchatty=True)
2852 self.assertIs(stats['compression'], None)
2853
2854 def test_dh_params(self):
2855 # Check we can get a connection with ephemeral Diffie-Hellman
2856 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2857 context.load_cert_chain(CERTFILE)
2858 context.load_dh_params(DHFILE)
2859 context.set_ciphers("kEDH")
2860 stats = server_params_test(context, context,
2861 chatty=True, connectionchatty=True)
2862 cipher = stats["cipher"][0]
2863 parts = cipher.split("-")
2864 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2865 self.fail("Non-DH cipher: " + cipher[0])
2866
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002867 def test_selected_alpn_protocol(self):
2868 # selected_alpn_protocol() is None unless ALPN is used.
2869 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2870 context.load_cert_chain(CERTFILE)
2871 stats = server_params_test(context, context,
2872 chatty=True, connectionchatty=True)
2873 self.assertIs(stats['client_alpn_protocol'], None)
2874
2875 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
2876 def test_selected_alpn_protocol_if_server_uses_alpn(self):
2877 # selected_alpn_protocol() is None unless ALPN is used by the client.
2878 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2879 client_context.load_verify_locations(CERTFILE)
2880 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2881 server_context.load_cert_chain(CERTFILE)
2882 server_context.set_alpn_protocols(['foo', 'bar'])
2883 stats = server_params_test(client_context, server_context,
2884 chatty=True, connectionchatty=True)
2885 self.assertIs(stats['client_alpn_protocol'], None)
2886
2887 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
2888 def test_alpn_protocols(self):
2889 server_protocols = ['foo', 'bar', 'milkshake']
2890 protocol_tests = [
2891 (['foo', 'bar'], 'foo'),
Benjamin Petersonaa707582015-01-23 17:30:26 -05002892 (['bar', 'foo'], 'foo'),
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002893 (['milkshake'], 'milkshake'),
Benjamin Petersonaa707582015-01-23 17:30:26 -05002894 (['http/3.0', 'http/4.0'], None)
Benjamin Petersonb10bfbe2015-01-23 16:35:37 -05002895 ]
2896 for client_protocols, expected in protocol_tests:
2897 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2898 server_context.load_cert_chain(CERTFILE)
2899 server_context.set_alpn_protocols(server_protocols)
2900 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2901 client_context.load_cert_chain(CERTFILE)
2902 client_context.set_alpn_protocols(client_protocols)
2903 stats = server_params_test(client_context, server_context,
2904 chatty=True, connectionchatty=True)
2905
2906 msg = "failed trying %s (s) and %s (c).\n" \
2907 "was expecting %s, but got %%s from the %%s" \
2908 % (str(server_protocols), str(client_protocols),
2909 str(expected))
2910 client_result = stats['client_alpn_protocol']
2911 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2912 server_result = stats['server_alpn_protocols'][-1] \
2913 if len(stats['server_alpn_protocols']) else 'nothing'
2914 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2915
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002916 def test_selected_npn_protocol(self):
2917 # selected_npn_protocol() is None unless NPN is used
2918 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2919 context.load_cert_chain(CERTFILE)
2920 stats = server_params_test(context, context,
2921 chatty=True, connectionchatty=True)
2922 self.assertIs(stats['client_npn_protocol'], None)
2923
2924 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2925 def test_npn_protocols(self):
2926 server_protocols = ['http/1.1', 'spdy/2']
2927 protocol_tests = [
2928 (['http/1.1', 'spdy/2'], 'http/1.1'),
2929 (['spdy/2', 'http/1.1'], 'http/1.1'),
2930 (['spdy/2', 'test'], 'spdy/2'),
2931 (['abc', 'def'], 'abc')
2932 ]
2933 for client_protocols, expected in protocol_tests:
2934 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2935 server_context.load_cert_chain(CERTFILE)
2936 server_context.set_npn_protocols(server_protocols)
2937 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2938 client_context.load_cert_chain(CERTFILE)
2939 client_context.set_npn_protocols(client_protocols)
2940 stats = server_params_test(client_context, server_context,
2941 chatty=True, connectionchatty=True)
2942
2943 msg = "failed trying %s (s) and %s (c).\n" \
2944 "was expecting %s, but got %%s from the %%s" \
2945 % (str(server_protocols), str(client_protocols),
2946 str(expected))
2947 client_result = stats['client_npn_protocol']
2948 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2949 server_result = stats['server_npn_protocols'][-1] \
2950 if len(stats['server_npn_protocols']) else 'nothing'
2951 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2952
2953 def sni_contexts(self):
2954 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2955 server_context.load_cert_chain(SIGNED_CERTFILE)
2956 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2957 other_context.load_cert_chain(SIGNED_CERTFILE2)
2958 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2959 client_context.verify_mode = ssl.CERT_REQUIRED
2960 client_context.load_verify_locations(SIGNING_CA)
2961 return server_context, other_context, client_context
2962
2963 def check_common_name(self, stats, name):
2964 cert = stats['peercert']
2965 self.assertIn((('commonName', name),), cert['subject'])
2966
2967 @needs_sni
2968 def test_sni_callback(self):
2969 calls = []
2970 server_context, other_context, client_context = self.sni_contexts()
2971
2972 def servername_cb(ssl_sock, server_name, initial_context):
2973 calls.append((server_name, initial_context))
2974 if server_name is not None:
2975 ssl_sock.context = other_context
2976 server_context.set_servername_callback(servername_cb)
2977
2978 stats = server_params_test(client_context, server_context,
2979 chatty=True,
2980 sni_name='supermessage')
2981 # The hostname was fetched properly, and the certificate was
2982 # changed for the connection.
2983 self.assertEqual(calls, [("supermessage", server_context)])
2984 # CERTFILE4 was selected
2985 self.check_common_name(stats, 'fakehostname')
2986
2987 calls = []
2988 # The callback is called with server_name=None
2989 stats = server_params_test(client_context, server_context,
2990 chatty=True,
2991 sni_name=None)
2992 self.assertEqual(calls, [(None, server_context)])
2993 self.check_common_name(stats, 'localhost')
2994
2995 # Check disabling the callback
2996 calls = []
2997 server_context.set_servername_callback(None)
2998
2999 stats = server_params_test(client_context, server_context,
3000 chatty=True,
3001 sni_name='notfunny')
3002 # Certificate didn't change
3003 self.check_common_name(stats, 'localhost')
3004 self.assertEqual(calls, [])
3005
3006 @needs_sni
3007 def test_sni_callback_alert(self):
3008 # Returning a TLS alert is reflected to the connecting client
3009 server_context, other_context, client_context = self.sni_contexts()
3010
3011 def cb_returning_alert(ssl_sock, server_name, initial_context):
3012 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3013 server_context.set_servername_callback(cb_returning_alert)
3014
3015 with self.assertRaises(ssl.SSLError) as cm:
3016 stats = server_params_test(client_context, server_context,
3017 chatty=False,
3018 sni_name='supermessage')
3019 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3020
3021 @needs_sni
3022 def test_sni_callback_raising(self):
3023 # Raising fails the connection with a TLS handshake failure alert.
3024 server_context, other_context, client_context = self.sni_contexts()
3025
3026 def cb_raising(ssl_sock, server_name, initial_context):
Serhiy Storchaka5312a7f2015-01-31 11:27:06 +02003027 1.0/0.0
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003028 server_context.set_servername_callback(cb_raising)
3029
3030 with self.assertRaises(ssl.SSLError) as cm, \
3031 support.captured_stderr() as stderr:
3032 stats = server_params_test(client_context, server_context,
3033 chatty=False,
3034 sni_name='supermessage')
3035 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3036 self.assertIn("ZeroDivisionError", stderr.getvalue())
3037
3038 @needs_sni
3039 def test_sni_callback_wrong_return_type(self):
3040 # Returning the wrong return type terminates the TLS connection
3041 # with an internal error alert.
3042 server_context, other_context, client_context = self.sni_contexts()
3043
3044 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3045 return "foo"
3046 server_context.set_servername_callback(cb_wrong_return_type)
3047
3048 with self.assertRaises(ssl.SSLError) as cm, \
3049 support.captured_stderr() as stderr:
3050 stats = server_params_test(client_context, server_context,
3051 chatty=False,
3052 sni_name='supermessage')
3053 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3054 self.assertIn("TypeError", stderr.getvalue())
3055
3056 def test_read_write_after_close_raises_valuerror(self):
3057 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3058 context.verify_mode = ssl.CERT_REQUIRED
3059 context.load_verify_locations(CERTFILE)
3060 context.load_cert_chain(CERTFILE)
3061 server = ThreadedEchoServer(context=context, chatty=False)
3062
3063 with server:
3064 s = context.wrap_socket(socket.socket())
3065 s.connect((HOST, server.port))
3066 s.close()
3067
3068 self.assertRaises(ValueError, s.read, 1024)
3069 self.assertRaises(ValueError, s.write, b'hello')
3070
Bill Janssen61c001a2008-09-08 16:37:24 +00003071
Neal Norwitz9eb9b102007-08-27 01:15:33 +00003072def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003073 if support.verbose:
3074 plats = {
3075 'Linux': platform.linux_distribution,
3076 'Mac': platform.mac_ver,
3077 'Windows': platform.win32_ver,
3078 }
3079 for name, func in plats.items():
3080 plat = func()
3081 if plat and plat[0]:
3082 plat = '%s %r' % (name, plat)
3083 break
3084 else:
3085 plat = repr(platform.platform())
3086 print("test_ssl: testing with %r %r" %
3087 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3088 print(" under %s" % plat)
3089 print(" HAS_SNI = %r" % ssl.HAS_SNI)
3090 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3091 try:
3092 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3093 except AttributeError:
3094 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00003095
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003096 for filename in [
Martin Panter71202bb2016-01-15 00:25:29 +00003097 CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003098 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
3099 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
3100 BADCERT, BADKEY, EMPTYCERT]:
3101 if not os.path.exists(filename):
3102 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003103
Benjamin Peterson2f334562014-10-01 23:53:01 -04003104 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003105
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003106 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00003107 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00003108
Bill Janssen98d19da2007-09-10 21:51:02 +00003109 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003110 thread_info = support.threading_setup()
3111 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00003112 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003113
Antoine Pitrou3945c862010-04-28 21:11:01 +00003114 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003115 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00003116 finally:
3117 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05003118 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00003119
3120if __name__ == "__main__":
3121 test_main()