blob: 351925e2306ff831270160f7999916760517ded0 [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Bill Janssen934b16d2008-06-28 22:19:33 +00007import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import socket
Bill Janssen934b16d2008-06-28 22:19:33 +00009import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000010import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050011import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000012import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import os
Antoine Pitroub558f172010-04-23 23:25:45 +000014import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000015import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050016import tempfile
17import urllib
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000018import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000019import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000020import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050021import functools
22from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000023
Benjamin Petersondaeb9252014-08-20 14:14:50 -050024ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000025
Benjamin Petersondaeb9252014-08-20 14:14:50 -050026PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
27HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Benjamin Petersondaeb9252014-08-20 14:14:50 -050029def data_file(*name):
30 return os.path.join(os.path.dirname(__file__), *name)
31
32# The custom key and certificate files used in test_ssl are generated
33# using Lib/test/make_ssl_certs.py.
34# Other certificates are simply fetched from the Internet servers they
35# are meant to authenticate.
36
37CERTFILE = data_file("keycert.pem")
38BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
39ONLYCERT = data_file("ssl_cert.pem")
40ONLYKEY = data_file("ssl_key.pem")
41BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
42BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
43CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
44ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
45KEY_PASSWORD = "somepass"
46CAPATH = data_file("capath")
47BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
48CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
49CAFILE_CACERT = data_file("capath", "5ed36f99.0")
50
51
52# empty CRL
53CRLFILE = data_file("revocation.crl")
54
55# Two keys and certs signed by the same CA (for SNI tests)
56SIGNED_CERTFILE = data_file("keycert3.pem")
57SIGNED_CERTFILE2 = data_file("keycert4.pem")
58SIGNING_CA = data_file("pycacert.pem")
59
60SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
61
62EMPTYCERT = data_file("nullcert.pem")
63BADCERT = data_file("badcert.pem")
64WRONGCERT = data_file("XXXnonexisting.pem")
65BADKEY = data_file("badkey.pem")
66NOKIACERT = data_file("nokia.pem")
67NULLBYTECERT = data_file("nullbytecert.pem")
68
69DHFILE = data_file("dh512.pem")
70BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
71
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000072
Neal Norwitz3e533c22007-08-27 01:03:18 +000073def handle_error(prefix):
74 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050075 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000077
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000078
79class BasicTests(unittest.TestCase):
80
Antoine Pitrou3945c862010-04-28 21:11:01 +000081 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000082 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000083 try:
84 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
85 except IOError, e:
86 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
87 pass
88 else:
89 raise
90 try:
91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
92 except IOError, e:
93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94 pass
95 else:
96 raise
Benjamin Petersondaeb9252014-08-20 14:14:50 -050097def can_clear_options():
98 # 0.9.8m or higher
99 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
100
101def no_sslv2_implies_sslv3_hello():
102 # 0.9.7h or higher
103 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
104
105def have_verify_flags():
106 # 0.9.8 or higher
107 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
108
109def utc_offset(): #NOTE: ignore issues like #1647654
110 # local time = utc time + utc offset
111 if time.daylight and time.localtime().tm_isdst > 0:
112 return -time.altzone # seconds
113 return -time.timezone
114
115def asn1time(cert_time):
116 # Some versions of OpenSSL ignore seconds, see #18207
117 # 0.9.8.i
118 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
119 fmt = "%b %d %H:%M:%S %Y GMT"
120 dt = datetime.datetime.strptime(cert_time, fmt)
121 dt = dt.replace(second=0)
122 cert_time = dt.strftime(fmt)
123 # %d adds leading zero but ASN1_TIME_print() uses leading space
124 if cert_time[4] == "0":
125 cert_time = cert_time[:4] + " " + cert_time[5:]
126
127 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000128
Antoine Pitroud75efd92010-08-04 17:38:33 +0000129# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
130def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200131 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200132 @functools.wraps(func)
133 def f(*args, **kwargs):
134 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500135 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
136 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200137 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500138 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200139 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
140 return func(*args, **kwargs)
141 return f
142 else:
143 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000144
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500145needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
146
Antoine Pitroud75efd92010-08-04 17:38:33 +0000147
148class BasicSocketTests(unittest.TestCase):
149
Antoine Pitrou3945c862010-04-28 21:11:01 +0000150 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000151 ssl.CERT_NONE
152 ssl.CERT_OPTIONAL
153 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500154 ssl.OP_CIPHER_SERVER_PREFERENCE
155 ssl.OP_SINGLE_DH_USE
156 if ssl.HAS_ECDH:
157 ssl.OP_SINGLE_ECDH_USE
158 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
159 ssl.OP_NO_COMPRESSION
160 self.assertIn(ssl.HAS_SNI, {True, False})
161 self.assertIn(ssl.HAS_ECDH, {True, False})
162
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000163
Antoine Pitrou3945c862010-04-28 21:11:01 +0000164 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000165 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500166 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 sys.stdout.write("\n RAND_status is %d (%s)\n"
168 % (v, (v and "sufficient randomness") or
169 "insufficient randomness"))
Jesus Ceaa8a5b392012-09-11 01:55:04 +0200170 self.assertRaises(TypeError, ssl.RAND_egd, 1)
171 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000172 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000173
Antoine Pitrou3945c862010-04-28 21:11:01 +0000174 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000175 # note that this uses an 'unofficial' function in _ssl.c,
176 # provided solely for this test, to exercise the certificate
177 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500178 p = ssl._ssl._test_decode_cert(CERTFILE)
179 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000180 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500181 self.assertEqual(p['issuer'],
182 ((('countryName', 'XY'),),
183 (('localityName', 'Castle Anthrax'),),
184 (('organizationName', 'Python Software Foundation'),),
185 (('commonName', 'localhost'),))
186 )
187 # Note the next three asserts will fail if the keys are regenerated
188 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
189 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
190 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200191 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100192 ((('countryName', 'XY'),),
193 (('localityName', 'Castle Anthrax'),),
194 (('organizationName', 'Python Software Foundation'),),
195 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200196 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100197 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200198 # Issue #13034: the subjectAltName in some certificates
199 # (notably projects.developer.nokia.com:443) wasn't parsed
200 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500201 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200202 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
203 self.assertEqual(p['subjectAltName'],
204 (('DNS', 'projects.developer.nokia.com'),
205 ('DNS', 'projects.forum.nokia.com'))
206 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500207 # extra OCSP and AIA fields
208 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
209 self.assertEqual(p['caIssuers'],
210 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
211 self.assertEqual(p['crlDistributionPoints'],
212 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000213
Christian Heimes88b174c2013-08-17 00:54:47 +0200214 def test_parse_cert_CVE_2013_4238(self):
215 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500216 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200217 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
218 subject = ((('countryName', 'US'),),
219 (('stateOrProvinceName', 'Oregon'),),
220 (('localityName', 'Beaverton'),),
221 (('organizationName', 'Python Software Foundation'),),
222 (('organizationalUnitName', 'Python Core Development'),),
223 (('commonName', 'null.python.org\x00example.org'),),
224 (('emailAddress', 'python-dev@python.org'),))
225 self.assertEqual(p['subject'], subject)
226 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500227 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200228 san = (('DNS', 'altnull.python.org\x00example.com'),
229 ('email', 'null@python.org\x00user@example.org'),
230 ('URI', 'http://null.python.org\x00http://example.org'),
231 ('IP Address', '192.0.2.1'),
232 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
233 else:
234 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
235 san = (('DNS', 'altnull.python.org\x00example.com'),
236 ('email', 'null@python.org\x00user@example.org'),
237 ('URI', 'http://null.python.org\x00http://example.org'),
238 ('IP Address', '192.0.2.1'),
239 ('IP Address', '<invalid>'))
240
241 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200242
Antoine Pitrou3945c862010-04-28 21:11:01 +0000243 def test_DER_to_PEM(self):
244 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
245 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000246 d1 = ssl.PEM_cert_to_DER_cert(pem)
247 p2 = ssl.DER_cert_to_PEM_cert(d1)
248 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000249 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000250 if not p2.startswith(ssl.PEM_HEADER + '\n'):
251 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
252 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
253 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000254
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000255 def test_openssl_version(self):
256 n = ssl.OPENSSL_VERSION_NUMBER
257 t = ssl.OPENSSL_VERSION_INFO
258 s = ssl.OPENSSL_VERSION
259 self.assertIsInstance(n, (int, long))
260 self.assertIsInstance(t, tuple)
261 self.assertIsInstance(s, str)
262 # Some sanity checks follow
263 # >= 0.9
264 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400265 # < 3.0
266 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000267 major, minor, fix, patch, status = t
268 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400269 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000270 self.assertGreaterEqual(minor, 0)
271 self.assertLess(minor, 256)
272 self.assertGreaterEqual(fix, 0)
273 self.assertLess(fix, 256)
274 self.assertGreaterEqual(patch, 0)
275 self.assertLessEqual(patch, 26)
276 self.assertGreaterEqual(status, 0)
277 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400278 # Version string as returned by {Open,Libre}SSL, the format might change
279 if "LibreSSL" in s:
280 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
281 (s, t))
282 else:
283 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
284 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000285
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500286 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000287 def test_refcycle(self):
288 # Issue #7943: an SSL object doesn't create reference cycles with
289 # itself.
290 s = socket.socket(socket.AF_INET)
291 ss = ssl.wrap_socket(s)
292 wr = weakref.ref(ss)
293 del ss
294 self.assertEqual(wr(), None)
295
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000296 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500297 # Methods on an unconnected SSLSocket propagate the original
298 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000299 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500300 with closing(ssl.wrap_socket(s)) as ss:
301 self.assertRaises(socket.error, ss.recv, 1)
302 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
303 self.assertRaises(socket.error, ss.recvfrom, 1)
304 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
305 self.assertRaises(socket.error, ss.send, b'x')
306 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
307
308 def test_timeout(self):
309 # Issue #8524: when creating an SSL socket, the timeout of the
310 # original socket should be retained.
311 for timeout in (None, 0.0, 5.0):
312 s = socket.socket(socket.AF_INET)
313 s.settimeout(timeout)
314 with closing(ssl.wrap_socket(s)) as ss:
315 self.assertEqual(timeout, ss.gettimeout())
316
317 def test_errors(self):
318 sock = socket.socket()
319 self.assertRaisesRegexp(ValueError,
320 "certfile must be specified",
321 ssl.wrap_socket, sock, keyfile=CERTFILE)
322 self.assertRaisesRegexp(ValueError,
323 "certfile must be specified for server-side operations",
324 ssl.wrap_socket, sock, server_side=True)
325 self.assertRaisesRegexp(ValueError,
326 "certfile must be specified for server-side operations",
327 ssl.wrap_socket, sock, server_side=True, certfile="")
328 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
329 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
330 s.connect, (HOST, 8080))
331 with self.assertRaises(IOError) as cm:
332 with closing(socket.socket()) as sock:
333 ssl.wrap_socket(sock, certfile=WRONGCERT)
334 self.assertEqual(cm.exception.errno, errno.ENOENT)
335 with self.assertRaises(IOError) as cm:
336 with closing(socket.socket()) as sock:
337 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
338 self.assertEqual(cm.exception.errno, errno.ENOENT)
339 with self.assertRaises(IOError) as cm:
340 with closing(socket.socket()) as sock:
341 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
342 self.assertEqual(cm.exception.errno, errno.ENOENT)
343
344 def test_match_hostname(self):
345 def ok(cert, hostname):
346 ssl.match_hostname(cert, hostname)
347 def fail(cert, hostname):
348 self.assertRaises(ssl.CertificateError,
349 ssl.match_hostname, cert, hostname)
350
351 cert = {'subject': ((('commonName', 'example.com'),),)}
352 ok(cert, 'example.com')
353 ok(cert, 'ExAmple.cOm')
354 fail(cert, 'www.example.com')
355 fail(cert, '.example.com')
356 fail(cert, 'example.org')
357 fail(cert, 'exampleXcom')
358
359 cert = {'subject': ((('commonName', '*.a.com'),),)}
360 ok(cert, 'foo.a.com')
361 fail(cert, 'bar.foo.a.com')
362 fail(cert, 'a.com')
363 fail(cert, 'Xa.com')
364 fail(cert, '.a.com')
365
366 # only match one left-most wildcard
367 cert = {'subject': ((('commonName', 'f*.com'),),)}
368 ok(cert, 'foo.com')
369 ok(cert, 'f.com')
370 fail(cert, 'bar.com')
371 fail(cert, 'foo.a.com')
372 fail(cert, 'bar.foo.com')
373
374 # NULL bytes are bad, CVE-2013-4073
375 cert = {'subject': ((('commonName',
376 'null.python.org\x00example.org'),),)}
377 ok(cert, 'null.python.org\x00example.org') # or raise an error?
378 fail(cert, 'example.org')
379 fail(cert, 'null.python.org')
380
381 # error cases with wildcards
382 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
383 fail(cert, 'bar.foo.a.com')
384 fail(cert, 'a.com')
385 fail(cert, 'Xa.com')
386 fail(cert, '.a.com')
387
388 cert = {'subject': ((('commonName', 'a.*.com'),),)}
389 fail(cert, 'a.foo.com')
390 fail(cert, 'a..com')
391 fail(cert, 'a.com')
392
393 # wildcard doesn't match IDNA prefix 'xn--'
394 idna = u'püthon.python.org'.encode("idna").decode("ascii")
395 cert = {'subject': ((('commonName', idna),),)}
396 ok(cert, idna)
397 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
398 fail(cert, idna)
399 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
400 fail(cert, idna)
401
402 # wildcard in first fragment and IDNA A-labels in sequent fragments
403 # are supported.
404 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
405 cert = {'subject': ((('commonName', idna),),)}
406 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
407 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
408 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
409 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
410
411 # Slightly fake real-world example
412 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
413 'subject': ((('commonName', 'linuxfrz.org'),),),
414 'subjectAltName': (('DNS', 'linuxfr.org'),
415 ('DNS', 'linuxfr.com'),
416 ('othername', '<unsupported>'))}
417 ok(cert, 'linuxfr.org')
418 ok(cert, 'linuxfr.com')
419 # Not a "DNS" entry
420 fail(cert, '<unsupported>')
421 # When there is a subjectAltName, commonName isn't used
422 fail(cert, 'linuxfrz.org')
423
424 # A pristine real-world example
425 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
426 'subject': ((('countryName', 'US'),),
427 (('stateOrProvinceName', 'California'),),
428 (('localityName', 'Mountain View'),),
429 (('organizationName', 'Google Inc'),),
430 (('commonName', 'mail.google.com'),))}
431 ok(cert, 'mail.google.com')
432 fail(cert, 'gmail.com')
433 # Only commonName is considered
434 fail(cert, 'California')
435
436 # Neither commonName nor subjectAltName
437 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
438 'subject': ((('countryName', 'US'),),
439 (('stateOrProvinceName', 'California'),),
440 (('localityName', 'Mountain View'),),
441 (('organizationName', 'Google Inc'),))}
442 fail(cert, 'mail.google.com')
443
444 # No DNS entry in subjectAltName but a commonName
445 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
446 'subject': ((('countryName', 'US'),),
447 (('stateOrProvinceName', 'California'),),
448 (('localityName', 'Mountain View'),),
449 (('commonName', 'mail.google.com'),)),
450 'subjectAltName': (('othername', 'blabla'), )}
451 ok(cert, 'mail.google.com')
452
453 # No DNS entry subjectAltName and no commonName
454 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
455 'subject': ((('countryName', 'US'),),
456 (('stateOrProvinceName', 'California'),),
457 (('localityName', 'Mountain View'),),
458 (('organizationName', 'Google Inc'),)),
459 'subjectAltName': (('othername', 'blabla'),)}
460 fail(cert, 'google.com')
461
462 # Empty cert / no cert
463 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
464 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
465
466 # Issue #17980: avoid denials of service by refusing more than one
467 # wildcard per fragment.
468 cert = {'subject': ((('commonName', 'a*b.com'),),)}
469 ok(cert, 'axxb.com')
470 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
471 fail(cert, 'axxb.com')
472 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
473 with self.assertRaises(ssl.CertificateError) as cm:
474 ssl.match_hostname(cert, 'axxbxxc.com')
475 self.assertIn("too many wildcards", str(cm.exception))
476
477 def test_server_side(self):
478 # server_hostname doesn't work for server sockets
479 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
480 with closing(socket.socket()) as sock:
481 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
482 server_hostname="some.hostname")
483
484 def test_unknown_channel_binding(self):
485 # should raise ValueError for unknown type
486 s = socket.socket(socket.AF_INET)
487 with closing(ssl.wrap_socket(s)) as ss:
488 with self.assertRaises(ValueError):
489 ss.get_channel_binding("unknown-type")
490
491 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
492 "'tls-unique' channel binding not available")
493 def test_tls_unique_channel_binding(self):
494 # unconnected should return None for known type
495 s = socket.socket(socket.AF_INET)
496 with closing(ssl.wrap_socket(s)) as ss:
497 self.assertIsNone(ss.get_channel_binding("tls-unique"))
498 # the same for server-side
499 s = socket.socket(socket.AF_INET)
500 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
501 self.assertIsNone(ss.get_channel_binding("tls-unique"))
502
503 def test_get_default_verify_paths(self):
504 paths = ssl.get_default_verify_paths()
505 self.assertEqual(len(paths), 6)
506 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
507
508 with support.EnvironmentVarGuard() as env:
509 env["SSL_CERT_DIR"] = CAPATH
510 env["SSL_CERT_FILE"] = CERTFILE
511 paths = ssl.get_default_verify_paths()
512 self.assertEqual(paths.cafile, CERTFILE)
513 self.assertEqual(paths.capath, CAPATH)
514
515 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
516 def test_enum_certificates(self):
517 self.assertTrue(ssl.enum_certificates("CA"))
518 self.assertTrue(ssl.enum_certificates("ROOT"))
519
520 self.assertRaises(TypeError, ssl.enum_certificates)
521 self.assertRaises(WindowsError, ssl.enum_certificates, "")
522
523 trust_oids = set()
524 for storename in ("CA", "ROOT"):
525 store = ssl.enum_certificates(storename)
526 self.assertIsInstance(store, list)
527 for element in store:
528 self.assertIsInstance(element, tuple)
529 self.assertEqual(len(element), 3)
530 cert, enc, trust = element
531 self.assertIsInstance(cert, bytes)
532 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
533 self.assertIsInstance(trust, (set, bool))
534 if isinstance(trust, set):
535 trust_oids.update(trust)
536
537 serverAuth = "1.3.6.1.5.5.7.3.1"
538 self.assertIn(serverAuth, trust_oids)
539
540 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
541 def test_enum_crls(self):
542 self.assertTrue(ssl.enum_crls("CA"))
543 self.assertRaises(TypeError, ssl.enum_crls)
544 self.assertRaises(WindowsError, ssl.enum_crls, "")
545
546 crls = ssl.enum_crls("CA")
547 self.assertIsInstance(crls, list)
548 for element in crls:
549 self.assertIsInstance(element, tuple)
550 self.assertEqual(len(element), 2)
551 self.assertIsInstance(element[0], bytes)
552 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
553
554
555 def test_asn1object(self):
556 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
557 '1.3.6.1.5.5.7.3.1')
558
559 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
560 self.assertEqual(val, expected)
561 self.assertEqual(val.nid, 129)
562 self.assertEqual(val.shortname, 'serverAuth')
563 self.assertEqual(val.longname, 'TLS Web Server Authentication')
564 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
565 self.assertIsInstance(val, ssl._ASN1Object)
566 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
567
568 val = ssl._ASN1Object.fromnid(129)
569 self.assertEqual(val, expected)
570 self.assertIsInstance(val, ssl._ASN1Object)
571 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
572 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
573 ssl._ASN1Object.fromnid(100000)
574 for i in range(1000):
575 try:
576 obj = ssl._ASN1Object.fromnid(i)
577 except ValueError:
578 pass
579 else:
580 self.assertIsInstance(obj.nid, int)
581 self.assertIsInstance(obj.shortname, str)
582 self.assertIsInstance(obj.longname, str)
583 self.assertIsInstance(obj.oid, (str, type(None)))
584
585 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
586 self.assertEqual(val, expected)
587 self.assertIsInstance(val, ssl._ASN1Object)
588 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
589 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
590 expected)
591 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
592 ssl._ASN1Object.fromname('serverauth')
593
594 def test_purpose_enum(self):
595 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
596 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
597 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
598 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
599 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
600 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
601 '1.3.6.1.5.5.7.3.1')
602
603 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
604 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
605 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
606 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
607 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
608 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
609 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000610
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100611 def test_unsupported_dtls(self):
612 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
613 self.addCleanup(s.close)
614 with self.assertRaises(NotImplementedError) as cx:
615 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
616 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500617 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
618 with self.assertRaises(NotImplementedError) as cx:
619 ctx.wrap_socket(s)
620 self.assertEqual(str(cx.exception), "only stream sockets are supported")
621
622 def cert_time_ok(self, timestring, timestamp):
623 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
624
625 def cert_time_fail(self, timestring):
626 with self.assertRaises(ValueError):
627 ssl.cert_time_to_seconds(timestring)
628
629 @unittest.skipUnless(utc_offset(),
630 'local time needs to be different from UTC')
631 def test_cert_time_to_seconds_timezone(self):
632 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
633 # results if local timezone is not UTC
634 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
635 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
636
637 def test_cert_time_to_seconds(self):
638 timestring = "Jan 5 09:34:43 2018 GMT"
639 ts = 1515144883.0
640 self.cert_time_ok(timestring, ts)
641 # accept keyword parameter, assert its name
642 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
643 # accept both %e and %d (space or zero generated by strftime)
644 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
645 # case-insensitive
646 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
647 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
648 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
649 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
650 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
651 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
652 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
653 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
654
655 newyear_ts = 1230768000.0
656 # leap seconds
657 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
658 # same timestamp
659 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
660
661 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
662 # allow 60th second (even if it is not a leap second)
663 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
664 # allow 2nd leap second for compatibility with time.strptime()
665 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
666 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
667
668 # no special treatement for the special value:
669 # 99991231235959Z (rfc 5280)
670 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
671
672 @support.run_with_locale('LC_ALL', '')
673 def test_cert_time_to_seconds_locale(self):
674 # `cert_time_to_seconds()` should be locale independent
675
676 def local_february_name():
677 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
678
679 if local_february_name().lower() == 'feb':
680 self.skipTest("locale-specific month name needs to be "
681 "different from C locale")
682
683 # locale-independent
684 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
685 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
686
687
688class ContextTests(unittest.TestCase):
689
690 @skip_if_broken_ubuntu_ssl
691 def test_constructor(self):
692 for protocol in PROTOCOLS:
693 ssl.SSLContext(protocol)
694 self.assertRaises(TypeError, ssl.SSLContext)
695 self.assertRaises(ValueError, ssl.SSLContext, -1)
696 self.assertRaises(ValueError, ssl.SSLContext, 42)
697
698 @skip_if_broken_ubuntu_ssl
699 def test_protocol(self):
700 for proto in PROTOCOLS:
701 ctx = ssl.SSLContext(proto)
702 self.assertEqual(ctx.protocol, proto)
703
704 def test_ciphers(self):
705 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
706 ctx.set_ciphers("ALL")
707 ctx.set_ciphers("DEFAULT")
708 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
709 ctx.set_ciphers("^$:,;?*'dorothyx")
710
711 @skip_if_broken_ubuntu_ssl
712 def test_options(self):
713 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
714 # OP_ALL | OP_NO_SSLv2 is the default value
715 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
716 ctx.options)
717 ctx.options |= ssl.OP_NO_SSLv3
718 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
719 ctx.options)
720 if can_clear_options():
721 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
722 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
723 ctx.options)
724 ctx.options = 0
725 self.assertEqual(0, ctx.options)
726 else:
727 with self.assertRaises(ValueError):
728 ctx.options = 0
729
730 def test_verify_mode(self):
731 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
732 # Default value
733 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
734 ctx.verify_mode = ssl.CERT_OPTIONAL
735 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
736 ctx.verify_mode = ssl.CERT_REQUIRED
737 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
738 ctx.verify_mode = ssl.CERT_NONE
739 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
740 with self.assertRaises(TypeError):
741 ctx.verify_mode = None
742 with self.assertRaises(ValueError):
743 ctx.verify_mode = 42
744
745 @unittest.skipUnless(have_verify_flags(),
746 "verify_flags need OpenSSL > 0.9.8")
747 def test_verify_flags(self):
748 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
749 # default value by OpenSSL
750 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
751 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
752 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
753 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
754 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
755 ctx.verify_flags = ssl.VERIFY_DEFAULT
756 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
757 # supports any value
758 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
759 self.assertEqual(ctx.verify_flags,
760 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
761 with self.assertRaises(TypeError):
762 ctx.verify_flags = None
763
764 def test_load_cert_chain(self):
765 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
766 # Combined key and cert in a single file
767 ctx.load_cert_chain(CERTFILE)
768 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
769 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
770 with self.assertRaises(IOError) as cm:
771 ctx.load_cert_chain(WRONGCERT)
772 self.assertEqual(cm.exception.errno, errno.ENOENT)
773 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
774 ctx.load_cert_chain(BADCERT)
775 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
776 ctx.load_cert_chain(EMPTYCERT)
777 # Separate key and cert
778 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
779 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
780 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
781 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
782 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
783 ctx.load_cert_chain(ONLYCERT)
784 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
785 ctx.load_cert_chain(ONLYKEY)
786 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
787 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
788 # Mismatching key and cert
789 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
790 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
791 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
792 # Password protected key and cert
793 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
794 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
795 ctx.load_cert_chain(CERTFILE_PROTECTED,
796 password=bytearray(KEY_PASSWORD.encode()))
797 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
798 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
799 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
800 bytearray(KEY_PASSWORD.encode()))
801 with self.assertRaisesRegexp(TypeError, "should be a string"):
802 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
803 with self.assertRaises(ssl.SSLError):
804 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
805 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
806 # openssl has a fixed limit on the password buffer.
807 # PEM_BUFSIZE is generally set to 1kb.
808 # Return a string larger than this.
809 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
810 # Password callback
811 def getpass_unicode():
812 return KEY_PASSWORD
813 def getpass_bytes():
814 return KEY_PASSWORD.encode()
815 def getpass_bytearray():
816 return bytearray(KEY_PASSWORD.encode())
817 def getpass_badpass():
818 return "badpass"
819 def getpass_huge():
820 return b'a' * (1024 * 1024)
821 def getpass_bad_type():
822 return 9
823 def getpass_exception():
824 raise Exception('getpass error')
825 class GetPassCallable:
826 def __call__(self):
827 return KEY_PASSWORD
828 def getpass(self):
829 return KEY_PASSWORD
830 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
831 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
832 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
833 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
834 ctx.load_cert_chain(CERTFILE_PROTECTED,
835 password=GetPassCallable().getpass)
836 with self.assertRaises(ssl.SSLError):
837 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
838 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
839 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
840 with self.assertRaisesRegexp(TypeError, "must return a string"):
841 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
842 with self.assertRaisesRegexp(Exception, "getpass error"):
843 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
844 # Make sure the password function isn't called if it isn't needed
845 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
846
847 def test_load_verify_locations(self):
848 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
849 ctx.load_verify_locations(CERTFILE)
850 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
851 ctx.load_verify_locations(BYTES_CERTFILE)
852 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400853 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500854 self.assertRaises(TypeError, ctx.load_verify_locations)
855 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
856 with self.assertRaises(IOError) as cm:
857 ctx.load_verify_locations(WRONGCERT)
858 self.assertEqual(cm.exception.errno, errno.ENOENT)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400859 with self.assertRaises(IOError):
860 ctx.load_verify_locations(u'')
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500861 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
862 ctx.load_verify_locations(BADCERT)
863 ctx.load_verify_locations(CERTFILE, CAPATH)
864 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
865
866 # Issue #10989: crash if the second argument type is invalid
867 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
868
869 def test_load_verify_cadata(self):
870 # test cadata
871 with open(CAFILE_CACERT) as f:
872 cacert_pem = f.read().decode("ascii")
873 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
874 with open(CAFILE_NEURONIO) as f:
875 neuronio_pem = f.read().decode("ascii")
876 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
877
878 # test PEM
879 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
880 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
881 ctx.load_verify_locations(cadata=cacert_pem)
882 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
883 ctx.load_verify_locations(cadata=neuronio_pem)
884 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
885 # cert already in hash table
886 ctx.load_verify_locations(cadata=neuronio_pem)
887 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
888
889 # combined
890 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
891 combined = "\n".join((cacert_pem, neuronio_pem))
892 ctx.load_verify_locations(cadata=combined)
893 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
894
895 # with junk around the certs
896 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
897 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
898 neuronio_pem, "tail"]
899 ctx.load_verify_locations(cadata="\n".join(combined))
900 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
901
902 # test DER
903 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
904 ctx.load_verify_locations(cadata=cacert_der)
905 ctx.load_verify_locations(cadata=neuronio_der)
906 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
907 # cert already in hash table
908 ctx.load_verify_locations(cadata=cacert_der)
909 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
910
911 # combined
912 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
913 combined = b"".join((cacert_der, neuronio_der))
914 ctx.load_verify_locations(cadata=combined)
915 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
916
917 # error cases
918 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
919 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
920
921 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
922 ctx.load_verify_locations(cadata=u"broken")
923 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
924 ctx.load_verify_locations(cadata=b"broken")
925
926
927 def test_load_dh_params(self):
928 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
929 ctx.load_dh_params(DHFILE)
930 if os.name != 'nt':
931 ctx.load_dh_params(BYTES_DHFILE)
932 self.assertRaises(TypeError, ctx.load_dh_params)
933 self.assertRaises(TypeError, ctx.load_dh_params, None)
934 with self.assertRaises(IOError) as cm:
935 ctx.load_dh_params(WRONGCERT)
936 self.assertEqual(cm.exception.errno, errno.ENOENT)
937 with self.assertRaises(ssl.SSLError) as cm:
938 ctx.load_dh_params(CERTFILE)
939
940 @skip_if_broken_ubuntu_ssl
941 def test_session_stats(self):
942 for proto in PROTOCOLS:
943 ctx = ssl.SSLContext(proto)
944 self.assertEqual(ctx.session_stats(), {
945 'number': 0,
946 'connect': 0,
947 'connect_good': 0,
948 'connect_renegotiate': 0,
949 'accept': 0,
950 'accept_good': 0,
951 'accept_renegotiate': 0,
952 'hits': 0,
953 'misses': 0,
954 'timeouts': 0,
955 'cache_full': 0,
956 })
957
958 def test_set_default_verify_paths(self):
959 # There's not much we can do to test that it acts as expected,
960 # so just check it doesn't crash or raise an exception.
961 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
962 ctx.set_default_verify_paths()
963
964 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
965 def test_set_ecdh_curve(self):
966 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
967 ctx.set_ecdh_curve("prime256v1")
968 ctx.set_ecdh_curve(b"prime256v1")
969 self.assertRaises(TypeError, ctx.set_ecdh_curve)
970 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
971 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
972 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
973
974 @needs_sni
975 def test_sni_callback(self):
976 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
977
978 # set_servername_callback expects a callable, or None
979 self.assertRaises(TypeError, ctx.set_servername_callback)
980 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
981 self.assertRaises(TypeError, ctx.set_servername_callback, "")
982 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
983
984 def dummycallback(sock, servername, ctx):
985 pass
986 ctx.set_servername_callback(None)
987 ctx.set_servername_callback(dummycallback)
988
989 @needs_sni
990 def test_sni_callback_refcycle(self):
991 # Reference cycles through the servername callback are detected
992 # and cleared.
993 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
994 def dummycallback(sock, servername, ctx, cycle=ctx):
995 pass
996 ctx.set_servername_callback(dummycallback)
997 wr = weakref.ref(ctx)
998 del ctx, dummycallback
999 gc.collect()
1000 self.assertIs(wr(), None)
1001
1002 def test_cert_store_stats(self):
1003 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1004 self.assertEqual(ctx.cert_store_stats(),
1005 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1006 ctx.load_cert_chain(CERTFILE)
1007 self.assertEqual(ctx.cert_store_stats(),
1008 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1009 ctx.load_verify_locations(CERTFILE)
1010 self.assertEqual(ctx.cert_store_stats(),
1011 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1012 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1013 self.assertEqual(ctx.cert_store_stats(),
1014 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1015
1016 def test_get_ca_certs(self):
1017 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1018 self.assertEqual(ctx.get_ca_certs(), [])
1019 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1020 ctx.load_verify_locations(CERTFILE)
1021 self.assertEqual(ctx.get_ca_certs(), [])
1022 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1023 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1024 self.assertEqual(ctx.get_ca_certs(),
1025 [{'issuer': ((('organizationName', 'Root CA'),),
1026 (('organizationalUnitName', 'http://www.cacert.org'),),
1027 (('commonName', 'CA Cert Signing Authority'),),
1028 (('emailAddress', 'support@cacert.org'),)),
1029 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1030 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1031 'serialNumber': '00',
1032 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1033 'subject': ((('organizationName', 'Root CA'),),
1034 (('organizationalUnitName', 'http://www.cacert.org'),),
1035 (('commonName', 'CA Cert Signing Authority'),),
1036 (('emailAddress', 'support@cacert.org'),)),
1037 'version': 3}])
1038
1039 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1040 pem = f.read()
1041 der = ssl.PEM_cert_to_DER_cert(pem)
1042 self.assertEqual(ctx.get_ca_certs(True), [der])
1043
1044 def test_load_default_certs(self):
1045 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1046 ctx.load_default_certs()
1047
1048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1049 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1050 ctx.load_default_certs()
1051
1052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1053 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1054
1055 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1056 self.assertRaises(TypeError, ctx.load_default_certs, None)
1057 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1058
1059 def test_create_default_context(self):
1060 ctx = ssl.create_default_context()
1061 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1062 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1063 self.assertTrue(ctx.check_hostname)
1064 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1065 self.assertEqual(
1066 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1067 getattr(ssl, "OP_NO_COMPRESSION", 0),
1068 )
1069
1070 with open(SIGNING_CA) as f:
1071 cadata = f.read().decode("ascii")
1072 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1073 cadata=cadata)
1074 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1075 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1076 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1077 self.assertEqual(
1078 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1079 getattr(ssl, "OP_NO_COMPRESSION", 0),
1080 )
1081
1082 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1083 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1084 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1085 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1086 self.assertEqual(
1087 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1088 getattr(ssl, "OP_NO_COMPRESSION", 0),
1089 )
1090 self.assertEqual(
1091 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1092 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1093 )
1094 self.assertEqual(
1095 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1096 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1097 )
1098
1099 def test__create_stdlib_context(self):
1100 ctx = ssl._create_stdlib_context()
1101 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1102 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1103 self.assertFalse(ctx.check_hostname)
1104 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1105
1106 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1107 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1108 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1109 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1110
1111 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1112 cert_reqs=ssl.CERT_REQUIRED,
1113 check_hostname=True)
1114 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1115 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1116 self.assertTrue(ctx.check_hostname)
1117 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1118
1119 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1120 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1121 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1122 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1123
1124 def test_check_hostname(self):
1125 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1126 self.assertFalse(ctx.check_hostname)
1127
1128 # Requires CERT_REQUIRED or CERT_OPTIONAL
1129 with self.assertRaises(ValueError):
1130 ctx.check_hostname = True
1131 ctx.verify_mode = ssl.CERT_REQUIRED
1132 self.assertFalse(ctx.check_hostname)
1133 ctx.check_hostname = True
1134 self.assertTrue(ctx.check_hostname)
1135
1136 ctx.verify_mode = ssl.CERT_OPTIONAL
1137 ctx.check_hostname = True
1138 self.assertTrue(ctx.check_hostname)
1139
1140 # Cannot set CERT_NONE with check_hostname enabled
1141 with self.assertRaises(ValueError):
1142 ctx.verify_mode = ssl.CERT_NONE
1143 ctx.check_hostname = False
1144 self.assertFalse(ctx.check_hostname)
1145
1146
1147class SSLErrorTests(unittest.TestCase):
1148
1149 def test_str(self):
1150 # The str() of a SSLError doesn't include the errno
1151 e = ssl.SSLError(1, "foo")
1152 self.assertEqual(str(e), "foo")
1153 self.assertEqual(e.errno, 1)
1154 # Same for a subclass
1155 e = ssl.SSLZeroReturnError(1, "foo")
1156 self.assertEqual(str(e), "foo")
1157 self.assertEqual(e.errno, 1)
1158
1159 def test_lib_reason(self):
1160 # Test the library and reason attributes
1161 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1162 with self.assertRaises(ssl.SSLError) as cm:
1163 ctx.load_dh_params(CERTFILE)
1164 self.assertEqual(cm.exception.library, 'PEM')
1165 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1166 s = str(cm.exception)
1167 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1168
1169 def test_subclass(self):
1170 # Check that the appropriate SSLError subclass is raised
1171 # (this only tests one of them)
1172 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1173 with closing(socket.socket()) as s:
1174 s.bind(("127.0.0.1", 0))
1175 s.listen(5)
1176 c = socket.socket()
1177 c.connect(s.getsockname())
1178 c.setblocking(False)
1179 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1180 with self.assertRaises(ssl.SSLWantReadError) as cm:
1181 c.do_handshake()
1182 s = str(cm.exception)
1183 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1184 # For compatibility
1185 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001186
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001187
Bill Janssen934b16d2008-06-28 22:19:33 +00001188class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001189
Antoine Pitrou3945c862010-04-28 21:11:01 +00001190 def test_connect(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001191 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001192 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1193 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001194 try:
1195 s.connect(("svn.python.org", 443))
1196 self.assertEqual({}, s.getpeercert())
1197 finally:
1198 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001199
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001200 # this should fail because we have no verification certs
1201 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1202 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001203 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1204 s.connect, ("svn.python.org", 443))
1205 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001206
1207 # this should succeed because we specify the root cert
1208 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1209 cert_reqs=ssl.CERT_REQUIRED,
1210 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1211 try:
1212 s.connect(("svn.python.org", 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001213 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001214 finally:
1215 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001216
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001217 def test_connect_ex(self):
1218 # Issue #11326: check connect_ex() implementation
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001219 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001220 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1221 cert_reqs=ssl.CERT_REQUIRED,
1222 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1223 try:
1224 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1225 self.assertTrue(s.getpeercert())
1226 finally:
1227 s.close()
1228
1229 def test_non_blocking_connect_ex(self):
1230 # Issue #11326: non-blocking connect_ex() should allow handshake
1231 # to proceed after the socket gets ready.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001232 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001233 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1234 cert_reqs=ssl.CERT_REQUIRED,
1235 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1236 do_handshake_on_connect=False)
1237 try:
1238 s.setblocking(False)
1239 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001240 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1241 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001242 # Wait for connect to finish
1243 select.select([], [s], [], 5.0)
1244 # Non-blocking handshake
1245 while True:
1246 try:
1247 s.do_handshake()
1248 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001249 except ssl.SSLWantReadError:
1250 select.select([s], [], [], 5.0)
1251 except ssl.SSLWantWriteError:
1252 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001253 # SSL established
1254 self.assertTrue(s.getpeercert())
1255 finally:
1256 s.close()
1257
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001258 def test_timeout_connect_ex(self):
1259 # Issue #12065: on a timeout, connect_ex() should return the original
1260 # errno (mimicking the behaviour of non-SSL sockets).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001261 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001262 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1263 cert_reqs=ssl.CERT_REQUIRED,
1264 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1265 do_handshake_on_connect=False)
1266 try:
1267 s.settimeout(0.0000001)
1268 rc = s.connect_ex(('svn.python.org', 443))
1269 if rc == 0:
1270 self.skipTest("svn.python.org responded too quickly")
1271 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1272 finally:
1273 s.close()
1274
1275 def test_connect_ex_error(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001276 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001277 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1278 cert_reqs=ssl.CERT_REQUIRED,
1279 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1280 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001281 rc = s.connect_ex(("svn.python.org", 444))
1282 # Issue #19919: Windows machines or VMs hosted on Windows
1283 # machines sometimes return EWOULDBLOCK.
1284 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001285 finally:
1286 s.close()
1287
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001288 def test_connect_with_context(self):
1289 with support.transient_internet("svn.python.org"):
1290 # Same as test_connect, but with a separately created context
1291 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1292 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1293 s.connect(("svn.python.org", 443))
1294 try:
1295 self.assertEqual({}, s.getpeercert())
1296 finally:
1297 s.close()
1298 # Same with a server hostname
1299 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1300 server_hostname="svn.python.org")
1301 if ssl.HAS_SNI:
1302 s.connect(("svn.python.org", 443))
1303 s.close()
1304 else:
1305 self.assertRaises(ValueError, s.connect, ("svn.python.org", 443))
1306 # This should fail because we have no verification certs
1307 ctx.verify_mode = ssl.CERT_REQUIRED
1308 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1309 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1310 s.connect, ("svn.python.org", 443))
1311 s.close()
1312 # This should succeed because we specify the root cert
1313 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1314 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1315 s.connect(("svn.python.org", 443))
1316 try:
1317 cert = s.getpeercert()
1318 self.assertTrue(cert)
1319 finally:
1320 s.close()
1321
1322 def test_connect_capath(self):
1323 # Verify server certificates using the `capath` argument
1324 # NOTE: the subject hashing algorithm has been changed between
1325 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1326 # contain both versions of each certificate (same content, different
1327 # filename) for this test to be portable across OpenSSL releases.
1328 with support.transient_internet("svn.python.org"):
1329 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1330 ctx.verify_mode = ssl.CERT_REQUIRED
1331 ctx.load_verify_locations(capath=CAPATH)
1332 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1333 s.connect(("svn.python.org", 443))
1334 try:
1335 cert = s.getpeercert()
1336 self.assertTrue(cert)
1337 finally:
1338 s.close()
1339 # Same with a bytes `capath` argument
1340 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1341 ctx.verify_mode = ssl.CERT_REQUIRED
1342 ctx.load_verify_locations(capath=BYTES_CAPATH)
1343 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1344 s.connect(("svn.python.org", 443))
1345 try:
1346 cert = s.getpeercert()
1347 self.assertTrue(cert)
1348 finally:
1349 s.close()
1350
1351 def test_connect_cadata(self):
1352 with open(CAFILE_CACERT) as f:
1353 pem = f.read().decode('ascii')
1354 der = ssl.PEM_cert_to_DER_cert(pem)
1355 with support.transient_internet("svn.python.org"):
1356 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1357 ctx.verify_mode = ssl.CERT_REQUIRED
1358 ctx.load_verify_locations(cadata=pem)
1359 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1360 s.connect(("svn.python.org", 443))
1361 cert = s.getpeercert()
1362 self.assertTrue(cert)
1363
1364 # same with DER
1365 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1366 ctx.verify_mode = ssl.CERT_REQUIRED
1367 ctx.load_verify_locations(cadata=der)
1368 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1369 s.connect(("svn.python.org", 443))
1370 cert = s.getpeercert()
1371 self.assertTrue(cert)
1372
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001373 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1374 def test_makefile_close(self):
1375 # Issue #5238: creating a file-like object with makefile() shouldn't
1376 # delay closing the underlying "real socket" (here tested with its
1377 # file descriptor, hence skipping the test under Windows).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001378 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001379 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1380 ss.connect(("svn.python.org", 443))
1381 fd = ss.fileno()
1382 f = ss.makefile()
1383 f.close()
1384 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001385 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001386 # Closing the SSL socket should close the fd too
1387 ss.close()
1388 gc.collect()
1389 with self.assertRaises(OSError) as e:
1390 os.read(fd, 0)
1391 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001392
Antoine Pitrou3945c862010-04-28 21:11:01 +00001393 def test_non_blocking_handshake(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001394 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001395 s = socket.socket(socket.AF_INET)
1396 s.connect(("svn.python.org", 443))
1397 s.setblocking(False)
1398 s = ssl.wrap_socket(s,
1399 cert_reqs=ssl.CERT_NONE,
1400 do_handshake_on_connect=False)
1401 count = 0
1402 while True:
1403 try:
1404 count += 1
1405 s.do_handshake()
1406 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001407 except ssl.SSLWantReadError:
1408 select.select([s], [], [])
1409 except ssl.SSLWantWriteError:
1410 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001411 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001412 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001413 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001414
Antoine Pitrou3945c862010-04-28 21:11:01 +00001415 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001416 def _test_get_server_certificate(host, port, cert=None):
1417 with support.transient_internet(host):
1418 pem = ssl.get_server_certificate((host, port))
1419 if not pem:
1420 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001421
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001422 try:
1423 pem = ssl.get_server_certificate((host, port),
1424 ca_certs=CERTFILE)
1425 except ssl.SSLError as x:
1426 #should fail
1427 if support.verbose:
1428 sys.stdout.write("%s\n" % x)
1429 else:
1430 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001431
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001432 pem = ssl.get_server_certificate((host, port),
1433 ca_certs=cert)
1434 if not pem:
1435 self.fail("No server certificate on %s:%s!" % (host, port))
1436 if support.verbose:
1437 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1438
1439 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1440 if support.IPV6_ENABLED:
1441 _test_get_server_certificate('ipv6.google.com', 443)
1442
1443 def test_ciphers(self):
1444 remote = ("svn.python.org", 443)
1445 with support.transient_internet(remote[0]):
1446 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1447 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1448 s.connect(remote)
1449 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1450 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1451 s.connect(remote)
1452 # Error checking can happen at instantiation or when connecting
1453 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1454 with closing(socket.socket(socket.AF_INET)) as sock:
1455 s = ssl.wrap_socket(sock,
1456 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1457 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001458
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001459 def test_algorithms(self):
1460 # Issue #8484: all algorithms should be available when verifying a
1461 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001462 # SHA256 was added in OpenSSL 0.9.8
1463 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1464 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001465 # sha256.tbs-internet.com needs SNI to use the correct certificate
1466 if not ssl.HAS_SNI:
1467 self.skipTest("SNI needed for this test")
1468 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001469 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001470 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001471 with support.transient_internet("sha256.tbs-internet.com"):
1472 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1473 ctx.verify_mode = ssl.CERT_REQUIRED
1474 ctx.load_verify_locations(sha256_cert)
1475 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1476 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001477 try:
1478 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001479 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001480 sys.stdout.write("\nCipher with %r is %r\n" %
1481 (remote, s.cipher()))
1482 sys.stdout.write("Certificate is:\n%s\n" %
1483 pprint.pformat(s.getpeercert()))
1484 finally:
1485 s.close()
1486
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001487 def test_get_ca_certs_capath(self):
1488 # capath certs are loaded on request
1489 with support.transient_internet("svn.python.org"):
1490 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1491 ctx.verify_mode = ssl.CERT_REQUIRED
1492 ctx.load_verify_locations(capath=CAPATH)
1493 self.assertEqual(ctx.get_ca_certs(), [])
1494 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1495 s.connect(("svn.python.org", 443))
1496 try:
1497 cert = s.getpeercert()
1498 self.assertTrue(cert)
1499 finally:
1500 s.close()
1501 self.assertEqual(len(ctx.get_ca_certs()), 1)
1502
1503 @needs_sni
1504 def test_context_setget(self):
1505 # Check that the context of a connected socket can be replaced.
1506 with support.transient_internet("svn.python.org"):
1507 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1508 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1509 s = socket.socket(socket.AF_INET)
1510 with closing(ctx1.wrap_socket(s)) as ss:
1511 ss.connect(("svn.python.org", 443))
1512 self.assertIs(ss.context, ctx1)
1513 self.assertIs(ss._sslobj.context, ctx1)
1514 ss.context = ctx2
1515 self.assertIs(ss.context, ctx2)
1516 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001517
Bill Janssen98d19da2007-09-10 21:51:02 +00001518try:
1519 import threading
1520except ImportError:
1521 _have_threads = False
1522else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001523 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001524
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001525 from test.ssl_servers import make_https_server
1526
Bill Janssen98d19da2007-09-10 21:51:02 +00001527 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001528
Bill Janssen98d19da2007-09-10 21:51:02 +00001529 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001530
Bill Janssen98d19da2007-09-10 21:51:02 +00001531 """A mildly complicated class, because we want it to work both
1532 with and without the SSL wrapper around the socket connection, so
1533 that we can test the STARTTLS functionality."""
1534
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001535 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001536 self.server = server
1537 self.running = False
1538 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001539 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001540 self.sock.setblocking(1)
1541 self.sslconn = None
1542 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001543 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001544
Antoine Pitrou3945c862010-04-28 21:11:01 +00001545 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001546 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001547 self.sslconn = self.server.context.wrap_socket(
1548 self.sock, server_side=True)
1549 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
1550 except socket.error as e:
1551 # We treat ConnectionResetError as though it were an
1552 # SSLError - OpenSSL on Ubuntu abruptly closes the
1553 # connection when asked to use an unsupported protocol.
1554 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001555 # XXX Various errors can have happened here, for example
1556 # a mismatching protocol version, an invalid certificate,
1557 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001558 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1559 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001560 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001561 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001562 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001563 self.running = False
1564 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001565 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001566 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001567 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001568 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1569 cert = self.sslconn.getpeercert()
1570 if support.verbose and self.server.chatty:
1571 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1572 cert_binary = self.sslconn.getpeercert(True)
1573 if support.verbose and self.server.chatty:
1574 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1575 cipher = self.sslconn.cipher()
1576 if support.verbose and self.server.chatty:
1577 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1578 sys.stdout.write(" server: selected protocol is now "
1579 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001580 return True
1581
1582 def read(self):
1583 if self.sslconn:
1584 return self.sslconn.read()
1585 else:
1586 return self.sock.recv(1024)
1587
1588 def write(self, bytes):
1589 if self.sslconn:
1590 return self.sslconn.write(bytes)
1591 else:
1592 return self.sock.send(bytes)
1593
1594 def close(self):
1595 if self.sslconn:
1596 self.sslconn.close()
1597 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001598 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001599
Antoine Pitrou3945c862010-04-28 21:11:01 +00001600 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001601 self.running = True
1602 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001603 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001604 return
1605 while self.running:
1606 try:
1607 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001608 stripped = msg.strip()
1609 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001610 # eof, so quit this handler
1611 self.running = False
1612 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001613 elif stripped == b'over':
1614 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001615 sys.stdout.write(" server: client closed connection\n")
1616 self.close()
1617 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001618 elif (self.server.starttls_server and
1619 stripped == b'STARTTLS'):
1620 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001621 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001622 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001623 if not self.wrap_conn():
1624 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001625 elif (self.server.starttls_server and self.sslconn
1626 and stripped == b'ENDTLS'):
1627 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001628 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001629 self.write(b"OK\n")
1630 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001631 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001632 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001633 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001634 elif stripped == b'CB tls-unique':
1635 if support.verbose and self.server.connectionchatty:
1636 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1637 data = self.sslconn.get_channel_binding("tls-unique")
1638 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001639 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001640 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001641 self.server.connectionchatty):
1642 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001643 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1644 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001645 self.write(msg.lower())
1646 except ssl.SSLError:
1647 if self.server.chatty:
1648 handle_error("Test server failure:\n")
1649 self.close()
1650 self.running = False
1651 # normally, we'd just stop here, but for the test
1652 # harness, we want to stop the server
1653 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001654
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001655 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001656 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001657 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001658 npn_protocols=None, ciphers=None, context=None):
1659 if context:
1660 self.context = context
1661 else:
1662 self.context = ssl.SSLContext(ssl_version
1663 if ssl_version is not None
1664 else ssl.PROTOCOL_TLSv1)
1665 self.context.verify_mode = (certreqs if certreqs is not None
1666 else ssl.CERT_NONE)
1667 if cacerts:
1668 self.context.load_verify_locations(cacerts)
1669 if certificate:
1670 self.context.load_cert_chain(certificate)
1671 if npn_protocols:
1672 self.context.set_npn_protocols(npn_protocols)
1673 if ciphers:
1674 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001675 self.chatty = chatty
1676 self.connectionchatty = connectionchatty
1677 self.starttls_server = starttls_server
1678 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001679 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001680 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001681 self.active = False
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001682 self.selected_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001683 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001684 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001685 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001686
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001687 def __enter__(self):
1688 self.start(threading.Event())
1689 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001690 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001691
1692 def __exit__(self, *args):
1693 self.stop()
1694 self.join()
1695
Antoine Pitrou3945c862010-04-28 21:11:01 +00001696 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001697 self.flag = flag
1698 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001699
Antoine Pitrou3945c862010-04-28 21:11:01 +00001700 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001701 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001702 self.sock.listen(5)
1703 self.active = True
1704 if self.flag:
1705 # signal an event
1706 self.flag.set()
1707 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001708 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001709 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001710 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001711 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001712 + repr(connaddr) + '\n')
1713 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001714 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001715 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001716 except socket.timeout:
1717 pass
1718 except KeyboardInterrupt:
1719 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001720 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001721
Antoine Pitrou3945c862010-04-28 21:11:01 +00001722 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001723 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001724
Bill Janssen934b16d2008-06-28 22:19:33 +00001725 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001726
Antoine Pitrou3945c862010-04-28 21:11:01 +00001727 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001728
Antoine Pitrou3945c862010-04-28 21:11:01 +00001729 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001730
1731 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001732 self.socket = ssl.wrap_socket(conn, server_side=True,
1733 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001734 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001735 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001736 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001737 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001738
1739 def readable(self):
1740 if isinstance(self.socket, ssl.SSLSocket):
1741 while self.socket.pending() > 0:
1742 self.handle_read_event()
1743 return True
1744
Antoine Pitroufc69af12010-04-24 20:04:58 +00001745 def _do_ssl_handshake(self):
1746 try:
1747 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001748 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1749 return
1750 except ssl.SSLEOFError:
1751 return self.handle_close()
1752 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001753 raise
1754 except socket.error, err:
1755 if err.args[0] == errno.ECONNABORTED:
1756 return self.handle_close()
1757 else:
1758 self._ssl_accepting = False
1759
Bill Janssen934b16d2008-06-28 22:19:33 +00001760 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001761 if self._ssl_accepting:
1762 self._do_ssl_handshake()
1763 else:
1764 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001765 if support.verbose:
1766 sys.stdout.write(" server: read %s from client\n" % repr(data))
1767 if not data:
1768 self.close()
1769 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001770 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001771
1772 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001773 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001774 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001775 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1776
1777 def handle_error(self):
1778 raise
1779
1780 def __init__(self, certfile):
1781 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001782 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1783 self.port = support.bind_port(sock, '')
1784 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001785 self.listen(5)
1786
1787 def handle_accept(self):
1788 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001789 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001790 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1791 self.ConnectionHandler(sock_obj, self.certfile)
1792
1793 def handle_error(self):
1794 raise
1795
1796 def __init__(self, certfile):
1797 self.flag = None
1798 self.active = False
1799 self.server = self.EchoServer(certfile)
1800 self.port = self.server.port
1801 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001802 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001803
1804 def __str__(self):
1805 return "<%s %s>" % (self.__class__.__name__, self.server)
1806
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001807 def __enter__(self):
1808 self.start(threading.Event())
1809 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001810 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001811
1812 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001813 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001814 sys.stdout.write(" cleanup: stopping server.\n")
1815 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001816 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001817 sys.stdout.write(" cleanup: joining server thread.\n")
1818 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001819 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001820 sys.stdout.write(" cleanup: successfully joined.\n")
1821
Antoine Pitrou3945c862010-04-28 21:11:01 +00001822 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001823 self.flag = flag
1824 threading.Thread.start(self)
1825
Antoine Pitrou3945c862010-04-28 21:11:01 +00001826 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001827 self.active = True
1828 if self.flag:
1829 self.flag.set()
1830 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001831 try:
1832 asyncore.loop(1)
1833 except:
1834 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001835
Antoine Pitrou3945c862010-04-28 21:11:01 +00001836 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001837 self.active = False
1838 self.server.close()
1839
Antoine Pitrou3945c862010-04-28 21:11:01 +00001840 def bad_cert_test(certfile):
1841 """
1842 Launch a server with CERT_REQUIRED, and check that trying to
1843 connect to it with the given client certificate fails.
1844 """
Trent Nelsone41b0062008-04-08 23:47:30 +00001845 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00001846 certreqs=ssl.CERT_REQUIRED,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001847 cacerts=CERTFILE, chatty=False,
1848 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001849 with server:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001850 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001851 with closing(socket.socket()) as sock:
1852 s = ssl.wrap_socket(sock,
1853 certfile=certfile,
1854 ssl_version=ssl.PROTOCOL_TLSv1)
1855 s.connect((HOST, server.port))
1856 except ssl.SSLError as x:
1857 if support.verbose:
1858 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1859 except OSError as x:
1860 if support.verbose:
1861 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1862 except OSError as x:
1863 if x.errno != errno.ENOENT:
1864 raise
1865 if support.verbose:
1866 sys.stdout.write("\OSError is %s\n" % str(x))
Bill Janssen98d19da2007-09-10 21:51:02 +00001867 else:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001868 raise AssertionError("Use of invalid cert should have failed!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001869
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001870 def server_params_test(client_context, server_context, indata=b"FOO\n",
1871 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001872 """
1873 Launch a server, connect a client to it and try various reads
1874 and writes.
1875 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001876 stats = {}
1877 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001878 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001879 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001880 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001881 with closing(client_context.wrap_socket(socket.socket(),
1882 server_hostname=sni_name)) as s:
1883 s.connect((HOST, server.port))
1884 for arg in [indata, bytearray(indata), memoryview(indata)]:
1885 if connectionchatty:
1886 if support.verbose:
1887 sys.stdout.write(
1888 " client: sending %r...\n" % indata)
1889 s.write(arg)
1890 outdata = s.read()
1891 if connectionchatty:
1892 if support.verbose:
1893 sys.stdout.write(" client: read %r\n" % outdata)
1894 if outdata != indata.lower():
1895 raise AssertionError(
1896 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1897 % (outdata[:20], len(outdata),
1898 indata[:20].lower(), len(indata)))
1899 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001900 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001901 if support.verbose:
1902 sys.stdout.write(" client: closing connection.\n")
1903 stats.update({
1904 'compression': s.compression(),
1905 'cipher': s.cipher(),
1906 'peercert': s.getpeercert(),
Alex Gaynore98205d2014-09-04 13:33:22 -07001907 'client_npn_protocol': s.selected_npn_protocol(),
1908 'version': s.version(),
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001909 })
1910 s.close()
1911 stats['server_npn_protocols'] = server.selected_protocols
1912 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001913
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001914 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1915 certsreqs=None, server_options=0, client_options=0):
Alex Gaynore98205d2014-09-04 13:33:22 -07001916 """
1917 Try to SSL-connect using *client_protocol* to *server_protocol*.
1918 If *expect_success* is true, assert that the connection succeeds,
1919 if it's false, assert that the connection fails.
1920 Also, if *expect_success* is a string, assert that it is the protocol
1921 version actually used by the connection.
1922 """
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00001923 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00001924 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00001925 certtype = {
1926 ssl.CERT_NONE: "CERT_NONE",
1927 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1928 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1929 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001930 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001931 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00001932 sys.stdout.write(formatstr %
1933 (ssl.get_protocol_name(client_protocol),
1934 ssl.get_protocol_name(server_protocol),
1935 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001936 client_context = ssl.SSLContext(client_protocol)
1937 client_context.options |= client_options
1938 server_context = ssl.SSLContext(server_protocol)
1939 server_context.options |= server_options
1940
1941 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1942 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1943 # starting from OpenSSL 1.0.0 (see issue #8322).
1944 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1945 client_context.set_ciphers("ALL")
1946
1947 for ctx in (client_context, server_context):
1948 ctx.verify_mode = certsreqs
1949 ctx.load_cert_chain(CERTFILE)
1950 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00001951 try:
Alex Gaynore98205d2014-09-04 13:33:22 -07001952 stats = server_params_test(client_context, server_context,
1953 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001954 # Protocol mismatch can result in either an SSLError, or a
1955 # "Connection reset by peer" error.
1956 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001957 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00001958 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00001959 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001960 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00001961 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00001962 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001963 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001964 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00001965 "Client protocol %s succeeded with server protocol %s!"
1966 % (ssl.get_protocol_name(client_protocol),
1967 ssl.get_protocol_name(server_protocol)))
Alex Gaynore98205d2014-09-04 13:33:22 -07001968 elif (expect_success is not True
1969 and expect_success != stats['version']):
1970 raise AssertionError("version mismatch: expected %r, got %r"
1971 % (expect_success, stats['version']))
Bill Janssen98d19da2007-09-10 21:51:02 +00001972
1973
Bill Janssen934b16d2008-06-28 22:19:33 +00001974 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00001975
Antoine Pitroud75efd92010-08-04 17:38:33 +00001976 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00001977 def test_echo(self):
1978 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001979 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001980 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001981 for protocol in PROTOCOLS:
1982 context = ssl.SSLContext(protocol)
1983 context.load_cert_chain(CERTFILE)
1984 server_params_test(context, context,
1985 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00001986
Antoine Pitrou3945c862010-04-28 21:11:01 +00001987 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001988 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001989 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001990 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1991 context.verify_mode = ssl.CERT_REQUIRED
1992 context.load_verify_locations(CERTFILE)
1993 context.load_cert_chain(CERTFILE)
1994 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001995 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001996 s = context.wrap_socket(socket.socket(),
1997 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001998 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001999 # getpeercert() raise ValueError while the handshake isn't
2000 # done.
2001 with self.assertRaises(ValueError):
2002 s.getpeercert()
2003 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00002004 cert = s.getpeercert()
2005 self.assertTrue(cert, "Can't get peer certificate.")
2006 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002007 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002008 sys.stdout.write(pprint.pformat(cert) + '\n')
2009 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2010 if 'subject' not in cert:
2011 self.fail("No subject field in certificate: %s." %
2012 pprint.pformat(cert))
2013 if ((('organizationName', 'Python Software Foundation'),)
2014 not in cert['subject']):
2015 self.fail(
2016 "Missing or invalid 'organizationName' field in certificate subject; "
2017 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002018 self.assertIn('notBefore', cert)
2019 self.assertIn('notAfter', cert)
2020 before = ssl.cert_time_to_seconds(cert['notBefore'])
2021 after = ssl.cert_time_to_seconds(cert['notAfter'])
2022 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002023 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002024
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002025 @unittest.skipUnless(have_verify_flags(),
2026 "verify_flags need OpenSSL > 0.9.8")
2027 def test_crl_check(self):
2028 if support.verbose:
2029 sys.stdout.write("\n")
2030
2031 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2032 server_context.load_cert_chain(SIGNED_CERTFILE)
2033
2034 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2035 context.verify_mode = ssl.CERT_REQUIRED
2036 context.load_verify_locations(SIGNING_CA)
2037 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
2038
2039 # VERIFY_DEFAULT should pass
2040 server = ThreadedEchoServer(context=server_context, chatty=True)
2041 with server:
2042 with closing(context.wrap_socket(socket.socket())) as s:
2043 s.connect((HOST, server.port))
2044 cert = s.getpeercert()
2045 self.assertTrue(cert, "Can't get peer certificate.")
2046
2047 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2048 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2049
2050 server = ThreadedEchoServer(context=server_context, chatty=True)
2051 with server:
2052 with closing(context.wrap_socket(socket.socket())) as s:
2053 with self.assertRaisesRegexp(ssl.SSLError,
2054 "certificate verify failed"):
2055 s.connect((HOST, server.port))
2056
2057 # now load a CRL file. The CRL file is signed by the CA.
2058 context.load_verify_locations(CRLFILE)
2059
2060 server = ThreadedEchoServer(context=server_context, chatty=True)
2061 with server:
2062 with closing(context.wrap_socket(socket.socket())) as s:
2063 s.connect((HOST, server.port))
2064 cert = s.getpeercert()
2065 self.assertTrue(cert, "Can't get peer certificate.")
2066
2067 @needs_sni
2068 def test_check_hostname(self):
2069 if support.verbose:
2070 sys.stdout.write("\n")
2071
2072 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2073 server_context.load_cert_chain(SIGNED_CERTFILE)
2074
2075 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2076 context.verify_mode = ssl.CERT_REQUIRED
2077 context.check_hostname = True
2078 context.load_verify_locations(SIGNING_CA)
2079
2080 # correct hostname should verify
2081 server = ThreadedEchoServer(context=server_context, chatty=True)
2082 with server:
2083 with closing(context.wrap_socket(socket.socket(),
2084 server_hostname="localhost")) as s:
2085 s.connect((HOST, server.port))
2086 cert = s.getpeercert()
2087 self.assertTrue(cert, "Can't get peer certificate.")
2088
2089 # incorrect hostname should raise an exception
2090 server = ThreadedEchoServer(context=server_context, chatty=True)
2091 with server:
2092 with closing(context.wrap_socket(socket.socket(),
2093 server_hostname="invalid")) as s:
2094 with self.assertRaisesRegexp(ssl.CertificateError,
2095 "hostname 'invalid' doesn't match u?'localhost'"):
2096 s.connect((HOST, server.port))
2097
2098 # missing server_hostname arg should cause an exception, too
2099 server = ThreadedEchoServer(context=server_context, chatty=True)
2100 with server:
2101 with closing(socket.socket()) as s:
2102 with self.assertRaisesRegexp(ValueError,
2103 "check_hostname requires server_hostname"):
2104 context.wrap_socket(s)
2105
Antoine Pitrou3945c862010-04-28 21:11:01 +00002106 def test_empty_cert(self):
2107 """Connecting with an empty cert file"""
2108 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2109 "nullcert.pem"))
2110 def test_malformed_cert(self):
2111 """Connecting with a badly formatted certificate (syntax error)"""
2112 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2113 "badcert.pem"))
2114 def test_nonexisting_cert(self):
2115 """Connecting with a non-existing cert file"""
2116 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2117 "wrongcert.pem"))
2118 def test_malformed_key(self):
2119 """Connecting with a badly formatted key (syntax error)"""
2120 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2121 "badkey.pem"))
Bill Janssen98d19da2007-09-10 21:51:02 +00002122
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002123 def test_rude_shutdown(self):
2124 """A brutal shutdown of an SSL server should raise an OSError
2125 in the client when attempting handshake.
2126 """
2127 listener_ready = threading.Event()
2128 listener_gone = threading.Event()
2129
2130 s = socket.socket()
2131 port = support.bind_port(s, HOST)
2132
2133 # `listener` runs in a thread. It sits in an accept() until
2134 # the main thread connects. Then it rudely closes the socket,
2135 # and sets Event `listener_gone` to let the main thread know
2136 # the socket is gone.
2137 def listener():
2138 s.listen(5)
2139 listener_ready.set()
2140 newsock, addr = s.accept()
2141 newsock.close()
2142 s.close()
2143 listener_gone.set()
2144
2145 def connector():
2146 listener_ready.wait()
2147 with closing(socket.socket()) as c:
2148 c.connect((HOST, port))
2149 listener_gone.wait()
2150 try:
2151 ssl_sock = ssl.wrap_socket(c)
Benjamin Petersone208b572014-08-20 14:49:08 -05002152 except socket.error:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002153 pass
2154 else:
2155 self.fail('connecting to closed SSL socket should have failed')
2156
2157 t = threading.Thread(target=listener)
2158 t.start()
2159 try:
2160 connector()
2161 finally:
2162 t.join()
2163
Antoine Pitroud75efd92010-08-04 17:38:33 +00002164 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002165 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2166 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002167 def test_protocol_sslv2(self):
2168 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002169 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002170 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002171 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2172 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2173 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002174 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002175 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2176 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002177 # SSLv23 client with specific SSL options
2178 if no_sslv2_implies_sslv3_hello():
2179 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2180 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2181 client_options=ssl.OP_NO_SSLv2)
2182 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2183 client_options=ssl.OP_NO_SSLv3)
2184 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2185 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002186
Antoine Pitroud75efd92010-08-04 17:38:33 +00002187 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002188 def test_protocol_sslv23(self):
2189 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002190 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002191 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002192 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2193 try:
2194 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2195 except socket.error as x:
2196 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2197 if support.verbose:
2198 sys.stdout.write(
2199 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2200 % str(x))
Alex Gaynore98205d2014-09-04 13:33:22 -07002201 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou3945c862010-04-28 21:11:01 +00002202 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Alex Gaynore98205d2014-09-04 13:33:22 -07002203 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Bill Janssen98d19da2007-09-10 21:51:02 +00002204
Alex Gaynore98205d2014-09-04 13:33:22 -07002205 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002206 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Alex Gaynore98205d2014-09-04 13:33:22 -07002207 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002208
Alex Gaynore98205d2014-09-04 13:33:22 -07002209 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002210 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Alex Gaynore98205d2014-09-04 13:33:22 -07002211 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002212
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002213 # Server with specific SSL options
2214 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2215 server_options=ssl.OP_NO_SSLv3)
2216 # Will choose TLSv1
2217 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2218 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2219 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2220 server_options=ssl.OP_NO_TLSv1)
2221
2222
Antoine Pitroud75efd92010-08-04 17:38:33 +00002223 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002224 def test_protocol_sslv3(self):
2225 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002226 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002227 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002228 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2229 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2230 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002231 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2232 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002233 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2234 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002235 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002236 if no_sslv2_implies_sslv3_hello():
2237 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Alex Gaynore98205d2014-09-04 13:33:22 -07002238 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002239 client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002240
Antoine Pitroud75efd92010-08-04 17:38:33 +00002241 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002242 def test_protocol_tlsv1(self):
2243 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002244 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002245 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002246 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2247 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2248 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002249 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2250 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002251 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002252 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2253 client_options=ssl.OP_NO_TLSv1)
2254
2255 @skip_if_broken_ubuntu_ssl
2256 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2257 "TLS version 1.1 not supported.")
2258 def test_protocol_tlsv1_1(self):
2259 """Connecting to a TLSv1.1 server with various client options.
2260 Testing against older TLS versions."""
2261 if support.verbose:
2262 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002263 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002264 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2265 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2266 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2267 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2268 client_options=ssl.OP_NO_TLSv1_1)
2269
Alex Gaynore98205d2014-09-04 13:33:22 -07002270 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002271 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2272 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2273
2274
2275 @skip_if_broken_ubuntu_ssl
2276 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2277 "TLS version 1.2 not supported.")
2278 def test_protocol_tlsv1_2(self):
2279 """Connecting to a TLSv1.2 server with various client options.
2280 Testing against older TLS versions."""
2281 if support.verbose:
2282 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002283 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002284 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2285 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2286 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2287 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2288 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2289 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2290 client_options=ssl.OP_NO_TLSv1_2)
2291
Alex Gaynore98205d2014-09-04 13:33:22 -07002292 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002293 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2294 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2295 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2296 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002297
Antoine Pitrou3945c862010-04-28 21:11:01 +00002298 def test_starttls(self):
2299 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002300 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002301
Trent Nelsone41b0062008-04-08 23:47:30 +00002302 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002303 ssl_version=ssl.PROTOCOL_TLSv1,
2304 starttls_server=True,
2305 chatty=True,
2306 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002307 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002308 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002309 s = socket.socket()
2310 s.setblocking(1)
2311 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002312 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002313 sys.stdout.write("\n")
2314 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002315 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002316 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002317 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002318 if wrapped:
2319 conn.write(indata)
2320 outdata = conn.read()
2321 else:
2322 s.send(indata)
2323 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002324 msg = outdata.strip().lower()
2325 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002326 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002327 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002328 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002329 " client: read %r from server, starting TLS...\n"
2330 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002331 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2332 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002333 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002334 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002335 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002336 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002337 " client: read %r from server, ending TLS...\n"
2338 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002339 s = conn.unwrap()
2340 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002341 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002342 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002343 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002344 " client: read %r from server\n" % msg)
2345 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002346 sys.stdout.write(" client: closing connection.\n")
2347 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002348 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002349 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002350 s.send(b"over\n")
2351 if wrapped:
2352 conn.close()
2353 else:
2354 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002355
Antoine Pitrou3945c862010-04-28 21:11:01 +00002356 def test_socketserver(self):
2357 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002358 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002359 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002360 if support.verbose:
2361 sys.stdout.write('\n')
2362 with open(CERTFILE, 'rb') as f:
2363 d1 = f.read()
2364 d2 = ''
2365 # now fetch the same data from the HTTPS server
2366 url = 'https://%s:%d/%s' % (
2367 HOST, server.port, os.path.split(CERTFILE)[1])
2368 f = urllib.urlopen(url)
Bill Janssen296a59d2007-09-16 22:06:00 +00002369 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002370 dlen = f.info().getheader("content-length")
2371 if dlen and (int(dlen) > 0):
2372 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002373 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002374 sys.stdout.write(
2375 " client: read %d bytes from remote server '%s'\n"
2376 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002377 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002378 f.close()
2379 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002380
Antoine Pitrou3945c862010-04-28 21:11:01 +00002381 def test_asyncore_server(self):
2382 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002383 indata = "TEST MESSAGE of mixed case\n"
2384
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002385 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002386 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002387
2388 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002389 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002390 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002391 s = ssl.wrap_socket(socket.socket())
2392 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002393 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002394 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002395 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002396 s.write(indata)
2397 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002398 if support.verbose:
2399 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002400 if outdata != indata.lower():
2401 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002402 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2403 % (outdata[:20], len(outdata),
2404 indata[:20].lower(), len(indata)))
2405 s.write(b"over\n")
2406 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002407 sys.stdout.write(" client: closing connection.\n")
2408 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002409 if support.verbose:
2410 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002411
Antoine Pitrou3945c862010-04-28 21:11:01 +00002412 def test_recv_send(self):
2413 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002414 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002415 sys.stdout.write("\n")
2416
2417 server = ThreadedEchoServer(CERTFILE,
2418 certreqs=ssl.CERT_NONE,
2419 ssl_version=ssl.PROTOCOL_TLSv1,
2420 cacerts=CERTFILE,
2421 chatty=True,
2422 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002423 with server:
2424 s = ssl.wrap_socket(socket.socket(),
2425 server_side=False,
2426 certfile=CERTFILE,
2427 ca_certs=CERTFILE,
2428 cert_reqs=ssl.CERT_NONE,
2429 ssl_version=ssl.PROTOCOL_TLSv1)
2430 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002431 # helper methods for standardising recv* method signatures
2432 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002433 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002434 count = s.recv_into(b)
2435 return b[:count]
2436
2437 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002438 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002439 count, addr = s.recvfrom_into(b)
2440 return b[:count]
2441
2442 # (name, method, whether to expect success, *args)
2443 send_methods = [
2444 ('send', s.send, True, []),
2445 ('sendto', s.sendto, False, ["some.address"]),
2446 ('sendall', s.sendall, True, []),
2447 ]
2448 recv_methods = [
2449 ('recv', s.recv, True, []),
2450 ('recvfrom', s.recvfrom, False, ["some.address"]),
2451 ('recv_into', _recv_into, True, []),
2452 ('recvfrom_into', _recvfrom_into, False, []),
2453 ]
2454 data_prefix = u"PREFIX_"
2455
2456 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002457 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002458 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002459 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002460 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002461 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002462 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002463 "While sending with <<{name:s}>> bad data "
2464 "<<{outdata:r}>> ({nout:d}) received; "
2465 "expected <<{indata:r}>> ({nin:d})\n".format(
2466 name=meth_name, outdata=outdata[:20],
2467 nout=len(outdata),
2468 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002469 )
2470 )
2471 except ValueError as e:
2472 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002473 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002474 "Failed to send with method <<{name:s}>>; "
2475 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002476 )
2477 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002478 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002479 "Method <<{name:s}>> failed with unexpected "
2480 "exception message: {exp:s}\n".format(
2481 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002482 )
2483 )
2484
2485 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002486 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002487 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002488 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002489 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002490 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002491 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002492 "While receiving with <<{name:s}>> bad data "
2493 "<<{outdata:r}>> ({nout:d}) received; "
2494 "expected <<{indata:r}>> ({nin:d})\n".format(
2495 name=meth_name, outdata=outdata[:20],
2496 nout=len(outdata),
2497 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002498 )
2499 )
2500 except ValueError as e:
2501 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002502 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002503 "Failed to receive with method <<{name:s}>>; "
2504 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002505 )
2506 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002507 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002508 "Method <<{name:s}>> failed with unexpected "
2509 "exception message: {exp:s}\n".format(
2510 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002511 )
2512 )
2513 # consume data
2514 s.read()
2515
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002516 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002517 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002518
Antoine Pitroufc69af12010-04-24 20:04:58 +00002519 def test_handshake_timeout(self):
2520 # Issue #5103: SSL handshake must respect the socket timeout
2521 server = socket.socket(socket.AF_INET)
2522 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002523 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002524 started = threading.Event()
2525 finish = False
2526
2527 def serve():
2528 server.listen(5)
2529 started.set()
2530 conns = []
2531 while not finish:
2532 r, w, e = select.select([server], [], [], 0.1)
2533 if server in r:
2534 # Let the socket hang around rather than having
2535 # it closed by garbage collection.
2536 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002537 for sock in conns:
2538 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002539
2540 t = threading.Thread(target=serve)
2541 t.start()
2542 started.wait()
2543
2544 try:
2545 try:
2546 c = socket.socket(socket.AF_INET)
2547 c.settimeout(0.2)
2548 c.connect((host, port))
2549 # Will attempt handshake and time out
2550 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2551 ssl.wrap_socket, c)
2552 finally:
2553 c.close()
2554 try:
2555 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002556 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002557 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002558 # Will attempt handshake and time out
2559 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2560 c.connect, (host, port))
2561 finally:
2562 c.close()
2563 finally:
2564 finish = True
2565 t.join()
2566 server.close()
2567
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002568 def test_server_accept(self):
2569 # Issue #16357: accept() on a SSLSocket created through
2570 # SSLContext.wrap_socket().
2571 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2572 context.verify_mode = ssl.CERT_REQUIRED
2573 context.load_verify_locations(CERTFILE)
2574 context.load_cert_chain(CERTFILE)
2575 server = socket.socket(socket.AF_INET)
2576 host = "127.0.0.1"
2577 port = support.bind_port(server)
2578 server = context.wrap_socket(server, server_side=True)
2579
2580 evt = threading.Event()
2581 remote = [None]
2582 peer = [None]
2583 def serve():
2584 server.listen(5)
2585 # Block on the accept and wait on the connection to close.
2586 evt.set()
2587 remote[0], peer[0] = server.accept()
2588 remote[0].recv(1)
2589
2590 t = threading.Thread(target=serve)
2591 t.start()
2592 # Client wait until server setup and perform a connect.
2593 evt.wait()
2594 client = context.wrap_socket(socket.socket())
2595 client.connect((host, port))
2596 client_addr = client.getsockname()
2597 client.close()
2598 t.join()
2599 remote[0].close()
2600 server.close()
2601 # Sanity checks.
2602 self.assertIsInstance(remote[0], ssl.SSLSocket)
2603 self.assertEqual(peer[0], client_addr)
2604
2605 def test_getpeercert_enotconn(self):
2606 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2607 with closing(context.wrap_socket(socket.socket())) as sock:
2608 with self.assertRaises(socket.error) as cm:
2609 sock.getpeercert()
2610 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2611
2612 def test_do_handshake_enotconn(self):
2613 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2614 with closing(context.wrap_socket(socket.socket())) as sock:
2615 with self.assertRaises(socket.error) as cm:
2616 sock.do_handshake()
2617 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2618
Antoine Pitroud76088d2012-01-03 22:46:48 +01002619 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002620 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2621 try:
2622 # Force a set of weak ciphers on our client context
2623 context.set_ciphers("DES")
2624 except ssl.SSLError:
2625 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002626 with ThreadedEchoServer(CERTFILE,
2627 ssl_version=ssl.PROTOCOL_SSLv23,
2628 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002629 with closing(context.wrap_socket(socket.socket())) as s:
2630 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002631 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002632 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2633
Alex Gaynore98205d2014-09-04 13:33:22 -07002634 def test_version_basic(self):
2635 """
2636 Basic tests for SSLSocket.version().
2637 More tests are done in the test_protocol_*() methods.
2638 """
2639 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2640 with ThreadedEchoServer(CERTFILE,
2641 ssl_version=ssl.PROTOCOL_TLSv1,
2642 chatty=False) as server:
2643 with closing(context.wrap_socket(socket.socket())) as s:
2644 self.assertIs(s.version(), None)
2645 s.connect((HOST, server.port))
2646 self.assertEqual(s.version(), "TLSv1")
2647 self.assertIs(s.version(), None)
2648
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002649 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2650 def test_default_ecdh_curve(self):
2651 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2652 # should be enabled by default on SSL contexts.
2653 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2654 context.load_cert_chain(CERTFILE)
2655 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2656 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2657 # our default cipher list should prefer ECDH-based ciphers
2658 # automatically.
2659 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2660 context.set_ciphers("ECCdraft:ECDH")
2661 with ThreadedEchoServer(context=context) as server:
2662 with closing(context.wrap_socket(socket.socket())) as s:
2663 s.connect((HOST, server.port))
2664 self.assertIn("ECDH", s.cipher()[0])
2665
2666 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2667 "'tls-unique' channel binding not available")
2668 def test_tls_unique_channel_binding(self):
2669 """Test tls-unique channel binding."""
2670 if support.verbose:
2671 sys.stdout.write("\n")
2672
2673 server = ThreadedEchoServer(CERTFILE,
2674 certreqs=ssl.CERT_NONE,
2675 ssl_version=ssl.PROTOCOL_TLSv1,
2676 cacerts=CERTFILE,
2677 chatty=True,
2678 connectionchatty=False)
2679 with server:
2680 s = ssl.wrap_socket(socket.socket(),
2681 server_side=False,
2682 certfile=CERTFILE,
2683 ca_certs=CERTFILE,
2684 cert_reqs=ssl.CERT_NONE,
2685 ssl_version=ssl.PROTOCOL_TLSv1)
2686 s.connect((HOST, server.port))
2687 # get the data
2688 cb_data = s.get_channel_binding("tls-unique")
2689 if support.verbose:
2690 sys.stdout.write(" got channel binding data: {0!r}\n"
2691 .format(cb_data))
2692
2693 # check if it is sane
2694 self.assertIsNotNone(cb_data)
2695 self.assertEqual(len(cb_data), 12) # True for TLSv1
2696
2697 # and compare with the peers version
2698 s.write(b"CB tls-unique\n")
2699 peer_data_repr = s.read().strip()
2700 self.assertEqual(peer_data_repr,
2701 repr(cb_data).encode("us-ascii"))
2702 s.close()
2703
2704 # now, again
2705 s = ssl.wrap_socket(socket.socket(),
2706 server_side=False,
2707 certfile=CERTFILE,
2708 ca_certs=CERTFILE,
2709 cert_reqs=ssl.CERT_NONE,
2710 ssl_version=ssl.PROTOCOL_TLSv1)
2711 s.connect((HOST, server.port))
2712 new_cb_data = s.get_channel_binding("tls-unique")
2713 if support.verbose:
2714 sys.stdout.write(" got another channel binding data: {0!r}\n"
2715 .format(new_cb_data))
2716 # is it really unique
2717 self.assertNotEqual(cb_data, new_cb_data)
2718 self.assertIsNotNone(cb_data)
2719 self.assertEqual(len(cb_data), 12) # True for TLSv1
2720 s.write(b"CB tls-unique\n")
2721 peer_data_repr = s.read().strip()
2722 self.assertEqual(peer_data_repr,
2723 repr(new_cb_data).encode("us-ascii"))
2724 s.close()
2725
2726 def test_compression(self):
2727 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2728 context.load_cert_chain(CERTFILE)
2729 stats = server_params_test(context, context,
2730 chatty=True, connectionchatty=True)
2731 if support.verbose:
2732 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2733 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2734
2735 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2736 "ssl.OP_NO_COMPRESSION needed for this test")
2737 def test_compression_disabled(self):
2738 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2739 context.load_cert_chain(CERTFILE)
2740 context.options |= ssl.OP_NO_COMPRESSION
2741 stats = server_params_test(context, context,
2742 chatty=True, connectionchatty=True)
2743 self.assertIs(stats['compression'], None)
2744
2745 def test_dh_params(self):
2746 # Check we can get a connection with ephemeral Diffie-Hellman
2747 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2748 context.load_cert_chain(CERTFILE)
2749 context.load_dh_params(DHFILE)
2750 context.set_ciphers("kEDH")
2751 stats = server_params_test(context, context,
2752 chatty=True, connectionchatty=True)
2753 cipher = stats["cipher"][0]
2754 parts = cipher.split("-")
2755 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2756 self.fail("Non-DH cipher: " + cipher[0])
2757
2758 def test_selected_npn_protocol(self):
2759 # selected_npn_protocol() is None unless NPN is used
2760 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2761 context.load_cert_chain(CERTFILE)
2762 stats = server_params_test(context, context,
2763 chatty=True, connectionchatty=True)
2764 self.assertIs(stats['client_npn_protocol'], None)
2765
2766 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2767 def test_npn_protocols(self):
2768 server_protocols = ['http/1.1', 'spdy/2']
2769 protocol_tests = [
2770 (['http/1.1', 'spdy/2'], 'http/1.1'),
2771 (['spdy/2', 'http/1.1'], 'http/1.1'),
2772 (['spdy/2', 'test'], 'spdy/2'),
2773 (['abc', 'def'], 'abc')
2774 ]
2775 for client_protocols, expected in protocol_tests:
2776 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2777 server_context.load_cert_chain(CERTFILE)
2778 server_context.set_npn_protocols(server_protocols)
2779 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2780 client_context.load_cert_chain(CERTFILE)
2781 client_context.set_npn_protocols(client_protocols)
2782 stats = server_params_test(client_context, server_context,
2783 chatty=True, connectionchatty=True)
2784
2785 msg = "failed trying %s (s) and %s (c).\n" \
2786 "was expecting %s, but got %%s from the %%s" \
2787 % (str(server_protocols), str(client_protocols),
2788 str(expected))
2789 client_result = stats['client_npn_protocol']
2790 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2791 server_result = stats['server_npn_protocols'][-1] \
2792 if len(stats['server_npn_protocols']) else 'nothing'
2793 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2794
2795 def sni_contexts(self):
2796 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2797 server_context.load_cert_chain(SIGNED_CERTFILE)
2798 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2799 other_context.load_cert_chain(SIGNED_CERTFILE2)
2800 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2801 client_context.verify_mode = ssl.CERT_REQUIRED
2802 client_context.load_verify_locations(SIGNING_CA)
2803 return server_context, other_context, client_context
2804
2805 def check_common_name(self, stats, name):
2806 cert = stats['peercert']
2807 self.assertIn((('commonName', name),), cert['subject'])
2808
2809 @needs_sni
2810 def test_sni_callback(self):
2811 calls = []
2812 server_context, other_context, client_context = self.sni_contexts()
2813
2814 def servername_cb(ssl_sock, server_name, initial_context):
2815 calls.append((server_name, initial_context))
2816 if server_name is not None:
2817 ssl_sock.context = other_context
2818 server_context.set_servername_callback(servername_cb)
2819
2820 stats = server_params_test(client_context, server_context,
2821 chatty=True,
2822 sni_name='supermessage')
2823 # The hostname was fetched properly, and the certificate was
2824 # changed for the connection.
2825 self.assertEqual(calls, [("supermessage", server_context)])
2826 # CERTFILE4 was selected
2827 self.check_common_name(stats, 'fakehostname')
2828
2829 calls = []
2830 # The callback is called with server_name=None
2831 stats = server_params_test(client_context, server_context,
2832 chatty=True,
2833 sni_name=None)
2834 self.assertEqual(calls, [(None, server_context)])
2835 self.check_common_name(stats, 'localhost')
2836
2837 # Check disabling the callback
2838 calls = []
2839 server_context.set_servername_callback(None)
2840
2841 stats = server_params_test(client_context, server_context,
2842 chatty=True,
2843 sni_name='notfunny')
2844 # Certificate didn't change
2845 self.check_common_name(stats, 'localhost')
2846 self.assertEqual(calls, [])
2847
2848 @needs_sni
2849 def test_sni_callback_alert(self):
2850 # Returning a TLS alert is reflected to the connecting client
2851 server_context, other_context, client_context = self.sni_contexts()
2852
2853 def cb_returning_alert(ssl_sock, server_name, initial_context):
2854 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2855 server_context.set_servername_callback(cb_returning_alert)
2856
2857 with self.assertRaises(ssl.SSLError) as cm:
2858 stats = server_params_test(client_context, server_context,
2859 chatty=False,
2860 sni_name='supermessage')
2861 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2862
2863 @needs_sni
2864 def test_sni_callback_raising(self):
2865 # Raising fails the connection with a TLS handshake failure alert.
2866 server_context, other_context, client_context = self.sni_contexts()
2867
2868 def cb_raising(ssl_sock, server_name, initial_context):
2869 1/0
2870 server_context.set_servername_callback(cb_raising)
2871
2872 with self.assertRaises(ssl.SSLError) as cm, \
2873 support.captured_stderr() as stderr:
2874 stats = server_params_test(client_context, server_context,
2875 chatty=False,
2876 sni_name='supermessage')
2877 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2878 self.assertIn("ZeroDivisionError", stderr.getvalue())
2879
2880 @needs_sni
2881 def test_sni_callback_wrong_return_type(self):
2882 # Returning the wrong return type terminates the TLS connection
2883 # with an internal error alert.
2884 server_context, other_context, client_context = self.sni_contexts()
2885
2886 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2887 return "foo"
2888 server_context.set_servername_callback(cb_wrong_return_type)
2889
2890 with self.assertRaises(ssl.SSLError) as cm, \
2891 support.captured_stderr() as stderr:
2892 stats = server_params_test(client_context, server_context,
2893 chatty=False,
2894 sni_name='supermessage')
2895 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2896 self.assertIn("TypeError", stderr.getvalue())
2897
2898 def test_read_write_after_close_raises_valuerror(self):
2899 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2900 context.verify_mode = ssl.CERT_REQUIRED
2901 context.load_verify_locations(CERTFILE)
2902 context.load_cert_chain(CERTFILE)
2903 server = ThreadedEchoServer(context=context, chatty=False)
2904
2905 with server:
2906 s = context.wrap_socket(socket.socket())
2907 s.connect((HOST, server.port))
2908 s.close()
2909
2910 self.assertRaises(ValueError, s.read, 1024)
2911 self.assertRaises(ValueError, s.write, b'hello')
2912
Bill Janssen61c001a2008-09-08 16:37:24 +00002913
Neal Norwitz9eb9b102007-08-27 01:15:33 +00002914def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002915 if support.verbose:
2916 plats = {
2917 'Linux': platform.linux_distribution,
2918 'Mac': platform.mac_ver,
2919 'Windows': platform.win32_ver,
2920 }
2921 for name, func in plats.items():
2922 plat = func()
2923 if plat and plat[0]:
2924 plat = '%s %r' % (name, plat)
2925 break
2926 else:
2927 plat = repr(platform.platform())
2928 print("test_ssl: testing with %r %r" %
2929 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
2930 print(" under %s" % plat)
2931 print(" HAS_SNI = %r" % ssl.HAS_SNI)
2932 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2933 try:
2934 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2935 except AttributeError:
2936 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00002937
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002938 for filename in [
2939 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2940 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
2941 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
2942 BADCERT, BADKEY, EMPTYCERT]:
2943 if not os.path.exists(filename):
2944 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002945
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002946 tests = [ContextTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002947
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002948 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00002949 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00002950
Bill Janssen98d19da2007-09-10 21:51:02 +00002951 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002952 thread_info = support.threading_setup()
2953 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00002954 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002955
Antoine Pitrou3945c862010-04-28 21:11:01 +00002956 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002957 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002958 finally:
2959 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002960 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002961
2962if __name__ == "__main__":
2963 test_main()