blob: b31bd983cbdbdda9dcd9b5d478b597bf2a0908bd [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Bill Janssen934b16d2008-06-28 22:19:33 +00007import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import socket
Bill Janssen934b16d2008-06-28 22:19:33 +00009import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000010import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050011import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000012import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import os
Antoine Pitroub558f172010-04-23 23:25:45 +000014import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000015import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050016import tempfile
17import urllib
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000018import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000019import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000020import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050021import functools
22from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000023
Benjamin Petersondaeb9252014-08-20 14:14:50 -050024ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000025
Benjamin Petersondaeb9252014-08-20 14:14:50 -050026PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
27HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Benjamin Petersondaeb9252014-08-20 14:14:50 -050029def data_file(*name):
30 return os.path.join(os.path.dirname(__file__), *name)
31
32# The custom key and certificate files used in test_ssl are generated
33# using Lib/test/make_ssl_certs.py.
34# Other certificates are simply fetched from the Internet servers they
35# are meant to authenticate.
36
37CERTFILE = data_file("keycert.pem")
38BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
39ONLYCERT = data_file("ssl_cert.pem")
40ONLYKEY = data_file("ssl_key.pem")
41BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
42BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
43CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
44ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
45KEY_PASSWORD = "somepass"
46CAPATH = data_file("capath")
47BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
48CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
49CAFILE_CACERT = data_file("capath", "5ed36f99.0")
50
51
52# empty CRL
53CRLFILE = data_file("revocation.crl")
54
55# Two keys and certs signed by the same CA (for SNI tests)
56SIGNED_CERTFILE = data_file("keycert3.pem")
57SIGNED_CERTFILE2 = data_file("keycert4.pem")
58SIGNING_CA = data_file("pycacert.pem")
59
60SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
61
62EMPTYCERT = data_file("nullcert.pem")
63BADCERT = data_file("badcert.pem")
64WRONGCERT = data_file("XXXnonexisting.pem")
65BADKEY = data_file("badkey.pem")
66NOKIACERT = data_file("nokia.pem")
67NULLBYTECERT = data_file("nullbytecert.pem")
68
69DHFILE = data_file("dh512.pem")
70BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
71
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000072
Neal Norwitz3e533c22007-08-27 01:03:18 +000073def handle_error(prefix):
74 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050075 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000077
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000078
79class BasicTests(unittest.TestCase):
80
Antoine Pitrou3945c862010-04-28 21:11:01 +000081 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000082 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000083 try:
84 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
85 except IOError, e:
86 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
87 pass
88 else:
89 raise
90 try:
91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
92 except IOError, e:
93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94 pass
95 else:
96 raise
Benjamin Petersondaeb9252014-08-20 14:14:50 -050097def can_clear_options():
98 # 0.9.8m or higher
99 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
100
101def no_sslv2_implies_sslv3_hello():
102 # 0.9.7h or higher
103 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
104
105def have_verify_flags():
106 # 0.9.8 or higher
107 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
108
109def utc_offset(): #NOTE: ignore issues like #1647654
110 # local time = utc time + utc offset
111 if time.daylight and time.localtime().tm_isdst > 0:
112 return -time.altzone # seconds
113 return -time.timezone
114
115def asn1time(cert_time):
116 # Some versions of OpenSSL ignore seconds, see #18207
117 # 0.9.8.i
118 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
119 fmt = "%b %d %H:%M:%S %Y GMT"
120 dt = datetime.datetime.strptime(cert_time, fmt)
121 dt = dt.replace(second=0)
122 cert_time = dt.strftime(fmt)
123 # %d adds leading zero but ASN1_TIME_print() uses leading space
124 if cert_time[4] == "0":
125 cert_time = cert_time[:4] + " " + cert_time[5:]
126
127 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000128
Antoine Pitroud75efd92010-08-04 17:38:33 +0000129# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
130def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200131 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200132 @functools.wraps(func)
133 def f(*args, **kwargs):
134 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500135 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
136 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200137 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500138 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200139 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
140 return func(*args, **kwargs)
141 return f
142 else:
143 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000144
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500145needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
146
Antoine Pitroud75efd92010-08-04 17:38:33 +0000147
148class BasicSocketTests(unittest.TestCase):
149
Antoine Pitrou3945c862010-04-28 21:11:01 +0000150 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000151 ssl.CERT_NONE
152 ssl.CERT_OPTIONAL
153 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500154 ssl.OP_CIPHER_SERVER_PREFERENCE
155 ssl.OP_SINGLE_DH_USE
156 if ssl.HAS_ECDH:
157 ssl.OP_SINGLE_ECDH_USE
158 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
159 ssl.OP_NO_COMPRESSION
160 self.assertIn(ssl.HAS_SNI, {True, False})
161 self.assertIn(ssl.HAS_ECDH, {True, False})
162
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000163
Antoine Pitrou3945c862010-04-28 21:11:01 +0000164 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000165 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500166 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 sys.stdout.write("\n RAND_status is %d (%s)\n"
168 % (v, (v and "sufficient randomness") or
169 "insufficient randomness"))
Jesus Ceaa8a5b392012-09-11 01:55:04 +0200170 self.assertRaises(TypeError, ssl.RAND_egd, 1)
171 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000172 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000173
Antoine Pitrou3945c862010-04-28 21:11:01 +0000174 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000175 # note that this uses an 'unofficial' function in _ssl.c,
176 # provided solely for this test, to exercise the certificate
177 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500178 p = ssl._ssl._test_decode_cert(CERTFILE)
179 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000180 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500181 self.assertEqual(p['issuer'],
182 ((('countryName', 'XY'),),
183 (('localityName', 'Castle Anthrax'),),
184 (('organizationName', 'Python Software Foundation'),),
185 (('commonName', 'localhost'),))
186 )
187 # Note the next three asserts will fail if the keys are regenerated
188 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
189 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
190 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200191 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100192 ((('countryName', 'XY'),),
193 (('localityName', 'Castle Anthrax'),),
194 (('organizationName', 'Python Software Foundation'),),
195 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200196 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100197 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200198 # Issue #13034: the subjectAltName in some certificates
199 # (notably projects.developer.nokia.com:443) wasn't parsed
200 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500201 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200202 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
203 self.assertEqual(p['subjectAltName'],
204 (('DNS', 'projects.developer.nokia.com'),
205 ('DNS', 'projects.forum.nokia.com'))
206 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500207 # extra OCSP and AIA fields
208 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
209 self.assertEqual(p['caIssuers'],
210 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
211 self.assertEqual(p['crlDistributionPoints'],
212 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000213
Christian Heimes88b174c2013-08-17 00:54:47 +0200214 def test_parse_cert_CVE_2013_4238(self):
215 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500216 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200217 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
218 subject = ((('countryName', 'US'),),
219 (('stateOrProvinceName', 'Oregon'),),
220 (('localityName', 'Beaverton'),),
221 (('organizationName', 'Python Software Foundation'),),
222 (('organizationalUnitName', 'Python Core Development'),),
223 (('commonName', 'null.python.org\x00example.org'),),
224 (('emailAddress', 'python-dev@python.org'),))
225 self.assertEqual(p['subject'], subject)
226 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500227 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200228 san = (('DNS', 'altnull.python.org\x00example.com'),
229 ('email', 'null@python.org\x00user@example.org'),
230 ('URI', 'http://null.python.org\x00http://example.org'),
231 ('IP Address', '192.0.2.1'),
232 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
233 else:
234 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
235 san = (('DNS', 'altnull.python.org\x00example.com'),
236 ('email', 'null@python.org\x00user@example.org'),
237 ('URI', 'http://null.python.org\x00http://example.org'),
238 ('IP Address', '192.0.2.1'),
239 ('IP Address', '<invalid>'))
240
241 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200242
Antoine Pitrou3945c862010-04-28 21:11:01 +0000243 def test_DER_to_PEM(self):
244 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
245 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000246 d1 = ssl.PEM_cert_to_DER_cert(pem)
247 p2 = ssl.DER_cert_to_PEM_cert(d1)
248 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000249 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000250 if not p2.startswith(ssl.PEM_HEADER + '\n'):
251 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
252 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
253 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000254
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000255 def test_openssl_version(self):
256 n = ssl.OPENSSL_VERSION_NUMBER
257 t = ssl.OPENSSL_VERSION_INFO
258 s = ssl.OPENSSL_VERSION
259 self.assertIsInstance(n, (int, long))
260 self.assertIsInstance(t, tuple)
261 self.assertIsInstance(s, str)
262 # Some sanity checks follow
263 # >= 0.9
264 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400265 # < 3.0
266 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000267 major, minor, fix, patch, status = t
268 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400269 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000270 self.assertGreaterEqual(minor, 0)
271 self.assertLess(minor, 256)
272 self.assertGreaterEqual(fix, 0)
273 self.assertLess(fix, 256)
274 self.assertGreaterEqual(patch, 0)
275 self.assertLessEqual(patch, 26)
276 self.assertGreaterEqual(status, 0)
277 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400278 # Version string as returned by {Open,Libre}SSL, the format might change
279 if "LibreSSL" in s:
280 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
281 (s, t))
282 else:
283 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
284 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000285
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500286 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000287 def test_refcycle(self):
288 # Issue #7943: an SSL object doesn't create reference cycles with
289 # itself.
290 s = socket.socket(socket.AF_INET)
291 ss = ssl.wrap_socket(s)
292 wr = weakref.ref(ss)
293 del ss
294 self.assertEqual(wr(), None)
295
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000296 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500297 # Methods on an unconnected SSLSocket propagate the original
298 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000299 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500300 with closing(ssl.wrap_socket(s)) as ss:
301 self.assertRaises(socket.error, ss.recv, 1)
302 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
303 self.assertRaises(socket.error, ss.recvfrom, 1)
304 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
305 self.assertRaises(socket.error, ss.send, b'x')
306 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
307
308 def test_timeout(self):
309 # Issue #8524: when creating an SSL socket, the timeout of the
310 # original socket should be retained.
311 for timeout in (None, 0.0, 5.0):
312 s = socket.socket(socket.AF_INET)
313 s.settimeout(timeout)
314 with closing(ssl.wrap_socket(s)) as ss:
315 self.assertEqual(timeout, ss.gettimeout())
316
317 def test_errors(self):
318 sock = socket.socket()
319 self.assertRaisesRegexp(ValueError,
320 "certfile must be specified",
321 ssl.wrap_socket, sock, keyfile=CERTFILE)
322 self.assertRaisesRegexp(ValueError,
323 "certfile must be specified for server-side operations",
324 ssl.wrap_socket, sock, server_side=True)
325 self.assertRaisesRegexp(ValueError,
326 "certfile must be specified for server-side operations",
327 ssl.wrap_socket, sock, server_side=True, certfile="")
328 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
329 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
330 s.connect, (HOST, 8080))
331 with self.assertRaises(IOError) as cm:
332 with closing(socket.socket()) as sock:
333 ssl.wrap_socket(sock, certfile=WRONGCERT)
334 self.assertEqual(cm.exception.errno, errno.ENOENT)
335 with self.assertRaises(IOError) as cm:
336 with closing(socket.socket()) as sock:
337 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
338 self.assertEqual(cm.exception.errno, errno.ENOENT)
339 with self.assertRaises(IOError) as cm:
340 with closing(socket.socket()) as sock:
341 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
342 self.assertEqual(cm.exception.errno, errno.ENOENT)
343
344 def test_match_hostname(self):
345 def ok(cert, hostname):
346 ssl.match_hostname(cert, hostname)
347 def fail(cert, hostname):
348 self.assertRaises(ssl.CertificateError,
349 ssl.match_hostname, cert, hostname)
350
351 cert = {'subject': ((('commonName', 'example.com'),),)}
352 ok(cert, 'example.com')
353 ok(cert, 'ExAmple.cOm')
354 fail(cert, 'www.example.com')
355 fail(cert, '.example.com')
356 fail(cert, 'example.org')
357 fail(cert, 'exampleXcom')
358
359 cert = {'subject': ((('commonName', '*.a.com'),),)}
360 ok(cert, 'foo.a.com')
361 fail(cert, 'bar.foo.a.com')
362 fail(cert, 'a.com')
363 fail(cert, 'Xa.com')
364 fail(cert, '.a.com')
365
366 # only match one left-most wildcard
367 cert = {'subject': ((('commonName', 'f*.com'),),)}
368 ok(cert, 'foo.com')
369 ok(cert, 'f.com')
370 fail(cert, 'bar.com')
371 fail(cert, 'foo.a.com')
372 fail(cert, 'bar.foo.com')
373
374 # NULL bytes are bad, CVE-2013-4073
375 cert = {'subject': ((('commonName',
376 'null.python.org\x00example.org'),),)}
377 ok(cert, 'null.python.org\x00example.org') # or raise an error?
378 fail(cert, 'example.org')
379 fail(cert, 'null.python.org')
380
381 # error cases with wildcards
382 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
383 fail(cert, 'bar.foo.a.com')
384 fail(cert, 'a.com')
385 fail(cert, 'Xa.com')
386 fail(cert, '.a.com')
387
388 cert = {'subject': ((('commonName', 'a.*.com'),),)}
389 fail(cert, 'a.foo.com')
390 fail(cert, 'a..com')
391 fail(cert, 'a.com')
392
393 # wildcard doesn't match IDNA prefix 'xn--'
394 idna = u'püthon.python.org'.encode("idna").decode("ascii")
395 cert = {'subject': ((('commonName', idna),),)}
396 ok(cert, idna)
397 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
398 fail(cert, idna)
399 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
400 fail(cert, idna)
401
402 # wildcard in first fragment and IDNA A-labels in sequent fragments
403 # are supported.
404 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
405 cert = {'subject': ((('commonName', idna),),)}
406 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
407 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
408 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
409 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
410
411 # Slightly fake real-world example
412 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
413 'subject': ((('commonName', 'linuxfrz.org'),),),
414 'subjectAltName': (('DNS', 'linuxfr.org'),
415 ('DNS', 'linuxfr.com'),
416 ('othername', '<unsupported>'))}
417 ok(cert, 'linuxfr.org')
418 ok(cert, 'linuxfr.com')
419 # Not a "DNS" entry
420 fail(cert, '<unsupported>')
421 # When there is a subjectAltName, commonName isn't used
422 fail(cert, 'linuxfrz.org')
423
424 # A pristine real-world example
425 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
426 'subject': ((('countryName', 'US'),),
427 (('stateOrProvinceName', 'California'),),
428 (('localityName', 'Mountain View'),),
429 (('organizationName', 'Google Inc'),),
430 (('commonName', 'mail.google.com'),))}
431 ok(cert, 'mail.google.com')
432 fail(cert, 'gmail.com')
433 # Only commonName is considered
434 fail(cert, 'California')
435
436 # Neither commonName nor subjectAltName
437 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
438 'subject': ((('countryName', 'US'),),
439 (('stateOrProvinceName', 'California'),),
440 (('localityName', 'Mountain View'),),
441 (('organizationName', 'Google Inc'),))}
442 fail(cert, 'mail.google.com')
443
444 # No DNS entry in subjectAltName but a commonName
445 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
446 'subject': ((('countryName', 'US'),),
447 (('stateOrProvinceName', 'California'),),
448 (('localityName', 'Mountain View'),),
449 (('commonName', 'mail.google.com'),)),
450 'subjectAltName': (('othername', 'blabla'), )}
451 ok(cert, 'mail.google.com')
452
453 # No DNS entry subjectAltName and no commonName
454 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
455 'subject': ((('countryName', 'US'),),
456 (('stateOrProvinceName', 'California'),),
457 (('localityName', 'Mountain View'),),
458 (('organizationName', 'Google Inc'),)),
459 'subjectAltName': (('othername', 'blabla'),)}
460 fail(cert, 'google.com')
461
462 # Empty cert / no cert
463 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
464 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
465
466 # Issue #17980: avoid denials of service by refusing more than one
467 # wildcard per fragment.
468 cert = {'subject': ((('commonName', 'a*b.com'),),)}
469 ok(cert, 'axxb.com')
470 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
471 fail(cert, 'axxb.com')
472 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
473 with self.assertRaises(ssl.CertificateError) as cm:
474 ssl.match_hostname(cert, 'axxbxxc.com')
475 self.assertIn("too many wildcards", str(cm.exception))
476
477 def test_server_side(self):
478 # server_hostname doesn't work for server sockets
479 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
480 with closing(socket.socket()) as sock:
481 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
482 server_hostname="some.hostname")
483
484 def test_unknown_channel_binding(self):
485 # should raise ValueError for unknown type
486 s = socket.socket(socket.AF_INET)
487 with closing(ssl.wrap_socket(s)) as ss:
488 with self.assertRaises(ValueError):
489 ss.get_channel_binding("unknown-type")
490
491 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
492 "'tls-unique' channel binding not available")
493 def test_tls_unique_channel_binding(self):
494 # unconnected should return None for known type
495 s = socket.socket(socket.AF_INET)
496 with closing(ssl.wrap_socket(s)) as ss:
497 self.assertIsNone(ss.get_channel_binding("tls-unique"))
498 # the same for server-side
499 s = socket.socket(socket.AF_INET)
500 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
501 self.assertIsNone(ss.get_channel_binding("tls-unique"))
502
503 def test_get_default_verify_paths(self):
504 paths = ssl.get_default_verify_paths()
505 self.assertEqual(len(paths), 6)
506 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
507
508 with support.EnvironmentVarGuard() as env:
509 env["SSL_CERT_DIR"] = CAPATH
510 env["SSL_CERT_FILE"] = CERTFILE
511 paths = ssl.get_default_verify_paths()
512 self.assertEqual(paths.cafile, CERTFILE)
513 self.assertEqual(paths.capath, CAPATH)
514
515 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
516 def test_enum_certificates(self):
517 self.assertTrue(ssl.enum_certificates("CA"))
518 self.assertTrue(ssl.enum_certificates("ROOT"))
519
520 self.assertRaises(TypeError, ssl.enum_certificates)
521 self.assertRaises(WindowsError, ssl.enum_certificates, "")
522
523 trust_oids = set()
524 for storename in ("CA", "ROOT"):
525 store = ssl.enum_certificates(storename)
526 self.assertIsInstance(store, list)
527 for element in store:
528 self.assertIsInstance(element, tuple)
529 self.assertEqual(len(element), 3)
530 cert, enc, trust = element
531 self.assertIsInstance(cert, bytes)
532 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
533 self.assertIsInstance(trust, (set, bool))
534 if isinstance(trust, set):
535 trust_oids.update(trust)
536
537 serverAuth = "1.3.6.1.5.5.7.3.1"
538 self.assertIn(serverAuth, trust_oids)
539
540 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
541 def test_enum_crls(self):
542 self.assertTrue(ssl.enum_crls("CA"))
543 self.assertRaises(TypeError, ssl.enum_crls)
544 self.assertRaises(WindowsError, ssl.enum_crls, "")
545
546 crls = ssl.enum_crls("CA")
547 self.assertIsInstance(crls, list)
548 for element in crls:
549 self.assertIsInstance(element, tuple)
550 self.assertEqual(len(element), 2)
551 self.assertIsInstance(element[0], bytes)
552 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
553
554
555 def test_asn1object(self):
556 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
557 '1.3.6.1.5.5.7.3.1')
558
559 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
560 self.assertEqual(val, expected)
561 self.assertEqual(val.nid, 129)
562 self.assertEqual(val.shortname, 'serverAuth')
563 self.assertEqual(val.longname, 'TLS Web Server Authentication')
564 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
565 self.assertIsInstance(val, ssl._ASN1Object)
566 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
567
568 val = ssl._ASN1Object.fromnid(129)
569 self.assertEqual(val, expected)
570 self.assertIsInstance(val, ssl._ASN1Object)
571 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
572 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
573 ssl._ASN1Object.fromnid(100000)
574 for i in range(1000):
575 try:
576 obj = ssl._ASN1Object.fromnid(i)
577 except ValueError:
578 pass
579 else:
580 self.assertIsInstance(obj.nid, int)
581 self.assertIsInstance(obj.shortname, str)
582 self.assertIsInstance(obj.longname, str)
583 self.assertIsInstance(obj.oid, (str, type(None)))
584
585 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
586 self.assertEqual(val, expected)
587 self.assertIsInstance(val, ssl._ASN1Object)
588 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
589 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
590 expected)
591 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
592 ssl._ASN1Object.fromname('serverauth')
593
594 def test_purpose_enum(self):
595 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
596 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
597 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
598 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
599 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
600 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
601 '1.3.6.1.5.5.7.3.1')
602
603 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
604 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
605 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
606 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
607 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
608 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
609 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000610
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100611 def test_unsupported_dtls(self):
612 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
613 self.addCleanup(s.close)
614 with self.assertRaises(NotImplementedError) as cx:
615 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
616 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500617 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
618 with self.assertRaises(NotImplementedError) as cx:
619 ctx.wrap_socket(s)
620 self.assertEqual(str(cx.exception), "only stream sockets are supported")
621
622 def cert_time_ok(self, timestring, timestamp):
623 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
624
625 def cert_time_fail(self, timestring):
626 with self.assertRaises(ValueError):
627 ssl.cert_time_to_seconds(timestring)
628
629 @unittest.skipUnless(utc_offset(),
630 'local time needs to be different from UTC')
631 def test_cert_time_to_seconds_timezone(self):
632 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
633 # results if local timezone is not UTC
634 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
635 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
636
637 def test_cert_time_to_seconds(self):
638 timestring = "Jan 5 09:34:43 2018 GMT"
639 ts = 1515144883.0
640 self.cert_time_ok(timestring, ts)
641 # accept keyword parameter, assert its name
642 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
643 # accept both %e and %d (space or zero generated by strftime)
644 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
645 # case-insensitive
646 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
647 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
648 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
649 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
650 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
651 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
652 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
653 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
654
655 newyear_ts = 1230768000.0
656 # leap seconds
657 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
658 # same timestamp
659 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
660
661 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
662 # allow 60th second (even if it is not a leap second)
663 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
664 # allow 2nd leap second for compatibility with time.strptime()
665 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
666 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
667
668 # no special treatement for the special value:
669 # 99991231235959Z (rfc 5280)
670 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
671
672 @support.run_with_locale('LC_ALL', '')
673 def test_cert_time_to_seconds_locale(self):
674 # `cert_time_to_seconds()` should be locale independent
675
676 def local_february_name():
677 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
678
679 if local_february_name().lower() == 'feb':
680 self.skipTest("locale-specific month name needs to be "
681 "different from C locale")
682
683 # locale-independent
684 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
685 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
686
687
688class ContextTests(unittest.TestCase):
689
690 @skip_if_broken_ubuntu_ssl
691 def test_constructor(self):
692 for protocol in PROTOCOLS:
693 ssl.SSLContext(protocol)
694 self.assertRaises(TypeError, ssl.SSLContext)
695 self.assertRaises(ValueError, ssl.SSLContext, -1)
696 self.assertRaises(ValueError, ssl.SSLContext, 42)
697
698 @skip_if_broken_ubuntu_ssl
699 def test_protocol(self):
700 for proto in PROTOCOLS:
701 ctx = ssl.SSLContext(proto)
702 self.assertEqual(ctx.protocol, proto)
703
704 def test_ciphers(self):
705 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
706 ctx.set_ciphers("ALL")
707 ctx.set_ciphers("DEFAULT")
708 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
709 ctx.set_ciphers("^$:,;?*'dorothyx")
710
711 @skip_if_broken_ubuntu_ssl
712 def test_options(self):
713 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
714 # OP_ALL | OP_NO_SSLv2 is the default value
715 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
716 ctx.options)
717 ctx.options |= ssl.OP_NO_SSLv3
718 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
719 ctx.options)
720 if can_clear_options():
721 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
722 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
723 ctx.options)
724 ctx.options = 0
725 self.assertEqual(0, ctx.options)
726 else:
727 with self.assertRaises(ValueError):
728 ctx.options = 0
729
730 def test_verify_mode(self):
731 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
732 # Default value
733 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
734 ctx.verify_mode = ssl.CERT_OPTIONAL
735 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
736 ctx.verify_mode = ssl.CERT_REQUIRED
737 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
738 ctx.verify_mode = ssl.CERT_NONE
739 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
740 with self.assertRaises(TypeError):
741 ctx.verify_mode = None
742 with self.assertRaises(ValueError):
743 ctx.verify_mode = 42
744
745 @unittest.skipUnless(have_verify_flags(),
746 "verify_flags need OpenSSL > 0.9.8")
747 def test_verify_flags(self):
748 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
749 # default value by OpenSSL
750 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
751 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
752 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
753 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
754 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
755 ctx.verify_flags = ssl.VERIFY_DEFAULT
756 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
757 # supports any value
758 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
759 self.assertEqual(ctx.verify_flags,
760 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
761 with self.assertRaises(TypeError):
762 ctx.verify_flags = None
763
764 def test_load_cert_chain(self):
765 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
766 # Combined key and cert in a single file
767 ctx.load_cert_chain(CERTFILE)
768 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
769 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
770 with self.assertRaises(IOError) as cm:
771 ctx.load_cert_chain(WRONGCERT)
772 self.assertEqual(cm.exception.errno, errno.ENOENT)
773 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
774 ctx.load_cert_chain(BADCERT)
775 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
776 ctx.load_cert_chain(EMPTYCERT)
777 # Separate key and cert
778 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
779 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
780 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
781 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
782 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
783 ctx.load_cert_chain(ONLYCERT)
784 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
785 ctx.load_cert_chain(ONLYKEY)
786 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
787 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
788 # Mismatching key and cert
789 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
790 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
791 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
792 # Password protected key and cert
793 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
794 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
795 ctx.load_cert_chain(CERTFILE_PROTECTED,
796 password=bytearray(KEY_PASSWORD.encode()))
797 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
798 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
799 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
800 bytearray(KEY_PASSWORD.encode()))
801 with self.assertRaisesRegexp(TypeError, "should be a string"):
802 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
803 with self.assertRaises(ssl.SSLError):
804 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
805 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
806 # openssl has a fixed limit on the password buffer.
807 # PEM_BUFSIZE is generally set to 1kb.
808 # Return a string larger than this.
809 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
810 # Password callback
811 def getpass_unicode():
812 return KEY_PASSWORD
813 def getpass_bytes():
814 return KEY_PASSWORD.encode()
815 def getpass_bytearray():
816 return bytearray(KEY_PASSWORD.encode())
817 def getpass_badpass():
818 return "badpass"
819 def getpass_huge():
820 return b'a' * (1024 * 1024)
821 def getpass_bad_type():
822 return 9
823 def getpass_exception():
824 raise Exception('getpass error')
825 class GetPassCallable:
826 def __call__(self):
827 return KEY_PASSWORD
828 def getpass(self):
829 return KEY_PASSWORD
830 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
831 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
832 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
833 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
834 ctx.load_cert_chain(CERTFILE_PROTECTED,
835 password=GetPassCallable().getpass)
836 with self.assertRaises(ssl.SSLError):
837 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
838 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
839 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
840 with self.assertRaisesRegexp(TypeError, "must return a string"):
841 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
842 with self.assertRaisesRegexp(Exception, "getpass error"):
843 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
844 # Make sure the password function isn't called if it isn't needed
845 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
846
847 def test_load_verify_locations(self):
848 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
849 ctx.load_verify_locations(CERTFILE)
850 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
851 ctx.load_verify_locations(BYTES_CERTFILE)
852 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
853 self.assertRaises(TypeError, ctx.load_verify_locations)
854 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
855 with self.assertRaises(IOError) as cm:
856 ctx.load_verify_locations(WRONGCERT)
857 self.assertEqual(cm.exception.errno, errno.ENOENT)
858 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
859 ctx.load_verify_locations(BADCERT)
860 ctx.load_verify_locations(CERTFILE, CAPATH)
861 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
862
863 # Issue #10989: crash if the second argument type is invalid
864 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
865
866 def test_load_verify_cadata(self):
867 # test cadata
868 with open(CAFILE_CACERT) as f:
869 cacert_pem = f.read().decode("ascii")
870 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
871 with open(CAFILE_NEURONIO) as f:
872 neuronio_pem = f.read().decode("ascii")
873 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
874
875 # test PEM
876 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
877 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
878 ctx.load_verify_locations(cadata=cacert_pem)
879 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
880 ctx.load_verify_locations(cadata=neuronio_pem)
881 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
882 # cert already in hash table
883 ctx.load_verify_locations(cadata=neuronio_pem)
884 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
885
886 # combined
887 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
888 combined = "\n".join((cacert_pem, neuronio_pem))
889 ctx.load_verify_locations(cadata=combined)
890 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
891
892 # with junk around the certs
893 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
894 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
895 neuronio_pem, "tail"]
896 ctx.load_verify_locations(cadata="\n".join(combined))
897 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
898
899 # test DER
900 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
901 ctx.load_verify_locations(cadata=cacert_der)
902 ctx.load_verify_locations(cadata=neuronio_der)
903 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
904 # cert already in hash table
905 ctx.load_verify_locations(cadata=cacert_der)
906 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
907
908 # combined
909 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
910 combined = b"".join((cacert_der, neuronio_der))
911 ctx.load_verify_locations(cadata=combined)
912 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
913
914 # error cases
915 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
916 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
917
918 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
919 ctx.load_verify_locations(cadata=u"broken")
920 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
921 ctx.load_verify_locations(cadata=b"broken")
922
923
924 def test_load_dh_params(self):
925 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
926 ctx.load_dh_params(DHFILE)
927 if os.name != 'nt':
928 ctx.load_dh_params(BYTES_DHFILE)
929 self.assertRaises(TypeError, ctx.load_dh_params)
930 self.assertRaises(TypeError, ctx.load_dh_params, None)
931 with self.assertRaises(IOError) as cm:
932 ctx.load_dh_params(WRONGCERT)
933 self.assertEqual(cm.exception.errno, errno.ENOENT)
934 with self.assertRaises(ssl.SSLError) as cm:
935 ctx.load_dh_params(CERTFILE)
936
937 @skip_if_broken_ubuntu_ssl
938 def test_session_stats(self):
939 for proto in PROTOCOLS:
940 ctx = ssl.SSLContext(proto)
941 self.assertEqual(ctx.session_stats(), {
942 'number': 0,
943 'connect': 0,
944 'connect_good': 0,
945 'connect_renegotiate': 0,
946 'accept': 0,
947 'accept_good': 0,
948 'accept_renegotiate': 0,
949 'hits': 0,
950 'misses': 0,
951 'timeouts': 0,
952 'cache_full': 0,
953 })
954
955 def test_set_default_verify_paths(self):
956 # There's not much we can do to test that it acts as expected,
957 # so just check it doesn't crash or raise an exception.
958 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
959 ctx.set_default_verify_paths()
960
961 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
962 def test_set_ecdh_curve(self):
963 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
964 ctx.set_ecdh_curve("prime256v1")
965 ctx.set_ecdh_curve(b"prime256v1")
966 self.assertRaises(TypeError, ctx.set_ecdh_curve)
967 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
968 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
969 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
970
971 @needs_sni
972 def test_sni_callback(self):
973 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
974
975 # set_servername_callback expects a callable, or None
976 self.assertRaises(TypeError, ctx.set_servername_callback)
977 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
978 self.assertRaises(TypeError, ctx.set_servername_callback, "")
979 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
980
981 def dummycallback(sock, servername, ctx):
982 pass
983 ctx.set_servername_callback(None)
984 ctx.set_servername_callback(dummycallback)
985
986 @needs_sni
987 def test_sni_callback_refcycle(self):
988 # Reference cycles through the servername callback are detected
989 # and cleared.
990 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
991 def dummycallback(sock, servername, ctx, cycle=ctx):
992 pass
993 ctx.set_servername_callback(dummycallback)
994 wr = weakref.ref(ctx)
995 del ctx, dummycallback
996 gc.collect()
997 self.assertIs(wr(), None)
998
999 def test_cert_store_stats(self):
1000 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1001 self.assertEqual(ctx.cert_store_stats(),
1002 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1003 ctx.load_cert_chain(CERTFILE)
1004 self.assertEqual(ctx.cert_store_stats(),
1005 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1006 ctx.load_verify_locations(CERTFILE)
1007 self.assertEqual(ctx.cert_store_stats(),
1008 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1009 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1010 self.assertEqual(ctx.cert_store_stats(),
1011 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1012
1013 def test_get_ca_certs(self):
1014 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1015 self.assertEqual(ctx.get_ca_certs(), [])
1016 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1017 ctx.load_verify_locations(CERTFILE)
1018 self.assertEqual(ctx.get_ca_certs(), [])
1019 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1020 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1021 self.assertEqual(ctx.get_ca_certs(),
1022 [{'issuer': ((('organizationName', 'Root CA'),),
1023 (('organizationalUnitName', 'http://www.cacert.org'),),
1024 (('commonName', 'CA Cert Signing Authority'),),
1025 (('emailAddress', 'support@cacert.org'),)),
1026 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1027 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1028 'serialNumber': '00',
1029 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1030 'subject': ((('organizationName', 'Root CA'),),
1031 (('organizationalUnitName', 'http://www.cacert.org'),),
1032 (('commonName', 'CA Cert Signing Authority'),),
1033 (('emailAddress', 'support@cacert.org'),)),
1034 'version': 3}])
1035
1036 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1037 pem = f.read()
1038 der = ssl.PEM_cert_to_DER_cert(pem)
1039 self.assertEqual(ctx.get_ca_certs(True), [der])
1040
1041 def test_load_default_certs(self):
1042 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1043 ctx.load_default_certs()
1044
1045 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1046 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1047 ctx.load_default_certs()
1048
1049 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1050 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1051
1052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1053 self.assertRaises(TypeError, ctx.load_default_certs, None)
1054 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1055
1056 def test_create_default_context(self):
1057 ctx = ssl.create_default_context()
1058 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1059 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1060 self.assertTrue(ctx.check_hostname)
1061 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1062 self.assertEqual(
1063 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1064 getattr(ssl, "OP_NO_COMPRESSION", 0),
1065 )
1066
1067 with open(SIGNING_CA) as f:
1068 cadata = f.read().decode("ascii")
1069 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1070 cadata=cadata)
1071 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1072 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1073 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1074 self.assertEqual(
1075 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1076 getattr(ssl, "OP_NO_COMPRESSION", 0),
1077 )
1078
1079 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1080 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1081 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1082 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1083 self.assertEqual(
1084 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1085 getattr(ssl, "OP_NO_COMPRESSION", 0),
1086 )
1087 self.assertEqual(
1088 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1089 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1090 )
1091 self.assertEqual(
1092 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1093 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1094 )
1095
1096 def test__create_stdlib_context(self):
1097 ctx = ssl._create_stdlib_context()
1098 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1099 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1100 self.assertFalse(ctx.check_hostname)
1101 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1102
1103 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1104 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1105 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1106 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1107
1108 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1109 cert_reqs=ssl.CERT_REQUIRED,
1110 check_hostname=True)
1111 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1112 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1113 self.assertTrue(ctx.check_hostname)
1114 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1115
1116 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1117 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1118 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1119 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1120
1121 def test_check_hostname(self):
1122 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1123 self.assertFalse(ctx.check_hostname)
1124
1125 # Requires CERT_REQUIRED or CERT_OPTIONAL
1126 with self.assertRaises(ValueError):
1127 ctx.check_hostname = True
1128 ctx.verify_mode = ssl.CERT_REQUIRED
1129 self.assertFalse(ctx.check_hostname)
1130 ctx.check_hostname = True
1131 self.assertTrue(ctx.check_hostname)
1132
1133 ctx.verify_mode = ssl.CERT_OPTIONAL
1134 ctx.check_hostname = True
1135 self.assertTrue(ctx.check_hostname)
1136
1137 # Cannot set CERT_NONE with check_hostname enabled
1138 with self.assertRaises(ValueError):
1139 ctx.verify_mode = ssl.CERT_NONE
1140 ctx.check_hostname = False
1141 self.assertFalse(ctx.check_hostname)
1142
1143
1144class SSLErrorTests(unittest.TestCase):
1145
1146 def test_str(self):
1147 # The str() of a SSLError doesn't include the errno
1148 e = ssl.SSLError(1, "foo")
1149 self.assertEqual(str(e), "foo")
1150 self.assertEqual(e.errno, 1)
1151 # Same for a subclass
1152 e = ssl.SSLZeroReturnError(1, "foo")
1153 self.assertEqual(str(e), "foo")
1154 self.assertEqual(e.errno, 1)
1155
1156 def test_lib_reason(self):
1157 # Test the library and reason attributes
1158 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1159 with self.assertRaises(ssl.SSLError) as cm:
1160 ctx.load_dh_params(CERTFILE)
1161 self.assertEqual(cm.exception.library, 'PEM')
1162 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1163 s = str(cm.exception)
1164 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1165
1166 def test_subclass(self):
1167 # Check that the appropriate SSLError subclass is raised
1168 # (this only tests one of them)
1169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1170 with closing(socket.socket()) as s:
1171 s.bind(("127.0.0.1", 0))
1172 s.listen(5)
1173 c = socket.socket()
1174 c.connect(s.getsockname())
1175 c.setblocking(False)
1176 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1177 with self.assertRaises(ssl.SSLWantReadError) as cm:
1178 c.do_handshake()
1179 s = str(cm.exception)
1180 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1181 # For compatibility
1182 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001183
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001184
Bill Janssen934b16d2008-06-28 22:19:33 +00001185class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001186
Antoine Pitrou3945c862010-04-28 21:11:01 +00001187 def test_connect(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001188 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001189 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1190 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001191 try:
1192 s.connect(("svn.python.org", 443))
1193 self.assertEqual({}, s.getpeercert())
1194 finally:
1195 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001196
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001197 # this should fail because we have no verification certs
1198 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1199 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001200 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1201 s.connect, ("svn.python.org", 443))
1202 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001203
1204 # this should succeed because we specify the root cert
1205 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1206 cert_reqs=ssl.CERT_REQUIRED,
1207 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1208 try:
1209 s.connect(("svn.python.org", 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001210 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001211 finally:
1212 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001213
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001214 def test_connect_ex(self):
1215 # Issue #11326: check connect_ex() implementation
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001216 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001217 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1218 cert_reqs=ssl.CERT_REQUIRED,
1219 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1220 try:
1221 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1222 self.assertTrue(s.getpeercert())
1223 finally:
1224 s.close()
1225
1226 def test_non_blocking_connect_ex(self):
1227 # Issue #11326: non-blocking connect_ex() should allow handshake
1228 # to proceed after the socket gets ready.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001229 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001230 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1231 cert_reqs=ssl.CERT_REQUIRED,
1232 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1233 do_handshake_on_connect=False)
1234 try:
1235 s.setblocking(False)
1236 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001237 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1238 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001239 # Wait for connect to finish
1240 select.select([], [s], [], 5.0)
1241 # Non-blocking handshake
1242 while True:
1243 try:
1244 s.do_handshake()
1245 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001246 except ssl.SSLWantReadError:
1247 select.select([s], [], [], 5.0)
1248 except ssl.SSLWantWriteError:
1249 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001250 # SSL established
1251 self.assertTrue(s.getpeercert())
1252 finally:
1253 s.close()
1254
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001255 def test_timeout_connect_ex(self):
1256 # Issue #12065: on a timeout, connect_ex() should return the original
1257 # errno (mimicking the behaviour of non-SSL sockets).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001258 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001259 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1260 cert_reqs=ssl.CERT_REQUIRED,
1261 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1262 do_handshake_on_connect=False)
1263 try:
1264 s.settimeout(0.0000001)
1265 rc = s.connect_ex(('svn.python.org', 443))
1266 if rc == 0:
1267 self.skipTest("svn.python.org responded too quickly")
1268 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1269 finally:
1270 s.close()
1271
1272 def test_connect_ex_error(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001273 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001274 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1275 cert_reqs=ssl.CERT_REQUIRED,
1276 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1277 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001278 rc = s.connect_ex(("svn.python.org", 444))
1279 # Issue #19919: Windows machines or VMs hosted on Windows
1280 # machines sometimes return EWOULDBLOCK.
1281 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001282 finally:
1283 s.close()
1284
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001285 def test_connect_with_context(self):
1286 with support.transient_internet("svn.python.org"):
1287 # Same as test_connect, but with a separately created context
1288 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1289 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1290 s.connect(("svn.python.org", 443))
1291 try:
1292 self.assertEqual({}, s.getpeercert())
1293 finally:
1294 s.close()
1295 # Same with a server hostname
1296 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1297 server_hostname="svn.python.org")
1298 if ssl.HAS_SNI:
1299 s.connect(("svn.python.org", 443))
1300 s.close()
1301 else:
1302 self.assertRaises(ValueError, s.connect, ("svn.python.org", 443))
1303 # This should fail because we have no verification certs
1304 ctx.verify_mode = ssl.CERT_REQUIRED
1305 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1306 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1307 s.connect, ("svn.python.org", 443))
1308 s.close()
1309 # This should succeed because we specify the root cert
1310 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1311 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1312 s.connect(("svn.python.org", 443))
1313 try:
1314 cert = s.getpeercert()
1315 self.assertTrue(cert)
1316 finally:
1317 s.close()
1318
1319 def test_connect_capath(self):
1320 # Verify server certificates using the `capath` argument
1321 # NOTE: the subject hashing algorithm has been changed between
1322 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1323 # contain both versions of each certificate (same content, different
1324 # filename) for this test to be portable across OpenSSL releases.
1325 with support.transient_internet("svn.python.org"):
1326 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1327 ctx.verify_mode = ssl.CERT_REQUIRED
1328 ctx.load_verify_locations(capath=CAPATH)
1329 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1330 s.connect(("svn.python.org", 443))
1331 try:
1332 cert = s.getpeercert()
1333 self.assertTrue(cert)
1334 finally:
1335 s.close()
1336 # Same with a bytes `capath` argument
1337 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1338 ctx.verify_mode = ssl.CERT_REQUIRED
1339 ctx.load_verify_locations(capath=BYTES_CAPATH)
1340 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1341 s.connect(("svn.python.org", 443))
1342 try:
1343 cert = s.getpeercert()
1344 self.assertTrue(cert)
1345 finally:
1346 s.close()
1347
1348 def test_connect_cadata(self):
1349 with open(CAFILE_CACERT) as f:
1350 pem = f.read().decode('ascii')
1351 der = ssl.PEM_cert_to_DER_cert(pem)
1352 with support.transient_internet("svn.python.org"):
1353 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1354 ctx.verify_mode = ssl.CERT_REQUIRED
1355 ctx.load_verify_locations(cadata=pem)
1356 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1357 s.connect(("svn.python.org", 443))
1358 cert = s.getpeercert()
1359 self.assertTrue(cert)
1360
1361 # same with DER
1362 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1363 ctx.verify_mode = ssl.CERT_REQUIRED
1364 ctx.load_verify_locations(cadata=der)
1365 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1366 s.connect(("svn.python.org", 443))
1367 cert = s.getpeercert()
1368 self.assertTrue(cert)
1369
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001370 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1371 def test_makefile_close(self):
1372 # Issue #5238: creating a file-like object with makefile() shouldn't
1373 # delay closing the underlying "real socket" (here tested with its
1374 # file descriptor, hence skipping the test under Windows).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001375 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001376 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1377 ss.connect(("svn.python.org", 443))
1378 fd = ss.fileno()
1379 f = ss.makefile()
1380 f.close()
1381 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001382 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001383 # Closing the SSL socket should close the fd too
1384 ss.close()
1385 gc.collect()
1386 with self.assertRaises(OSError) as e:
1387 os.read(fd, 0)
1388 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001389
Antoine Pitrou3945c862010-04-28 21:11:01 +00001390 def test_non_blocking_handshake(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001391 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001392 s = socket.socket(socket.AF_INET)
1393 s.connect(("svn.python.org", 443))
1394 s.setblocking(False)
1395 s = ssl.wrap_socket(s,
1396 cert_reqs=ssl.CERT_NONE,
1397 do_handshake_on_connect=False)
1398 count = 0
1399 while True:
1400 try:
1401 count += 1
1402 s.do_handshake()
1403 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001404 except ssl.SSLWantReadError:
1405 select.select([s], [], [])
1406 except ssl.SSLWantWriteError:
1407 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001408 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001409 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001410 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001411
Antoine Pitrou3945c862010-04-28 21:11:01 +00001412 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001413 def _test_get_server_certificate(host, port, cert=None):
1414 with support.transient_internet(host):
1415 pem = ssl.get_server_certificate((host, port))
1416 if not pem:
1417 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001418
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001419 try:
1420 pem = ssl.get_server_certificate((host, port),
1421 ca_certs=CERTFILE)
1422 except ssl.SSLError as x:
1423 #should fail
1424 if support.verbose:
1425 sys.stdout.write("%s\n" % x)
1426 else:
1427 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001428
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001429 pem = ssl.get_server_certificate((host, port),
1430 ca_certs=cert)
1431 if not pem:
1432 self.fail("No server certificate on %s:%s!" % (host, port))
1433 if support.verbose:
1434 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1435
1436 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1437 if support.IPV6_ENABLED:
1438 _test_get_server_certificate('ipv6.google.com', 443)
1439
1440 def test_ciphers(self):
1441 remote = ("svn.python.org", 443)
1442 with support.transient_internet(remote[0]):
1443 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1444 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1445 s.connect(remote)
1446 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1447 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1448 s.connect(remote)
1449 # Error checking can happen at instantiation or when connecting
1450 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1451 with closing(socket.socket(socket.AF_INET)) as sock:
1452 s = ssl.wrap_socket(sock,
1453 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1454 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001455
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001456 def test_algorithms(self):
1457 # Issue #8484: all algorithms should be available when verifying a
1458 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001459 # SHA256 was added in OpenSSL 0.9.8
1460 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1461 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001462 # sha256.tbs-internet.com needs SNI to use the correct certificate
1463 if not ssl.HAS_SNI:
1464 self.skipTest("SNI needed for this test")
1465 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001466 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001467 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001468 with support.transient_internet("sha256.tbs-internet.com"):
1469 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1470 ctx.verify_mode = ssl.CERT_REQUIRED
1471 ctx.load_verify_locations(sha256_cert)
1472 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1473 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001474 try:
1475 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001476 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001477 sys.stdout.write("\nCipher with %r is %r\n" %
1478 (remote, s.cipher()))
1479 sys.stdout.write("Certificate is:\n%s\n" %
1480 pprint.pformat(s.getpeercert()))
1481 finally:
1482 s.close()
1483
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001484 def test_get_ca_certs_capath(self):
1485 # capath certs are loaded on request
1486 with support.transient_internet("svn.python.org"):
1487 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1488 ctx.verify_mode = ssl.CERT_REQUIRED
1489 ctx.load_verify_locations(capath=CAPATH)
1490 self.assertEqual(ctx.get_ca_certs(), [])
1491 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1492 s.connect(("svn.python.org", 443))
1493 try:
1494 cert = s.getpeercert()
1495 self.assertTrue(cert)
1496 finally:
1497 s.close()
1498 self.assertEqual(len(ctx.get_ca_certs()), 1)
1499
1500 @needs_sni
1501 def test_context_setget(self):
1502 # Check that the context of a connected socket can be replaced.
1503 with support.transient_internet("svn.python.org"):
1504 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1505 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1506 s = socket.socket(socket.AF_INET)
1507 with closing(ctx1.wrap_socket(s)) as ss:
1508 ss.connect(("svn.python.org", 443))
1509 self.assertIs(ss.context, ctx1)
1510 self.assertIs(ss._sslobj.context, ctx1)
1511 ss.context = ctx2
1512 self.assertIs(ss.context, ctx2)
1513 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001514
Bill Janssen98d19da2007-09-10 21:51:02 +00001515try:
1516 import threading
1517except ImportError:
1518 _have_threads = False
1519else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001520 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001521
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001522 from test.ssl_servers import make_https_server
1523
Bill Janssen98d19da2007-09-10 21:51:02 +00001524 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001525
Bill Janssen98d19da2007-09-10 21:51:02 +00001526 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001527
Bill Janssen98d19da2007-09-10 21:51:02 +00001528 """A mildly complicated class, because we want it to work both
1529 with and without the SSL wrapper around the socket connection, so
1530 that we can test the STARTTLS functionality."""
1531
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001532 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001533 self.server = server
1534 self.running = False
1535 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001536 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001537 self.sock.setblocking(1)
1538 self.sslconn = None
1539 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001540 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001541
Antoine Pitrou3945c862010-04-28 21:11:01 +00001542 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001543 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001544 self.sslconn = self.server.context.wrap_socket(
1545 self.sock, server_side=True)
1546 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
1547 except socket.error as e:
1548 # We treat ConnectionResetError as though it were an
1549 # SSLError - OpenSSL on Ubuntu abruptly closes the
1550 # connection when asked to use an unsupported protocol.
1551 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001552 # XXX Various errors can have happened here, for example
1553 # a mismatching protocol version, an invalid certificate,
1554 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001555 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1556 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001557 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001558 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001559 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001560 self.running = False
1561 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001562 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001563 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001564 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001565 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1566 cert = self.sslconn.getpeercert()
1567 if support.verbose and self.server.chatty:
1568 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1569 cert_binary = self.sslconn.getpeercert(True)
1570 if support.verbose and self.server.chatty:
1571 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1572 cipher = self.sslconn.cipher()
1573 if support.verbose and self.server.chatty:
1574 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1575 sys.stdout.write(" server: selected protocol is now "
1576 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001577 return True
1578
1579 def read(self):
1580 if self.sslconn:
1581 return self.sslconn.read()
1582 else:
1583 return self.sock.recv(1024)
1584
1585 def write(self, bytes):
1586 if self.sslconn:
1587 return self.sslconn.write(bytes)
1588 else:
1589 return self.sock.send(bytes)
1590
1591 def close(self):
1592 if self.sslconn:
1593 self.sslconn.close()
1594 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001595 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001596
Antoine Pitrou3945c862010-04-28 21:11:01 +00001597 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001598 self.running = True
1599 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001600 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001601 return
1602 while self.running:
1603 try:
1604 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001605 stripped = msg.strip()
1606 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001607 # eof, so quit this handler
1608 self.running = False
1609 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001610 elif stripped == b'over':
1611 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001612 sys.stdout.write(" server: client closed connection\n")
1613 self.close()
1614 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001615 elif (self.server.starttls_server and
1616 stripped == b'STARTTLS'):
1617 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001618 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001619 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001620 if not self.wrap_conn():
1621 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001622 elif (self.server.starttls_server and self.sslconn
1623 and stripped == b'ENDTLS'):
1624 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001625 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001626 self.write(b"OK\n")
1627 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001628 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001629 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001630 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001631 elif stripped == b'CB tls-unique':
1632 if support.verbose and self.server.connectionchatty:
1633 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1634 data = self.sslconn.get_channel_binding("tls-unique")
1635 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001636 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001637 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001638 self.server.connectionchatty):
1639 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001640 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1641 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001642 self.write(msg.lower())
1643 except ssl.SSLError:
1644 if self.server.chatty:
1645 handle_error("Test server failure:\n")
1646 self.close()
1647 self.running = False
1648 # normally, we'd just stop here, but for the test
1649 # harness, we want to stop the server
1650 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001651
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001652 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001653 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001654 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001655 npn_protocols=None, ciphers=None, context=None):
1656 if context:
1657 self.context = context
1658 else:
1659 self.context = ssl.SSLContext(ssl_version
1660 if ssl_version is not None
1661 else ssl.PROTOCOL_TLSv1)
1662 self.context.verify_mode = (certreqs if certreqs is not None
1663 else ssl.CERT_NONE)
1664 if cacerts:
1665 self.context.load_verify_locations(cacerts)
1666 if certificate:
1667 self.context.load_cert_chain(certificate)
1668 if npn_protocols:
1669 self.context.set_npn_protocols(npn_protocols)
1670 if ciphers:
1671 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001672 self.chatty = chatty
1673 self.connectionchatty = connectionchatty
1674 self.starttls_server = starttls_server
1675 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001676 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001677 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001678 self.active = False
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001679 self.selected_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001680 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001681 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001682 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001683
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001684 def __enter__(self):
1685 self.start(threading.Event())
1686 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001687 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001688
1689 def __exit__(self, *args):
1690 self.stop()
1691 self.join()
1692
Antoine Pitrou3945c862010-04-28 21:11:01 +00001693 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001694 self.flag = flag
1695 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001696
Antoine Pitrou3945c862010-04-28 21:11:01 +00001697 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001698 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001699 self.sock.listen(5)
1700 self.active = True
1701 if self.flag:
1702 # signal an event
1703 self.flag.set()
1704 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001705 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001706 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001707 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001708 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001709 + repr(connaddr) + '\n')
1710 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001711 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001712 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001713 except socket.timeout:
1714 pass
1715 except KeyboardInterrupt:
1716 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001717 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001718
Antoine Pitrou3945c862010-04-28 21:11:01 +00001719 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001720 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001721
Bill Janssen934b16d2008-06-28 22:19:33 +00001722 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001723
Antoine Pitrou3945c862010-04-28 21:11:01 +00001724 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001725
Antoine Pitrou3945c862010-04-28 21:11:01 +00001726 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001727
1728 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001729 self.socket = ssl.wrap_socket(conn, server_side=True,
1730 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001731 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001732 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001733 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001734 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001735
1736 def readable(self):
1737 if isinstance(self.socket, ssl.SSLSocket):
1738 while self.socket.pending() > 0:
1739 self.handle_read_event()
1740 return True
1741
Antoine Pitroufc69af12010-04-24 20:04:58 +00001742 def _do_ssl_handshake(self):
1743 try:
1744 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001745 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1746 return
1747 except ssl.SSLEOFError:
1748 return self.handle_close()
1749 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001750 raise
1751 except socket.error, err:
1752 if err.args[0] == errno.ECONNABORTED:
1753 return self.handle_close()
1754 else:
1755 self._ssl_accepting = False
1756
Bill Janssen934b16d2008-06-28 22:19:33 +00001757 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001758 if self._ssl_accepting:
1759 self._do_ssl_handshake()
1760 else:
1761 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001762 if support.verbose:
1763 sys.stdout.write(" server: read %s from client\n" % repr(data))
1764 if not data:
1765 self.close()
1766 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001767 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001768
1769 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001770 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001771 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001772 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1773
1774 def handle_error(self):
1775 raise
1776
1777 def __init__(self, certfile):
1778 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001779 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1780 self.port = support.bind_port(sock, '')
1781 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001782 self.listen(5)
1783
1784 def handle_accept(self):
1785 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001786 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001787 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1788 self.ConnectionHandler(sock_obj, self.certfile)
1789
1790 def handle_error(self):
1791 raise
1792
1793 def __init__(self, certfile):
1794 self.flag = None
1795 self.active = False
1796 self.server = self.EchoServer(certfile)
1797 self.port = self.server.port
1798 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001799 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001800
1801 def __str__(self):
1802 return "<%s %s>" % (self.__class__.__name__, self.server)
1803
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001804 def __enter__(self):
1805 self.start(threading.Event())
1806 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001807 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001808
1809 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001810 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001811 sys.stdout.write(" cleanup: stopping server.\n")
1812 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001813 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001814 sys.stdout.write(" cleanup: joining server thread.\n")
1815 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001816 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001817 sys.stdout.write(" cleanup: successfully joined.\n")
1818
Antoine Pitrou3945c862010-04-28 21:11:01 +00001819 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001820 self.flag = flag
1821 threading.Thread.start(self)
1822
Antoine Pitrou3945c862010-04-28 21:11:01 +00001823 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001824 self.active = True
1825 if self.flag:
1826 self.flag.set()
1827 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001828 try:
1829 asyncore.loop(1)
1830 except:
1831 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001832
Antoine Pitrou3945c862010-04-28 21:11:01 +00001833 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001834 self.active = False
1835 self.server.close()
1836
Antoine Pitrou3945c862010-04-28 21:11:01 +00001837 def bad_cert_test(certfile):
1838 """
1839 Launch a server with CERT_REQUIRED, and check that trying to
1840 connect to it with the given client certificate fails.
1841 """
Trent Nelsone41b0062008-04-08 23:47:30 +00001842 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00001843 certreqs=ssl.CERT_REQUIRED,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001844 cacerts=CERTFILE, chatty=False,
1845 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001846 with server:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001847 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001848 with closing(socket.socket()) as sock:
1849 s = ssl.wrap_socket(sock,
1850 certfile=certfile,
1851 ssl_version=ssl.PROTOCOL_TLSv1)
1852 s.connect((HOST, server.port))
1853 except ssl.SSLError as x:
1854 if support.verbose:
1855 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1856 except OSError as x:
1857 if support.verbose:
1858 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1859 except OSError as x:
1860 if x.errno != errno.ENOENT:
1861 raise
1862 if support.verbose:
1863 sys.stdout.write("\OSError is %s\n" % str(x))
Bill Janssen98d19da2007-09-10 21:51:02 +00001864 else:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001865 raise AssertionError("Use of invalid cert should have failed!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001866
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001867 def server_params_test(client_context, server_context, indata=b"FOO\n",
1868 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001869 """
1870 Launch a server, connect a client to it and try various reads
1871 and writes.
1872 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001873 stats = {}
1874 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001875 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001876 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001877 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001878 with closing(client_context.wrap_socket(socket.socket(),
1879 server_hostname=sni_name)) as s:
1880 s.connect((HOST, server.port))
1881 for arg in [indata, bytearray(indata), memoryview(indata)]:
1882 if connectionchatty:
1883 if support.verbose:
1884 sys.stdout.write(
1885 " client: sending %r...\n" % indata)
1886 s.write(arg)
1887 outdata = s.read()
1888 if connectionchatty:
1889 if support.verbose:
1890 sys.stdout.write(" client: read %r\n" % outdata)
1891 if outdata != indata.lower():
1892 raise AssertionError(
1893 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1894 % (outdata[:20], len(outdata),
1895 indata[:20].lower(), len(indata)))
1896 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001897 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001898 if support.verbose:
1899 sys.stdout.write(" client: closing connection.\n")
1900 stats.update({
1901 'compression': s.compression(),
1902 'cipher': s.cipher(),
1903 'peercert': s.getpeercert(),
1904 'client_npn_protocol': s.selected_npn_protocol()
1905 })
1906 s.close()
1907 stats['server_npn_protocols'] = server.selected_protocols
1908 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001909
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001910 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1911 certsreqs=None, server_options=0, client_options=0):
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00001912 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00001913 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00001914 certtype = {
1915 ssl.CERT_NONE: "CERT_NONE",
1916 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1917 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1918 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001919 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001920 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00001921 sys.stdout.write(formatstr %
1922 (ssl.get_protocol_name(client_protocol),
1923 ssl.get_protocol_name(server_protocol),
1924 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001925 client_context = ssl.SSLContext(client_protocol)
1926 client_context.options |= client_options
1927 server_context = ssl.SSLContext(server_protocol)
1928 server_context.options |= server_options
1929
1930 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1931 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1932 # starting from OpenSSL 1.0.0 (see issue #8322).
1933 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1934 client_context.set_ciphers("ALL")
1935
1936 for ctx in (client_context, server_context):
1937 ctx.verify_mode = certsreqs
1938 ctx.load_cert_chain(CERTFILE)
1939 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00001940 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001941 server_params_test(client_context, server_context,
1942 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001943 # Protocol mismatch can result in either an SSLError, or a
1944 # "Connection reset by peer" error.
1945 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001946 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00001947 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00001948 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001949 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00001950 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00001951 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001952 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001953 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00001954 "Client protocol %s succeeded with server protocol %s!"
1955 % (ssl.get_protocol_name(client_protocol),
1956 ssl.get_protocol_name(server_protocol)))
1957
1958
Bill Janssen934b16d2008-06-28 22:19:33 +00001959 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00001960
Antoine Pitroud75efd92010-08-04 17:38:33 +00001961 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00001962 def test_echo(self):
1963 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001964 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001965 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001966 for protocol in PROTOCOLS:
1967 context = ssl.SSLContext(protocol)
1968 context.load_cert_chain(CERTFILE)
1969 server_params_test(context, context,
1970 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00001971
Antoine Pitrou3945c862010-04-28 21:11:01 +00001972 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001973 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001974 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001975 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1976 context.verify_mode = ssl.CERT_REQUIRED
1977 context.load_verify_locations(CERTFILE)
1978 context.load_cert_chain(CERTFILE)
1979 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001980 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001981 s = context.wrap_socket(socket.socket(),
1982 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001983 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001984 # getpeercert() raise ValueError while the handshake isn't
1985 # done.
1986 with self.assertRaises(ValueError):
1987 s.getpeercert()
1988 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00001989 cert = s.getpeercert()
1990 self.assertTrue(cert, "Can't get peer certificate.")
1991 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001992 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00001993 sys.stdout.write(pprint.pformat(cert) + '\n')
1994 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
1995 if 'subject' not in cert:
1996 self.fail("No subject field in certificate: %s." %
1997 pprint.pformat(cert))
1998 if ((('organizationName', 'Python Software Foundation'),)
1999 not in cert['subject']):
2000 self.fail(
2001 "Missing or invalid 'organizationName' field in certificate subject; "
2002 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002003 self.assertIn('notBefore', cert)
2004 self.assertIn('notAfter', cert)
2005 before = ssl.cert_time_to_seconds(cert['notBefore'])
2006 after = ssl.cert_time_to_seconds(cert['notAfter'])
2007 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002008 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002009
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002010 @unittest.skipUnless(have_verify_flags(),
2011 "verify_flags need OpenSSL > 0.9.8")
2012 def test_crl_check(self):
2013 if support.verbose:
2014 sys.stdout.write("\n")
2015
2016 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2017 server_context.load_cert_chain(SIGNED_CERTFILE)
2018
2019 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2020 context.verify_mode = ssl.CERT_REQUIRED
2021 context.load_verify_locations(SIGNING_CA)
2022 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
2023
2024 # VERIFY_DEFAULT should pass
2025 server = ThreadedEchoServer(context=server_context, chatty=True)
2026 with server:
2027 with closing(context.wrap_socket(socket.socket())) as s:
2028 s.connect((HOST, server.port))
2029 cert = s.getpeercert()
2030 self.assertTrue(cert, "Can't get peer certificate.")
2031
2032 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2033 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2034
2035 server = ThreadedEchoServer(context=server_context, chatty=True)
2036 with server:
2037 with closing(context.wrap_socket(socket.socket())) as s:
2038 with self.assertRaisesRegexp(ssl.SSLError,
2039 "certificate verify failed"):
2040 s.connect((HOST, server.port))
2041
2042 # now load a CRL file. The CRL file is signed by the CA.
2043 context.load_verify_locations(CRLFILE)
2044
2045 server = ThreadedEchoServer(context=server_context, chatty=True)
2046 with server:
2047 with closing(context.wrap_socket(socket.socket())) as s:
2048 s.connect((HOST, server.port))
2049 cert = s.getpeercert()
2050 self.assertTrue(cert, "Can't get peer certificate.")
2051
2052 @needs_sni
2053 def test_check_hostname(self):
2054 if support.verbose:
2055 sys.stdout.write("\n")
2056
2057 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2058 server_context.load_cert_chain(SIGNED_CERTFILE)
2059
2060 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2061 context.verify_mode = ssl.CERT_REQUIRED
2062 context.check_hostname = True
2063 context.load_verify_locations(SIGNING_CA)
2064
2065 # correct hostname should verify
2066 server = ThreadedEchoServer(context=server_context, chatty=True)
2067 with server:
2068 with closing(context.wrap_socket(socket.socket(),
2069 server_hostname="localhost")) as s:
2070 s.connect((HOST, server.port))
2071 cert = s.getpeercert()
2072 self.assertTrue(cert, "Can't get peer certificate.")
2073
2074 # incorrect hostname should raise an exception
2075 server = ThreadedEchoServer(context=server_context, chatty=True)
2076 with server:
2077 with closing(context.wrap_socket(socket.socket(),
2078 server_hostname="invalid")) as s:
2079 with self.assertRaisesRegexp(ssl.CertificateError,
2080 "hostname 'invalid' doesn't match u?'localhost'"):
2081 s.connect((HOST, server.port))
2082
2083 # missing server_hostname arg should cause an exception, too
2084 server = ThreadedEchoServer(context=server_context, chatty=True)
2085 with server:
2086 with closing(socket.socket()) as s:
2087 with self.assertRaisesRegexp(ValueError,
2088 "check_hostname requires server_hostname"):
2089 context.wrap_socket(s)
2090
Antoine Pitrou3945c862010-04-28 21:11:01 +00002091 def test_empty_cert(self):
2092 """Connecting with an empty cert file"""
2093 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2094 "nullcert.pem"))
2095 def test_malformed_cert(self):
2096 """Connecting with a badly formatted certificate (syntax error)"""
2097 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2098 "badcert.pem"))
2099 def test_nonexisting_cert(self):
2100 """Connecting with a non-existing cert file"""
2101 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2102 "wrongcert.pem"))
2103 def test_malformed_key(self):
2104 """Connecting with a badly formatted key (syntax error)"""
2105 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2106 "badkey.pem"))
Bill Janssen98d19da2007-09-10 21:51:02 +00002107
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002108 def test_rude_shutdown(self):
2109 """A brutal shutdown of an SSL server should raise an OSError
2110 in the client when attempting handshake.
2111 """
2112 listener_ready = threading.Event()
2113 listener_gone = threading.Event()
2114
2115 s = socket.socket()
2116 port = support.bind_port(s, HOST)
2117
2118 # `listener` runs in a thread. It sits in an accept() until
2119 # the main thread connects. Then it rudely closes the socket,
2120 # and sets Event `listener_gone` to let the main thread know
2121 # the socket is gone.
2122 def listener():
2123 s.listen(5)
2124 listener_ready.set()
2125 newsock, addr = s.accept()
2126 newsock.close()
2127 s.close()
2128 listener_gone.set()
2129
2130 def connector():
2131 listener_ready.wait()
2132 with closing(socket.socket()) as c:
2133 c.connect((HOST, port))
2134 listener_gone.wait()
2135 try:
2136 ssl_sock = ssl.wrap_socket(c)
2137 except ssl.SSLError:
2138 pass
2139 else:
2140 self.fail('connecting to closed SSL socket should have failed')
2141
2142 t = threading.Thread(target=listener)
2143 t.start()
2144 try:
2145 connector()
2146 finally:
2147 t.join()
2148
Antoine Pitroud75efd92010-08-04 17:38:33 +00002149 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002150 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2151 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002152 def test_protocol_sslv2(self):
2153 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002154 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002155 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002156 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2157 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2158 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002159 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002160 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2161 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002162 # SSLv23 client with specific SSL options
2163 if no_sslv2_implies_sslv3_hello():
2164 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2165 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2166 client_options=ssl.OP_NO_SSLv2)
2167 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2168 client_options=ssl.OP_NO_SSLv3)
2169 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2170 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002171
Antoine Pitroud75efd92010-08-04 17:38:33 +00002172 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002173 def test_protocol_sslv23(self):
2174 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002175 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002176 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002177 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2178 try:
2179 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2180 except socket.error as x:
2181 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2182 if support.verbose:
2183 sys.stdout.write(
2184 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2185 % str(x))
Antoine Pitrou3945c862010-04-28 21:11:01 +00002186 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
2187 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
2188 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002189
Antoine Pitrou3945c862010-04-28 21:11:01 +00002190 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
2191 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
2192 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002193
Antoine Pitrou3945c862010-04-28 21:11:01 +00002194 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
2195 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
2196 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002197
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002198 # Server with specific SSL options
2199 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2200 server_options=ssl.OP_NO_SSLv3)
2201 # Will choose TLSv1
2202 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2203 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2204 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2205 server_options=ssl.OP_NO_TLSv1)
2206
2207
Antoine Pitroud75efd92010-08-04 17:38:33 +00002208 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002209 def test_protocol_sslv3(self):
2210 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002211 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002212 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002213 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
2214 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
2215 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002216 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2217 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002218 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2219 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002220 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002221 if no_sslv2_implies_sslv3_hello():
2222 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2223 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, True,
2224 client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002225
Antoine Pitroud75efd92010-08-04 17:38:33 +00002226 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002227 def test_protocol_tlsv1(self):
2228 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002229 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002230 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002231 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
2232 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
2233 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002234 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2235 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002236 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002237 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2238 client_options=ssl.OP_NO_TLSv1)
2239
2240 @skip_if_broken_ubuntu_ssl
2241 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2242 "TLS version 1.1 not supported.")
2243 def test_protocol_tlsv1_1(self):
2244 """Connecting to a TLSv1.1 server with various client options.
2245 Testing against older TLS versions."""
2246 if support.verbose:
2247 sys.stdout.write("\n")
2248 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, True)
2249 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2250 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2251 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2252 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2253 client_options=ssl.OP_NO_TLSv1_1)
2254
2255 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, True)
2256 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2257 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2258
2259
2260 @skip_if_broken_ubuntu_ssl
2261 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2262 "TLS version 1.2 not supported.")
2263 def test_protocol_tlsv1_2(self):
2264 """Connecting to a TLSv1.2 server with various client options.
2265 Testing against older TLS versions."""
2266 if support.verbose:
2267 sys.stdout.write("\n")
2268 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, True,
2269 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2270 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2271 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2272 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2273 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2274 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2275 client_options=ssl.OP_NO_TLSv1_2)
2276
2277 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, True)
2278 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2279 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2280 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2281 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002282
Antoine Pitrou3945c862010-04-28 21:11:01 +00002283 def test_starttls(self):
2284 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002285 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002286
Trent Nelsone41b0062008-04-08 23:47:30 +00002287 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002288 ssl_version=ssl.PROTOCOL_TLSv1,
2289 starttls_server=True,
2290 chatty=True,
2291 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002292 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002293 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002294 s = socket.socket()
2295 s.setblocking(1)
2296 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002297 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002298 sys.stdout.write("\n")
2299 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002300 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002301 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002302 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002303 if wrapped:
2304 conn.write(indata)
2305 outdata = conn.read()
2306 else:
2307 s.send(indata)
2308 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002309 msg = outdata.strip().lower()
2310 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002311 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002312 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002313 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002314 " client: read %r from server, starting TLS...\n"
2315 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002316 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2317 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002318 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002319 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002320 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002321 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002322 " client: read %r from server, ending TLS...\n"
2323 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002324 s = conn.unwrap()
2325 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002326 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002327 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002328 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002329 " client: read %r from server\n" % msg)
2330 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002331 sys.stdout.write(" client: closing connection.\n")
2332 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002333 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002334 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002335 s.send(b"over\n")
2336 if wrapped:
2337 conn.close()
2338 else:
2339 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002340
Antoine Pitrou3945c862010-04-28 21:11:01 +00002341 def test_socketserver(self):
2342 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002343 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002344 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002345 if support.verbose:
2346 sys.stdout.write('\n')
2347 with open(CERTFILE, 'rb') as f:
2348 d1 = f.read()
2349 d2 = ''
2350 # now fetch the same data from the HTTPS server
2351 url = 'https://%s:%d/%s' % (
2352 HOST, server.port, os.path.split(CERTFILE)[1])
2353 f = urllib.urlopen(url)
Bill Janssen296a59d2007-09-16 22:06:00 +00002354 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002355 dlen = f.info().getheader("content-length")
2356 if dlen and (int(dlen) > 0):
2357 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002358 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002359 sys.stdout.write(
2360 " client: read %d bytes from remote server '%s'\n"
2361 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002362 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002363 f.close()
2364 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002365
Antoine Pitrou3945c862010-04-28 21:11:01 +00002366 def test_asyncore_server(self):
2367 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002368 indata = "TEST MESSAGE of mixed case\n"
2369
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002370 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002371 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002372
2373 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002374 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002375 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002376 s = ssl.wrap_socket(socket.socket())
2377 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002378 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002379 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002380 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002381 s.write(indata)
2382 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002383 if support.verbose:
2384 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002385 if outdata != indata.lower():
2386 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002387 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2388 % (outdata[:20], len(outdata),
2389 indata[:20].lower(), len(indata)))
2390 s.write(b"over\n")
2391 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002392 sys.stdout.write(" client: closing connection.\n")
2393 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002394 if support.verbose:
2395 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002396
Antoine Pitrou3945c862010-04-28 21:11:01 +00002397 def test_recv_send(self):
2398 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002399 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002400 sys.stdout.write("\n")
2401
2402 server = ThreadedEchoServer(CERTFILE,
2403 certreqs=ssl.CERT_NONE,
2404 ssl_version=ssl.PROTOCOL_TLSv1,
2405 cacerts=CERTFILE,
2406 chatty=True,
2407 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002408 with server:
2409 s = ssl.wrap_socket(socket.socket(),
2410 server_side=False,
2411 certfile=CERTFILE,
2412 ca_certs=CERTFILE,
2413 cert_reqs=ssl.CERT_NONE,
2414 ssl_version=ssl.PROTOCOL_TLSv1)
2415 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002416 # helper methods for standardising recv* method signatures
2417 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002418 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002419 count = s.recv_into(b)
2420 return b[:count]
2421
2422 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002423 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002424 count, addr = s.recvfrom_into(b)
2425 return b[:count]
2426
2427 # (name, method, whether to expect success, *args)
2428 send_methods = [
2429 ('send', s.send, True, []),
2430 ('sendto', s.sendto, False, ["some.address"]),
2431 ('sendall', s.sendall, True, []),
2432 ]
2433 recv_methods = [
2434 ('recv', s.recv, True, []),
2435 ('recvfrom', s.recvfrom, False, ["some.address"]),
2436 ('recv_into', _recv_into, True, []),
2437 ('recvfrom_into', _recvfrom_into, False, []),
2438 ]
2439 data_prefix = u"PREFIX_"
2440
2441 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002442 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002443 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002444 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002445 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002446 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002447 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002448 "While sending with <<{name:s}>> bad data "
2449 "<<{outdata:r}>> ({nout:d}) received; "
2450 "expected <<{indata:r}>> ({nin:d})\n".format(
2451 name=meth_name, outdata=outdata[:20],
2452 nout=len(outdata),
2453 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002454 )
2455 )
2456 except ValueError as e:
2457 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002458 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002459 "Failed to send with method <<{name:s}>>; "
2460 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002461 )
2462 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002463 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002464 "Method <<{name:s}>> failed with unexpected "
2465 "exception message: {exp:s}\n".format(
2466 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002467 )
2468 )
2469
2470 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002471 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002472 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002473 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002474 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002475 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002476 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002477 "While receiving with <<{name:s}>> bad data "
2478 "<<{outdata:r}>> ({nout:d}) received; "
2479 "expected <<{indata:r}>> ({nin:d})\n".format(
2480 name=meth_name, outdata=outdata[:20],
2481 nout=len(outdata),
2482 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002483 )
2484 )
2485 except ValueError as e:
2486 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002487 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002488 "Failed to receive with method <<{name:s}>>; "
2489 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002490 )
2491 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002492 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002493 "Method <<{name:s}>> failed with unexpected "
2494 "exception message: {exp:s}\n".format(
2495 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002496 )
2497 )
2498 # consume data
2499 s.read()
2500
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002501 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002502 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002503
Antoine Pitroufc69af12010-04-24 20:04:58 +00002504 def test_handshake_timeout(self):
2505 # Issue #5103: SSL handshake must respect the socket timeout
2506 server = socket.socket(socket.AF_INET)
2507 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002508 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002509 started = threading.Event()
2510 finish = False
2511
2512 def serve():
2513 server.listen(5)
2514 started.set()
2515 conns = []
2516 while not finish:
2517 r, w, e = select.select([server], [], [], 0.1)
2518 if server in r:
2519 # Let the socket hang around rather than having
2520 # it closed by garbage collection.
2521 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002522 for sock in conns:
2523 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002524
2525 t = threading.Thread(target=serve)
2526 t.start()
2527 started.wait()
2528
2529 try:
2530 try:
2531 c = socket.socket(socket.AF_INET)
2532 c.settimeout(0.2)
2533 c.connect((host, port))
2534 # Will attempt handshake and time out
2535 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2536 ssl.wrap_socket, c)
2537 finally:
2538 c.close()
2539 try:
2540 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002541 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002542 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002543 # Will attempt handshake and time out
2544 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2545 c.connect, (host, port))
2546 finally:
2547 c.close()
2548 finally:
2549 finish = True
2550 t.join()
2551 server.close()
2552
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002553 def test_server_accept(self):
2554 # Issue #16357: accept() on a SSLSocket created through
2555 # SSLContext.wrap_socket().
2556 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2557 context.verify_mode = ssl.CERT_REQUIRED
2558 context.load_verify_locations(CERTFILE)
2559 context.load_cert_chain(CERTFILE)
2560 server = socket.socket(socket.AF_INET)
2561 host = "127.0.0.1"
2562 port = support.bind_port(server)
2563 server = context.wrap_socket(server, server_side=True)
2564
2565 evt = threading.Event()
2566 remote = [None]
2567 peer = [None]
2568 def serve():
2569 server.listen(5)
2570 # Block on the accept and wait on the connection to close.
2571 evt.set()
2572 remote[0], peer[0] = server.accept()
2573 remote[0].recv(1)
2574
2575 t = threading.Thread(target=serve)
2576 t.start()
2577 # Client wait until server setup and perform a connect.
2578 evt.wait()
2579 client = context.wrap_socket(socket.socket())
2580 client.connect((host, port))
2581 client_addr = client.getsockname()
2582 client.close()
2583 t.join()
2584 remote[0].close()
2585 server.close()
2586 # Sanity checks.
2587 self.assertIsInstance(remote[0], ssl.SSLSocket)
2588 self.assertEqual(peer[0], client_addr)
2589
2590 def test_getpeercert_enotconn(self):
2591 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2592 with closing(context.wrap_socket(socket.socket())) as sock:
2593 with self.assertRaises(socket.error) as cm:
2594 sock.getpeercert()
2595 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2596
2597 def test_do_handshake_enotconn(self):
2598 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2599 with closing(context.wrap_socket(socket.socket())) as sock:
2600 with self.assertRaises(socket.error) as cm:
2601 sock.do_handshake()
2602 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2603
Antoine Pitroud76088d2012-01-03 22:46:48 +01002604 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002605 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2606 try:
2607 # Force a set of weak ciphers on our client context
2608 context.set_ciphers("DES")
2609 except ssl.SSLError:
2610 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002611 with ThreadedEchoServer(CERTFILE,
2612 ssl_version=ssl.PROTOCOL_SSLv23,
2613 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002614 with closing(context.wrap_socket(socket.socket())) as s:
2615 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002616 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002617 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2618
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002619 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2620 def test_default_ecdh_curve(self):
2621 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2622 # should be enabled by default on SSL contexts.
2623 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2624 context.load_cert_chain(CERTFILE)
2625 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2626 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2627 # our default cipher list should prefer ECDH-based ciphers
2628 # automatically.
2629 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2630 context.set_ciphers("ECCdraft:ECDH")
2631 with ThreadedEchoServer(context=context) as server:
2632 with closing(context.wrap_socket(socket.socket())) as s:
2633 s.connect((HOST, server.port))
2634 self.assertIn("ECDH", s.cipher()[0])
2635
2636 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2637 "'tls-unique' channel binding not available")
2638 def test_tls_unique_channel_binding(self):
2639 """Test tls-unique channel binding."""
2640 if support.verbose:
2641 sys.stdout.write("\n")
2642
2643 server = ThreadedEchoServer(CERTFILE,
2644 certreqs=ssl.CERT_NONE,
2645 ssl_version=ssl.PROTOCOL_TLSv1,
2646 cacerts=CERTFILE,
2647 chatty=True,
2648 connectionchatty=False)
2649 with server:
2650 s = ssl.wrap_socket(socket.socket(),
2651 server_side=False,
2652 certfile=CERTFILE,
2653 ca_certs=CERTFILE,
2654 cert_reqs=ssl.CERT_NONE,
2655 ssl_version=ssl.PROTOCOL_TLSv1)
2656 s.connect((HOST, server.port))
2657 # get the data
2658 cb_data = s.get_channel_binding("tls-unique")
2659 if support.verbose:
2660 sys.stdout.write(" got channel binding data: {0!r}\n"
2661 .format(cb_data))
2662
2663 # check if it is sane
2664 self.assertIsNotNone(cb_data)
2665 self.assertEqual(len(cb_data), 12) # True for TLSv1
2666
2667 # and compare with the peers version
2668 s.write(b"CB tls-unique\n")
2669 peer_data_repr = s.read().strip()
2670 self.assertEqual(peer_data_repr,
2671 repr(cb_data).encode("us-ascii"))
2672 s.close()
2673
2674 # now, again
2675 s = ssl.wrap_socket(socket.socket(),
2676 server_side=False,
2677 certfile=CERTFILE,
2678 ca_certs=CERTFILE,
2679 cert_reqs=ssl.CERT_NONE,
2680 ssl_version=ssl.PROTOCOL_TLSv1)
2681 s.connect((HOST, server.port))
2682 new_cb_data = s.get_channel_binding("tls-unique")
2683 if support.verbose:
2684 sys.stdout.write(" got another channel binding data: {0!r}\n"
2685 .format(new_cb_data))
2686 # is it really unique
2687 self.assertNotEqual(cb_data, new_cb_data)
2688 self.assertIsNotNone(cb_data)
2689 self.assertEqual(len(cb_data), 12) # True for TLSv1
2690 s.write(b"CB tls-unique\n")
2691 peer_data_repr = s.read().strip()
2692 self.assertEqual(peer_data_repr,
2693 repr(new_cb_data).encode("us-ascii"))
2694 s.close()
2695
2696 def test_compression(self):
2697 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2698 context.load_cert_chain(CERTFILE)
2699 stats = server_params_test(context, context,
2700 chatty=True, connectionchatty=True)
2701 if support.verbose:
2702 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2703 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2704
2705 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2706 "ssl.OP_NO_COMPRESSION needed for this test")
2707 def test_compression_disabled(self):
2708 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2709 context.load_cert_chain(CERTFILE)
2710 context.options |= ssl.OP_NO_COMPRESSION
2711 stats = server_params_test(context, context,
2712 chatty=True, connectionchatty=True)
2713 self.assertIs(stats['compression'], None)
2714
2715 def test_dh_params(self):
2716 # Check we can get a connection with ephemeral Diffie-Hellman
2717 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2718 context.load_cert_chain(CERTFILE)
2719 context.load_dh_params(DHFILE)
2720 context.set_ciphers("kEDH")
2721 stats = server_params_test(context, context,
2722 chatty=True, connectionchatty=True)
2723 cipher = stats["cipher"][0]
2724 parts = cipher.split("-")
2725 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2726 self.fail("Non-DH cipher: " + cipher[0])
2727
2728 def test_selected_npn_protocol(self):
2729 # selected_npn_protocol() is None unless NPN is used
2730 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2731 context.load_cert_chain(CERTFILE)
2732 stats = server_params_test(context, context,
2733 chatty=True, connectionchatty=True)
2734 self.assertIs(stats['client_npn_protocol'], None)
2735
2736 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2737 def test_npn_protocols(self):
2738 server_protocols = ['http/1.1', 'spdy/2']
2739 protocol_tests = [
2740 (['http/1.1', 'spdy/2'], 'http/1.1'),
2741 (['spdy/2', 'http/1.1'], 'http/1.1'),
2742 (['spdy/2', 'test'], 'spdy/2'),
2743 (['abc', 'def'], 'abc')
2744 ]
2745 for client_protocols, expected in protocol_tests:
2746 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2747 server_context.load_cert_chain(CERTFILE)
2748 server_context.set_npn_protocols(server_protocols)
2749 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2750 client_context.load_cert_chain(CERTFILE)
2751 client_context.set_npn_protocols(client_protocols)
2752 stats = server_params_test(client_context, server_context,
2753 chatty=True, connectionchatty=True)
2754
2755 msg = "failed trying %s (s) and %s (c).\n" \
2756 "was expecting %s, but got %%s from the %%s" \
2757 % (str(server_protocols), str(client_protocols),
2758 str(expected))
2759 client_result = stats['client_npn_protocol']
2760 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2761 server_result = stats['server_npn_protocols'][-1] \
2762 if len(stats['server_npn_protocols']) else 'nothing'
2763 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2764
2765 def sni_contexts(self):
2766 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2767 server_context.load_cert_chain(SIGNED_CERTFILE)
2768 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2769 other_context.load_cert_chain(SIGNED_CERTFILE2)
2770 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2771 client_context.verify_mode = ssl.CERT_REQUIRED
2772 client_context.load_verify_locations(SIGNING_CA)
2773 return server_context, other_context, client_context
2774
2775 def check_common_name(self, stats, name):
2776 cert = stats['peercert']
2777 self.assertIn((('commonName', name),), cert['subject'])
2778
2779 @needs_sni
2780 def test_sni_callback(self):
2781 calls = []
2782 server_context, other_context, client_context = self.sni_contexts()
2783
2784 def servername_cb(ssl_sock, server_name, initial_context):
2785 calls.append((server_name, initial_context))
2786 if server_name is not None:
2787 ssl_sock.context = other_context
2788 server_context.set_servername_callback(servername_cb)
2789
2790 stats = server_params_test(client_context, server_context,
2791 chatty=True,
2792 sni_name='supermessage')
2793 # The hostname was fetched properly, and the certificate was
2794 # changed for the connection.
2795 self.assertEqual(calls, [("supermessage", server_context)])
2796 # CERTFILE4 was selected
2797 self.check_common_name(stats, 'fakehostname')
2798
2799 calls = []
2800 # The callback is called with server_name=None
2801 stats = server_params_test(client_context, server_context,
2802 chatty=True,
2803 sni_name=None)
2804 self.assertEqual(calls, [(None, server_context)])
2805 self.check_common_name(stats, 'localhost')
2806
2807 # Check disabling the callback
2808 calls = []
2809 server_context.set_servername_callback(None)
2810
2811 stats = server_params_test(client_context, server_context,
2812 chatty=True,
2813 sni_name='notfunny')
2814 # Certificate didn't change
2815 self.check_common_name(stats, 'localhost')
2816 self.assertEqual(calls, [])
2817
2818 @needs_sni
2819 def test_sni_callback_alert(self):
2820 # Returning a TLS alert is reflected to the connecting client
2821 server_context, other_context, client_context = self.sni_contexts()
2822
2823 def cb_returning_alert(ssl_sock, server_name, initial_context):
2824 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2825 server_context.set_servername_callback(cb_returning_alert)
2826
2827 with self.assertRaises(ssl.SSLError) as cm:
2828 stats = server_params_test(client_context, server_context,
2829 chatty=False,
2830 sni_name='supermessage')
2831 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2832
2833 @needs_sni
2834 def test_sni_callback_raising(self):
2835 # Raising fails the connection with a TLS handshake failure alert.
2836 server_context, other_context, client_context = self.sni_contexts()
2837
2838 def cb_raising(ssl_sock, server_name, initial_context):
2839 1/0
2840 server_context.set_servername_callback(cb_raising)
2841
2842 with self.assertRaises(ssl.SSLError) as cm, \
2843 support.captured_stderr() as stderr:
2844 stats = server_params_test(client_context, server_context,
2845 chatty=False,
2846 sni_name='supermessage')
2847 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2848 self.assertIn("ZeroDivisionError", stderr.getvalue())
2849
2850 @needs_sni
2851 def test_sni_callback_wrong_return_type(self):
2852 # Returning the wrong return type terminates the TLS connection
2853 # with an internal error alert.
2854 server_context, other_context, client_context = self.sni_contexts()
2855
2856 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2857 return "foo"
2858 server_context.set_servername_callback(cb_wrong_return_type)
2859
2860 with self.assertRaises(ssl.SSLError) as cm, \
2861 support.captured_stderr() as stderr:
2862 stats = server_params_test(client_context, server_context,
2863 chatty=False,
2864 sni_name='supermessage')
2865 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2866 self.assertIn("TypeError", stderr.getvalue())
2867
2868 def test_read_write_after_close_raises_valuerror(self):
2869 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2870 context.verify_mode = ssl.CERT_REQUIRED
2871 context.load_verify_locations(CERTFILE)
2872 context.load_cert_chain(CERTFILE)
2873 server = ThreadedEchoServer(context=context, chatty=False)
2874
2875 with server:
2876 s = context.wrap_socket(socket.socket())
2877 s.connect((HOST, server.port))
2878 s.close()
2879
2880 self.assertRaises(ValueError, s.read, 1024)
2881 self.assertRaises(ValueError, s.write, b'hello')
2882
Bill Janssen61c001a2008-09-08 16:37:24 +00002883
Neal Norwitz9eb9b102007-08-27 01:15:33 +00002884def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002885 if support.verbose:
2886 plats = {
2887 'Linux': platform.linux_distribution,
2888 'Mac': platform.mac_ver,
2889 'Windows': platform.win32_ver,
2890 }
2891 for name, func in plats.items():
2892 plat = func()
2893 if plat and plat[0]:
2894 plat = '%s %r' % (name, plat)
2895 break
2896 else:
2897 plat = repr(platform.platform())
2898 print("test_ssl: testing with %r %r" %
2899 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
2900 print(" under %s" % plat)
2901 print(" HAS_SNI = %r" % ssl.HAS_SNI)
2902 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2903 try:
2904 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2905 except AttributeError:
2906 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00002907
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002908 for filename in [
2909 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2910 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
2911 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
2912 BADCERT, BADKEY, EMPTYCERT]:
2913 if not os.path.exists(filename):
2914 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002915
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002916 tests = [ContextTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002917
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002918 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00002919 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00002920
Bill Janssen98d19da2007-09-10 21:51:02 +00002921 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002922 thread_info = support.threading_setup()
2923 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00002924 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002925
Antoine Pitrou3945c862010-04-28 21:11:01 +00002926 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002927 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002928 finally:
2929 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002930 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002931
2932if __name__ == "__main__":
2933 test_main()