blob: 75bb1e0f0800d87e0296c6331b2ee814a2de32fd [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Bill Janssen934b16d2008-06-28 22:19:33 +00007import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import socket
Bill Janssen934b16d2008-06-28 22:19:33 +00009import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000010import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050011import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000012import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import os
Antoine Pitroub558f172010-04-23 23:25:45 +000014import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000015import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050016import tempfile
17import urllib
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000018import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000019import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000020import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050021import functools
22from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000023
Benjamin Petersondaeb9252014-08-20 14:14:50 -050024ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000025
Benjamin Petersondaeb9252014-08-20 14:14:50 -050026PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
27HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Benjamin Petersondaeb9252014-08-20 14:14:50 -050029def data_file(*name):
30 return os.path.join(os.path.dirname(__file__), *name)
31
32# The custom key and certificate files used in test_ssl are generated
33# using Lib/test/make_ssl_certs.py.
34# Other certificates are simply fetched from the Internet servers they
35# are meant to authenticate.
36
37CERTFILE = data_file("keycert.pem")
38BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
39ONLYCERT = data_file("ssl_cert.pem")
40ONLYKEY = data_file("ssl_key.pem")
41BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
42BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
43CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
44ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
45KEY_PASSWORD = "somepass"
46CAPATH = data_file("capath")
47BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
48CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
49CAFILE_CACERT = data_file("capath", "5ed36f99.0")
50
51
52# empty CRL
53CRLFILE = data_file("revocation.crl")
54
55# Two keys and certs signed by the same CA (for SNI tests)
56SIGNED_CERTFILE = data_file("keycert3.pem")
57SIGNED_CERTFILE2 = data_file("keycert4.pem")
58SIGNING_CA = data_file("pycacert.pem")
59
60SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
61
62EMPTYCERT = data_file("nullcert.pem")
63BADCERT = data_file("badcert.pem")
64WRONGCERT = data_file("XXXnonexisting.pem")
65BADKEY = data_file("badkey.pem")
66NOKIACERT = data_file("nokia.pem")
67NULLBYTECERT = data_file("nullbytecert.pem")
68
69DHFILE = data_file("dh512.pem")
70BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
71
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000072
Neal Norwitz3e533c22007-08-27 01:03:18 +000073def handle_error(prefix):
74 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050075 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000077
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000078
79class BasicTests(unittest.TestCase):
80
Antoine Pitrou3945c862010-04-28 21:11:01 +000081 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000082 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000083 try:
84 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
85 except IOError, e:
86 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
87 pass
88 else:
89 raise
90 try:
91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
92 except IOError, e:
93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94 pass
95 else:
96 raise
Benjamin Peterson2f334562014-10-01 23:53:01 -040097
98
Benjamin Petersondaeb9252014-08-20 14:14:50 -050099def can_clear_options():
100 # 0.9.8m or higher
101 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
102
103def no_sslv2_implies_sslv3_hello():
104 # 0.9.7h or higher
105 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
106
107def have_verify_flags():
108 # 0.9.8 or higher
109 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
110
111def utc_offset(): #NOTE: ignore issues like #1647654
112 # local time = utc time + utc offset
113 if time.daylight and time.localtime().tm_isdst > 0:
114 return -time.altzone # seconds
115 return -time.timezone
116
117def asn1time(cert_time):
118 # Some versions of OpenSSL ignore seconds, see #18207
119 # 0.9.8.i
120 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
121 fmt = "%b %d %H:%M:%S %Y GMT"
122 dt = datetime.datetime.strptime(cert_time, fmt)
123 dt = dt.replace(second=0)
124 cert_time = dt.strftime(fmt)
125 # %d adds leading zero but ASN1_TIME_print() uses leading space
126 if cert_time[4] == "0":
127 cert_time = cert_time[:4] + " " + cert_time[5:]
128
129 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000130
Antoine Pitroud75efd92010-08-04 17:38:33 +0000131# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
132def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200133 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200134 @functools.wraps(func)
135 def f(*args, **kwargs):
136 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500137 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
138 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200139 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500140 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200141 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
142 return func(*args, **kwargs)
143 return f
144 else:
145 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000146
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500147needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
148
Antoine Pitroud75efd92010-08-04 17:38:33 +0000149
150class BasicSocketTests(unittest.TestCase):
151
Antoine Pitrou3945c862010-04-28 21:11:01 +0000152 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000153 ssl.CERT_NONE
154 ssl.CERT_OPTIONAL
155 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500156 ssl.OP_CIPHER_SERVER_PREFERENCE
157 ssl.OP_SINGLE_DH_USE
158 if ssl.HAS_ECDH:
159 ssl.OP_SINGLE_ECDH_USE
160 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
161 ssl.OP_NO_COMPRESSION
162 self.assertIn(ssl.HAS_SNI, {True, False})
163 self.assertIn(ssl.HAS_ECDH, {True, False})
164
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000165
Antoine Pitrou3945c862010-04-28 21:11:01 +0000166 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500168 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000169 sys.stdout.write("\n RAND_status is %d (%s)\n"
170 % (v, (v and "sufficient randomness") or
171 "insufficient randomness"))
Jesus Ceaa8a5b392012-09-11 01:55:04 +0200172 self.assertRaises(TypeError, ssl.RAND_egd, 1)
173 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000174 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000175
Antoine Pitrou3945c862010-04-28 21:11:01 +0000176 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000177 # note that this uses an 'unofficial' function in _ssl.c,
178 # provided solely for this test, to exercise the certificate
179 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500180 p = ssl._ssl._test_decode_cert(CERTFILE)
181 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000182 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500183 self.assertEqual(p['issuer'],
184 ((('countryName', 'XY'),),
185 (('localityName', 'Castle Anthrax'),),
186 (('organizationName', 'Python Software Foundation'),),
187 (('commonName', 'localhost'),))
188 )
189 # Note the next three asserts will fail if the keys are regenerated
190 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
191 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
192 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200193 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100194 ((('countryName', 'XY'),),
195 (('localityName', 'Castle Anthrax'),),
196 (('organizationName', 'Python Software Foundation'),),
197 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200198 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100199 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200200 # Issue #13034: the subjectAltName in some certificates
201 # (notably projects.developer.nokia.com:443) wasn't parsed
202 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500203 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200204 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
205 self.assertEqual(p['subjectAltName'],
206 (('DNS', 'projects.developer.nokia.com'),
207 ('DNS', 'projects.forum.nokia.com'))
208 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500209 # extra OCSP and AIA fields
210 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
211 self.assertEqual(p['caIssuers'],
212 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
213 self.assertEqual(p['crlDistributionPoints'],
214 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000215
Christian Heimes88b174c2013-08-17 00:54:47 +0200216 def test_parse_cert_CVE_2013_4238(self):
217 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500218 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200219 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
220 subject = ((('countryName', 'US'),),
221 (('stateOrProvinceName', 'Oregon'),),
222 (('localityName', 'Beaverton'),),
223 (('organizationName', 'Python Software Foundation'),),
224 (('organizationalUnitName', 'Python Core Development'),),
225 (('commonName', 'null.python.org\x00example.org'),),
226 (('emailAddress', 'python-dev@python.org'),))
227 self.assertEqual(p['subject'], subject)
228 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500229 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200230 san = (('DNS', 'altnull.python.org\x00example.com'),
231 ('email', 'null@python.org\x00user@example.org'),
232 ('URI', 'http://null.python.org\x00http://example.org'),
233 ('IP Address', '192.0.2.1'),
234 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
235 else:
236 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
237 san = (('DNS', 'altnull.python.org\x00example.com'),
238 ('email', 'null@python.org\x00user@example.org'),
239 ('URI', 'http://null.python.org\x00http://example.org'),
240 ('IP Address', '192.0.2.1'),
241 ('IP Address', '<invalid>'))
242
243 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200244
Antoine Pitrou3945c862010-04-28 21:11:01 +0000245 def test_DER_to_PEM(self):
246 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
247 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000248 d1 = ssl.PEM_cert_to_DER_cert(pem)
249 p2 = ssl.DER_cert_to_PEM_cert(d1)
250 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000251 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000252 if not p2.startswith(ssl.PEM_HEADER + '\n'):
253 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
254 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
255 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000256
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000257 def test_openssl_version(self):
258 n = ssl.OPENSSL_VERSION_NUMBER
259 t = ssl.OPENSSL_VERSION_INFO
260 s = ssl.OPENSSL_VERSION
261 self.assertIsInstance(n, (int, long))
262 self.assertIsInstance(t, tuple)
263 self.assertIsInstance(s, str)
264 # Some sanity checks follow
265 # >= 0.9
266 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400267 # < 3.0
268 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000269 major, minor, fix, patch, status = t
270 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400271 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000272 self.assertGreaterEqual(minor, 0)
273 self.assertLess(minor, 256)
274 self.assertGreaterEqual(fix, 0)
275 self.assertLess(fix, 256)
276 self.assertGreaterEqual(patch, 0)
277 self.assertLessEqual(patch, 26)
278 self.assertGreaterEqual(status, 0)
279 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400280 # Version string as returned by {Open,Libre}SSL, the format might change
281 if "LibreSSL" in s:
282 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
283 (s, t))
284 else:
285 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
286 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000287
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500288 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000289 def test_refcycle(self):
290 # Issue #7943: an SSL object doesn't create reference cycles with
291 # itself.
292 s = socket.socket(socket.AF_INET)
293 ss = ssl.wrap_socket(s)
294 wr = weakref.ref(ss)
295 del ss
296 self.assertEqual(wr(), None)
297
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000298 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500299 # Methods on an unconnected SSLSocket propagate the original
300 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000301 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500302 with closing(ssl.wrap_socket(s)) as ss:
303 self.assertRaises(socket.error, ss.recv, 1)
304 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
305 self.assertRaises(socket.error, ss.recvfrom, 1)
306 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
307 self.assertRaises(socket.error, ss.send, b'x')
308 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
309
310 def test_timeout(self):
311 # Issue #8524: when creating an SSL socket, the timeout of the
312 # original socket should be retained.
313 for timeout in (None, 0.0, 5.0):
314 s = socket.socket(socket.AF_INET)
315 s.settimeout(timeout)
316 with closing(ssl.wrap_socket(s)) as ss:
317 self.assertEqual(timeout, ss.gettimeout())
318
319 def test_errors(self):
320 sock = socket.socket()
321 self.assertRaisesRegexp(ValueError,
322 "certfile must be specified",
323 ssl.wrap_socket, sock, keyfile=CERTFILE)
324 self.assertRaisesRegexp(ValueError,
325 "certfile must be specified for server-side operations",
326 ssl.wrap_socket, sock, server_side=True)
327 self.assertRaisesRegexp(ValueError,
328 "certfile must be specified for server-side operations",
329 ssl.wrap_socket, sock, server_side=True, certfile="")
330 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
331 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
332 s.connect, (HOST, 8080))
333 with self.assertRaises(IOError) as cm:
334 with closing(socket.socket()) as sock:
335 ssl.wrap_socket(sock, certfile=WRONGCERT)
336 self.assertEqual(cm.exception.errno, errno.ENOENT)
337 with self.assertRaises(IOError) as cm:
338 with closing(socket.socket()) as sock:
339 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
340 self.assertEqual(cm.exception.errno, errno.ENOENT)
341 with self.assertRaises(IOError) as cm:
342 with closing(socket.socket()) as sock:
343 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
344 self.assertEqual(cm.exception.errno, errno.ENOENT)
345
346 def test_match_hostname(self):
347 def ok(cert, hostname):
348 ssl.match_hostname(cert, hostname)
349 def fail(cert, hostname):
350 self.assertRaises(ssl.CertificateError,
351 ssl.match_hostname, cert, hostname)
352
353 cert = {'subject': ((('commonName', 'example.com'),),)}
354 ok(cert, 'example.com')
355 ok(cert, 'ExAmple.cOm')
356 fail(cert, 'www.example.com')
357 fail(cert, '.example.com')
358 fail(cert, 'example.org')
359 fail(cert, 'exampleXcom')
360
361 cert = {'subject': ((('commonName', '*.a.com'),),)}
362 ok(cert, 'foo.a.com')
363 fail(cert, 'bar.foo.a.com')
364 fail(cert, 'a.com')
365 fail(cert, 'Xa.com')
366 fail(cert, '.a.com')
367
368 # only match one left-most wildcard
369 cert = {'subject': ((('commonName', 'f*.com'),),)}
370 ok(cert, 'foo.com')
371 ok(cert, 'f.com')
372 fail(cert, 'bar.com')
373 fail(cert, 'foo.a.com')
374 fail(cert, 'bar.foo.com')
375
376 # NULL bytes are bad, CVE-2013-4073
377 cert = {'subject': ((('commonName',
378 'null.python.org\x00example.org'),),)}
379 ok(cert, 'null.python.org\x00example.org') # or raise an error?
380 fail(cert, 'example.org')
381 fail(cert, 'null.python.org')
382
383 # error cases with wildcards
384 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
385 fail(cert, 'bar.foo.a.com')
386 fail(cert, 'a.com')
387 fail(cert, 'Xa.com')
388 fail(cert, '.a.com')
389
390 cert = {'subject': ((('commonName', 'a.*.com'),),)}
391 fail(cert, 'a.foo.com')
392 fail(cert, 'a..com')
393 fail(cert, 'a.com')
394
395 # wildcard doesn't match IDNA prefix 'xn--'
396 idna = u'püthon.python.org'.encode("idna").decode("ascii")
397 cert = {'subject': ((('commonName', idna),),)}
398 ok(cert, idna)
399 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
400 fail(cert, idna)
401 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
402 fail(cert, idna)
403
404 # wildcard in first fragment and IDNA A-labels in sequent fragments
405 # are supported.
406 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
407 cert = {'subject': ((('commonName', idna),),)}
408 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
409 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
410 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
411 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
412
413 # Slightly fake real-world example
414 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
415 'subject': ((('commonName', 'linuxfrz.org'),),),
416 'subjectAltName': (('DNS', 'linuxfr.org'),
417 ('DNS', 'linuxfr.com'),
418 ('othername', '<unsupported>'))}
419 ok(cert, 'linuxfr.org')
420 ok(cert, 'linuxfr.com')
421 # Not a "DNS" entry
422 fail(cert, '<unsupported>')
423 # When there is a subjectAltName, commonName isn't used
424 fail(cert, 'linuxfrz.org')
425
426 # A pristine real-world example
427 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
428 'subject': ((('countryName', 'US'),),
429 (('stateOrProvinceName', 'California'),),
430 (('localityName', 'Mountain View'),),
431 (('organizationName', 'Google Inc'),),
432 (('commonName', 'mail.google.com'),))}
433 ok(cert, 'mail.google.com')
434 fail(cert, 'gmail.com')
435 # Only commonName is considered
436 fail(cert, 'California')
437
438 # Neither commonName nor subjectAltName
439 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
440 'subject': ((('countryName', 'US'),),
441 (('stateOrProvinceName', 'California'),),
442 (('localityName', 'Mountain View'),),
443 (('organizationName', 'Google Inc'),))}
444 fail(cert, 'mail.google.com')
445
446 # No DNS entry in subjectAltName but a commonName
447 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
448 'subject': ((('countryName', 'US'),),
449 (('stateOrProvinceName', 'California'),),
450 (('localityName', 'Mountain View'),),
451 (('commonName', 'mail.google.com'),)),
452 'subjectAltName': (('othername', 'blabla'), )}
453 ok(cert, 'mail.google.com')
454
455 # No DNS entry subjectAltName and no commonName
456 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
457 'subject': ((('countryName', 'US'),),
458 (('stateOrProvinceName', 'California'),),
459 (('localityName', 'Mountain View'),),
460 (('organizationName', 'Google Inc'),)),
461 'subjectAltName': (('othername', 'blabla'),)}
462 fail(cert, 'google.com')
463
464 # Empty cert / no cert
465 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
466 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
467
468 # Issue #17980: avoid denials of service by refusing more than one
469 # wildcard per fragment.
470 cert = {'subject': ((('commonName', 'a*b.com'),),)}
471 ok(cert, 'axxb.com')
472 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
473 fail(cert, 'axxb.com')
474 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
475 with self.assertRaises(ssl.CertificateError) as cm:
476 ssl.match_hostname(cert, 'axxbxxc.com')
477 self.assertIn("too many wildcards", str(cm.exception))
478
479 def test_server_side(self):
480 # server_hostname doesn't work for server sockets
481 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
482 with closing(socket.socket()) as sock:
483 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
484 server_hostname="some.hostname")
485
486 def test_unknown_channel_binding(self):
487 # should raise ValueError for unknown type
488 s = socket.socket(socket.AF_INET)
489 with closing(ssl.wrap_socket(s)) as ss:
490 with self.assertRaises(ValueError):
491 ss.get_channel_binding("unknown-type")
492
493 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
494 "'tls-unique' channel binding not available")
495 def test_tls_unique_channel_binding(self):
496 # unconnected should return None for known type
497 s = socket.socket(socket.AF_INET)
498 with closing(ssl.wrap_socket(s)) as ss:
499 self.assertIsNone(ss.get_channel_binding("tls-unique"))
500 # the same for server-side
501 s = socket.socket(socket.AF_INET)
502 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
503 self.assertIsNone(ss.get_channel_binding("tls-unique"))
504
505 def test_get_default_verify_paths(self):
506 paths = ssl.get_default_verify_paths()
507 self.assertEqual(len(paths), 6)
508 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
509
510 with support.EnvironmentVarGuard() as env:
511 env["SSL_CERT_DIR"] = CAPATH
512 env["SSL_CERT_FILE"] = CERTFILE
513 paths = ssl.get_default_verify_paths()
514 self.assertEqual(paths.cafile, CERTFILE)
515 self.assertEqual(paths.capath, CAPATH)
516
517 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
518 def test_enum_certificates(self):
519 self.assertTrue(ssl.enum_certificates("CA"))
520 self.assertTrue(ssl.enum_certificates("ROOT"))
521
522 self.assertRaises(TypeError, ssl.enum_certificates)
523 self.assertRaises(WindowsError, ssl.enum_certificates, "")
524
525 trust_oids = set()
526 for storename in ("CA", "ROOT"):
527 store = ssl.enum_certificates(storename)
528 self.assertIsInstance(store, list)
529 for element in store:
530 self.assertIsInstance(element, tuple)
531 self.assertEqual(len(element), 3)
532 cert, enc, trust = element
533 self.assertIsInstance(cert, bytes)
534 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
535 self.assertIsInstance(trust, (set, bool))
536 if isinstance(trust, set):
537 trust_oids.update(trust)
538
539 serverAuth = "1.3.6.1.5.5.7.3.1"
540 self.assertIn(serverAuth, trust_oids)
541
542 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
543 def test_enum_crls(self):
544 self.assertTrue(ssl.enum_crls("CA"))
545 self.assertRaises(TypeError, ssl.enum_crls)
546 self.assertRaises(WindowsError, ssl.enum_crls, "")
547
548 crls = ssl.enum_crls("CA")
549 self.assertIsInstance(crls, list)
550 for element in crls:
551 self.assertIsInstance(element, tuple)
552 self.assertEqual(len(element), 2)
553 self.assertIsInstance(element[0], bytes)
554 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
555
556
557 def test_asn1object(self):
558 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
559 '1.3.6.1.5.5.7.3.1')
560
561 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
562 self.assertEqual(val, expected)
563 self.assertEqual(val.nid, 129)
564 self.assertEqual(val.shortname, 'serverAuth')
565 self.assertEqual(val.longname, 'TLS Web Server Authentication')
566 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
567 self.assertIsInstance(val, ssl._ASN1Object)
568 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
569
570 val = ssl._ASN1Object.fromnid(129)
571 self.assertEqual(val, expected)
572 self.assertIsInstance(val, ssl._ASN1Object)
573 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
574 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
575 ssl._ASN1Object.fromnid(100000)
576 for i in range(1000):
577 try:
578 obj = ssl._ASN1Object.fromnid(i)
579 except ValueError:
580 pass
581 else:
582 self.assertIsInstance(obj.nid, int)
583 self.assertIsInstance(obj.shortname, str)
584 self.assertIsInstance(obj.longname, str)
585 self.assertIsInstance(obj.oid, (str, type(None)))
586
587 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
588 self.assertEqual(val, expected)
589 self.assertIsInstance(val, ssl._ASN1Object)
590 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
591 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
592 expected)
593 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
594 ssl._ASN1Object.fromname('serverauth')
595
596 def test_purpose_enum(self):
597 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
598 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
599 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
600 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
601 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
602 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
603 '1.3.6.1.5.5.7.3.1')
604
605 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
606 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
607 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
608 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
609 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
610 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
611 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000612
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100613 def test_unsupported_dtls(self):
614 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
615 self.addCleanup(s.close)
616 with self.assertRaises(NotImplementedError) as cx:
617 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
618 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500619 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
620 with self.assertRaises(NotImplementedError) as cx:
621 ctx.wrap_socket(s)
622 self.assertEqual(str(cx.exception), "only stream sockets are supported")
623
624 def cert_time_ok(self, timestring, timestamp):
625 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
626
627 def cert_time_fail(self, timestring):
628 with self.assertRaises(ValueError):
629 ssl.cert_time_to_seconds(timestring)
630
631 @unittest.skipUnless(utc_offset(),
632 'local time needs to be different from UTC')
633 def test_cert_time_to_seconds_timezone(self):
634 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
635 # results if local timezone is not UTC
636 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
637 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
638
639 def test_cert_time_to_seconds(self):
640 timestring = "Jan 5 09:34:43 2018 GMT"
641 ts = 1515144883.0
642 self.cert_time_ok(timestring, ts)
643 # accept keyword parameter, assert its name
644 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
645 # accept both %e and %d (space or zero generated by strftime)
646 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
647 # case-insensitive
648 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
649 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
650 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
651 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
652 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
653 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
654 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
655 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
656
657 newyear_ts = 1230768000.0
658 # leap seconds
659 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
660 # same timestamp
661 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
662
663 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
664 # allow 60th second (even if it is not a leap second)
665 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
666 # allow 2nd leap second for compatibility with time.strptime()
667 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
668 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
669
670 # no special treatement for the special value:
671 # 99991231235959Z (rfc 5280)
672 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
673
674 @support.run_with_locale('LC_ALL', '')
675 def test_cert_time_to_seconds_locale(self):
676 # `cert_time_to_seconds()` should be locale independent
677
678 def local_february_name():
679 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
680
681 if local_february_name().lower() == 'feb':
682 self.skipTest("locale-specific month name needs to be "
683 "different from C locale")
684
685 # locale-independent
686 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
687 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
688
689
690class ContextTests(unittest.TestCase):
691
692 @skip_if_broken_ubuntu_ssl
693 def test_constructor(self):
694 for protocol in PROTOCOLS:
695 ssl.SSLContext(protocol)
696 self.assertRaises(TypeError, ssl.SSLContext)
697 self.assertRaises(ValueError, ssl.SSLContext, -1)
698 self.assertRaises(ValueError, ssl.SSLContext, 42)
699
700 @skip_if_broken_ubuntu_ssl
701 def test_protocol(self):
702 for proto in PROTOCOLS:
703 ctx = ssl.SSLContext(proto)
704 self.assertEqual(ctx.protocol, proto)
705
706 def test_ciphers(self):
707 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
708 ctx.set_ciphers("ALL")
709 ctx.set_ciphers("DEFAULT")
710 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
711 ctx.set_ciphers("^$:,;?*'dorothyx")
712
713 @skip_if_broken_ubuntu_ssl
714 def test_options(self):
715 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
716 # OP_ALL | OP_NO_SSLv2 is the default value
717 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
718 ctx.options)
719 ctx.options |= ssl.OP_NO_SSLv3
720 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
721 ctx.options)
722 if can_clear_options():
723 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
724 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
725 ctx.options)
726 ctx.options = 0
727 self.assertEqual(0, ctx.options)
728 else:
729 with self.assertRaises(ValueError):
730 ctx.options = 0
731
732 def test_verify_mode(self):
733 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
734 # Default value
735 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
736 ctx.verify_mode = ssl.CERT_OPTIONAL
737 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
738 ctx.verify_mode = ssl.CERT_REQUIRED
739 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
740 ctx.verify_mode = ssl.CERT_NONE
741 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
742 with self.assertRaises(TypeError):
743 ctx.verify_mode = None
744 with self.assertRaises(ValueError):
745 ctx.verify_mode = 42
746
747 @unittest.skipUnless(have_verify_flags(),
748 "verify_flags need OpenSSL > 0.9.8")
749 def test_verify_flags(self):
750 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
751 # default value by OpenSSL
752 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
753 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
754 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
755 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
756 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
757 ctx.verify_flags = ssl.VERIFY_DEFAULT
758 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
759 # supports any value
760 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
761 self.assertEqual(ctx.verify_flags,
762 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
763 with self.assertRaises(TypeError):
764 ctx.verify_flags = None
765
766 def test_load_cert_chain(self):
767 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
768 # Combined key and cert in a single file
769 ctx.load_cert_chain(CERTFILE)
770 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
771 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
772 with self.assertRaises(IOError) as cm:
773 ctx.load_cert_chain(WRONGCERT)
774 self.assertEqual(cm.exception.errno, errno.ENOENT)
775 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
776 ctx.load_cert_chain(BADCERT)
777 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
778 ctx.load_cert_chain(EMPTYCERT)
779 # Separate key and cert
780 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
781 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
782 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
783 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
784 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
785 ctx.load_cert_chain(ONLYCERT)
786 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
787 ctx.load_cert_chain(ONLYKEY)
788 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
789 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
790 # Mismatching key and cert
791 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
792 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
793 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
794 # Password protected key and cert
795 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
796 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
797 ctx.load_cert_chain(CERTFILE_PROTECTED,
798 password=bytearray(KEY_PASSWORD.encode()))
799 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
800 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
801 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
802 bytearray(KEY_PASSWORD.encode()))
803 with self.assertRaisesRegexp(TypeError, "should be a string"):
804 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
805 with self.assertRaises(ssl.SSLError):
806 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
807 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
808 # openssl has a fixed limit on the password buffer.
809 # PEM_BUFSIZE is generally set to 1kb.
810 # Return a string larger than this.
811 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
812 # Password callback
813 def getpass_unicode():
814 return KEY_PASSWORD
815 def getpass_bytes():
816 return KEY_PASSWORD.encode()
817 def getpass_bytearray():
818 return bytearray(KEY_PASSWORD.encode())
819 def getpass_badpass():
820 return "badpass"
821 def getpass_huge():
822 return b'a' * (1024 * 1024)
823 def getpass_bad_type():
824 return 9
825 def getpass_exception():
826 raise Exception('getpass error')
827 class GetPassCallable:
828 def __call__(self):
829 return KEY_PASSWORD
830 def getpass(self):
831 return KEY_PASSWORD
832 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
833 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
834 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
835 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
836 ctx.load_cert_chain(CERTFILE_PROTECTED,
837 password=GetPassCallable().getpass)
838 with self.assertRaises(ssl.SSLError):
839 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
840 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
841 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
842 with self.assertRaisesRegexp(TypeError, "must return a string"):
843 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
844 with self.assertRaisesRegexp(Exception, "getpass error"):
845 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
846 # Make sure the password function isn't called if it isn't needed
847 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
848
849 def test_load_verify_locations(self):
850 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
851 ctx.load_verify_locations(CERTFILE)
852 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
853 ctx.load_verify_locations(BYTES_CERTFILE)
854 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400855 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500856 self.assertRaises(TypeError, ctx.load_verify_locations)
857 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
858 with self.assertRaises(IOError) as cm:
859 ctx.load_verify_locations(WRONGCERT)
860 self.assertEqual(cm.exception.errno, errno.ENOENT)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400861 with self.assertRaises(IOError):
862 ctx.load_verify_locations(u'')
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500863 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
864 ctx.load_verify_locations(BADCERT)
865 ctx.load_verify_locations(CERTFILE, CAPATH)
866 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
867
868 # Issue #10989: crash if the second argument type is invalid
869 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
870
871 def test_load_verify_cadata(self):
872 # test cadata
873 with open(CAFILE_CACERT) as f:
874 cacert_pem = f.read().decode("ascii")
875 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
876 with open(CAFILE_NEURONIO) as f:
877 neuronio_pem = f.read().decode("ascii")
878 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
879
880 # test PEM
881 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
882 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
883 ctx.load_verify_locations(cadata=cacert_pem)
884 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
885 ctx.load_verify_locations(cadata=neuronio_pem)
886 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
887 # cert already in hash table
888 ctx.load_verify_locations(cadata=neuronio_pem)
889 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
890
891 # combined
892 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
893 combined = "\n".join((cacert_pem, neuronio_pem))
894 ctx.load_verify_locations(cadata=combined)
895 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
896
897 # with junk around the certs
898 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
899 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
900 neuronio_pem, "tail"]
901 ctx.load_verify_locations(cadata="\n".join(combined))
902 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
903
904 # test DER
905 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
906 ctx.load_verify_locations(cadata=cacert_der)
907 ctx.load_verify_locations(cadata=neuronio_der)
908 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
909 # cert already in hash table
910 ctx.load_verify_locations(cadata=cacert_der)
911 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
912
913 # combined
914 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
915 combined = b"".join((cacert_der, neuronio_der))
916 ctx.load_verify_locations(cadata=combined)
917 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
918
919 # error cases
920 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
921 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
922
923 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
924 ctx.load_verify_locations(cadata=u"broken")
925 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
926 ctx.load_verify_locations(cadata=b"broken")
927
928
929 def test_load_dh_params(self):
930 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
931 ctx.load_dh_params(DHFILE)
932 if os.name != 'nt':
933 ctx.load_dh_params(BYTES_DHFILE)
934 self.assertRaises(TypeError, ctx.load_dh_params)
935 self.assertRaises(TypeError, ctx.load_dh_params, None)
936 with self.assertRaises(IOError) as cm:
937 ctx.load_dh_params(WRONGCERT)
938 self.assertEqual(cm.exception.errno, errno.ENOENT)
939 with self.assertRaises(ssl.SSLError) as cm:
940 ctx.load_dh_params(CERTFILE)
941
942 @skip_if_broken_ubuntu_ssl
943 def test_session_stats(self):
944 for proto in PROTOCOLS:
945 ctx = ssl.SSLContext(proto)
946 self.assertEqual(ctx.session_stats(), {
947 'number': 0,
948 'connect': 0,
949 'connect_good': 0,
950 'connect_renegotiate': 0,
951 'accept': 0,
952 'accept_good': 0,
953 'accept_renegotiate': 0,
954 'hits': 0,
955 'misses': 0,
956 'timeouts': 0,
957 'cache_full': 0,
958 })
959
960 def test_set_default_verify_paths(self):
961 # There's not much we can do to test that it acts as expected,
962 # so just check it doesn't crash or raise an exception.
963 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
964 ctx.set_default_verify_paths()
965
966 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
967 def test_set_ecdh_curve(self):
968 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
969 ctx.set_ecdh_curve("prime256v1")
970 ctx.set_ecdh_curve(b"prime256v1")
971 self.assertRaises(TypeError, ctx.set_ecdh_curve)
972 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
973 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
974 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
975
976 @needs_sni
977 def test_sni_callback(self):
978 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
979
980 # set_servername_callback expects a callable, or None
981 self.assertRaises(TypeError, ctx.set_servername_callback)
982 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
983 self.assertRaises(TypeError, ctx.set_servername_callback, "")
984 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
985
986 def dummycallback(sock, servername, ctx):
987 pass
988 ctx.set_servername_callback(None)
989 ctx.set_servername_callback(dummycallback)
990
991 @needs_sni
992 def test_sni_callback_refcycle(self):
993 # Reference cycles through the servername callback are detected
994 # and cleared.
995 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
996 def dummycallback(sock, servername, ctx, cycle=ctx):
997 pass
998 ctx.set_servername_callback(dummycallback)
999 wr = weakref.ref(ctx)
1000 del ctx, dummycallback
1001 gc.collect()
1002 self.assertIs(wr(), None)
1003
1004 def test_cert_store_stats(self):
1005 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1006 self.assertEqual(ctx.cert_store_stats(),
1007 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1008 ctx.load_cert_chain(CERTFILE)
1009 self.assertEqual(ctx.cert_store_stats(),
1010 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1011 ctx.load_verify_locations(CERTFILE)
1012 self.assertEqual(ctx.cert_store_stats(),
1013 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1014 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1015 self.assertEqual(ctx.cert_store_stats(),
1016 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1017
1018 def test_get_ca_certs(self):
1019 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1020 self.assertEqual(ctx.get_ca_certs(), [])
1021 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1022 ctx.load_verify_locations(CERTFILE)
1023 self.assertEqual(ctx.get_ca_certs(), [])
1024 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1025 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1026 self.assertEqual(ctx.get_ca_certs(),
1027 [{'issuer': ((('organizationName', 'Root CA'),),
1028 (('organizationalUnitName', 'http://www.cacert.org'),),
1029 (('commonName', 'CA Cert Signing Authority'),),
1030 (('emailAddress', 'support@cacert.org'),)),
1031 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1032 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1033 'serialNumber': '00',
1034 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1035 'subject': ((('organizationName', 'Root CA'),),
1036 (('organizationalUnitName', 'http://www.cacert.org'),),
1037 (('commonName', 'CA Cert Signing Authority'),),
1038 (('emailAddress', 'support@cacert.org'),)),
1039 'version': 3}])
1040
1041 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1042 pem = f.read()
1043 der = ssl.PEM_cert_to_DER_cert(pem)
1044 self.assertEqual(ctx.get_ca_certs(True), [der])
1045
1046 def test_load_default_certs(self):
1047 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1048 ctx.load_default_certs()
1049
1050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1051 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1052 ctx.load_default_certs()
1053
1054 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1055 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1056
1057 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1058 self.assertRaises(TypeError, ctx.load_default_certs, None)
1059 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1060
1061 def test_create_default_context(self):
1062 ctx = ssl.create_default_context()
1063 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1064 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1065 self.assertTrue(ctx.check_hostname)
1066 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1067 self.assertEqual(
1068 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1069 getattr(ssl, "OP_NO_COMPRESSION", 0),
1070 )
1071
1072 with open(SIGNING_CA) as f:
1073 cadata = f.read().decode("ascii")
1074 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1075 cadata=cadata)
1076 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1077 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1078 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1079 self.assertEqual(
1080 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1081 getattr(ssl, "OP_NO_COMPRESSION", 0),
1082 )
1083
1084 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1085 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1086 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1087 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1088 self.assertEqual(
1089 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1090 getattr(ssl, "OP_NO_COMPRESSION", 0),
1091 )
1092 self.assertEqual(
1093 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1094 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1095 )
1096 self.assertEqual(
1097 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1098 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1099 )
1100
1101 def test__create_stdlib_context(self):
1102 ctx = ssl._create_stdlib_context()
1103 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1104 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1105 self.assertFalse(ctx.check_hostname)
1106 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1107
1108 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1109 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1110 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1111 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1112
1113 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1114 cert_reqs=ssl.CERT_REQUIRED,
1115 check_hostname=True)
1116 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1117 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1118 self.assertTrue(ctx.check_hostname)
1119 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1120
1121 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1122 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1123 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1124 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1125
1126 def test_check_hostname(self):
1127 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1128 self.assertFalse(ctx.check_hostname)
1129
1130 # Requires CERT_REQUIRED or CERT_OPTIONAL
1131 with self.assertRaises(ValueError):
1132 ctx.check_hostname = True
1133 ctx.verify_mode = ssl.CERT_REQUIRED
1134 self.assertFalse(ctx.check_hostname)
1135 ctx.check_hostname = True
1136 self.assertTrue(ctx.check_hostname)
1137
1138 ctx.verify_mode = ssl.CERT_OPTIONAL
1139 ctx.check_hostname = True
1140 self.assertTrue(ctx.check_hostname)
1141
1142 # Cannot set CERT_NONE with check_hostname enabled
1143 with self.assertRaises(ValueError):
1144 ctx.verify_mode = ssl.CERT_NONE
1145 ctx.check_hostname = False
1146 self.assertFalse(ctx.check_hostname)
1147
1148
1149class SSLErrorTests(unittest.TestCase):
1150
1151 def test_str(self):
1152 # The str() of a SSLError doesn't include the errno
1153 e = ssl.SSLError(1, "foo")
1154 self.assertEqual(str(e), "foo")
1155 self.assertEqual(e.errno, 1)
1156 # Same for a subclass
1157 e = ssl.SSLZeroReturnError(1, "foo")
1158 self.assertEqual(str(e), "foo")
1159 self.assertEqual(e.errno, 1)
1160
1161 def test_lib_reason(self):
1162 # Test the library and reason attributes
1163 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1164 with self.assertRaises(ssl.SSLError) as cm:
1165 ctx.load_dh_params(CERTFILE)
1166 self.assertEqual(cm.exception.library, 'PEM')
1167 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1168 s = str(cm.exception)
1169 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1170
1171 def test_subclass(self):
1172 # Check that the appropriate SSLError subclass is raised
1173 # (this only tests one of them)
1174 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1175 with closing(socket.socket()) as s:
1176 s.bind(("127.0.0.1", 0))
1177 s.listen(5)
1178 c = socket.socket()
1179 c.connect(s.getsockname())
1180 c.setblocking(False)
1181 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1182 with self.assertRaises(ssl.SSLWantReadError) as cm:
1183 c.do_handshake()
1184 s = str(cm.exception)
1185 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1186 # For compatibility
1187 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001188
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001189
Bill Janssen934b16d2008-06-28 22:19:33 +00001190class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001191
Antoine Pitrou3945c862010-04-28 21:11:01 +00001192 def test_connect(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001193 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001194 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1195 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001196 try:
1197 s.connect(("svn.python.org", 443))
1198 self.assertEqual({}, s.getpeercert())
1199 finally:
1200 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001201
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001202 # this should fail because we have no verification certs
1203 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1204 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001205 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1206 s.connect, ("svn.python.org", 443))
1207 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001208
1209 # this should succeed because we specify the root cert
1210 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1211 cert_reqs=ssl.CERT_REQUIRED,
1212 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1213 try:
1214 s.connect(("svn.python.org", 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001215 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001216 finally:
1217 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001218
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001219 def test_connect_ex(self):
1220 # Issue #11326: check connect_ex() implementation
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001221 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001222 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1223 cert_reqs=ssl.CERT_REQUIRED,
1224 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1225 try:
1226 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1227 self.assertTrue(s.getpeercert())
1228 finally:
1229 s.close()
1230
1231 def test_non_blocking_connect_ex(self):
1232 # Issue #11326: non-blocking connect_ex() should allow handshake
1233 # to proceed after the socket gets ready.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001234 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001235 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1236 cert_reqs=ssl.CERT_REQUIRED,
1237 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1238 do_handshake_on_connect=False)
1239 try:
1240 s.setblocking(False)
1241 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001242 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1243 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001244 # Wait for connect to finish
1245 select.select([], [s], [], 5.0)
1246 # Non-blocking handshake
1247 while True:
1248 try:
1249 s.do_handshake()
1250 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001251 except ssl.SSLWantReadError:
1252 select.select([s], [], [], 5.0)
1253 except ssl.SSLWantWriteError:
1254 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001255 # SSL established
1256 self.assertTrue(s.getpeercert())
1257 finally:
1258 s.close()
1259
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001260 def test_timeout_connect_ex(self):
1261 # Issue #12065: on a timeout, connect_ex() should return the original
1262 # errno (mimicking the behaviour of non-SSL sockets).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001263 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001264 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1265 cert_reqs=ssl.CERT_REQUIRED,
1266 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1267 do_handshake_on_connect=False)
1268 try:
1269 s.settimeout(0.0000001)
1270 rc = s.connect_ex(('svn.python.org', 443))
1271 if rc == 0:
1272 self.skipTest("svn.python.org responded too quickly")
1273 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1274 finally:
1275 s.close()
1276
1277 def test_connect_ex_error(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001278 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001279 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1280 cert_reqs=ssl.CERT_REQUIRED,
1281 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1282 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001283 rc = s.connect_ex(("svn.python.org", 444))
1284 # Issue #19919: Windows machines or VMs hosted on Windows
1285 # machines sometimes return EWOULDBLOCK.
1286 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001287 finally:
1288 s.close()
1289
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001290 def test_connect_with_context(self):
1291 with support.transient_internet("svn.python.org"):
1292 # Same as test_connect, but with a separately created context
1293 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1294 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1295 s.connect(("svn.python.org", 443))
1296 try:
1297 self.assertEqual({}, s.getpeercert())
1298 finally:
1299 s.close()
1300 # Same with a server hostname
1301 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1302 server_hostname="svn.python.org")
1303 if ssl.HAS_SNI:
1304 s.connect(("svn.python.org", 443))
1305 s.close()
1306 else:
1307 self.assertRaises(ValueError, s.connect, ("svn.python.org", 443))
1308 # This should fail because we have no verification certs
1309 ctx.verify_mode = ssl.CERT_REQUIRED
1310 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1311 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1312 s.connect, ("svn.python.org", 443))
1313 s.close()
1314 # This should succeed because we specify the root cert
1315 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1316 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1317 s.connect(("svn.python.org", 443))
1318 try:
1319 cert = s.getpeercert()
1320 self.assertTrue(cert)
1321 finally:
1322 s.close()
1323
1324 def test_connect_capath(self):
1325 # Verify server certificates using the `capath` argument
1326 # NOTE: the subject hashing algorithm has been changed between
1327 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1328 # contain both versions of each certificate (same content, different
1329 # filename) for this test to be portable across OpenSSL releases.
1330 with support.transient_internet("svn.python.org"):
1331 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1332 ctx.verify_mode = ssl.CERT_REQUIRED
1333 ctx.load_verify_locations(capath=CAPATH)
1334 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1335 s.connect(("svn.python.org", 443))
1336 try:
1337 cert = s.getpeercert()
1338 self.assertTrue(cert)
1339 finally:
1340 s.close()
1341 # Same with a bytes `capath` argument
1342 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1343 ctx.verify_mode = ssl.CERT_REQUIRED
1344 ctx.load_verify_locations(capath=BYTES_CAPATH)
1345 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1346 s.connect(("svn.python.org", 443))
1347 try:
1348 cert = s.getpeercert()
1349 self.assertTrue(cert)
1350 finally:
1351 s.close()
1352
1353 def test_connect_cadata(self):
1354 with open(CAFILE_CACERT) as f:
1355 pem = f.read().decode('ascii')
1356 der = ssl.PEM_cert_to_DER_cert(pem)
1357 with support.transient_internet("svn.python.org"):
1358 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1359 ctx.verify_mode = ssl.CERT_REQUIRED
1360 ctx.load_verify_locations(cadata=pem)
1361 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1362 s.connect(("svn.python.org", 443))
1363 cert = s.getpeercert()
1364 self.assertTrue(cert)
1365
1366 # same with DER
1367 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1368 ctx.verify_mode = ssl.CERT_REQUIRED
1369 ctx.load_verify_locations(cadata=der)
1370 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1371 s.connect(("svn.python.org", 443))
1372 cert = s.getpeercert()
1373 self.assertTrue(cert)
1374
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001375 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1376 def test_makefile_close(self):
1377 # Issue #5238: creating a file-like object with makefile() shouldn't
1378 # delay closing the underlying "real socket" (here tested with its
1379 # file descriptor, hence skipping the test under Windows).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001380 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001381 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1382 ss.connect(("svn.python.org", 443))
1383 fd = ss.fileno()
1384 f = ss.makefile()
1385 f.close()
1386 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001387 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001388 # Closing the SSL socket should close the fd too
1389 ss.close()
1390 gc.collect()
1391 with self.assertRaises(OSError) as e:
1392 os.read(fd, 0)
1393 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001394
Antoine Pitrou3945c862010-04-28 21:11:01 +00001395 def test_non_blocking_handshake(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001396 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001397 s = socket.socket(socket.AF_INET)
1398 s.connect(("svn.python.org", 443))
1399 s.setblocking(False)
1400 s = ssl.wrap_socket(s,
1401 cert_reqs=ssl.CERT_NONE,
1402 do_handshake_on_connect=False)
1403 count = 0
1404 while True:
1405 try:
1406 count += 1
1407 s.do_handshake()
1408 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001409 except ssl.SSLWantReadError:
1410 select.select([s], [], [])
1411 except ssl.SSLWantWriteError:
1412 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001413 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001414 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001415 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001416
Antoine Pitrou3945c862010-04-28 21:11:01 +00001417 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001418 def _test_get_server_certificate(host, port, cert=None):
1419 with support.transient_internet(host):
1420 pem = ssl.get_server_certificate((host, port))
1421 if not pem:
1422 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001423
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001424 try:
1425 pem = ssl.get_server_certificate((host, port),
1426 ca_certs=CERTFILE)
1427 except ssl.SSLError as x:
1428 #should fail
1429 if support.verbose:
1430 sys.stdout.write("%s\n" % x)
1431 else:
1432 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001433
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001434 pem = ssl.get_server_certificate((host, port),
1435 ca_certs=cert)
1436 if not pem:
1437 self.fail("No server certificate on %s:%s!" % (host, port))
1438 if support.verbose:
1439 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1440
1441 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1442 if support.IPV6_ENABLED:
1443 _test_get_server_certificate('ipv6.google.com', 443)
1444
1445 def test_ciphers(self):
1446 remote = ("svn.python.org", 443)
1447 with support.transient_internet(remote[0]):
1448 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1449 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1450 s.connect(remote)
1451 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1452 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1453 s.connect(remote)
1454 # Error checking can happen at instantiation or when connecting
1455 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1456 with closing(socket.socket(socket.AF_INET)) as sock:
1457 s = ssl.wrap_socket(sock,
1458 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1459 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001460
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001461 def test_algorithms(self):
1462 # Issue #8484: all algorithms should be available when verifying a
1463 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001464 # SHA256 was added in OpenSSL 0.9.8
1465 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1466 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001467 # sha256.tbs-internet.com needs SNI to use the correct certificate
1468 if not ssl.HAS_SNI:
1469 self.skipTest("SNI needed for this test")
1470 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001471 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001472 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001473 with support.transient_internet("sha256.tbs-internet.com"):
1474 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1475 ctx.verify_mode = ssl.CERT_REQUIRED
1476 ctx.load_verify_locations(sha256_cert)
1477 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1478 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001479 try:
1480 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001481 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001482 sys.stdout.write("\nCipher with %r is %r\n" %
1483 (remote, s.cipher()))
1484 sys.stdout.write("Certificate is:\n%s\n" %
1485 pprint.pformat(s.getpeercert()))
1486 finally:
1487 s.close()
1488
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001489 def test_get_ca_certs_capath(self):
1490 # capath certs are loaded on request
1491 with support.transient_internet("svn.python.org"):
1492 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1493 ctx.verify_mode = ssl.CERT_REQUIRED
1494 ctx.load_verify_locations(capath=CAPATH)
1495 self.assertEqual(ctx.get_ca_certs(), [])
1496 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1497 s.connect(("svn.python.org", 443))
1498 try:
1499 cert = s.getpeercert()
1500 self.assertTrue(cert)
1501 finally:
1502 s.close()
1503 self.assertEqual(len(ctx.get_ca_certs()), 1)
1504
1505 @needs_sni
1506 def test_context_setget(self):
1507 # Check that the context of a connected socket can be replaced.
1508 with support.transient_internet("svn.python.org"):
1509 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1510 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1511 s = socket.socket(socket.AF_INET)
1512 with closing(ctx1.wrap_socket(s)) as ss:
1513 ss.connect(("svn.python.org", 443))
1514 self.assertIs(ss.context, ctx1)
1515 self.assertIs(ss._sslobj.context, ctx1)
1516 ss.context = ctx2
1517 self.assertIs(ss.context, ctx2)
1518 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001519
Bill Janssen98d19da2007-09-10 21:51:02 +00001520try:
1521 import threading
1522except ImportError:
1523 _have_threads = False
1524else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001525 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001526
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001527 from test.ssl_servers import make_https_server
1528
Bill Janssen98d19da2007-09-10 21:51:02 +00001529 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001530
Bill Janssen98d19da2007-09-10 21:51:02 +00001531 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001532
Bill Janssen98d19da2007-09-10 21:51:02 +00001533 """A mildly complicated class, because we want it to work both
1534 with and without the SSL wrapper around the socket connection, so
1535 that we can test the STARTTLS functionality."""
1536
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001537 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001538 self.server = server
1539 self.running = False
1540 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001541 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001542 self.sock.setblocking(1)
1543 self.sslconn = None
1544 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001545 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001546
Antoine Pitrou3945c862010-04-28 21:11:01 +00001547 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001548 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001549 self.sslconn = self.server.context.wrap_socket(
1550 self.sock, server_side=True)
1551 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
1552 except socket.error as e:
1553 # We treat ConnectionResetError as though it were an
1554 # SSLError - OpenSSL on Ubuntu abruptly closes the
1555 # connection when asked to use an unsupported protocol.
1556 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001557 # XXX Various errors can have happened here, for example
1558 # a mismatching protocol version, an invalid certificate,
1559 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001560 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1561 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001562 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001563 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001564 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001565 self.running = False
1566 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001567 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001568 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001569 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001570 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1571 cert = self.sslconn.getpeercert()
1572 if support.verbose and self.server.chatty:
1573 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1574 cert_binary = self.sslconn.getpeercert(True)
1575 if support.verbose and self.server.chatty:
1576 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1577 cipher = self.sslconn.cipher()
1578 if support.verbose and self.server.chatty:
1579 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1580 sys.stdout.write(" server: selected protocol is now "
1581 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001582 return True
1583
1584 def read(self):
1585 if self.sslconn:
1586 return self.sslconn.read()
1587 else:
1588 return self.sock.recv(1024)
1589
1590 def write(self, bytes):
1591 if self.sslconn:
1592 return self.sslconn.write(bytes)
1593 else:
1594 return self.sock.send(bytes)
1595
1596 def close(self):
1597 if self.sslconn:
1598 self.sslconn.close()
1599 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001600 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001601
Antoine Pitrou3945c862010-04-28 21:11:01 +00001602 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001603 self.running = True
1604 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001605 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001606 return
1607 while self.running:
1608 try:
1609 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001610 stripped = msg.strip()
1611 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001612 # eof, so quit this handler
1613 self.running = False
1614 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001615 elif stripped == b'over':
1616 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001617 sys.stdout.write(" server: client closed connection\n")
1618 self.close()
1619 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001620 elif (self.server.starttls_server and
1621 stripped == b'STARTTLS'):
1622 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001623 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001624 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001625 if not self.wrap_conn():
1626 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001627 elif (self.server.starttls_server and self.sslconn
1628 and stripped == b'ENDTLS'):
1629 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001630 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001631 self.write(b"OK\n")
1632 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001633 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001634 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001635 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001636 elif stripped == b'CB tls-unique':
1637 if support.verbose and self.server.connectionchatty:
1638 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1639 data = self.sslconn.get_channel_binding("tls-unique")
1640 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001641 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001642 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001643 self.server.connectionchatty):
1644 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001645 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1646 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001647 self.write(msg.lower())
1648 except ssl.SSLError:
1649 if self.server.chatty:
1650 handle_error("Test server failure:\n")
1651 self.close()
1652 self.running = False
1653 # normally, we'd just stop here, but for the test
1654 # harness, we want to stop the server
1655 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001656
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001657 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001658 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001659 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001660 npn_protocols=None, ciphers=None, context=None):
1661 if context:
1662 self.context = context
1663 else:
1664 self.context = ssl.SSLContext(ssl_version
1665 if ssl_version is not None
1666 else ssl.PROTOCOL_TLSv1)
1667 self.context.verify_mode = (certreqs if certreqs is not None
1668 else ssl.CERT_NONE)
1669 if cacerts:
1670 self.context.load_verify_locations(cacerts)
1671 if certificate:
1672 self.context.load_cert_chain(certificate)
1673 if npn_protocols:
1674 self.context.set_npn_protocols(npn_protocols)
1675 if ciphers:
1676 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001677 self.chatty = chatty
1678 self.connectionchatty = connectionchatty
1679 self.starttls_server = starttls_server
1680 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001681 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001682 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001683 self.active = False
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001684 self.selected_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001685 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001686 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001687 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001688
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001689 def __enter__(self):
1690 self.start(threading.Event())
1691 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001692 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001693
1694 def __exit__(self, *args):
1695 self.stop()
1696 self.join()
1697
Antoine Pitrou3945c862010-04-28 21:11:01 +00001698 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001699 self.flag = flag
1700 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001701
Antoine Pitrou3945c862010-04-28 21:11:01 +00001702 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001703 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001704 self.sock.listen(5)
1705 self.active = True
1706 if self.flag:
1707 # signal an event
1708 self.flag.set()
1709 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001710 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001711 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001712 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001713 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001714 + repr(connaddr) + '\n')
1715 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001716 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001717 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001718 except socket.timeout:
1719 pass
1720 except KeyboardInterrupt:
1721 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001722 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001723
Antoine Pitrou3945c862010-04-28 21:11:01 +00001724 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001725 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001726
Bill Janssen934b16d2008-06-28 22:19:33 +00001727 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001728
Antoine Pitrou3945c862010-04-28 21:11:01 +00001729 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001730
Antoine Pitrou3945c862010-04-28 21:11:01 +00001731 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001732
1733 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001734 self.socket = ssl.wrap_socket(conn, server_side=True,
1735 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001736 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001737 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001738 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001739 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001740
1741 def readable(self):
1742 if isinstance(self.socket, ssl.SSLSocket):
1743 while self.socket.pending() > 0:
1744 self.handle_read_event()
1745 return True
1746
Antoine Pitroufc69af12010-04-24 20:04:58 +00001747 def _do_ssl_handshake(self):
1748 try:
1749 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001750 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1751 return
1752 except ssl.SSLEOFError:
1753 return self.handle_close()
1754 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001755 raise
1756 except socket.error, err:
1757 if err.args[0] == errno.ECONNABORTED:
1758 return self.handle_close()
1759 else:
1760 self._ssl_accepting = False
1761
Bill Janssen934b16d2008-06-28 22:19:33 +00001762 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001763 if self._ssl_accepting:
1764 self._do_ssl_handshake()
1765 else:
1766 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001767 if support.verbose:
1768 sys.stdout.write(" server: read %s from client\n" % repr(data))
1769 if not data:
1770 self.close()
1771 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001772 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001773
1774 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001775 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001776 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001777 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1778
1779 def handle_error(self):
1780 raise
1781
1782 def __init__(self, certfile):
1783 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001784 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1785 self.port = support.bind_port(sock, '')
1786 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001787 self.listen(5)
1788
1789 def handle_accept(self):
1790 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001791 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001792 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1793 self.ConnectionHandler(sock_obj, self.certfile)
1794
1795 def handle_error(self):
1796 raise
1797
1798 def __init__(self, certfile):
1799 self.flag = None
1800 self.active = False
1801 self.server = self.EchoServer(certfile)
1802 self.port = self.server.port
1803 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001804 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001805
1806 def __str__(self):
1807 return "<%s %s>" % (self.__class__.__name__, self.server)
1808
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001809 def __enter__(self):
1810 self.start(threading.Event())
1811 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001812 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001813
1814 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001815 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001816 sys.stdout.write(" cleanup: stopping server.\n")
1817 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001818 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001819 sys.stdout.write(" cleanup: joining server thread.\n")
1820 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001821 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001822 sys.stdout.write(" cleanup: successfully joined.\n")
1823
Antoine Pitrou3945c862010-04-28 21:11:01 +00001824 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001825 self.flag = flag
1826 threading.Thread.start(self)
1827
Antoine Pitrou3945c862010-04-28 21:11:01 +00001828 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001829 self.active = True
1830 if self.flag:
1831 self.flag.set()
1832 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001833 try:
1834 asyncore.loop(1)
1835 except:
1836 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001837
Antoine Pitrou3945c862010-04-28 21:11:01 +00001838 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001839 self.active = False
1840 self.server.close()
1841
Antoine Pitrou3945c862010-04-28 21:11:01 +00001842 def bad_cert_test(certfile):
1843 """
1844 Launch a server with CERT_REQUIRED, and check that trying to
1845 connect to it with the given client certificate fails.
1846 """
Trent Nelsone41b0062008-04-08 23:47:30 +00001847 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00001848 certreqs=ssl.CERT_REQUIRED,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001849 cacerts=CERTFILE, chatty=False,
1850 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001851 with server:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001852 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001853 with closing(socket.socket()) as sock:
1854 s = ssl.wrap_socket(sock,
1855 certfile=certfile,
1856 ssl_version=ssl.PROTOCOL_TLSv1)
1857 s.connect((HOST, server.port))
1858 except ssl.SSLError as x:
1859 if support.verbose:
1860 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1861 except OSError as x:
1862 if support.verbose:
1863 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1864 except OSError as x:
1865 if x.errno != errno.ENOENT:
1866 raise
1867 if support.verbose:
1868 sys.stdout.write("\OSError is %s\n" % str(x))
Bill Janssen98d19da2007-09-10 21:51:02 +00001869 else:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001870 raise AssertionError("Use of invalid cert should have failed!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001871
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001872 def server_params_test(client_context, server_context, indata=b"FOO\n",
1873 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001874 """
1875 Launch a server, connect a client to it and try various reads
1876 and writes.
1877 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001878 stats = {}
1879 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001880 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001881 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001882 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001883 with closing(client_context.wrap_socket(socket.socket(),
1884 server_hostname=sni_name)) as s:
1885 s.connect((HOST, server.port))
1886 for arg in [indata, bytearray(indata), memoryview(indata)]:
1887 if connectionchatty:
1888 if support.verbose:
1889 sys.stdout.write(
1890 " client: sending %r...\n" % indata)
1891 s.write(arg)
1892 outdata = s.read()
1893 if connectionchatty:
1894 if support.verbose:
1895 sys.stdout.write(" client: read %r\n" % outdata)
1896 if outdata != indata.lower():
1897 raise AssertionError(
1898 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1899 % (outdata[:20], len(outdata),
1900 indata[:20].lower(), len(indata)))
1901 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001902 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001903 if support.verbose:
1904 sys.stdout.write(" client: closing connection.\n")
1905 stats.update({
1906 'compression': s.compression(),
1907 'cipher': s.cipher(),
1908 'peercert': s.getpeercert(),
Alex Gaynore98205d2014-09-04 13:33:22 -07001909 'client_npn_protocol': s.selected_npn_protocol(),
1910 'version': s.version(),
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001911 })
1912 s.close()
1913 stats['server_npn_protocols'] = server.selected_protocols
1914 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001915
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001916 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1917 certsreqs=None, server_options=0, client_options=0):
Alex Gaynore98205d2014-09-04 13:33:22 -07001918 """
1919 Try to SSL-connect using *client_protocol* to *server_protocol*.
1920 If *expect_success* is true, assert that the connection succeeds,
1921 if it's false, assert that the connection fails.
1922 Also, if *expect_success* is a string, assert that it is the protocol
1923 version actually used by the connection.
1924 """
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00001925 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00001926 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00001927 certtype = {
1928 ssl.CERT_NONE: "CERT_NONE",
1929 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1930 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1931 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001932 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001933 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00001934 sys.stdout.write(formatstr %
1935 (ssl.get_protocol_name(client_protocol),
1936 ssl.get_protocol_name(server_protocol),
1937 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001938 client_context = ssl.SSLContext(client_protocol)
1939 client_context.options |= client_options
1940 server_context = ssl.SSLContext(server_protocol)
1941 server_context.options |= server_options
1942
1943 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1944 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1945 # starting from OpenSSL 1.0.0 (see issue #8322).
1946 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1947 client_context.set_ciphers("ALL")
1948
1949 for ctx in (client_context, server_context):
1950 ctx.verify_mode = certsreqs
1951 ctx.load_cert_chain(CERTFILE)
1952 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00001953 try:
Alex Gaynore98205d2014-09-04 13:33:22 -07001954 stats = server_params_test(client_context, server_context,
1955 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001956 # Protocol mismatch can result in either an SSLError, or a
1957 # "Connection reset by peer" error.
1958 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001959 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00001960 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00001961 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001962 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00001963 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00001964 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001965 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001966 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00001967 "Client protocol %s succeeded with server protocol %s!"
1968 % (ssl.get_protocol_name(client_protocol),
1969 ssl.get_protocol_name(server_protocol)))
Alex Gaynore98205d2014-09-04 13:33:22 -07001970 elif (expect_success is not True
1971 and expect_success != stats['version']):
1972 raise AssertionError("version mismatch: expected %r, got %r"
1973 % (expect_success, stats['version']))
Bill Janssen98d19da2007-09-10 21:51:02 +00001974
1975
Bill Janssen934b16d2008-06-28 22:19:33 +00001976 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00001977
Antoine Pitroud75efd92010-08-04 17:38:33 +00001978 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00001979 def test_echo(self):
1980 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001981 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001982 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001983 for protocol in PROTOCOLS:
1984 context = ssl.SSLContext(protocol)
1985 context.load_cert_chain(CERTFILE)
1986 server_params_test(context, context,
1987 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00001988
Antoine Pitrou3945c862010-04-28 21:11:01 +00001989 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001990 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001991 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001992 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1993 context.verify_mode = ssl.CERT_REQUIRED
1994 context.load_verify_locations(CERTFILE)
1995 context.load_cert_chain(CERTFILE)
1996 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001997 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001998 s = context.wrap_socket(socket.socket(),
1999 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00002000 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002001 # getpeercert() raise ValueError while the handshake isn't
2002 # done.
2003 with self.assertRaises(ValueError):
2004 s.getpeercert()
2005 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00002006 cert = s.getpeercert()
2007 self.assertTrue(cert, "Can't get peer certificate.")
2008 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002009 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002010 sys.stdout.write(pprint.pformat(cert) + '\n')
2011 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2012 if 'subject' not in cert:
2013 self.fail("No subject field in certificate: %s." %
2014 pprint.pformat(cert))
2015 if ((('organizationName', 'Python Software Foundation'),)
2016 not in cert['subject']):
2017 self.fail(
2018 "Missing or invalid 'organizationName' field in certificate subject; "
2019 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002020 self.assertIn('notBefore', cert)
2021 self.assertIn('notAfter', cert)
2022 before = ssl.cert_time_to_seconds(cert['notBefore'])
2023 after = ssl.cert_time_to_seconds(cert['notAfter'])
2024 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002025 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002026
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002027 @unittest.skipUnless(have_verify_flags(),
2028 "verify_flags need OpenSSL > 0.9.8")
2029 def test_crl_check(self):
2030 if support.verbose:
2031 sys.stdout.write("\n")
2032
2033 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2034 server_context.load_cert_chain(SIGNED_CERTFILE)
2035
2036 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2037 context.verify_mode = ssl.CERT_REQUIRED
2038 context.load_verify_locations(SIGNING_CA)
2039 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
2040
2041 # VERIFY_DEFAULT should pass
2042 server = ThreadedEchoServer(context=server_context, chatty=True)
2043 with server:
2044 with closing(context.wrap_socket(socket.socket())) as s:
2045 s.connect((HOST, server.port))
2046 cert = s.getpeercert()
2047 self.assertTrue(cert, "Can't get peer certificate.")
2048
2049 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2050 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2051
2052 server = ThreadedEchoServer(context=server_context, chatty=True)
2053 with server:
2054 with closing(context.wrap_socket(socket.socket())) as s:
2055 with self.assertRaisesRegexp(ssl.SSLError,
2056 "certificate verify failed"):
2057 s.connect((HOST, server.port))
2058
2059 # now load a CRL file. The CRL file is signed by the CA.
2060 context.load_verify_locations(CRLFILE)
2061
2062 server = ThreadedEchoServer(context=server_context, chatty=True)
2063 with server:
2064 with closing(context.wrap_socket(socket.socket())) as s:
2065 s.connect((HOST, server.port))
2066 cert = s.getpeercert()
2067 self.assertTrue(cert, "Can't get peer certificate.")
2068
2069 @needs_sni
2070 def test_check_hostname(self):
2071 if support.verbose:
2072 sys.stdout.write("\n")
2073
2074 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2075 server_context.load_cert_chain(SIGNED_CERTFILE)
2076
2077 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2078 context.verify_mode = ssl.CERT_REQUIRED
2079 context.check_hostname = True
2080 context.load_verify_locations(SIGNING_CA)
2081
2082 # correct hostname should verify
2083 server = ThreadedEchoServer(context=server_context, chatty=True)
2084 with server:
2085 with closing(context.wrap_socket(socket.socket(),
2086 server_hostname="localhost")) as s:
2087 s.connect((HOST, server.port))
2088 cert = s.getpeercert()
2089 self.assertTrue(cert, "Can't get peer certificate.")
2090
2091 # incorrect hostname should raise an exception
2092 server = ThreadedEchoServer(context=server_context, chatty=True)
2093 with server:
2094 with closing(context.wrap_socket(socket.socket(),
2095 server_hostname="invalid")) as s:
2096 with self.assertRaisesRegexp(ssl.CertificateError,
2097 "hostname 'invalid' doesn't match u?'localhost'"):
2098 s.connect((HOST, server.port))
2099
2100 # missing server_hostname arg should cause an exception, too
2101 server = ThreadedEchoServer(context=server_context, chatty=True)
2102 with server:
2103 with closing(socket.socket()) as s:
2104 with self.assertRaisesRegexp(ValueError,
2105 "check_hostname requires server_hostname"):
2106 context.wrap_socket(s)
2107
Antoine Pitrou3945c862010-04-28 21:11:01 +00002108 def test_empty_cert(self):
2109 """Connecting with an empty cert file"""
2110 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2111 "nullcert.pem"))
2112 def test_malformed_cert(self):
2113 """Connecting with a badly formatted certificate (syntax error)"""
2114 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2115 "badcert.pem"))
2116 def test_nonexisting_cert(self):
2117 """Connecting with a non-existing cert file"""
2118 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2119 "wrongcert.pem"))
2120 def test_malformed_key(self):
2121 """Connecting with a badly formatted key (syntax error)"""
2122 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2123 "badkey.pem"))
Bill Janssen98d19da2007-09-10 21:51:02 +00002124
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002125 def test_rude_shutdown(self):
2126 """A brutal shutdown of an SSL server should raise an OSError
2127 in the client when attempting handshake.
2128 """
2129 listener_ready = threading.Event()
2130 listener_gone = threading.Event()
2131
2132 s = socket.socket()
2133 port = support.bind_port(s, HOST)
2134
2135 # `listener` runs in a thread. It sits in an accept() until
2136 # the main thread connects. Then it rudely closes the socket,
2137 # and sets Event `listener_gone` to let the main thread know
2138 # the socket is gone.
2139 def listener():
2140 s.listen(5)
2141 listener_ready.set()
2142 newsock, addr = s.accept()
2143 newsock.close()
2144 s.close()
2145 listener_gone.set()
2146
2147 def connector():
2148 listener_ready.wait()
2149 with closing(socket.socket()) as c:
2150 c.connect((HOST, port))
2151 listener_gone.wait()
2152 try:
2153 ssl_sock = ssl.wrap_socket(c)
Benjamin Petersone208b572014-08-20 14:49:08 -05002154 except socket.error:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002155 pass
2156 else:
2157 self.fail('connecting to closed SSL socket should have failed')
2158
2159 t = threading.Thread(target=listener)
2160 t.start()
2161 try:
2162 connector()
2163 finally:
2164 t.join()
2165
Antoine Pitroud75efd92010-08-04 17:38:33 +00002166 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002167 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2168 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002169 def test_protocol_sslv2(self):
2170 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002171 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002172 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002173 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2174 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2175 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002176 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002177 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2178 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002179 # SSLv23 client with specific SSL options
2180 if no_sslv2_implies_sslv3_hello():
2181 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2182 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2183 client_options=ssl.OP_NO_SSLv2)
2184 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2185 client_options=ssl.OP_NO_SSLv3)
2186 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2187 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002188
Antoine Pitroud75efd92010-08-04 17:38:33 +00002189 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002190 def test_protocol_sslv23(self):
2191 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002192 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002193 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002194 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2195 try:
2196 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2197 except socket.error as x:
2198 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2199 if support.verbose:
2200 sys.stdout.write(
2201 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2202 % str(x))
Alex Gaynore98205d2014-09-04 13:33:22 -07002203 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou3945c862010-04-28 21:11:01 +00002204 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Alex Gaynore98205d2014-09-04 13:33:22 -07002205 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Bill Janssen98d19da2007-09-10 21:51:02 +00002206
Alex Gaynore98205d2014-09-04 13:33:22 -07002207 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002208 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Alex Gaynore98205d2014-09-04 13:33:22 -07002209 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002210
Alex Gaynore98205d2014-09-04 13:33:22 -07002211 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002212 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Alex Gaynore98205d2014-09-04 13:33:22 -07002213 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002214
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002215 # Server with specific SSL options
2216 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2217 server_options=ssl.OP_NO_SSLv3)
2218 # Will choose TLSv1
2219 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2220 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2221 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2222 server_options=ssl.OP_NO_TLSv1)
2223
2224
Antoine Pitroud75efd92010-08-04 17:38:33 +00002225 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002226 def test_protocol_sslv3(self):
2227 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002228 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002229 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002230 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2231 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2232 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002233 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2234 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002235 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2236 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002237 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002238 if no_sslv2_implies_sslv3_hello():
2239 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Alex Gaynore98205d2014-09-04 13:33:22 -07002240 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002241 client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002242
Antoine Pitroud75efd92010-08-04 17:38:33 +00002243 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002244 def test_protocol_tlsv1(self):
2245 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002246 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002247 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002248 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2249 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2250 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002251 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2252 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002253 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002254 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2255 client_options=ssl.OP_NO_TLSv1)
2256
2257 @skip_if_broken_ubuntu_ssl
2258 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2259 "TLS version 1.1 not supported.")
2260 def test_protocol_tlsv1_1(self):
2261 """Connecting to a TLSv1.1 server with various client options.
2262 Testing against older TLS versions."""
2263 if support.verbose:
2264 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002265 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002266 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2267 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2268 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2269 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2270 client_options=ssl.OP_NO_TLSv1_1)
2271
Alex Gaynore98205d2014-09-04 13:33:22 -07002272 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002273 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2274 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2275
2276
2277 @skip_if_broken_ubuntu_ssl
2278 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2279 "TLS version 1.2 not supported.")
2280 def test_protocol_tlsv1_2(self):
2281 """Connecting to a TLSv1.2 server with various client options.
2282 Testing against older TLS versions."""
2283 if support.verbose:
2284 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002285 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002286 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2287 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2288 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2289 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2290 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2291 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2292 client_options=ssl.OP_NO_TLSv1_2)
2293
Alex Gaynore98205d2014-09-04 13:33:22 -07002294 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002295 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2296 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2297 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2298 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002299
Antoine Pitrou3945c862010-04-28 21:11:01 +00002300 def test_starttls(self):
2301 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002302 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002303
Trent Nelsone41b0062008-04-08 23:47:30 +00002304 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002305 ssl_version=ssl.PROTOCOL_TLSv1,
2306 starttls_server=True,
2307 chatty=True,
2308 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002309 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002310 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002311 s = socket.socket()
2312 s.setblocking(1)
2313 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002314 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002315 sys.stdout.write("\n")
2316 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002317 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002318 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002319 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002320 if wrapped:
2321 conn.write(indata)
2322 outdata = conn.read()
2323 else:
2324 s.send(indata)
2325 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002326 msg = outdata.strip().lower()
2327 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002328 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002329 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002330 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002331 " client: read %r from server, starting TLS...\n"
2332 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002333 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2334 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002335 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002336 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002337 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002338 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002339 " client: read %r from server, ending TLS...\n"
2340 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002341 s = conn.unwrap()
2342 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002343 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002344 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002345 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002346 " client: read %r from server\n" % msg)
2347 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002348 sys.stdout.write(" client: closing connection.\n")
2349 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002350 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002351 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002352 s.send(b"over\n")
2353 if wrapped:
2354 conn.close()
2355 else:
2356 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002357
Antoine Pitrou3945c862010-04-28 21:11:01 +00002358 def test_socketserver(self):
2359 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002360 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002361 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002362 if support.verbose:
2363 sys.stdout.write('\n')
2364 with open(CERTFILE, 'rb') as f:
2365 d1 = f.read()
2366 d2 = ''
2367 # now fetch the same data from the HTTPS server
2368 url = 'https://%s:%d/%s' % (
2369 HOST, server.port, os.path.split(CERTFILE)[1])
Victor Stinnera3acea32014-09-05 21:05:05 +02002370 with support.check_py3k_warnings():
2371 f = urllib.urlopen(url)
Bill Janssen296a59d2007-09-16 22:06:00 +00002372 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002373 dlen = f.info().getheader("content-length")
2374 if dlen and (int(dlen) > 0):
2375 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002376 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002377 sys.stdout.write(
2378 " client: read %d bytes from remote server '%s'\n"
2379 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002380 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002381 f.close()
2382 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002383
Antoine Pitrou3945c862010-04-28 21:11:01 +00002384 def test_asyncore_server(self):
2385 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002386 indata = "TEST MESSAGE of mixed case\n"
2387
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002388 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002389 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002390
2391 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002392 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002393 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002394 s = ssl.wrap_socket(socket.socket())
2395 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002396 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002397 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002398 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002399 s.write(indata)
2400 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002401 if support.verbose:
2402 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002403 if outdata != indata.lower():
2404 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002405 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2406 % (outdata[:20], len(outdata),
2407 indata[:20].lower(), len(indata)))
2408 s.write(b"over\n")
2409 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002410 sys.stdout.write(" client: closing connection.\n")
2411 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002412 if support.verbose:
2413 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002414
Antoine Pitrou3945c862010-04-28 21:11:01 +00002415 def test_recv_send(self):
2416 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002417 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002418 sys.stdout.write("\n")
2419
2420 server = ThreadedEchoServer(CERTFILE,
2421 certreqs=ssl.CERT_NONE,
2422 ssl_version=ssl.PROTOCOL_TLSv1,
2423 cacerts=CERTFILE,
2424 chatty=True,
2425 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002426 with server:
2427 s = ssl.wrap_socket(socket.socket(),
2428 server_side=False,
2429 certfile=CERTFILE,
2430 ca_certs=CERTFILE,
2431 cert_reqs=ssl.CERT_NONE,
2432 ssl_version=ssl.PROTOCOL_TLSv1)
2433 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002434 # helper methods for standardising recv* method signatures
2435 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002436 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002437 count = s.recv_into(b)
2438 return b[:count]
2439
2440 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002441 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002442 count, addr = s.recvfrom_into(b)
2443 return b[:count]
2444
2445 # (name, method, whether to expect success, *args)
2446 send_methods = [
2447 ('send', s.send, True, []),
2448 ('sendto', s.sendto, False, ["some.address"]),
2449 ('sendall', s.sendall, True, []),
2450 ]
2451 recv_methods = [
2452 ('recv', s.recv, True, []),
2453 ('recvfrom', s.recvfrom, False, ["some.address"]),
2454 ('recv_into', _recv_into, True, []),
2455 ('recvfrom_into', _recvfrom_into, False, []),
2456 ]
2457 data_prefix = u"PREFIX_"
2458
2459 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002460 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002461 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002462 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002463 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002464 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002465 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002466 "While sending with <<{name:s}>> bad data "
2467 "<<{outdata:r}>> ({nout:d}) received; "
2468 "expected <<{indata:r}>> ({nin:d})\n".format(
2469 name=meth_name, outdata=outdata[:20],
2470 nout=len(outdata),
2471 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002472 )
2473 )
2474 except ValueError as e:
2475 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002476 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002477 "Failed to send with method <<{name:s}>>; "
2478 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002479 )
2480 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002481 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002482 "Method <<{name:s}>> failed with unexpected "
2483 "exception message: {exp:s}\n".format(
2484 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002485 )
2486 )
2487
2488 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002489 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002490 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002491 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002492 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002493 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002494 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002495 "While receiving with <<{name:s}>> bad data "
2496 "<<{outdata:r}>> ({nout:d}) received; "
2497 "expected <<{indata:r}>> ({nin:d})\n".format(
2498 name=meth_name, outdata=outdata[:20],
2499 nout=len(outdata),
2500 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002501 )
2502 )
2503 except ValueError as e:
2504 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002505 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002506 "Failed to receive with method <<{name:s}>>; "
2507 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002508 )
2509 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002510 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002511 "Method <<{name:s}>> failed with unexpected "
2512 "exception message: {exp:s}\n".format(
2513 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002514 )
2515 )
2516 # consume data
2517 s.read()
2518
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002519 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002520 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002521
Antoine Pitroufc69af12010-04-24 20:04:58 +00002522 def test_handshake_timeout(self):
2523 # Issue #5103: SSL handshake must respect the socket timeout
2524 server = socket.socket(socket.AF_INET)
2525 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002526 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002527 started = threading.Event()
2528 finish = False
2529
2530 def serve():
2531 server.listen(5)
2532 started.set()
2533 conns = []
2534 while not finish:
2535 r, w, e = select.select([server], [], [], 0.1)
2536 if server in r:
2537 # Let the socket hang around rather than having
2538 # it closed by garbage collection.
2539 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002540 for sock in conns:
2541 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002542
2543 t = threading.Thread(target=serve)
2544 t.start()
2545 started.wait()
2546
2547 try:
2548 try:
2549 c = socket.socket(socket.AF_INET)
2550 c.settimeout(0.2)
2551 c.connect((host, port))
2552 # Will attempt handshake and time out
2553 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2554 ssl.wrap_socket, c)
2555 finally:
2556 c.close()
2557 try:
2558 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002559 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002560 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002561 # Will attempt handshake and time out
2562 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2563 c.connect, (host, port))
2564 finally:
2565 c.close()
2566 finally:
2567 finish = True
2568 t.join()
2569 server.close()
2570
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002571 def test_server_accept(self):
2572 # Issue #16357: accept() on a SSLSocket created through
2573 # SSLContext.wrap_socket().
2574 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2575 context.verify_mode = ssl.CERT_REQUIRED
2576 context.load_verify_locations(CERTFILE)
2577 context.load_cert_chain(CERTFILE)
2578 server = socket.socket(socket.AF_INET)
2579 host = "127.0.0.1"
2580 port = support.bind_port(server)
2581 server = context.wrap_socket(server, server_side=True)
2582
2583 evt = threading.Event()
2584 remote = [None]
2585 peer = [None]
2586 def serve():
2587 server.listen(5)
2588 # Block on the accept and wait on the connection to close.
2589 evt.set()
2590 remote[0], peer[0] = server.accept()
2591 remote[0].recv(1)
2592
2593 t = threading.Thread(target=serve)
2594 t.start()
2595 # Client wait until server setup and perform a connect.
2596 evt.wait()
2597 client = context.wrap_socket(socket.socket())
2598 client.connect((host, port))
2599 client_addr = client.getsockname()
2600 client.close()
2601 t.join()
2602 remote[0].close()
2603 server.close()
2604 # Sanity checks.
2605 self.assertIsInstance(remote[0], ssl.SSLSocket)
2606 self.assertEqual(peer[0], client_addr)
2607
2608 def test_getpeercert_enotconn(self):
2609 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2610 with closing(context.wrap_socket(socket.socket())) as sock:
2611 with self.assertRaises(socket.error) as cm:
2612 sock.getpeercert()
2613 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2614
2615 def test_do_handshake_enotconn(self):
2616 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2617 with closing(context.wrap_socket(socket.socket())) as sock:
2618 with self.assertRaises(socket.error) as cm:
2619 sock.do_handshake()
2620 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2621
Antoine Pitroud76088d2012-01-03 22:46:48 +01002622 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002623 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2624 try:
2625 # Force a set of weak ciphers on our client context
2626 context.set_ciphers("DES")
2627 except ssl.SSLError:
2628 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002629 with ThreadedEchoServer(CERTFILE,
2630 ssl_version=ssl.PROTOCOL_SSLv23,
2631 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002632 with closing(context.wrap_socket(socket.socket())) as s:
2633 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002634 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002635 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2636
Alex Gaynore98205d2014-09-04 13:33:22 -07002637 def test_version_basic(self):
2638 """
2639 Basic tests for SSLSocket.version().
2640 More tests are done in the test_protocol_*() methods.
2641 """
2642 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2643 with ThreadedEchoServer(CERTFILE,
2644 ssl_version=ssl.PROTOCOL_TLSv1,
2645 chatty=False) as server:
2646 with closing(context.wrap_socket(socket.socket())) as s:
2647 self.assertIs(s.version(), None)
2648 s.connect((HOST, server.port))
2649 self.assertEqual(s.version(), "TLSv1")
2650 self.assertIs(s.version(), None)
2651
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002652 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2653 def test_default_ecdh_curve(self):
2654 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2655 # should be enabled by default on SSL contexts.
2656 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2657 context.load_cert_chain(CERTFILE)
2658 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2659 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2660 # our default cipher list should prefer ECDH-based ciphers
2661 # automatically.
2662 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2663 context.set_ciphers("ECCdraft:ECDH")
2664 with ThreadedEchoServer(context=context) as server:
2665 with closing(context.wrap_socket(socket.socket())) as s:
2666 s.connect((HOST, server.port))
2667 self.assertIn("ECDH", s.cipher()[0])
2668
2669 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2670 "'tls-unique' channel binding not available")
2671 def test_tls_unique_channel_binding(self):
2672 """Test tls-unique channel binding."""
2673 if support.verbose:
2674 sys.stdout.write("\n")
2675
2676 server = ThreadedEchoServer(CERTFILE,
2677 certreqs=ssl.CERT_NONE,
2678 ssl_version=ssl.PROTOCOL_TLSv1,
2679 cacerts=CERTFILE,
2680 chatty=True,
2681 connectionchatty=False)
2682 with server:
2683 s = ssl.wrap_socket(socket.socket(),
2684 server_side=False,
2685 certfile=CERTFILE,
2686 ca_certs=CERTFILE,
2687 cert_reqs=ssl.CERT_NONE,
2688 ssl_version=ssl.PROTOCOL_TLSv1)
2689 s.connect((HOST, server.port))
2690 # get the data
2691 cb_data = s.get_channel_binding("tls-unique")
2692 if support.verbose:
2693 sys.stdout.write(" got channel binding data: {0!r}\n"
2694 .format(cb_data))
2695
2696 # check if it is sane
2697 self.assertIsNotNone(cb_data)
2698 self.assertEqual(len(cb_data), 12) # True for TLSv1
2699
2700 # and compare with the peers version
2701 s.write(b"CB tls-unique\n")
2702 peer_data_repr = s.read().strip()
2703 self.assertEqual(peer_data_repr,
2704 repr(cb_data).encode("us-ascii"))
2705 s.close()
2706
2707 # now, again
2708 s = ssl.wrap_socket(socket.socket(),
2709 server_side=False,
2710 certfile=CERTFILE,
2711 ca_certs=CERTFILE,
2712 cert_reqs=ssl.CERT_NONE,
2713 ssl_version=ssl.PROTOCOL_TLSv1)
2714 s.connect((HOST, server.port))
2715 new_cb_data = s.get_channel_binding("tls-unique")
2716 if support.verbose:
2717 sys.stdout.write(" got another channel binding data: {0!r}\n"
2718 .format(new_cb_data))
2719 # is it really unique
2720 self.assertNotEqual(cb_data, new_cb_data)
2721 self.assertIsNotNone(cb_data)
2722 self.assertEqual(len(cb_data), 12) # True for TLSv1
2723 s.write(b"CB tls-unique\n")
2724 peer_data_repr = s.read().strip()
2725 self.assertEqual(peer_data_repr,
2726 repr(new_cb_data).encode("us-ascii"))
2727 s.close()
2728
2729 def test_compression(self):
2730 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2731 context.load_cert_chain(CERTFILE)
2732 stats = server_params_test(context, context,
2733 chatty=True, connectionchatty=True)
2734 if support.verbose:
2735 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2736 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2737
2738 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2739 "ssl.OP_NO_COMPRESSION needed for this test")
2740 def test_compression_disabled(self):
2741 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2742 context.load_cert_chain(CERTFILE)
2743 context.options |= ssl.OP_NO_COMPRESSION
2744 stats = server_params_test(context, context,
2745 chatty=True, connectionchatty=True)
2746 self.assertIs(stats['compression'], None)
2747
2748 def test_dh_params(self):
2749 # Check we can get a connection with ephemeral Diffie-Hellman
2750 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2751 context.load_cert_chain(CERTFILE)
2752 context.load_dh_params(DHFILE)
2753 context.set_ciphers("kEDH")
2754 stats = server_params_test(context, context,
2755 chatty=True, connectionchatty=True)
2756 cipher = stats["cipher"][0]
2757 parts = cipher.split("-")
2758 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2759 self.fail("Non-DH cipher: " + cipher[0])
2760
2761 def test_selected_npn_protocol(self):
2762 # selected_npn_protocol() is None unless NPN is used
2763 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2764 context.load_cert_chain(CERTFILE)
2765 stats = server_params_test(context, context,
2766 chatty=True, connectionchatty=True)
2767 self.assertIs(stats['client_npn_protocol'], None)
2768
2769 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2770 def test_npn_protocols(self):
2771 server_protocols = ['http/1.1', 'spdy/2']
2772 protocol_tests = [
2773 (['http/1.1', 'spdy/2'], 'http/1.1'),
2774 (['spdy/2', 'http/1.1'], 'http/1.1'),
2775 (['spdy/2', 'test'], 'spdy/2'),
2776 (['abc', 'def'], 'abc')
2777 ]
2778 for client_protocols, expected in protocol_tests:
2779 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2780 server_context.load_cert_chain(CERTFILE)
2781 server_context.set_npn_protocols(server_protocols)
2782 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2783 client_context.load_cert_chain(CERTFILE)
2784 client_context.set_npn_protocols(client_protocols)
2785 stats = server_params_test(client_context, server_context,
2786 chatty=True, connectionchatty=True)
2787
2788 msg = "failed trying %s (s) and %s (c).\n" \
2789 "was expecting %s, but got %%s from the %%s" \
2790 % (str(server_protocols), str(client_protocols),
2791 str(expected))
2792 client_result = stats['client_npn_protocol']
2793 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2794 server_result = stats['server_npn_protocols'][-1] \
2795 if len(stats['server_npn_protocols']) else 'nothing'
2796 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2797
2798 def sni_contexts(self):
2799 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2800 server_context.load_cert_chain(SIGNED_CERTFILE)
2801 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2802 other_context.load_cert_chain(SIGNED_CERTFILE2)
2803 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2804 client_context.verify_mode = ssl.CERT_REQUIRED
2805 client_context.load_verify_locations(SIGNING_CA)
2806 return server_context, other_context, client_context
2807
2808 def check_common_name(self, stats, name):
2809 cert = stats['peercert']
2810 self.assertIn((('commonName', name),), cert['subject'])
2811
2812 @needs_sni
2813 def test_sni_callback(self):
2814 calls = []
2815 server_context, other_context, client_context = self.sni_contexts()
2816
2817 def servername_cb(ssl_sock, server_name, initial_context):
2818 calls.append((server_name, initial_context))
2819 if server_name is not None:
2820 ssl_sock.context = other_context
2821 server_context.set_servername_callback(servername_cb)
2822
2823 stats = server_params_test(client_context, server_context,
2824 chatty=True,
2825 sni_name='supermessage')
2826 # The hostname was fetched properly, and the certificate was
2827 # changed for the connection.
2828 self.assertEqual(calls, [("supermessage", server_context)])
2829 # CERTFILE4 was selected
2830 self.check_common_name(stats, 'fakehostname')
2831
2832 calls = []
2833 # The callback is called with server_name=None
2834 stats = server_params_test(client_context, server_context,
2835 chatty=True,
2836 sni_name=None)
2837 self.assertEqual(calls, [(None, server_context)])
2838 self.check_common_name(stats, 'localhost')
2839
2840 # Check disabling the callback
2841 calls = []
2842 server_context.set_servername_callback(None)
2843
2844 stats = server_params_test(client_context, server_context,
2845 chatty=True,
2846 sni_name='notfunny')
2847 # Certificate didn't change
2848 self.check_common_name(stats, 'localhost')
2849 self.assertEqual(calls, [])
2850
2851 @needs_sni
2852 def test_sni_callback_alert(self):
2853 # Returning a TLS alert is reflected to the connecting client
2854 server_context, other_context, client_context = self.sni_contexts()
2855
2856 def cb_returning_alert(ssl_sock, server_name, initial_context):
2857 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2858 server_context.set_servername_callback(cb_returning_alert)
2859
2860 with self.assertRaises(ssl.SSLError) as cm:
2861 stats = server_params_test(client_context, server_context,
2862 chatty=False,
2863 sni_name='supermessage')
2864 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2865
2866 @needs_sni
2867 def test_sni_callback_raising(self):
2868 # Raising fails the connection with a TLS handshake failure alert.
2869 server_context, other_context, client_context = self.sni_contexts()
2870
2871 def cb_raising(ssl_sock, server_name, initial_context):
2872 1/0
2873 server_context.set_servername_callback(cb_raising)
2874
2875 with self.assertRaises(ssl.SSLError) as cm, \
2876 support.captured_stderr() as stderr:
2877 stats = server_params_test(client_context, server_context,
2878 chatty=False,
2879 sni_name='supermessage')
2880 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2881 self.assertIn("ZeroDivisionError", stderr.getvalue())
2882
2883 @needs_sni
2884 def test_sni_callback_wrong_return_type(self):
2885 # Returning the wrong return type terminates the TLS connection
2886 # with an internal error alert.
2887 server_context, other_context, client_context = self.sni_contexts()
2888
2889 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2890 return "foo"
2891 server_context.set_servername_callback(cb_wrong_return_type)
2892
2893 with self.assertRaises(ssl.SSLError) as cm, \
2894 support.captured_stderr() as stderr:
2895 stats = server_params_test(client_context, server_context,
2896 chatty=False,
2897 sni_name='supermessage')
2898 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2899 self.assertIn("TypeError", stderr.getvalue())
2900
2901 def test_read_write_after_close_raises_valuerror(self):
2902 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2903 context.verify_mode = ssl.CERT_REQUIRED
2904 context.load_verify_locations(CERTFILE)
2905 context.load_cert_chain(CERTFILE)
2906 server = ThreadedEchoServer(context=context, chatty=False)
2907
2908 with server:
2909 s = context.wrap_socket(socket.socket())
2910 s.connect((HOST, server.port))
2911 s.close()
2912
2913 self.assertRaises(ValueError, s.read, 1024)
2914 self.assertRaises(ValueError, s.write, b'hello')
2915
Bill Janssen61c001a2008-09-08 16:37:24 +00002916
Neal Norwitz9eb9b102007-08-27 01:15:33 +00002917def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002918 if support.verbose:
2919 plats = {
2920 'Linux': platform.linux_distribution,
2921 'Mac': platform.mac_ver,
2922 'Windows': platform.win32_ver,
2923 }
2924 for name, func in plats.items():
2925 plat = func()
2926 if plat and plat[0]:
2927 plat = '%s %r' % (name, plat)
2928 break
2929 else:
2930 plat = repr(platform.platform())
2931 print("test_ssl: testing with %r %r" %
2932 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
2933 print(" under %s" % plat)
2934 print(" HAS_SNI = %r" % ssl.HAS_SNI)
2935 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2936 try:
2937 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2938 except AttributeError:
2939 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00002940
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002941 for filename in [
2942 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2943 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
2944 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
2945 BADCERT, BADKEY, EMPTYCERT]:
2946 if not os.path.exists(filename):
2947 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002948
Benjamin Peterson2f334562014-10-01 23:53:01 -04002949 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002950
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002951 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00002952 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00002953
Bill Janssen98d19da2007-09-10 21:51:02 +00002954 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002955 thread_info = support.threading_setup()
2956 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00002957 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002958
Antoine Pitrou3945c862010-04-28 21:11:01 +00002959 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002960 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002961 finally:
2962 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002963 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002964
2965if __name__ == "__main__":
2966 test_main()