blob: eac994f1230ed1f2e3dc4cd42ec486ae1b33e7c1 [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Bill Janssen934b16d2008-06-28 22:19:33 +00007import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import socket
Bill Janssen934b16d2008-06-28 22:19:33 +00009import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000010import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050011import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000012import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import os
Antoine Pitroub558f172010-04-23 23:25:45 +000014import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000015import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050016import tempfile
17import urllib
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000018import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000019import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000020import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050021import functools
22from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000023
Benjamin Petersondaeb9252014-08-20 14:14:50 -050024ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000025
Benjamin Petersondaeb9252014-08-20 14:14:50 -050026PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
27HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Benjamin Petersondaeb9252014-08-20 14:14:50 -050029def data_file(*name):
30 return os.path.join(os.path.dirname(__file__), *name)
31
32# The custom key and certificate files used in test_ssl are generated
33# using Lib/test/make_ssl_certs.py.
34# Other certificates are simply fetched from the Internet servers they
35# are meant to authenticate.
36
37CERTFILE = data_file("keycert.pem")
38BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
39ONLYCERT = data_file("ssl_cert.pem")
40ONLYKEY = data_file("ssl_key.pem")
41BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
42BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
43CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
44ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
45KEY_PASSWORD = "somepass"
46CAPATH = data_file("capath")
47BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
48CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
49CAFILE_CACERT = data_file("capath", "5ed36f99.0")
50
51
52# empty CRL
53CRLFILE = data_file("revocation.crl")
54
55# Two keys and certs signed by the same CA (for SNI tests)
56SIGNED_CERTFILE = data_file("keycert3.pem")
57SIGNED_CERTFILE2 = data_file("keycert4.pem")
58SIGNING_CA = data_file("pycacert.pem")
59
60SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
61
62EMPTYCERT = data_file("nullcert.pem")
63BADCERT = data_file("badcert.pem")
64WRONGCERT = data_file("XXXnonexisting.pem")
65BADKEY = data_file("badkey.pem")
66NOKIACERT = data_file("nokia.pem")
67NULLBYTECERT = data_file("nullbytecert.pem")
68
69DHFILE = data_file("dh512.pem")
70BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
71
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000072
Neal Norwitz3e533c22007-08-27 01:03:18 +000073def handle_error(prefix):
74 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050075 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000077
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000078
79class BasicTests(unittest.TestCase):
80
Antoine Pitrou3945c862010-04-28 21:11:01 +000081 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000082 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000083 try:
84 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
85 except IOError, e:
86 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
87 pass
88 else:
89 raise
90 try:
91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
92 except IOError, e:
93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94 pass
95 else:
96 raise
Benjamin Petersondaeb9252014-08-20 14:14:50 -050097def can_clear_options():
98 # 0.9.8m or higher
99 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
100
101def no_sslv2_implies_sslv3_hello():
102 # 0.9.7h or higher
103 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
104
105def have_verify_flags():
106 # 0.9.8 or higher
107 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
108
109def utc_offset(): #NOTE: ignore issues like #1647654
110 # local time = utc time + utc offset
111 if time.daylight and time.localtime().tm_isdst > 0:
112 return -time.altzone # seconds
113 return -time.timezone
114
115def asn1time(cert_time):
116 # Some versions of OpenSSL ignore seconds, see #18207
117 # 0.9.8.i
118 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
119 fmt = "%b %d %H:%M:%S %Y GMT"
120 dt = datetime.datetime.strptime(cert_time, fmt)
121 dt = dt.replace(second=0)
122 cert_time = dt.strftime(fmt)
123 # %d adds leading zero but ASN1_TIME_print() uses leading space
124 if cert_time[4] == "0":
125 cert_time = cert_time[:4] + " " + cert_time[5:]
126
127 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000128
Antoine Pitroud75efd92010-08-04 17:38:33 +0000129# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
130def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200131 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200132 @functools.wraps(func)
133 def f(*args, **kwargs):
134 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500135 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
136 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200137 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500138 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200139 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
140 return func(*args, **kwargs)
141 return f
142 else:
143 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000144
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500145needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
146
Antoine Pitroud75efd92010-08-04 17:38:33 +0000147
148class BasicSocketTests(unittest.TestCase):
149
Antoine Pitrou3945c862010-04-28 21:11:01 +0000150 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000151 ssl.CERT_NONE
152 ssl.CERT_OPTIONAL
153 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500154 ssl.OP_CIPHER_SERVER_PREFERENCE
155 ssl.OP_SINGLE_DH_USE
156 if ssl.HAS_ECDH:
157 ssl.OP_SINGLE_ECDH_USE
158 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
159 ssl.OP_NO_COMPRESSION
160 self.assertIn(ssl.HAS_SNI, {True, False})
161 self.assertIn(ssl.HAS_ECDH, {True, False})
162
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000163
Antoine Pitrou3945c862010-04-28 21:11:01 +0000164 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000165 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500166 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 sys.stdout.write("\n RAND_status is %d (%s)\n"
168 % (v, (v and "sufficient randomness") or
169 "insufficient randomness"))
Jesus Ceaa8a5b392012-09-11 01:55:04 +0200170 self.assertRaises(TypeError, ssl.RAND_egd, 1)
171 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000172 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000173
Antoine Pitrou3945c862010-04-28 21:11:01 +0000174 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000175 # note that this uses an 'unofficial' function in _ssl.c,
176 # provided solely for this test, to exercise the certificate
177 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500178 p = ssl._ssl._test_decode_cert(CERTFILE)
179 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000180 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500181 self.assertEqual(p['issuer'],
182 ((('countryName', 'XY'),),
183 (('localityName', 'Castle Anthrax'),),
184 (('organizationName', 'Python Software Foundation'),),
185 (('commonName', 'localhost'),))
186 )
187 # Note the next three asserts will fail if the keys are regenerated
188 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
189 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
190 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200191 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100192 ((('countryName', 'XY'),),
193 (('localityName', 'Castle Anthrax'),),
194 (('organizationName', 'Python Software Foundation'),),
195 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200196 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100197 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200198 # Issue #13034: the subjectAltName in some certificates
199 # (notably projects.developer.nokia.com:443) wasn't parsed
200 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500201 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200202 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
203 self.assertEqual(p['subjectAltName'],
204 (('DNS', 'projects.developer.nokia.com'),
205 ('DNS', 'projects.forum.nokia.com'))
206 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500207 # extra OCSP and AIA fields
208 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
209 self.assertEqual(p['caIssuers'],
210 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
211 self.assertEqual(p['crlDistributionPoints'],
212 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000213
Christian Heimes88b174c2013-08-17 00:54:47 +0200214 def test_parse_cert_CVE_2013_4238(self):
215 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500216 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200217 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
218 subject = ((('countryName', 'US'),),
219 (('stateOrProvinceName', 'Oregon'),),
220 (('localityName', 'Beaverton'),),
221 (('organizationName', 'Python Software Foundation'),),
222 (('organizationalUnitName', 'Python Core Development'),),
223 (('commonName', 'null.python.org\x00example.org'),),
224 (('emailAddress', 'python-dev@python.org'),))
225 self.assertEqual(p['subject'], subject)
226 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500227 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200228 san = (('DNS', 'altnull.python.org\x00example.com'),
229 ('email', 'null@python.org\x00user@example.org'),
230 ('URI', 'http://null.python.org\x00http://example.org'),
231 ('IP Address', '192.0.2.1'),
232 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
233 else:
234 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
235 san = (('DNS', 'altnull.python.org\x00example.com'),
236 ('email', 'null@python.org\x00user@example.org'),
237 ('URI', 'http://null.python.org\x00http://example.org'),
238 ('IP Address', '192.0.2.1'),
239 ('IP Address', '<invalid>'))
240
241 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200242
Antoine Pitrou3945c862010-04-28 21:11:01 +0000243 def test_DER_to_PEM(self):
244 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
245 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000246 d1 = ssl.PEM_cert_to_DER_cert(pem)
247 p2 = ssl.DER_cert_to_PEM_cert(d1)
248 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000249 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000250 if not p2.startswith(ssl.PEM_HEADER + '\n'):
251 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
252 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
253 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000254
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000255 def test_openssl_version(self):
256 n = ssl.OPENSSL_VERSION_NUMBER
257 t = ssl.OPENSSL_VERSION_INFO
258 s = ssl.OPENSSL_VERSION
259 self.assertIsInstance(n, (int, long))
260 self.assertIsInstance(t, tuple)
261 self.assertIsInstance(s, str)
262 # Some sanity checks follow
263 # >= 0.9
264 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400265 # < 3.0
266 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000267 major, minor, fix, patch, status = t
268 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400269 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000270 self.assertGreaterEqual(minor, 0)
271 self.assertLess(minor, 256)
272 self.assertGreaterEqual(fix, 0)
273 self.assertLess(fix, 256)
274 self.assertGreaterEqual(patch, 0)
275 self.assertLessEqual(patch, 26)
276 self.assertGreaterEqual(status, 0)
277 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400278 # Version string as returned by {Open,Libre}SSL, the format might change
279 if "LibreSSL" in s:
280 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
281 (s, t))
282 else:
283 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
284 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000285
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500286 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000287 def test_refcycle(self):
288 # Issue #7943: an SSL object doesn't create reference cycles with
289 # itself.
290 s = socket.socket(socket.AF_INET)
291 ss = ssl.wrap_socket(s)
292 wr = weakref.ref(ss)
293 del ss
294 self.assertEqual(wr(), None)
295
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000296 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500297 # Methods on an unconnected SSLSocket propagate the original
298 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000299 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500300 with closing(ssl.wrap_socket(s)) as ss:
301 self.assertRaises(socket.error, ss.recv, 1)
302 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
303 self.assertRaises(socket.error, ss.recvfrom, 1)
304 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
305 self.assertRaises(socket.error, ss.send, b'x')
306 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
307
308 def test_timeout(self):
309 # Issue #8524: when creating an SSL socket, the timeout of the
310 # original socket should be retained.
311 for timeout in (None, 0.0, 5.0):
312 s = socket.socket(socket.AF_INET)
313 s.settimeout(timeout)
314 with closing(ssl.wrap_socket(s)) as ss:
315 self.assertEqual(timeout, ss.gettimeout())
316
317 def test_errors(self):
318 sock = socket.socket()
319 self.assertRaisesRegexp(ValueError,
320 "certfile must be specified",
321 ssl.wrap_socket, sock, keyfile=CERTFILE)
322 self.assertRaisesRegexp(ValueError,
323 "certfile must be specified for server-side operations",
324 ssl.wrap_socket, sock, server_side=True)
325 self.assertRaisesRegexp(ValueError,
326 "certfile must be specified for server-side operations",
327 ssl.wrap_socket, sock, server_side=True, certfile="")
328 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
329 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
330 s.connect, (HOST, 8080))
331 with self.assertRaises(IOError) as cm:
332 with closing(socket.socket()) as sock:
333 ssl.wrap_socket(sock, certfile=WRONGCERT)
334 self.assertEqual(cm.exception.errno, errno.ENOENT)
335 with self.assertRaises(IOError) as cm:
336 with closing(socket.socket()) as sock:
337 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
338 self.assertEqual(cm.exception.errno, errno.ENOENT)
339 with self.assertRaises(IOError) as cm:
340 with closing(socket.socket()) as sock:
341 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
342 self.assertEqual(cm.exception.errno, errno.ENOENT)
343
344 def test_match_hostname(self):
345 def ok(cert, hostname):
346 ssl.match_hostname(cert, hostname)
347 def fail(cert, hostname):
348 self.assertRaises(ssl.CertificateError,
349 ssl.match_hostname, cert, hostname)
350
351 cert = {'subject': ((('commonName', 'example.com'),),)}
352 ok(cert, 'example.com')
353 ok(cert, 'ExAmple.cOm')
354 fail(cert, 'www.example.com')
355 fail(cert, '.example.com')
356 fail(cert, 'example.org')
357 fail(cert, 'exampleXcom')
358
359 cert = {'subject': ((('commonName', '*.a.com'),),)}
360 ok(cert, 'foo.a.com')
361 fail(cert, 'bar.foo.a.com')
362 fail(cert, 'a.com')
363 fail(cert, 'Xa.com')
364 fail(cert, '.a.com')
365
366 # only match one left-most wildcard
367 cert = {'subject': ((('commonName', 'f*.com'),),)}
368 ok(cert, 'foo.com')
369 ok(cert, 'f.com')
370 fail(cert, 'bar.com')
371 fail(cert, 'foo.a.com')
372 fail(cert, 'bar.foo.com')
373
374 # NULL bytes are bad, CVE-2013-4073
375 cert = {'subject': ((('commonName',
376 'null.python.org\x00example.org'),),)}
377 ok(cert, 'null.python.org\x00example.org') # or raise an error?
378 fail(cert, 'example.org')
379 fail(cert, 'null.python.org')
380
381 # error cases with wildcards
382 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
383 fail(cert, 'bar.foo.a.com')
384 fail(cert, 'a.com')
385 fail(cert, 'Xa.com')
386 fail(cert, '.a.com')
387
388 cert = {'subject': ((('commonName', 'a.*.com'),),)}
389 fail(cert, 'a.foo.com')
390 fail(cert, 'a..com')
391 fail(cert, 'a.com')
392
393 # wildcard doesn't match IDNA prefix 'xn--'
394 idna = u'püthon.python.org'.encode("idna").decode("ascii")
395 cert = {'subject': ((('commonName', idna),),)}
396 ok(cert, idna)
397 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
398 fail(cert, idna)
399 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
400 fail(cert, idna)
401
402 # wildcard in first fragment and IDNA A-labels in sequent fragments
403 # are supported.
404 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
405 cert = {'subject': ((('commonName', idna),),)}
406 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
407 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
408 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
409 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
410
411 # Slightly fake real-world example
412 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
413 'subject': ((('commonName', 'linuxfrz.org'),),),
414 'subjectAltName': (('DNS', 'linuxfr.org'),
415 ('DNS', 'linuxfr.com'),
416 ('othername', '<unsupported>'))}
417 ok(cert, 'linuxfr.org')
418 ok(cert, 'linuxfr.com')
419 # Not a "DNS" entry
420 fail(cert, '<unsupported>')
421 # When there is a subjectAltName, commonName isn't used
422 fail(cert, 'linuxfrz.org')
423
424 # A pristine real-world example
425 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
426 'subject': ((('countryName', 'US'),),
427 (('stateOrProvinceName', 'California'),),
428 (('localityName', 'Mountain View'),),
429 (('organizationName', 'Google Inc'),),
430 (('commonName', 'mail.google.com'),))}
431 ok(cert, 'mail.google.com')
432 fail(cert, 'gmail.com')
433 # Only commonName is considered
434 fail(cert, 'California')
435
436 # Neither commonName nor subjectAltName
437 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
438 'subject': ((('countryName', 'US'),),
439 (('stateOrProvinceName', 'California'),),
440 (('localityName', 'Mountain View'),),
441 (('organizationName', 'Google Inc'),))}
442 fail(cert, 'mail.google.com')
443
444 # No DNS entry in subjectAltName but a commonName
445 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
446 'subject': ((('countryName', 'US'),),
447 (('stateOrProvinceName', 'California'),),
448 (('localityName', 'Mountain View'),),
449 (('commonName', 'mail.google.com'),)),
450 'subjectAltName': (('othername', 'blabla'), )}
451 ok(cert, 'mail.google.com')
452
453 # No DNS entry subjectAltName and no commonName
454 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
455 'subject': ((('countryName', 'US'),),
456 (('stateOrProvinceName', 'California'),),
457 (('localityName', 'Mountain View'),),
458 (('organizationName', 'Google Inc'),)),
459 'subjectAltName': (('othername', 'blabla'),)}
460 fail(cert, 'google.com')
461
462 # Empty cert / no cert
463 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
464 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
465
466 # Issue #17980: avoid denials of service by refusing more than one
467 # wildcard per fragment.
468 cert = {'subject': ((('commonName', 'a*b.com'),),)}
469 ok(cert, 'axxb.com')
470 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
471 fail(cert, 'axxb.com')
472 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
473 with self.assertRaises(ssl.CertificateError) as cm:
474 ssl.match_hostname(cert, 'axxbxxc.com')
475 self.assertIn("too many wildcards", str(cm.exception))
476
477 def test_server_side(self):
478 # server_hostname doesn't work for server sockets
479 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
480 with closing(socket.socket()) as sock:
481 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
482 server_hostname="some.hostname")
483
484 def test_unknown_channel_binding(self):
485 # should raise ValueError for unknown type
486 s = socket.socket(socket.AF_INET)
487 with closing(ssl.wrap_socket(s)) as ss:
488 with self.assertRaises(ValueError):
489 ss.get_channel_binding("unknown-type")
490
491 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
492 "'tls-unique' channel binding not available")
493 def test_tls_unique_channel_binding(self):
494 # unconnected should return None for known type
495 s = socket.socket(socket.AF_INET)
496 with closing(ssl.wrap_socket(s)) as ss:
497 self.assertIsNone(ss.get_channel_binding("tls-unique"))
498 # the same for server-side
499 s = socket.socket(socket.AF_INET)
500 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
501 self.assertIsNone(ss.get_channel_binding("tls-unique"))
502
503 def test_get_default_verify_paths(self):
504 paths = ssl.get_default_verify_paths()
505 self.assertEqual(len(paths), 6)
506 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
507
508 with support.EnvironmentVarGuard() as env:
509 env["SSL_CERT_DIR"] = CAPATH
510 env["SSL_CERT_FILE"] = CERTFILE
511 paths = ssl.get_default_verify_paths()
512 self.assertEqual(paths.cafile, CERTFILE)
513 self.assertEqual(paths.capath, CAPATH)
514
515 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
516 def test_enum_certificates(self):
517 self.assertTrue(ssl.enum_certificates("CA"))
518 self.assertTrue(ssl.enum_certificates("ROOT"))
519
520 self.assertRaises(TypeError, ssl.enum_certificates)
521 self.assertRaises(WindowsError, ssl.enum_certificates, "")
522
523 trust_oids = set()
524 for storename in ("CA", "ROOT"):
525 store = ssl.enum_certificates(storename)
526 self.assertIsInstance(store, list)
527 for element in store:
528 self.assertIsInstance(element, tuple)
529 self.assertEqual(len(element), 3)
530 cert, enc, trust = element
531 self.assertIsInstance(cert, bytes)
532 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
533 self.assertIsInstance(trust, (set, bool))
534 if isinstance(trust, set):
535 trust_oids.update(trust)
536
537 serverAuth = "1.3.6.1.5.5.7.3.1"
538 self.assertIn(serverAuth, trust_oids)
539
540 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
541 def test_enum_crls(self):
542 self.assertTrue(ssl.enum_crls("CA"))
543 self.assertRaises(TypeError, ssl.enum_crls)
544 self.assertRaises(WindowsError, ssl.enum_crls, "")
545
546 crls = ssl.enum_crls("CA")
547 self.assertIsInstance(crls, list)
548 for element in crls:
549 self.assertIsInstance(element, tuple)
550 self.assertEqual(len(element), 2)
551 self.assertIsInstance(element[0], bytes)
552 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
553
554
555 def test_asn1object(self):
556 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
557 '1.3.6.1.5.5.7.3.1')
558
559 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
560 self.assertEqual(val, expected)
561 self.assertEqual(val.nid, 129)
562 self.assertEqual(val.shortname, 'serverAuth')
563 self.assertEqual(val.longname, 'TLS Web Server Authentication')
564 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
565 self.assertIsInstance(val, ssl._ASN1Object)
566 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
567
568 val = ssl._ASN1Object.fromnid(129)
569 self.assertEqual(val, expected)
570 self.assertIsInstance(val, ssl._ASN1Object)
571 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
572 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
573 ssl._ASN1Object.fromnid(100000)
574 for i in range(1000):
575 try:
576 obj = ssl._ASN1Object.fromnid(i)
577 except ValueError:
578 pass
579 else:
580 self.assertIsInstance(obj.nid, int)
581 self.assertIsInstance(obj.shortname, str)
582 self.assertIsInstance(obj.longname, str)
583 self.assertIsInstance(obj.oid, (str, type(None)))
584
585 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
586 self.assertEqual(val, expected)
587 self.assertIsInstance(val, ssl._ASN1Object)
588 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
589 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
590 expected)
591 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
592 ssl._ASN1Object.fromname('serverauth')
593
594 def test_purpose_enum(self):
595 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
596 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
597 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
598 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
599 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
600 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
601 '1.3.6.1.5.5.7.3.1')
602
603 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
604 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
605 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
606 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
607 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
608 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
609 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000610
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100611 def test_unsupported_dtls(self):
612 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
613 self.addCleanup(s.close)
614 with self.assertRaises(NotImplementedError) as cx:
615 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
616 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500617 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
618 with self.assertRaises(NotImplementedError) as cx:
619 ctx.wrap_socket(s)
620 self.assertEqual(str(cx.exception), "only stream sockets are supported")
621
622 def cert_time_ok(self, timestring, timestamp):
623 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
624
625 def cert_time_fail(self, timestring):
626 with self.assertRaises(ValueError):
627 ssl.cert_time_to_seconds(timestring)
628
629 @unittest.skipUnless(utc_offset(),
630 'local time needs to be different from UTC')
631 def test_cert_time_to_seconds_timezone(self):
632 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
633 # results if local timezone is not UTC
634 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
635 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
636
637 def test_cert_time_to_seconds(self):
638 timestring = "Jan 5 09:34:43 2018 GMT"
639 ts = 1515144883.0
640 self.cert_time_ok(timestring, ts)
641 # accept keyword parameter, assert its name
642 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
643 # accept both %e and %d (space or zero generated by strftime)
644 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
645 # case-insensitive
646 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
647 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
648 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
649 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
650 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
651 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
652 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
653 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
654
655 newyear_ts = 1230768000.0
656 # leap seconds
657 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
658 # same timestamp
659 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
660
661 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
662 # allow 60th second (even if it is not a leap second)
663 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
664 # allow 2nd leap second for compatibility with time.strptime()
665 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
666 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
667
668 # no special treatement for the special value:
669 # 99991231235959Z (rfc 5280)
670 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
671
672 @support.run_with_locale('LC_ALL', '')
673 def test_cert_time_to_seconds_locale(self):
674 # `cert_time_to_seconds()` should be locale independent
675
676 def local_february_name():
677 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
678
679 if local_february_name().lower() == 'feb':
680 self.skipTest("locale-specific month name needs to be "
681 "different from C locale")
682
683 # locale-independent
684 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
685 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
686
687
688class ContextTests(unittest.TestCase):
689
690 @skip_if_broken_ubuntu_ssl
691 def test_constructor(self):
692 for protocol in PROTOCOLS:
693 ssl.SSLContext(protocol)
694 self.assertRaises(TypeError, ssl.SSLContext)
695 self.assertRaises(ValueError, ssl.SSLContext, -1)
696 self.assertRaises(ValueError, ssl.SSLContext, 42)
697
698 @skip_if_broken_ubuntu_ssl
699 def test_protocol(self):
700 for proto in PROTOCOLS:
701 ctx = ssl.SSLContext(proto)
702 self.assertEqual(ctx.protocol, proto)
703
704 def test_ciphers(self):
705 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
706 ctx.set_ciphers("ALL")
707 ctx.set_ciphers("DEFAULT")
708 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
709 ctx.set_ciphers("^$:,;?*'dorothyx")
710
711 @skip_if_broken_ubuntu_ssl
712 def test_options(self):
713 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
714 # OP_ALL | OP_NO_SSLv2 is the default value
715 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
716 ctx.options)
717 ctx.options |= ssl.OP_NO_SSLv3
718 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
719 ctx.options)
720 if can_clear_options():
721 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
722 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
723 ctx.options)
724 ctx.options = 0
725 self.assertEqual(0, ctx.options)
726 else:
727 with self.assertRaises(ValueError):
728 ctx.options = 0
729
730 def test_verify_mode(self):
731 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
732 # Default value
733 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
734 ctx.verify_mode = ssl.CERT_OPTIONAL
735 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
736 ctx.verify_mode = ssl.CERT_REQUIRED
737 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
738 ctx.verify_mode = ssl.CERT_NONE
739 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
740 with self.assertRaises(TypeError):
741 ctx.verify_mode = None
742 with self.assertRaises(ValueError):
743 ctx.verify_mode = 42
744
745 @unittest.skipUnless(have_verify_flags(),
746 "verify_flags need OpenSSL > 0.9.8")
747 def test_verify_flags(self):
748 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
749 # default value by OpenSSL
750 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
751 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
752 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
753 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
754 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
755 ctx.verify_flags = ssl.VERIFY_DEFAULT
756 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
757 # supports any value
758 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
759 self.assertEqual(ctx.verify_flags,
760 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
761 with self.assertRaises(TypeError):
762 ctx.verify_flags = None
763
764 def test_load_cert_chain(self):
765 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
766 # Combined key and cert in a single file
767 ctx.load_cert_chain(CERTFILE)
768 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
769 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
770 with self.assertRaises(IOError) as cm:
771 ctx.load_cert_chain(WRONGCERT)
772 self.assertEqual(cm.exception.errno, errno.ENOENT)
773 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
774 ctx.load_cert_chain(BADCERT)
775 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
776 ctx.load_cert_chain(EMPTYCERT)
777 # Separate key and cert
778 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
779 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
780 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
781 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
782 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
783 ctx.load_cert_chain(ONLYCERT)
784 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
785 ctx.load_cert_chain(ONLYKEY)
786 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
787 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
788 # Mismatching key and cert
789 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
790 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
791 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
792 # Password protected key and cert
793 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
794 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
795 ctx.load_cert_chain(CERTFILE_PROTECTED,
796 password=bytearray(KEY_PASSWORD.encode()))
797 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
798 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
799 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
800 bytearray(KEY_PASSWORD.encode()))
801 with self.assertRaisesRegexp(TypeError, "should be a string"):
802 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
803 with self.assertRaises(ssl.SSLError):
804 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
805 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
806 # openssl has a fixed limit on the password buffer.
807 # PEM_BUFSIZE is generally set to 1kb.
808 # Return a string larger than this.
809 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
810 # Password callback
811 def getpass_unicode():
812 return KEY_PASSWORD
813 def getpass_bytes():
814 return KEY_PASSWORD.encode()
815 def getpass_bytearray():
816 return bytearray(KEY_PASSWORD.encode())
817 def getpass_badpass():
818 return "badpass"
819 def getpass_huge():
820 return b'a' * (1024 * 1024)
821 def getpass_bad_type():
822 return 9
823 def getpass_exception():
824 raise Exception('getpass error')
825 class GetPassCallable:
826 def __call__(self):
827 return KEY_PASSWORD
828 def getpass(self):
829 return KEY_PASSWORD
830 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
831 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
832 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
833 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
834 ctx.load_cert_chain(CERTFILE_PROTECTED,
835 password=GetPassCallable().getpass)
836 with self.assertRaises(ssl.SSLError):
837 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
838 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
839 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
840 with self.assertRaisesRegexp(TypeError, "must return a string"):
841 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
842 with self.assertRaisesRegexp(Exception, "getpass error"):
843 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
844 # Make sure the password function isn't called if it isn't needed
845 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
846
847 def test_load_verify_locations(self):
848 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
849 ctx.load_verify_locations(CERTFILE)
850 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
851 ctx.load_verify_locations(BYTES_CERTFILE)
852 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400853 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500854 self.assertRaises(TypeError, ctx.load_verify_locations)
855 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
856 with self.assertRaises(IOError) as cm:
857 ctx.load_verify_locations(WRONGCERT)
858 self.assertEqual(cm.exception.errno, errno.ENOENT)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400859 with self.assertRaises(IOError):
860 ctx.load_verify_locations(u'')
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500861 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
862 ctx.load_verify_locations(BADCERT)
863 ctx.load_verify_locations(CERTFILE, CAPATH)
864 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
865
866 # Issue #10989: crash if the second argument type is invalid
867 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
868
869 def test_load_verify_cadata(self):
870 # test cadata
871 with open(CAFILE_CACERT) as f:
872 cacert_pem = f.read().decode("ascii")
873 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
874 with open(CAFILE_NEURONIO) as f:
875 neuronio_pem = f.read().decode("ascii")
876 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
877
878 # test PEM
879 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
880 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
881 ctx.load_verify_locations(cadata=cacert_pem)
882 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
883 ctx.load_verify_locations(cadata=neuronio_pem)
884 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
885 # cert already in hash table
886 ctx.load_verify_locations(cadata=neuronio_pem)
887 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
888
889 # combined
890 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
891 combined = "\n".join((cacert_pem, neuronio_pem))
892 ctx.load_verify_locations(cadata=combined)
893 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
894
895 # with junk around the certs
896 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
897 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
898 neuronio_pem, "tail"]
899 ctx.load_verify_locations(cadata="\n".join(combined))
900 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
901
902 # test DER
903 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
904 ctx.load_verify_locations(cadata=cacert_der)
905 ctx.load_verify_locations(cadata=neuronio_der)
906 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
907 # cert already in hash table
908 ctx.load_verify_locations(cadata=cacert_der)
909 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
910
911 # combined
912 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
913 combined = b"".join((cacert_der, neuronio_der))
914 ctx.load_verify_locations(cadata=combined)
915 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
916
917 # error cases
918 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
919 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
920
921 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
922 ctx.load_verify_locations(cadata=u"broken")
923 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
924 ctx.load_verify_locations(cadata=b"broken")
925
926
927 def test_load_dh_params(self):
928 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
929 ctx.load_dh_params(DHFILE)
930 if os.name != 'nt':
931 ctx.load_dh_params(BYTES_DHFILE)
932 self.assertRaises(TypeError, ctx.load_dh_params)
933 self.assertRaises(TypeError, ctx.load_dh_params, None)
934 with self.assertRaises(IOError) as cm:
935 ctx.load_dh_params(WRONGCERT)
936 self.assertEqual(cm.exception.errno, errno.ENOENT)
937 with self.assertRaises(ssl.SSLError) as cm:
938 ctx.load_dh_params(CERTFILE)
939
940 @skip_if_broken_ubuntu_ssl
941 def test_session_stats(self):
942 for proto in PROTOCOLS:
943 ctx = ssl.SSLContext(proto)
944 self.assertEqual(ctx.session_stats(), {
945 'number': 0,
946 'connect': 0,
947 'connect_good': 0,
948 'connect_renegotiate': 0,
949 'accept': 0,
950 'accept_good': 0,
951 'accept_renegotiate': 0,
952 'hits': 0,
953 'misses': 0,
954 'timeouts': 0,
955 'cache_full': 0,
956 })
957
958 def test_set_default_verify_paths(self):
959 # There's not much we can do to test that it acts as expected,
960 # so just check it doesn't crash or raise an exception.
961 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
962 ctx.set_default_verify_paths()
963
964 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
965 def test_set_ecdh_curve(self):
966 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
967 ctx.set_ecdh_curve("prime256v1")
968 ctx.set_ecdh_curve(b"prime256v1")
969 self.assertRaises(TypeError, ctx.set_ecdh_curve)
970 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
971 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
972 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
973
974 @needs_sni
975 def test_sni_callback(self):
976 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
977
978 # set_servername_callback expects a callable, or None
979 self.assertRaises(TypeError, ctx.set_servername_callback)
980 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
981 self.assertRaises(TypeError, ctx.set_servername_callback, "")
982 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
983
984 def dummycallback(sock, servername, ctx):
985 pass
986 ctx.set_servername_callback(None)
987 ctx.set_servername_callback(dummycallback)
988
989 @needs_sni
990 def test_sni_callback_refcycle(self):
991 # Reference cycles through the servername callback are detected
992 # and cleared.
993 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
994 def dummycallback(sock, servername, ctx, cycle=ctx):
995 pass
996 ctx.set_servername_callback(dummycallback)
997 wr = weakref.ref(ctx)
998 del ctx, dummycallback
999 gc.collect()
1000 self.assertIs(wr(), None)
1001
1002 def test_cert_store_stats(self):
1003 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1004 self.assertEqual(ctx.cert_store_stats(),
1005 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1006 ctx.load_cert_chain(CERTFILE)
1007 self.assertEqual(ctx.cert_store_stats(),
1008 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1009 ctx.load_verify_locations(CERTFILE)
1010 self.assertEqual(ctx.cert_store_stats(),
1011 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1012 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1013 self.assertEqual(ctx.cert_store_stats(),
1014 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1015
1016 def test_get_ca_certs(self):
1017 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1018 self.assertEqual(ctx.get_ca_certs(), [])
1019 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1020 ctx.load_verify_locations(CERTFILE)
1021 self.assertEqual(ctx.get_ca_certs(), [])
1022 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1023 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1024 self.assertEqual(ctx.get_ca_certs(),
1025 [{'issuer': ((('organizationName', 'Root CA'),),
1026 (('organizationalUnitName', 'http://www.cacert.org'),),
1027 (('commonName', 'CA Cert Signing Authority'),),
1028 (('emailAddress', 'support@cacert.org'),)),
1029 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1030 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1031 'serialNumber': '00',
1032 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1033 'subject': ((('organizationName', 'Root CA'),),
1034 (('organizationalUnitName', 'http://www.cacert.org'),),
1035 (('commonName', 'CA Cert Signing Authority'),),
1036 (('emailAddress', 'support@cacert.org'),)),
1037 'version': 3}])
1038
1039 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1040 pem = f.read()
1041 der = ssl.PEM_cert_to_DER_cert(pem)
1042 self.assertEqual(ctx.get_ca_certs(True), [der])
1043
1044 def test_load_default_certs(self):
1045 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1046 ctx.load_default_certs()
1047
1048 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1049 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1050 ctx.load_default_certs()
1051
1052 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1053 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1054
1055 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1056 self.assertRaises(TypeError, ctx.load_default_certs, None)
1057 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1058
1059 def test_create_default_context(self):
1060 ctx = ssl.create_default_context()
1061 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1062 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1063 self.assertTrue(ctx.check_hostname)
1064 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1065 self.assertEqual(
1066 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1067 getattr(ssl, "OP_NO_COMPRESSION", 0),
1068 )
1069
1070 with open(SIGNING_CA) as f:
1071 cadata = f.read().decode("ascii")
1072 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1073 cadata=cadata)
1074 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1075 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1076 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1077 self.assertEqual(
1078 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1079 getattr(ssl, "OP_NO_COMPRESSION", 0),
1080 )
1081
1082 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1083 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1084 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1085 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1086 self.assertEqual(
1087 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1088 getattr(ssl, "OP_NO_COMPRESSION", 0),
1089 )
1090 self.assertEqual(
1091 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1092 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1093 )
1094 self.assertEqual(
1095 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1096 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1097 )
1098
1099 def test__create_stdlib_context(self):
1100 ctx = ssl._create_stdlib_context()
1101 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1102 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1103 self.assertFalse(ctx.check_hostname)
1104 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1105
1106 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1107 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1108 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1109 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1110
1111 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1112 cert_reqs=ssl.CERT_REQUIRED,
1113 check_hostname=True)
1114 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1115 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1116 self.assertTrue(ctx.check_hostname)
1117 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1118
1119 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1120 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1121 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1122 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1123
1124 def test_check_hostname(self):
1125 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1126 self.assertFalse(ctx.check_hostname)
1127
1128 # Requires CERT_REQUIRED or CERT_OPTIONAL
1129 with self.assertRaises(ValueError):
1130 ctx.check_hostname = True
1131 ctx.verify_mode = ssl.CERT_REQUIRED
1132 self.assertFalse(ctx.check_hostname)
1133 ctx.check_hostname = True
1134 self.assertTrue(ctx.check_hostname)
1135
1136 ctx.verify_mode = ssl.CERT_OPTIONAL
1137 ctx.check_hostname = True
1138 self.assertTrue(ctx.check_hostname)
1139
1140 # Cannot set CERT_NONE with check_hostname enabled
1141 with self.assertRaises(ValueError):
1142 ctx.verify_mode = ssl.CERT_NONE
1143 ctx.check_hostname = False
1144 self.assertFalse(ctx.check_hostname)
1145
1146
1147class SSLErrorTests(unittest.TestCase):
1148
1149 def test_str(self):
1150 # The str() of a SSLError doesn't include the errno
1151 e = ssl.SSLError(1, "foo")
1152 self.assertEqual(str(e), "foo")
1153 self.assertEqual(e.errno, 1)
1154 # Same for a subclass
1155 e = ssl.SSLZeroReturnError(1, "foo")
1156 self.assertEqual(str(e), "foo")
1157 self.assertEqual(e.errno, 1)
1158
1159 def test_lib_reason(self):
1160 # Test the library and reason attributes
1161 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1162 with self.assertRaises(ssl.SSLError) as cm:
1163 ctx.load_dh_params(CERTFILE)
1164 self.assertEqual(cm.exception.library, 'PEM')
1165 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1166 s = str(cm.exception)
1167 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1168
1169 def test_subclass(self):
1170 # Check that the appropriate SSLError subclass is raised
1171 # (this only tests one of them)
1172 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1173 with closing(socket.socket()) as s:
1174 s.bind(("127.0.0.1", 0))
1175 s.listen(5)
1176 c = socket.socket()
1177 c.connect(s.getsockname())
1178 c.setblocking(False)
1179 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1180 with self.assertRaises(ssl.SSLWantReadError) as cm:
1181 c.do_handshake()
1182 s = str(cm.exception)
1183 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1184 # For compatibility
1185 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001186
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001187
Bill Janssen934b16d2008-06-28 22:19:33 +00001188class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001189
Antoine Pitrou3945c862010-04-28 21:11:01 +00001190 def test_connect(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001191 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001192 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1193 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001194 try:
1195 s.connect(("svn.python.org", 443))
1196 self.assertEqual({}, s.getpeercert())
1197 finally:
1198 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001199
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001200 # this should fail because we have no verification certs
1201 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1202 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001203 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1204 s.connect, ("svn.python.org", 443))
1205 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001206
1207 # this should succeed because we specify the root cert
1208 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1209 cert_reqs=ssl.CERT_REQUIRED,
1210 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1211 try:
1212 s.connect(("svn.python.org", 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001213 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001214 finally:
1215 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001216
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001217 def test_connect_ex(self):
1218 # Issue #11326: check connect_ex() implementation
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001219 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001220 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1221 cert_reqs=ssl.CERT_REQUIRED,
1222 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1223 try:
1224 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1225 self.assertTrue(s.getpeercert())
1226 finally:
1227 s.close()
1228
1229 def test_non_blocking_connect_ex(self):
1230 # Issue #11326: non-blocking connect_ex() should allow handshake
1231 # to proceed after the socket gets ready.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001232 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001233 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1234 cert_reqs=ssl.CERT_REQUIRED,
1235 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1236 do_handshake_on_connect=False)
1237 try:
1238 s.setblocking(False)
1239 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001240 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1241 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001242 # Wait for connect to finish
1243 select.select([], [s], [], 5.0)
1244 # Non-blocking handshake
1245 while True:
1246 try:
1247 s.do_handshake()
1248 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001249 except ssl.SSLWantReadError:
1250 select.select([s], [], [], 5.0)
1251 except ssl.SSLWantWriteError:
1252 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001253 # SSL established
1254 self.assertTrue(s.getpeercert())
1255 finally:
1256 s.close()
1257
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001258 def test_timeout_connect_ex(self):
1259 # Issue #12065: on a timeout, connect_ex() should return the original
1260 # errno (mimicking the behaviour of non-SSL sockets).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001261 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001262 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1263 cert_reqs=ssl.CERT_REQUIRED,
1264 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1265 do_handshake_on_connect=False)
1266 try:
1267 s.settimeout(0.0000001)
1268 rc = s.connect_ex(('svn.python.org', 443))
1269 if rc == 0:
1270 self.skipTest("svn.python.org responded too quickly")
1271 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1272 finally:
1273 s.close()
1274
1275 def test_connect_ex_error(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001276 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001277 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1278 cert_reqs=ssl.CERT_REQUIRED,
1279 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1280 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001281 rc = s.connect_ex(("svn.python.org", 444))
1282 # Issue #19919: Windows machines or VMs hosted on Windows
1283 # machines sometimes return EWOULDBLOCK.
1284 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001285 finally:
1286 s.close()
1287
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001288 def test_connect_with_context(self):
1289 with support.transient_internet("svn.python.org"):
1290 # Same as test_connect, but with a separately created context
1291 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1292 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1293 s.connect(("svn.python.org", 443))
1294 try:
1295 self.assertEqual({}, s.getpeercert())
1296 finally:
1297 s.close()
1298 # Same with a server hostname
1299 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1300 server_hostname="svn.python.org")
1301 if ssl.HAS_SNI:
1302 s.connect(("svn.python.org", 443))
1303 s.close()
1304 else:
1305 self.assertRaises(ValueError, s.connect, ("svn.python.org", 443))
1306 # This should fail because we have no verification certs
1307 ctx.verify_mode = ssl.CERT_REQUIRED
1308 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1309 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1310 s.connect, ("svn.python.org", 443))
1311 s.close()
1312 # This should succeed because we specify the root cert
1313 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1314 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1315 s.connect(("svn.python.org", 443))
1316 try:
1317 cert = s.getpeercert()
1318 self.assertTrue(cert)
1319 finally:
1320 s.close()
1321
1322 def test_connect_capath(self):
1323 # Verify server certificates using the `capath` argument
1324 # NOTE: the subject hashing algorithm has been changed between
1325 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1326 # contain both versions of each certificate (same content, different
1327 # filename) for this test to be portable across OpenSSL releases.
1328 with support.transient_internet("svn.python.org"):
1329 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1330 ctx.verify_mode = ssl.CERT_REQUIRED
1331 ctx.load_verify_locations(capath=CAPATH)
1332 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1333 s.connect(("svn.python.org", 443))
1334 try:
1335 cert = s.getpeercert()
1336 self.assertTrue(cert)
1337 finally:
1338 s.close()
1339 # Same with a bytes `capath` argument
1340 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1341 ctx.verify_mode = ssl.CERT_REQUIRED
1342 ctx.load_verify_locations(capath=BYTES_CAPATH)
1343 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1344 s.connect(("svn.python.org", 443))
1345 try:
1346 cert = s.getpeercert()
1347 self.assertTrue(cert)
1348 finally:
1349 s.close()
1350
1351 def test_connect_cadata(self):
1352 with open(CAFILE_CACERT) as f:
1353 pem = f.read().decode('ascii')
1354 der = ssl.PEM_cert_to_DER_cert(pem)
1355 with support.transient_internet("svn.python.org"):
1356 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1357 ctx.verify_mode = ssl.CERT_REQUIRED
1358 ctx.load_verify_locations(cadata=pem)
1359 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1360 s.connect(("svn.python.org", 443))
1361 cert = s.getpeercert()
1362 self.assertTrue(cert)
1363
1364 # same with DER
1365 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1366 ctx.verify_mode = ssl.CERT_REQUIRED
1367 ctx.load_verify_locations(cadata=der)
1368 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1369 s.connect(("svn.python.org", 443))
1370 cert = s.getpeercert()
1371 self.assertTrue(cert)
1372
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001373 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1374 def test_makefile_close(self):
1375 # Issue #5238: creating a file-like object with makefile() shouldn't
1376 # delay closing the underlying "real socket" (here tested with its
1377 # file descriptor, hence skipping the test under Windows).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001378 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001379 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1380 ss.connect(("svn.python.org", 443))
1381 fd = ss.fileno()
1382 f = ss.makefile()
1383 f.close()
1384 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001385 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001386 # Closing the SSL socket should close the fd too
1387 ss.close()
1388 gc.collect()
1389 with self.assertRaises(OSError) as e:
1390 os.read(fd, 0)
1391 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001392
Antoine Pitrou3945c862010-04-28 21:11:01 +00001393 def test_non_blocking_handshake(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001394 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001395 s = socket.socket(socket.AF_INET)
1396 s.connect(("svn.python.org", 443))
1397 s.setblocking(False)
1398 s = ssl.wrap_socket(s,
1399 cert_reqs=ssl.CERT_NONE,
1400 do_handshake_on_connect=False)
1401 count = 0
1402 while True:
1403 try:
1404 count += 1
1405 s.do_handshake()
1406 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001407 except ssl.SSLWantReadError:
1408 select.select([s], [], [])
1409 except ssl.SSLWantWriteError:
1410 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001411 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001412 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001413 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001414
Antoine Pitrou3945c862010-04-28 21:11:01 +00001415 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001416 def _test_get_server_certificate(host, port, cert=None):
1417 with support.transient_internet(host):
1418 pem = ssl.get_server_certificate((host, port))
1419 if not pem:
1420 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001421
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001422 try:
1423 pem = ssl.get_server_certificate((host, port),
1424 ca_certs=CERTFILE)
1425 except ssl.SSLError as x:
1426 #should fail
1427 if support.verbose:
1428 sys.stdout.write("%s\n" % x)
1429 else:
1430 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001431
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001432 pem = ssl.get_server_certificate((host, port),
1433 ca_certs=cert)
1434 if not pem:
1435 self.fail("No server certificate on %s:%s!" % (host, port))
1436 if support.verbose:
1437 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1438
1439 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1440 if support.IPV6_ENABLED:
1441 _test_get_server_certificate('ipv6.google.com', 443)
1442
1443 def test_ciphers(self):
1444 remote = ("svn.python.org", 443)
1445 with support.transient_internet(remote[0]):
1446 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1447 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1448 s.connect(remote)
1449 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1450 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1451 s.connect(remote)
1452 # Error checking can happen at instantiation or when connecting
1453 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1454 with closing(socket.socket(socket.AF_INET)) as sock:
1455 s = ssl.wrap_socket(sock,
1456 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1457 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001458
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001459 def test_algorithms(self):
1460 # Issue #8484: all algorithms should be available when verifying a
1461 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001462 # SHA256 was added in OpenSSL 0.9.8
1463 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1464 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001465 # sha256.tbs-internet.com needs SNI to use the correct certificate
1466 if not ssl.HAS_SNI:
1467 self.skipTest("SNI needed for this test")
1468 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001469 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001470 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001471 with support.transient_internet("sha256.tbs-internet.com"):
1472 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1473 ctx.verify_mode = ssl.CERT_REQUIRED
1474 ctx.load_verify_locations(sha256_cert)
1475 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1476 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001477 try:
1478 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001479 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001480 sys.stdout.write("\nCipher with %r is %r\n" %
1481 (remote, s.cipher()))
1482 sys.stdout.write("Certificate is:\n%s\n" %
1483 pprint.pformat(s.getpeercert()))
1484 finally:
1485 s.close()
1486
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001487 def test_get_ca_certs_capath(self):
1488 # capath certs are loaded on request
1489 with support.transient_internet("svn.python.org"):
1490 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1491 ctx.verify_mode = ssl.CERT_REQUIRED
1492 ctx.load_verify_locations(capath=CAPATH)
1493 self.assertEqual(ctx.get_ca_certs(), [])
1494 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1495 s.connect(("svn.python.org", 443))
1496 try:
1497 cert = s.getpeercert()
1498 self.assertTrue(cert)
1499 finally:
1500 s.close()
1501 self.assertEqual(len(ctx.get_ca_certs()), 1)
1502
1503 @needs_sni
1504 def test_context_setget(self):
1505 # Check that the context of a connected socket can be replaced.
1506 with support.transient_internet("svn.python.org"):
1507 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1508 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1509 s = socket.socket(socket.AF_INET)
1510 with closing(ctx1.wrap_socket(s)) as ss:
1511 ss.connect(("svn.python.org", 443))
1512 self.assertIs(ss.context, ctx1)
1513 self.assertIs(ss._sslobj.context, ctx1)
1514 ss.context = ctx2
1515 self.assertIs(ss.context, ctx2)
1516 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001517
Bill Janssen98d19da2007-09-10 21:51:02 +00001518try:
1519 import threading
1520except ImportError:
1521 _have_threads = False
1522else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001523 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001524
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001525 from test.ssl_servers import make_https_server
1526
Bill Janssen98d19da2007-09-10 21:51:02 +00001527 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001528
Bill Janssen98d19da2007-09-10 21:51:02 +00001529 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001530
Bill Janssen98d19da2007-09-10 21:51:02 +00001531 """A mildly complicated class, because we want it to work both
1532 with and without the SSL wrapper around the socket connection, so
1533 that we can test the STARTTLS functionality."""
1534
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001535 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001536 self.server = server
1537 self.running = False
1538 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001539 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001540 self.sock.setblocking(1)
1541 self.sslconn = None
1542 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001543 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001544
Antoine Pitrou3945c862010-04-28 21:11:01 +00001545 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001546 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001547 self.sslconn = self.server.context.wrap_socket(
1548 self.sock, server_side=True)
1549 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
1550 except socket.error as e:
1551 # We treat ConnectionResetError as though it were an
1552 # SSLError - OpenSSL on Ubuntu abruptly closes the
1553 # connection when asked to use an unsupported protocol.
1554 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001555 # XXX Various errors can have happened here, for example
1556 # a mismatching protocol version, an invalid certificate,
1557 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001558 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1559 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001560 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001561 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001562 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001563 self.running = False
1564 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001565 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001566 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001567 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001568 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1569 cert = self.sslconn.getpeercert()
1570 if support.verbose and self.server.chatty:
1571 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1572 cert_binary = self.sslconn.getpeercert(True)
1573 if support.verbose and self.server.chatty:
1574 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1575 cipher = self.sslconn.cipher()
1576 if support.verbose and self.server.chatty:
1577 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1578 sys.stdout.write(" server: selected protocol is now "
1579 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001580 return True
1581
1582 def read(self):
1583 if self.sslconn:
1584 return self.sslconn.read()
1585 else:
1586 return self.sock.recv(1024)
1587
1588 def write(self, bytes):
1589 if self.sslconn:
1590 return self.sslconn.write(bytes)
1591 else:
1592 return self.sock.send(bytes)
1593
1594 def close(self):
1595 if self.sslconn:
1596 self.sslconn.close()
1597 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001598 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001599
Antoine Pitrou3945c862010-04-28 21:11:01 +00001600 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001601 self.running = True
1602 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001603 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001604 return
1605 while self.running:
1606 try:
1607 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001608 stripped = msg.strip()
1609 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001610 # eof, so quit this handler
1611 self.running = False
1612 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001613 elif stripped == b'over':
1614 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001615 sys.stdout.write(" server: client closed connection\n")
1616 self.close()
1617 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001618 elif (self.server.starttls_server and
1619 stripped == b'STARTTLS'):
1620 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001621 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001622 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001623 if not self.wrap_conn():
1624 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001625 elif (self.server.starttls_server and self.sslconn
1626 and stripped == b'ENDTLS'):
1627 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001628 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001629 self.write(b"OK\n")
1630 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001631 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001632 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001633 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001634 elif stripped == b'CB tls-unique':
1635 if support.verbose and self.server.connectionchatty:
1636 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1637 data = self.sslconn.get_channel_binding("tls-unique")
1638 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001639 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001640 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001641 self.server.connectionchatty):
1642 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001643 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1644 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001645 self.write(msg.lower())
1646 except ssl.SSLError:
1647 if self.server.chatty:
1648 handle_error("Test server failure:\n")
1649 self.close()
1650 self.running = False
1651 # normally, we'd just stop here, but for the test
1652 # harness, we want to stop the server
1653 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001654
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001655 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001656 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001657 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001658 npn_protocols=None, ciphers=None, context=None):
1659 if context:
1660 self.context = context
1661 else:
1662 self.context = ssl.SSLContext(ssl_version
1663 if ssl_version is not None
1664 else ssl.PROTOCOL_TLSv1)
1665 self.context.verify_mode = (certreqs if certreqs is not None
1666 else ssl.CERT_NONE)
1667 if cacerts:
1668 self.context.load_verify_locations(cacerts)
1669 if certificate:
1670 self.context.load_cert_chain(certificate)
1671 if npn_protocols:
1672 self.context.set_npn_protocols(npn_protocols)
1673 if ciphers:
1674 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001675 self.chatty = chatty
1676 self.connectionchatty = connectionchatty
1677 self.starttls_server = starttls_server
1678 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001679 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001680 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001681 self.active = False
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001682 self.selected_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001683 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001684 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001685 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001686
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001687 def __enter__(self):
1688 self.start(threading.Event())
1689 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001690 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001691
1692 def __exit__(self, *args):
1693 self.stop()
1694 self.join()
1695
Antoine Pitrou3945c862010-04-28 21:11:01 +00001696 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001697 self.flag = flag
1698 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001699
Antoine Pitrou3945c862010-04-28 21:11:01 +00001700 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001701 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001702 self.sock.listen(5)
1703 self.active = True
1704 if self.flag:
1705 # signal an event
1706 self.flag.set()
1707 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001708 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001709 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001710 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001711 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001712 + repr(connaddr) + '\n')
1713 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001714 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001715 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001716 except socket.timeout:
1717 pass
1718 except KeyboardInterrupt:
1719 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001720 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001721
Antoine Pitrou3945c862010-04-28 21:11:01 +00001722 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001723 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001724
Bill Janssen934b16d2008-06-28 22:19:33 +00001725 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001726
Antoine Pitrou3945c862010-04-28 21:11:01 +00001727 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001728
Antoine Pitrou3945c862010-04-28 21:11:01 +00001729 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001730
1731 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001732 self.socket = ssl.wrap_socket(conn, server_side=True,
1733 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001734 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001735 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001736 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001737 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001738
1739 def readable(self):
1740 if isinstance(self.socket, ssl.SSLSocket):
1741 while self.socket.pending() > 0:
1742 self.handle_read_event()
1743 return True
1744
Antoine Pitroufc69af12010-04-24 20:04:58 +00001745 def _do_ssl_handshake(self):
1746 try:
1747 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001748 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1749 return
1750 except ssl.SSLEOFError:
1751 return self.handle_close()
1752 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001753 raise
1754 except socket.error, err:
1755 if err.args[0] == errno.ECONNABORTED:
1756 return self.handle_close()
1757 else:
1758 self._ssl_accepting = False
1759
Bill Janssen934b16d2008-06-28 22:19:33 +00001760 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001761 if self._ssl_accepting:
1762 self._do_ssl_handshake()
1763 else:
1764 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001765 if support.verbose:
1766 sys.stdout.write(" server: read %s from client\n" % repr(data))
1767 if not data:
1768 self.close()
1769 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001770 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001771
1772 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001773 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001774 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001775 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1776
1777 def handle_error(self):
1778 raise
1779
1780 def __init__(self, certfile):
1781 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001782 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1783 self.port = support.bind_port(sock, '')
1784 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001785 self.listen(5)
1786
1787 def handle_accept(self):
1788 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001789 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001790 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1791 self.ConnectionHandler(sock_obj, self.certfile)
1792
1793 def handle_error(self):
1794 raise
1795
1796 def __init__(self, certfile):
1797 self.flag = None
1798 self.active = False
1799 self.server = self.EchoServer(certfile)
1800 self.port = self.server.port
1801 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001802 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001803
1804 def __str__(self):
1805 return "<%s %s>" % (self.__class__.__name__, self.server)
1806
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001807 def __enter__(self):
1808 self.start(threading.Event())
1809 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001810 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001811
1812 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001813 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001814 sys.stdout.write(" cleanup: stopping server.\n")
1815 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001816 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001817 sys.stdout.write(" cleanup: joining server thread.\n")
1818 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001819 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001820 sys.stdout.write(" cleanup: successfully joined.\n")
1821
Antoine Pitrou3945c862010-04-28 21:11:01 +00001822 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001823 self.flag = flag
1824 threading.Thread.start(self)
1825
Antoine Pitrou3945c862010-04-28 21:11:01 +00001826 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001827 self.active = True
1828 if self.flag:
1829 self.flag.set()
1830 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001831 try:
1832 asyncore.loop(1)
1833 except:
1834 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001835
Antoine Pitrou3945c862010-04-28 21:11:01 +00001836 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001837 self.active = False
1838 self.server.close()
1839
Antoine Pitrou3945c862010-04-28 21:11:01 +00001840 def bad_cert_test(certfile):
1841 """
1842 Launch a server with CERT_REQUIRED, and check that trying to
1843 connect to it with the given client certificate fails.
1844 """
Trent Nelsone41b0062008-04-08 23:47:30 +00001845 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00001846 certreqs=ssl.CERT_REQUIRED,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001847 cacerts=CERTFILE, chatty=False,
1848 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001849 with server:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001850 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001851 with closing(socket.socket()) as sock:
1852 s = ssl.wrap_socket(sock,
1853 certfile=certfile,
1854 ssl_version=ssl.PROTOCOL_TLSv1)
1855 s.connect((HOST, server.port))
1856 except ssl.SSLError as x:
1857 if support.verbose:
1858 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1859 except OSError as x:
1860 if support.verbose:
1861 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1862 except OSError as x:
1863 if x.errno != errno.ENOENT:
1864 raise
1865 if support.verbose:
1866 sys.stdout.write("\OSError is %s\n" % str(x))
Bill Janssen98d19da2007-09-10 21:51:02 +00001867 else:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001868 raise AssertionError("Use of invalid cert should have failed!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001869
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001870 def server_params_test(client_context, server_context, indata=b"FOO\n",
1871 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001872 """
1873 Launch a server, connect a client to it and try various reads
1874 and writes.
1875 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001876 stats = {}
1877 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001878 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001879 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001880 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001881 with closing(client_context.wrap_socket(socket.socket(),
1882 server_hostname=sni_name)) as s:
1883 s.connect((HOST, server.port))
1884 for arg in [indata, bytearray(indata), memoryview(indata)]:
1885 if connectionchatty:
1886 if support.verbose:
1887 sys.stdout.write(
1888 " client: sending %r...\n" % indata)
1889 s.write(arg)
1890 outdata = s.read()
1891 if connectionchatty:
1892 if support.verbose:
1893 sys.stdout.write(" client: read %r\n" % outdata)
1894 if outdata != indata.lower():
1895 raise AssertionError(
1896 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1897 % (outdata[:20], len(outdata),
1898 indata[:20].lower(), len(indata)))
1899 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001900 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001901 if support.verbose:
1902 sys.stdout.write(" client: closing connection.\n")
1903 stats.update({
1904 'compression': s.compression(),
1905 'cipher': s.cipher(),
1906 'peercert': s.getpeercert(),
Alex Gaynore98205d2014-09-04 13:33:22 -07001907 'client_npn_protocol': s.selected_npn_protocol(),
1908 'version': s.version(),
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001909 })
1910 s.close()
1911 stats['server_npn_protocols'] = server.selected_protocols
1912 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001913
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001914 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1915 certsreqs=None, server_options=0, client_options=0):
Alex Gaynore98205d2014-09-04 13:33:22 -07001916 """
1917 Try to SSL-connect using *client_protocol* to *server_protocol*.
1918 If *expect_success* is true, assert that the connection succeeds,
1919 if it's false, assert that the connection fails.
1920 Also, if *expect_success* is a string, assert that it is the protocol
1921 version actually used by the connection.
1922 """
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00001923 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00001924 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00001925 certtype = {
1926 ssl.CERT_NONE: "CERT_NONE",
1927 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1928 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1929 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001930 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001931 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00001932 sys.stdout.write(formatstr %
1933 (ssl.get_protocol_name(client_protocol),
1934 ssl.get_protocol_name(server_protocol),
1935 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001936 client_context = ssl.SSLContext(client_protocol)
1937 client_context.options |= client_options
1938 server_context = ssl.SSLContext(server_protocol)
1939 server_context.options |= server_options
1940
1941 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1942 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1943 # starting from OpenSSL 1.0.0 (see issue #8322).
1944 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1945 client_context.set_ciphers("ALL")
1946
1947 for ctx in (client_context, server_context):
1948 ctx.verify_mode = certsreqs
1949 ctx.load_cert_chain(CERTFILE)
1950 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00001951 try:
Alex Gaynore98205d2014-09-04 13:33:22 -07001952 stats = server_params_test(client_context, server_context,
1953 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001954 # Protocol mismatch can result in either an SSLError, or a
1955 # "Connection reset by peer" error.
1956 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001957 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00001958 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00001959 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001960 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00001961 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00001962 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001963 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001964 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00001965 "Client protocol %s succeeded with server protocol %s!"
1966 % (ssl.get_protocol_name(client_protocol),
1967 ssl.get_protocol_name(server_protocol)))
Alex Gaynore98205d2014-09-04 13:33:22 -07001968 elif (expect_success is not True
1969 and expect_success != stats['version']):
1970 raise AssertionError("version mismatch: expected %r, got %r"
1971 % (expect_success, stats['version']))
Bill Janssen98d19da2007-09-10 21:51:02 +00001972
1973
Bill Janssen934b16d2008-06-28 22:19:33 +00001974 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00001975
Antoine Pitroud75efd92010-08-04 17:38:33 +00001976 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00001977 def test_echo(self):
1978 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001979 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001980 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001981 for protocol in PROTOCOLS:
1982 context = ssl.SSLContext(protocol)
1983 context.load_cert_chain(CERTFILE)
1984 server_params_test(context, context,
1985 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00001986
Antoine Pitrou3945c862010-04-28 21:11:01 +00001987 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001988 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001989 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001990 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1991 context.verify_mode = ssl.CERT_REQUIRED
1992 context.load_verify_locations(CERTFILE)
1993 context.load_cert_chain(CERTFILE)
1994 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001995 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001996 s = context.wrap_socket(socket.socket(),
1997 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001998 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001999 # getpeercert() raise ValueError while the handshake isn't
2000 # done.
2001 with self.assertRaises(ValueError):
2002 s.getpeercert()
2003 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00002004 cert = s.getpeercert()
2005 self.assertTrue(cert, "Can't get peer certificate.")
2006 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002007 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002008 sys.stdout.write(pprint.pformat(cert) + '\n')
2009 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2010 if 'subject' not in cert:
2011 self.fail("No subject field in certificate: %s." %
2012 pprint.pformat(cert))
2013 if ((('organizationName', 'Python Software Foundation'),)
2014 not in cert['subject']):
2015 self.fail(
2016 "Missing or invalid 'organizationName' field in certificate subject; "
2017 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002018 self.assertIn('notBefore', cert)
2019 self.assertIn('notAfter', cert)
2020 before = ssl.cert_time_to_seconds(cert['notBefore'])
2021 after = ssl.cert_time_to_seconds(cert['notAfter'])
2022 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002023 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002024
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002025 @unittest.skipUnless(have_verify_flags(),
2026 "verify_flags need OpenSSL > 0.9.8")
2027 def test_crl_check(self):
2028 if support.verbose:
2029 sys.stdout.write("\n")
2030
2031 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2032 server_context.load_cert_chain(SIGNED_CERTFILE)
2033
2034 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2035 context.verify_mode = ssl.CERT_REQUIRED
2036 context.load_verify_locations(SIGNING_CA)
2037 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
2038
2039 # VERIFY_DEFAULT should pass
2040 server = ThreadedEchoServer(context=server_context, chatty=True)
2041 with server:
2042 with closing(context.wrap_socket(socket.socket())) as s:
2043 s.connect((HOST, server.port))
2044 cert = s.getpeercert()
2045 self.assertTrue(cert, "Can't get peer certificate.")
2046
2047 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2048 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2049
2050 server = ThreadedEchoServer(context=server_context, chatty=True)
2051 with server:
2052 with closing(context.wrap_socket(socket.socket())) as s:
2053 with self.assertRaisesRegexp(ssl.SSLError,
2054 "certificate verify failed"):
2055 s.connect((HOST, server.port))
2056
2057 # now load a CRL file. The CRL file is signed by the CA.
2058 context.load_verify_locations(CRLFILE)
2059
2060 server = ThreadedEchoServer(context=server_context, chatty=True)
2061 with server:
2062 with closing(context.wrap_socket(socket.socket())) as s:
2063 s.connect((HOST, server.port))
2064 cert = s.getpeercert()
2065 self.assertTrue(cert, "Can't get peer certificate.")
2066
2067 @needs_sni
2068 def test_check_hostname(self):
2069 if support.verbose:
2070 sys.stdout.write("\n")
2071
2072 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2073 server_context.load_cert_chain(SIGNED_CERTFILE)
2074
2075 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2076 context.verify_mode = ssl.CERT_REQUIRED
2077 context.check_hostname = True
2078 context.load_verify_locations(SIGNING_CA)
2079
2080 # correct hostname should verify
2081 server = ThreadedEchoServer(context=server_context, chatty=True)
2082 with server:
2083 with closing(context.wrap_socket(socket.socket(),
2084 server_hostname="localhost")) as s:
2085 s.connect((HOST, server.port))
2086 cert = s.getpeercert()
2087 self.assertTrue(cert, "Can't get peer certificate.")
2088
2089 # incorrect hostname should raise an exception
2090 server = ThreadedEchoServer(context=server_context, chatty=True)
2091 with server:
2092 with closing(context.wrap_socket(socket.socket(),
2093 server_hostname="invalid")) as s:
2094 with self.assertRaisesRegexp(ssl.CertificateError,
2095 "hostname 'invalid' doesn't match u?'localhost'"):
2096 s.connect((HOST, server.port))
2097
2098 # missing server_hostname arg should cause an exception, too
2099 server = ThreadedEchoServer(context=server_context, chatty=True)
2100 with server:
2101 with closing(socket.socket()) as s:
2102 with self.assertRaisesRegexp(ValueError,
2103 "check_hostname requires server_hostname"):
2104 context.wrap_socket(s)
2105
Antoine Pitrou3945c862010-04-28 21:11:01 +00002106 def test_empty_cert(self):
2107 """Connecting with an empty cert file"""
2108 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2109 "nullcert.pem"))
2110 def test_malformed_cert(self):
2111 """Connecting with a badly formatted certificate (syntax error)"""
2112 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2113 "badcert.pem"))
2114 def test_nonexisting_cert(self):
2115 """Connecting with a non-existing cert file"""
2116 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2117 "wrongcert.pem"))
2118 def test_malformed_key(self):
2119 """Connecting with a badly formatted key (syntax error)"""
2120 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2121 "badkey.pem"))
Bill Janssen98d19da2007-09-10 21:51:02 +00002122
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002123 def test_rude_shutdown(self):
2124 """A brutal shutdown of an SSL server should raise an OSError
2125 in the client when attempting handshake.
2126 """
2127 listener_ready = threading.Event()
2128 listener_gone = threading.Event()
2129
2130 s = socket.socket()
2131 port = support.bind_port(s, HOST)
2132
2133 # `listener` runs in a thread. It sits in an accept() until
2134 # the main thread connects. Then it rudely closes the socket,
2135 # and sets Event `listener_gone` to let the main thread know
2136 # the socket is gone.
2137 def listener():
2138 s.listen(5)
2139 listener_ready.set()
2140 newsock, addr = s.accept()
2141 newsock.close()
2142 s.close()
2143 listener_gone.set()
2144
2145 def connector():
2146 listener_ready.wait()
2147 with closing(socket.socket()) as c:
2148 c.connect((HOST, port))
2149 listener_gone.wait()
2150 try:
2151 ssl_sock = ssl.wrap_socket(c)
Benjamin Petersone208b572014-08-20 14:49:08 -05002152 except socket.error:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002153 pass
2154 else:
2155 self.fail('connecting to closed SSL socket should have failed')
2156
2157 t = threading.Thread(target=listener)
2158 t.start()
2159 try:
2160 connector()
2161 finally:
2162 t.join()
2163
Antoine Pitroud75efd92010-08-04 17:38:33 +00002164 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002165 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2166 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002167 def test_protocol_sslv2(self):
2168 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002169 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002170 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002171 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2172 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2173 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002174 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002175 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2176 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002177 # SSLv23 client with specific SSL options
2178 if no_sslv2_implies_sslv3_hello():
2179 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2180 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2181 client_options=ssl.OP_NO_SSLv2)
2182 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2183 client_options=ssl.OP_NO_SSLv3)
2184 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2185 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002186
Antoine Pitroud75efd92010-08-04 17:38:33 +00002187 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002188 def test_protocol_sslv23(self):
2189 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002190 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002191 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002192 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2193 try:
2194 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2195 except socket.error as x:
2196 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2197 if support.verbose:
2198 sys.stdout.write(
2199 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2200 % str(x))
Alex Gaynore98205d2014-09-04 13:33:22 -07002201 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou3945c862010-04-28 21:11:01 +00002202 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Alex Gaynore98205d2014-09-04 13:33:22 -07002203 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Bill Janssen98d19da2007-09-10 21:51:02 +00002204
Alex Gaynore98205d2014-09-04 13:33:22 -07002205 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002206 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Alex Gaynore98205d2014-09-04 13:33:22 -07002207 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002208
Alex Gaynore98205d2014-09-04 13:33:22 -07002209 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002210 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Alex Gaynore98205d2014-09-04 13:33:22 -07002211 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002212
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002213 # Server with specific SSL options
2214 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2215 server_options=ssl.OP_NO_SSLv3)
2216 # Will choose TLSv1
2217 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2218 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2219 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2220 server_options=ssl.OP_NO_TLSv1)
2221
2222
Antoine Pitroud75efd92010-08-04 17:38:33 +00002223 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002224 def test_protocol_sslv3(self):
2225 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002226 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002227 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002228 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2229 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2230 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002231 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2232 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002233 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2234 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002235 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002236 if no_sslv2_implies_sslv3_hello():
2237 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Alex Gaynore98205d2014-09-04 13:33:22 -07002238 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002239 client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002240
Antoine Pitroud75efd92010-08-04 17:38:33 +00002241 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002242 def test_protocol_tlsv1(self):
2243 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002244 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002245 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002246 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2247 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2248 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002249 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2250 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002251 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002252 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2253 client_options=ssl.OP_NO_TLSv1)
2254
2255 @skip_if_broken_ubuntu_ssl
2256 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2257 "TLS version 1.1 not supported.")
2258 def test_protocol_tlsv1_1(self):
2259 """Connecting to a TLSv1.1 server with various client options.
2260 Testing against older TLS versions."""
2261 if support.verbose:
2262 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002263 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002264 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2265 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2266 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2267 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2268 client_options=ssl.OP_NO_TLSv1_1)
2269
Alex Gaynore98205d2014-09-04 13:33:22 -07002270 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002271 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2272 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2273
2274
2275 @skip_if_broken_ubuntu_ssl
2276 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2277 "TLS version 1.2 not supported.")
2278 def test_protocol_tlsv1_2(self):
2279 """Connecting to a TLSv1.2 server with various client options.
2280 Testing against older TLS versions."""
2281 if support.verbose:
2282 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002283 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002284 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2285 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2286 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2287 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2288 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2289 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2290 client_options=ssl.OP_NO_TLSv1_2)
2291
Alex Gaynore98205d2014-09-04 13:33:22 -07002292 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002293 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2294 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2295 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2296 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002297
Antoine Pitrou3945c862010-04-28 21:11:01 +00002298 def test_starttls(self):
2299 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002300 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002301
Trent Nelsone41b0062008-04-08 23:47:30 +00002302 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002303 ssl_version=ssl.PROTOCOL_TLSv1,
2304 starttls_server=True,
2305 chatty=True,
2306 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002307 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002308 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002309 s = socket.socket()
2310 s.setblocking(1)
2311 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002312 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002313 sys.stdout.write("\n")
2314 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002315 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002316 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002317 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002318 if wrapped:
2319 conn.write(indata)
2320 outdata = conn.read()
2321 else:
2322 s.send(indata)
2323 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002324 msg = outdata.strip().lower()
2325 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002326 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002327 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002328 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002329 " client: read %r from server, starting TLS...\n"
2330 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002331 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2332 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002333 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002334 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002335 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002336 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002337 " client: read %r from server, ending TLS...\n"
2338 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002339 s = conn.unwrap()
2340 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002341 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002342 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002343 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002344 " client: read %r from server\n" % msg)
2345 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002346 sys.stdout.write(" client: closing connection.\n")
2347 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002348 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002349 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002350 s.send(b"over\n")
2351 if wrapped:
2352 conn.close()
2353 else:
2354 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002355
Antoine Pitrou3945c862010-04-28 21:11:01 +00002356 def test_socketserver(self):
2357 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002358 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002359 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002360 if support.verbose:
2361 sys.stdout.write('\n')
2362 with open(CERTFILE, 'rb') as f:
2363 d1 = f.read()
2364 d2 = ''
2365 # now fetch the same data from the HTTPS server
2366 url = 'https://%s:%d/%s' % (
2367 HOST, server.port, os.path.split(CERTFILE)[1])
Victor Stinnera3acea32014-09-05 21:05:05 +02002368 with support.check_py3k_warnings():
2369 f = urllib.urlopen(url)
Bill Janssen296a59d2007-09-16 22:06:00 +00002370 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002371 dlen = f.info().getheader("content-length")
2372 if dlen and (int(dlen) > 0):
2373 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002374 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002375 sys.stdout.write(
2376 " client: read %d bytes from remote server '%s'\n"
2377 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002378 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002379 f.close()
2380 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002381
Antoine Pitrou3945c862010-04-28 21:11:01 +00002382 def test_asyncore_server(self):
2383 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002384 indata = "TEST MESSAGE of mixed case\n"
2385
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002386 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002387 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002388
2389 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002390 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002391 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002392 s = ssl.wrap_socket(socket.socket())
2393 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002394 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002395 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002396 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002397 s.write(indata)
2398 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002399 if support.verbose:
2400 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002401 if outdata != indata.lower():
2402 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002403 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2404 % (outdata[:20], len(outdata),
2405 indata[:20].lower(), len(indata)))
2406 s.write(b"over\n")
2407 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002408 sys.stdout.write(" client: closing connection.\n")
2409 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002410 if support.verbose:
2411 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002412
Antoine Pitrou3945c862010-04-28 21:11:01 +00002413 def test_recv_send(self):
2414 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002415 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002416 sys.stdout.write("\n")
2417
2418 server = ThreadedEchoServer(CERTFILE,
2419 certreqs=ssl.CERT_NONE,
2420 ssl_version=ssl.PROTOCOL_TLSv1,
2421 cacerts=CERTFILE,
2422 chatty=True,
2423 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002424 with server:
2425 s = ssl.wrap_socket(socket.socket(),
2426 server_side=False,
2427 certfile=CERTFILE,
2428 ca_certs=CERTFILE,
2429 cert_reqs=ssl.CERT_NONE,
2430 ssl_version=ssl.PROTOCOL_TLSv1)
2431 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002432 # helper methods for standardising recv* method signatures
2433 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002434 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002435 count = s.recv_into(b)
2436 return b[:count]
2437
2438 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002439 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002440 count, addr = s.recvfrom_into(b)
2441 return b[:count]
2442
2443 # (name, method, whether to expect success, *args)
2444 send_methods = [
2445 ('send', s.send, True, []),
2446 ('sendto', s.sendto, False, ["some.address"]),
2447 ('sendall', s.sendall, True, []),
2448 ]
2449 recv_methods = [
2450 ('recv', s.recv, True, []),
2451 ('recvfrom', s.recvfrom, False, ["some.address"]),
2452 ('recv_into', _recv_into, True, []),
2453 ('recvfrom_into', _recvfrom_into, False, []),
2454 ]
2455 data_prefix = u"PREFIX_"
2456
2457 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002458 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002459 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002460 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002461 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002462 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002463 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002464 "While sending with <<{name:s}>> bad data "
2465 "<<{outdata:r}>> ({nout:d}) received; "
2466 "expected <<{indata:r}>> ({nin:d})\n".format(
2467 name=meth_name, outdata=outdata[:20],
2468 nout=len(outdata),
2469 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002470 )
2471 )
2472 except ValueError as e:
2473 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002474 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002475 "Failed to send with method <<{name:s}>>; "
2476 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002477 )
2478 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002479 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002480 "Method <<{name:s}>> failed with unexpected "
2481 "exception message: {exp:s}\n".format(
2482 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002483 )
2484 )
2485
2486 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002487 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002488 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002489 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002490 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002491 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002492 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002493 "While receiving with <<{name:s}>> bad data "
2494 "<<{outdata:r}>> ({nout:d}) received; "
2495 "expected <<{indata:r}>> ({nin:d})\n".format(
2496 name=meth_name, outdata=outdata[:20],
2497 nout=len(outdata),
2498 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002499 )
2500 )
2501 except ValueError as e:
2502 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002503 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002504 "Failed to receive with method <<{name:s}>>; "
2505 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002506 )
2507 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002508 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002509 "Method <<{name:s}>> failed with unexpected "
2510 "exception message: {exp:s}\n".format(
2511 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002512 )
2513 )
2514 # consume data
2515 s.read()
2516
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002517 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002518 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002519
Antoine Pitroufc69af12010-04-24 20:04:58 +00002520 def test_handshake_timeout(self):
2521 # Issue #5103: SSL handshake must respect the socket timeout
2522 server = socket.socket(socket.AF_INET)
2523 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002524 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002525 started = threading.Event()
2526 finish = False
2527
2528 def serve():
2529 server.listen(5)
2530 started.set()
2531 conns = []
2532 while not finish:
2533 r, w, e = select.select([server], [], [], 0.1)
2534 if server in r:
2535 # Let the socket hang around rather than having
2536 # it closed by garbage collection.
2537 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002538 for sock in conns:
2539 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002540
2541 t = threading.Thread(target=serve)
2542 t.start()
2543 started.wait()
2544
2545 try:
2546 try:
2547 c = socket.socket(socket.AF_INET)
2548 c.settimeout(0.2)
2549 c.connect((host, port))
2550 # Will attempt handshake and time out
2551 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2552 ssl.wrap_socket, c)
2553 finally:
2554 c.close()
2555 try:
2556 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002557 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002558 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002559 # Will attempt handshake and time out
2560 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2561 c.connect, (host, port))
2562 finally:
2563 c.close()
2564 finally:
2565 finish = True
2566 t.join()
2567 server.close()
2568
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002569 def test_server_accept(self):
2570 # Issue #16357: accept() on a SSLSocket created through
2571 # SSLContext.wrap_socket().
2572 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2573 context.verify_mode = ssl.CERT_REQUIRED
2574 context.load_verify_locations(CERTFILE)
2575 context.load_cert_chain(CERTFILE)
2576 server = socket.socket(socket.AF_INET)
2577 host = "127.0.0.1"
2578 port = support.bind_port(server)
2579 server = context.wrap_socket(server, server_side=True)
2580
2581 evt = threading.Event()
2582 remote = [None]
2583 peer = [None]
2584 def serve():
2585 server.listen(5)
2586 # Block on the accept and wait on the connection to close.
2587 evt.set()
2588 remote[0], peer[0] = server.accept()
2589 remote[0].recv(1)
2590
2591 t = threading.Thread(target=serve)
2592 t.start()
2593 # Client wait until server setup and perform a connect.
2594 evt.wait()
2595 client = context.wrap_socket(socket.socket())
2596 client.connect((host, port))
2597 client_addr = client.getsockname()
2598 client.close()
2599 t.join()
2600 remote[0].close()
2601 server.close()
2602 # Sanity checks.
2603 self.assertIsInstance(remote[0], ssl.SSLSocket)
2604 self.assertEqual(peer[0], client_addr)
2605
2606 def test_getpeercert_enotconn(self):
2607 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2608 with closing(context.wrap_socket(socket.socket())) as sock:
2609 with self.assertRaises(socket.error) as cm:
2610 sock.getpeercert()
2611 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2612
2613 def test_do_handshake_enotconn(self):
2614 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2615 with closing(context.wrap_socket(socket.socket())) as sock:
2616 with self.assertRaises(socket.error) as cm:
2617 sock.do_handshake()
2618 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2619
Antoine Pitroud76088d2012-01-03 22:46:48 +01002620 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002621 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2622 try:
2623 # Force a set of weak ciphers on our client context
2624 context.set_ciphers("DES")
2625 except ssl.SSLError:
2626 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002627 with ThreadedEchoServer(CERTFILE,
2628 ssl_version=ssl.PROTOCOL_SSLv23,
2629 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002630 with closing(context.wrap_socket(socket.socket())) as s:
2631 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002632 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002633 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2634
Alex Gaynore98205d2014-09-04 13:33:22 -07002635 def test_version_basic(self):
2636 """
2637 Basic tests for SSLSocket.version().
2638 More tests are done in the test_protocol_*() methods.
2639 """
2640 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2641 with ThreadedEchoServer(CERTFILE,
2642 ssl_version=ssl.PROTOCOL_TLSv1,
2643 chatty=False) as server:
2644 with closing(context.wrap_socket(socket.socket())) as s:
2645 self.assertIs(s.version(), None)
2646 s.connect((HOST, server.port))
2647 self.assertEqual(s.version(), "TLSv1")
2648 self.assertIs(s.version(), None)
2649
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002650 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2651 def test_default_ecdh_curve(self):
2652 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2653 # should be enabled by default on SSL contexts.
2654 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2655 context.load_cert_chain(CERTFILE)
2656 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2657 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2658 # our default cipher list should prefer ECDH-based ciphers
2659 # automatically.
2660 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2661 context.set_ciphers("ECCdraft:ECDH")
2662 with ThreadedEchoServer(context=context) as server:
2663 with closing(context.wrap_socket(socket.socket())) as s:
2664 s.connect((HOST, server.port))
2665 self.assertIn("ECDH", s.cipher()[0])
2666
2667 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2668 "'tls-unique' channel binding not available")
2669 def test_tls_unique_channel_binding(self):
2670 """Test tls-unique channel binding."""
2671 if support.verbose:
2672 sys.stdout.write("\n")
2673
2674 server = ThreadedEchoServer(CERTFILE,
2675 certreqs=ssl.CERT_NONE,
2676 ssl_version=ssl.PROTOCOL_TLSv1,
2677 cacerts=CERTFILE,
2678 chatty=True,
2679 connectionchatty=False)
2680 with server:
2681 s = ssl.wrap_socket(socket.socket(),
2682 server_side=False,
2683 certfile=CERTFILE,
2684 ca_certs=CERTFILE,
2685 cert_reqs=ssl.CERT_NONE,
2686 ssl_version=ssl.PROTOCOL_TLSv1)
2687 s.connect((HOST, server.port))
2688 # get the data
2689 cb_data = s.get_channel_binding("tls-unique")
2690 if support.verbose:
2691 sys.stdout.write(" got channel binding data: {0!r}\n"
2692 .format(cb_data))
2693
2694 # check if it is sane
2695 self.assertIsNotNone(cb_data)
2696 self.assertEqual(len(cb_data), 12) # True for TLSv1
2697
2698 # and compare with the peers version
2699 s.write(b"CB tls-unique\n")
2700 peer_data_repr = s.read().strip()
2701 self.assertEqual(peer_data_repr,
2702 repr(cb_data).encode("us-ascii"))
2703 s.close()
2704
2705 # now, again
2706 s = ssl.wrap_socket(socket.socket(),
2707 server_side=False,
2708 certfile=CERTFILE,
2709 ca_certs=CERTFILE,
2710 cert_reqs=ssl.CERT_NONE,
2711 ssl_version=ssl.PROTOCOL_TLSv1)
2712 s.connect((HOST, server.port))
2713 new_cb_data = s.get_channel_binding("tls-unique")
2714 if support.verbose:
2715 sys.stdout.write(" got another channel binding data: {0!r}\n"
2716 .format(new_cb_data))
2717 # is it really unique
2718 self.assertNotEqual(cb_data, new_cb_data)
2719 self.assertIsNotNone(cb_data)
2720 self.assertEqual(len(cb_data), 12) # True for TLSv1
2721 s.write(b"CB tls-unique\n")
2722 peer_data_repr = s.read().strip()
2723 self.assertEqual(peer_data_repr,
2724 repr(new_cb_data).encode("us-ascii"))
2725 s.close()
2726
2727 def test_compression(self):
2728 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2729 context.load_cert_chain(CERTFILE)
2730 stats = server_params_test(context, context,
2731 chatty=True, connectionchatty=True)
2732 if support.verbose:
2733 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2734 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2735
2736 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2737 "ssl.OP_NO_COMPRESSION needed for this test")
2738 def test_compression_disabled(self):
2739 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2740 context.load_cert_chain(CERTFILE)
2741 context.options |= ssl.OP_NO_COMPRESSION
2742 stats = server_params_test(context, context,
2743 chatty=True, connectionchatty=True)
2744 self.assertIs(stats['compression'], None)
2745
2746 def test_dh_params(self):
2747 # Check we can get a connection with ephemeral Diffie-Hellman
2748 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2749 context.load_cert_chain(CERTFILE)
2750 context.load_dh_params(DHFILE)
2751 context.set_ciphers("kEDH")
2752 stats = server_params_test(context, context,
2753 chatty=True, connectionchatty=True)
2754 cipher = stats["cipher"][0]
2755 parts = cipher.split("-")
2756 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2757 self.fail("Non-DH cipher: " + cipher[0])
2758
2759 def test_selected_npn_protocol(self):
2760 # selected_npn_protocol() is None unless NPN is used
2761 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2762 context.load_cert_chain(CERTFILE)
2763 stats = server_params_test(context, context,
2764 chatty=True, connectionchatty=True)
2765 self.assertIs(stats['client_npn_protocol'], None)
2766
2767 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2768 def test_npn_protocols(self):
2769 server_protocols = ['http/1.1', 'spdy/2']
2770 protocol_tests = [
2771 (['http/1.1', 'spdy/2'], 'http/1.1'),
2772 (['spdy/2', 'http/1.1'], 'http/1.1'),
2773 (['spdy/2', 'test'], 'spdy/2'),
2774 (['abc', 'def'], 'abc')
2775 ]
2776 for client_protocols, expected in protocol_tests:
2777 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2778 server_context.load_cert_chain(CERTFILE)
2779 server_context.set_npn_protocols(server_protocols)
2780 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2781 client_context.load_cert_chain(CERTFILE)
2782 client_context.set_npn_protocols(client_protocols)
2783 stats = server_params_test(client_context, server_context,
2784 chatty=True, connectionchatty=True)
2785
2786 msg = "failed trying %s (s) and %s (c).\n" \
2787 "was expecting %s, but got %%s from the %%s" \
2788 % (str(server_protocols), str(client_protocols),
2789 str(expected))
2790 client_result = stats['client_npn_protocol']
2791 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2792 server_result = stats['server_npn_protocols'][-1] \
2793 if len(stats['server_npn_protocols']) else 'nothing'
2794 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2795
2796 def sni_contexts(self):
2797 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2798 server_context.load_cert_chain(SIGNED_CERTFILE)
2799 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2800 other_context.load_cert_chain(SIGNED_CERTFILE2)
2801 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2802 client_context.verify_mode = ssl.CERT_REQUIRED
2803 client_context.load_verify_locations(SIGNING_CA)
2804 return server_context, other_context, client_context
2805
2806 def check_common_name(self, stats, name):
2807 cert = stats['peercert']
2808 self.assertIn((('commonName', name),), cert['subject'])
2809
2810 @needs_sni
2811 def test_sni_callback(self):
2812 calls = []
2813 server_context, other_context, client_context = self.sni_contexts()
2814
2815 def servername_cb(ssl_sock, server_name, initial_context):
2816 calls.append((server_name, initial_context))
2817 if server_name is not None:
2818 ssl_sock.context = other_context
2819 server_context.set_servername_callback(servername_cb)
2820
2821 stats = server_params_test(client_context, server_context,
2822 chatty=True,
2823 sni_name='supermessage')
2824 # The hostname was fetched properly, and the certificate was
2825 # changed for the connection.
2826 self.assertEqual(calls, [("supermessage", server_context)])
2827 # CERTFILE4 was selected
2828 self.check_common_name(stats, 'fakehostname')
2829
2830 calls = []
2831 # The callback is called with server_name=None
2832 stats = server_params_test(client_context, server_context,
2833 chatty=True,
2834 sni_name=None)
2835 self.assertEqual(calls, [(None, server_context)])
2836 self.check_common_name(stats, 'localhost')
2837
2838 # Check disabling the callback
2839 calls = []
2840 server_context.set_servername_callback(None)
2841
2842 stats = server_params_test(client_context, server_context,
2843 chatty=True,
2844 sni_name='notfunny')
2845 # Certificate didn't change
2846 self.check_common_name(stats, 'localhost')
2847 self.assertEqual(calls, [])
2848
2849 @needs_sni
2850 def test_sni_callback_alert(self):
2851 # Returning a TLS alert is reflected to the connecting client
2852 server_context, other_context, client_context = self.sni_contexts()
2853
2854 def cb_returning_alert(ssl_sock, server_name, initial_context):
2855 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2856 server_context.set_servername_callback(cb_returning_alert)
2857
2858 with self.assertRaises(ssl.SSLError) as cm:
2859 stats = server_params_test(client_context, server_context,
2860 chatty=False,
2861 sni_name='supermessage')
2862 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2863
2864 @needs_sni
2865 def test_sni_callback_raising(self):
2866 # Raising fails the connection with a TLS handshake failure alert.
2867 server_context, other_context, client_context = self.sni_contexts()
2868
2869 def cb_raising(ssl_sock, server_name, initial_context):
2870 1/0
2871 server_context.set_servername_callback(cb_raising)
2872
2873 with self.assertRaises(ssl.SSLError) as cm, \
2874 support.captured_stderr() as stderr:
2875 stats = server_params_test(client_context, server_context,
2876 chatty=False,
2877 sni_name='supermessage')
2878 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2879 self.assertIn("ZeroDivisionError", stderr.getvalue())
2880
2881 @needs_sni
2882 def test_sni_callback_wrong_return_type(self):
2883 # Returning the wrong return type terminates the TLS connection
2884 # with an internal error alert.
2885 server_context, other_context, client_context = self.sni_contexts()
2886
2887 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2888 return "foo"
2889 server_context.set_servername_callback(cb_wrong_return_type)
2890
2891 with self.assertRaises(ssl.SSLError) as cm, \
2892 support.captured_stderr() as stderr:
2893 stats = server_params_test(client_context, server_context,
2894 chatty=False,
2895 sni_name='supermessage')
2896 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2897 self.assertIn("TypeError", stderr.getvalue())
2898
2899 def test_read_write_after_close_raises_valuerror(self):
2900 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2901 context.verify_mode = ssl.CERT_REQUIRED
2902 context.load_verify_locations(CERTFILE)
2903 context.load_cert_chain(CERTFILE)
2904 server = ThreadedEchoServer(context=context, chatty=False)
2905
2906 with server:
2907 s = context.wrap_socket(socket.socket())
2908 s.connect((HOST, server.port))
2909 s.close()
2910
2911 self.assertRaises(ValueError, s.read, 1024)
2912 self.assertRaises(ValueError, s.write, b'hello')
2913
Bill Janssen61c001a2008-09-08 16:37:24 +00002914
Neal Norwitz9eb9b102007-08-27 01:15:33 +00002915def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002916 if support.verbose:
2917 plats = {
2918 'Linux': platform.linux_distribution,
2919 'Mac': platform.mac_ver,
2920 'Windows': platform.win32_ver,
2921 }
2922 for name, func in plats.items():
2923 plat = func()
2924 if plat and plat[0]:
2925 plat = '%s %r' % (name, plat)
2926 break
2927 else:
2928 plat = repr(platform.platform())
2929 print("test_ssl: testing with %r %r" %
2930 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
2931 print(" under %s" % plat)
2932 print(" HAS_SNI = %r" % ssl.HAS_SNI)
2933 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2934 try:
2935 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2936 except AttributeError:
2937 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00002938
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002939 for filename in [
2940 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2941 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
2942 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
2943 BADCERT, BADKEY, EMPTYCERT]:
2944 if not os.path.exists(filename):
2945 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002946
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002947 tests = [ContextTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002948
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002949 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00002950 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00002951
Bill Janssen98d19da2007-09-10 21:51:02 +00002952 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002953 thread_info = support.threading_setup()
2954 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00002955 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002956
Antoine Pitrou3945c862010-04-28 21:11:01 +00002957 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002958 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002959 finally:
2960 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002961 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002962
2963if __name__ == "__main__":
2964 test_main()