blob: 1f0e0937162f5c59b7bd02ec4d38d6a913b00975 [file] [log] [blame]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001# -*- coding: utf-8 -*-
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002# Test the support for SSL and sockets
3
4import sys
5import unittest
Benjamin Petersondaeb9252014-08-20 14:14:50 -05006from test import test_support as support
Bill Janssen934b16d2008-06-28 22:19:33 +00007import asyncore
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import socket
Bill Janssen934b16d2008-06-28 22:19:33 +00009import select
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000010import time
Benjamin Petersondaeb9252014-08-20 14:14:50 -050011import datetime
Antoine Pitroub558f172010-04-23 23:25:45 +000012import gc
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import os
Antoine Pitroub558f172010-04-23 23:25:45 +000014import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000015import pprint
Benjamin Petersondaeb9252014-08-20 14:14:50 -050016import tempfile
17import urllib
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000018import traceback
Antoine Pitroudfb299b2010-04-23 22:54:59 +000019import weakref
Antoine Pitroud75efd92010-08-04 17:38:33 +000020import platform
Benjamin Petersondaeb9252014-08-20 14:14:50 -050021import functools
22from contextlib import closing
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000023
Benjamin Petersondaeb9252014-08-20 14:14:50 -050024ssl = support.import_module("ssl")
Bill Janssen296a59d2007-09-16 22:06:00 +000025
Benjamin Petersondaeb9252014-08-20 14:14:50 -050026PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
27HOST = support.HOST
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Benjamin Petersondaeb9252014-08-20 14:14:50 -050029def data_file(*name):
30 return os.path.join(os.path.dirname(__file__), *name)
31
32# The custom key and certificate files used in test_ssl are generated
33# using Lib/test/make_ssl_certs.py.
34# Other certificates are simply fetched from the Internet servers they
35# are meant to authenticate.
36
37CERTFILE = data_file("keycert.pem")
38BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
39ONLYCERT = data_file("ssl_cert.pem")
40ONLYKEY = data_file("ssl_key.pem")
41BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
42BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
43CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
44ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
45KEY_PASSWORD = "somepass"
46CAPATH = data_file("capath")
47BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
48CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
49CAFILE_CACERT = data_file("capath", "5ed36f99.0")
50
51
52# empty CRL
53CRLFILE = data_file("revocation.crl")
54
55# Two keys and certs signed by the same CA (for SNI tests)
56SIGNED_CERTFILE = data_file("keycert3.pem")
57SIGNED_CERTFILE2 = data_file("keycert4.pem")
58SIGNING_CA = data_file("pycacert.pem")
59
60SVN_PYTHON_ORG_ROOT_CERT = data_file("https_svn_python_org_root.pem")
61
62EMPTYCERT = data_file("nullcert.pem")
63BADCERT = data_file("badcert.pem")
64WRONGCERT = data_file("XXXnonexisting.pem")
65BADKEY = data_file("badkey.pem")
66NOKIACERT = data_file("nokia.pem")
67NULLBYTECERT = data_file("nullbytecert.pem")
68
69DHFILE = data_file("dh512.pem")
70BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
71
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000072
Neal Norwitz3e533c22007-08-27 01:03:18 +000073def handle_error(prefix):
74 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersondaeb9252014-08-20 14:14:50 -050075 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000077
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000078
79class BasicTests(unittest.TestCase):
80
Antoine Pitrou3945c862010-04-28 21:11:01 +000081 def test_sslwrap_simple(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +000082 # A crude test for the legacy API
Bill Jansseneb257ac2008-09-29 18:56:38 +000083 try:
84 ssl.sslwrap_simple(socket.socket(socket.AF_INET))
85 except IOError, e:
86 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
87 pass
88 else:
89 raise
90 try:
91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
92 except IOError, e:
93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94 pass
95 else:
96 raise
Benjamin Peterson2f334562014-10-01 23:53:01 -040097
98
Benjamin Petersondaeb9252014-08-20 14:14:50 -050099def can_clear_options():
100 # 0.9.8m or higher
101 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
102
103def no_sslv2_implies_sslv3_hello():
104 # 0.9.7h or higher
105 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
106
107def have_verify_flags():
108 # 0.9.8 or higher
109 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
110
111def utc_offset(): #NOTE: ignore issues like #1647654
112 # local time = utc time + utc offset
113 if time.daylight and time.localtime().tm_isdst > 0:
114 return -time.altzone # seconds
115 return -time.timezone
116
117def asn1time(cert_time):
118 # Some versions of OpenSSL ignore seconds, see #18207
119 # 0.9.8.i
120 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
121 fmt = "%b %d %H:%M:%S %Y GMT"
122 dt = datetime.datetime.strptime(cert_time, fmt)
123 dt = dt.replace(second=0)
124 cert_time = dt.strftime(fmt)
125 # %d adds leading zero but ASN1_TIME_print() uses leading space
126 if cert_time[4] == "0":
127 cert_time = cert_time[:4] + " " + cert_time[5:]
128
129 return cert_time
Neal Norwitz3e533c22007-08-27 01:03:18 +0000130
Antoine Pitroud75efd92010-08-04 17:38:33 +0000131# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
132def skip_if_broken_ubuntu_ssl(func):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200133 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200134 @functools.wraps(func)
135 def f(*args, **kwargs):
136 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500137 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
138 except ssl.SSLError:
Victor Stinnerb1241f92011-05-10 01:52:03 +0200139 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500140 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
Victor Stinnerb1241f92011-05-10 01:52:03 +0200141 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
142 return func(*args, **kwargs)
143 return f
144 else:
145 return func
Antoine Pitroud75efd92010-08-04 17:38:33 +0000146
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500147needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
148
Antoine Pitroud75efd92010-08-04 17:38:33 +0000149
150class BasicSocketTests(unittest.TestCase):
151
Antoine Pitrou3945c862010-04-28 21:11:01 +0000152 def test_constants(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000153 ssl.CERT_NONE
154 ssl.CERT_OPTIONAL
155 ssl.CERT_REQUIRED
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500156 ssl.OP_CIPHER_SERVER_PREFERENCE
157 ssl.OP_SINGLE_DH_USE
158 if ssl.HAS_ECDH:
159 ssl.OP_SINGLE_ECDH_USE
160 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
161 ssl.OP_NO_COMPRESSION
162 self.assertIn(ssl.HAS_SNI, {True, False})
163 self.assertIn(ssl.HAS_ECDH, {True, False})
164
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000165
Antoine Pitrou3945c862010-04-28 21:11:01 +0000166 def test_random(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 v = ssl.RAND_status()
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500168 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000169 sys.stdout.write("\n RAND_status is %d (%s)\n"
170 % (v, (v and "sufficient randomness") or
171 "insufficient randomness"))
Jesus Ceaa8a5b392012-09-11 01:55:04 +0200172 self.assertRaises(TypeError, ssl.RAND_egd, 1)
173 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000174 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000175
Antoine Pitrou3945c862010-04-28 21:11:01 +0000176 def test_parse_cert(self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000177 # note that this uses an 'unofficial' function in _ssl.c,
178 # provided solely for this test, to exercise the certificate
179 # parsing code
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500180 p = ssl._ssl._test_decode_cert(CERTFILE)
181 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000182 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500183 self.assertEqual(p['issuer'],
184 ((('countryName', 'XY'),),
185 (('localityName', 'Castle Anthrax'),),
186 (('organizationName', 'Python Software Foundation'),),
187 (('commonName', 'localhost'),))
188 )
189 # Note the next three asserts will fail if the keys are regenerated
190 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT'))
191 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT'))
192 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200193 self.assertEqual(p['subject'],
Antoine Pitrou60982912013-02-16 21:39:28 +0100194 ((('countryName', 'XY'),),
195 (('localityName', 'Castle Anthrax'),),
196 (('organizationName', 'Python Software Foundation'),),
197 (('commonName', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200198 )
Antoine Pitrou60982912013-02-16 21:39:28 +0100199 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200200 # Issue #13034: the subjectAltName in some certificates
201 # (notably projects.developer.nokia.com:443) wasn't parsed
202 p = ssl._ssl._test_decode_cert(NOKIACERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500203 if support.verbose:
Antoine Pitrouf06eb462011-10-01 19:30:58 +0200204 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
205 self.assertEqual(p['subjectAltName'],
206 (('DNS', 'projects.developer.nokia.com'),
207 ('DNS', 'projects.forum.nokia.com'))
208 )
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500209 # extra OCSP and AIA fields
210 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
211 self.assertEqual(p['caIssuers'],
212 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
213 self.assertEqual(p['crlDistributionPoints'],
214 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000215
Christian Heimes88b174c2013-08-17 00:54:47 +0200216 def test_parse_cert_CVE_2013_4238(self):
217 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500218 if support.verbose:
Christian Heimes88b174c2013-08-17 00:54:47 +0200219 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
220 subject = ((('countryName', 'US'),),
221 (('stateOrProvinceName', 'Oregon'),),
222 (('localityName', 'Beaverton'),),
223 (('organizationName', 'Python Software Foundation'),),
224 (('organizationalUnitName', 'Python Core Development'),),
225 (('commonName', 'null.python.org\x00example.org'),),
226 (('emailAddress', 'python-dev@python.org'),))
227 self.assertEqual(p['subject'], subject)
228 self.assertEqual(p['issuer'], subject)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500229 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
Christian Heimesf869a942013-08-25 14:12:41 +0200230 san = (('DNS', 'altnull.python.org\x00example.com'),
231 ('email', 'null@python.org\x00user@example.org'),
232 ('URI', 'http://null.python.org\x00http://example.org'),
233 ('IP Address', '192.0.2.1'),
234 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
235 else:
236 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
237 san = (('DNS', 'altnull.python.org\x00example.com'),
238 ('email', 'null@python.org\x00user@example.org'),
239 ('URI', 'http://null.python.org\x00http://example.org'),
240 ('IP Address', '192.0.2.1'),
241 ('IP Address', '<invalid>'))
242
243 self.assertEqual(p['subjectAltName'], san)
Christian Heimes88b174c2013-08-17 00:54:47 +0200244
Antoine Pitrou3945c862010-04-28 21:11:01 +0000245 def test_DER_to_PEM(self):
246 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
247 pem = f.read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000248 d1 = ssl.PEM_cert_to_DER_cert(pem)
249 p2 = ssl.DER_cert_to_PEM_cert(d1)
250 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitroudb187842010-04-27 10:32:58 +0000251 self.assertEqual(d1, d2)
Antoine Pitrou4c7bcf12010-04-27 22:03:37 +0000252 if not p2.startswith(ssl.PEM_HEADER + '\n'):
253 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
254 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
255 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Bill Janssen296a59d2007-09-16 22:06:00 +0000256
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000257 def test_openssl_version(self):
258 n = ssl.OPENSSL_VERSION_NUMBER
259 t = ssl.OPENSSL_VERSION_INFO
260 s = ssl.OPENSSL_VERSION
261 self.assertIsInstance(n, (int, long))
262 self.assertIsInstance(t, tuple)
263 self.assertIsInstance(s, str)
264 # Some sanity checks follow
265 # >= 0.9
266 self.assertGreaterEqual(n, 0x900000)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400267 # < 3.0
268 self.assertLess(n, 0x30000000)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000269 major, minor, fix, patch, status = t
270 self.assertGreaterEqual(major, 0)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400271 self.assertLess(major, 3)
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000272 self.assertGreaterEqual(minor, 0)
273 self.assertLess(minor, 256)
274 self.assertGreaterEqual(fix, 0)
275 self.assertLess(fix, 256)
276 self.assertGreaterEqual(patch, 0)
277 self.assertLessEqual(patch, 26)
278 self.assertGreaterEqual(status, 0)
279 self.assertLessEqual(status, 15)
Antoine Pitrou4e64d872014-07-21 18:35:01 -0400280 # Version string as returned by {Open,Libre}SSL, the format might change
281 if "LibreSSL" in s:
282 self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
283 (s, t))
284 else:
285 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
286 (s, t))
Antoine Pitrouf9de5342010-04-05 21:35:07 +0000287
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500288 @support.cpython_only
Antoine Pitroudfb299b2010-04-23 22:54:59 +0000289 def test_refcycle(self):
290 # Issue #7943: an SSL object doesn't create reference cycles with
291 # itself.
292 s = socket.socket(socket.AF_INET)
293 ss = ssl.wrap_socket(s)
294 wr = weakref.ref(ss)
295 del ss
296 self.assertEqual(wr(), None)
297
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000298 def test_wrapped_unconnected(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500299 # Methods on an unconnected SSLSocket propagate the original
300 # socket.error raise by the underlying socket object.
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000301 s = socket.socket(socket.AF_INET)
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500302 with closing(ssl.wrap_socket(s)) as ss:
303 self.assertRaises(socket.error, ss.recv, 1)
304 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
305 self.assertRaises(socket.error, ss.recvfrom, 1)
306 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
307 self.assertRaises(socket.error, ss.send, b'x')
308 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
309
310 def test_timeout(self):
311 # Issue #8524: when creating an SSL socket, the timeout of the
312 # original socket should be retained.
313 for timeout in (None, 0.0, 5.0):
314 s = socket.socket(socket.AF_INET)
315 s.settimeout(timeout)
316 with closing(ssl.wrap_socket(s)) as ss:
317 self.assertEqual(timeout, ss.gettimeout())
318
319 def test_errors(self):
320 sock = socket.socket()
321 self.assertRaisesRegexp(ValueError,
322 "certfile must be specified",
323 ssl.wrap_socket, sock, keyfile=CERTFILE)
324 self.assertRaisesRegexp(ValueError,
325 "certfile must be specified for server-side operations",
326 ssl.wrap_socket, sock, server_side=True)
327 self.assertRaisesRegexp(ValueError,
328 "certfile must be specified for server-side operations",
329 ssl.wrap_socket, sock, server_side=True, certfile="")
330 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
331 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
332 s.connect, (HOST, 8080))
333 with self.assertRaises(IOError) as cm:
334 with closing(socket.socket()) as sock:
335 ssl.wrap_socket(sock, certfile=WRONGCERT)
336 self.assertEqual(cm.exception.errno, errno.ENOENT)
337 with self.assertRaises(IOError) as cm:
338 with closing(socket.socket()) as sock:
339 ssl.wrap_socket(sock, certfile=CERTFILE, keyfile=WRONGCERT)
340 self.assertEqual(cm.exception.errno, errno.ENOENT)
341 with self.assertRaises(IOError) as cm:
342 with closing(socket.socket()) as sock:
343 ssl.wrap_socket(sock, certfile=WRONGCERT, keyfile=WRONGCERT)
344 self.assertEqual(cm.exception.errno, errno.ENOENT)
345
346 def test_match_hostname(self):
347 def ok(cert, hostname):
348 ssl.match_hostname(cert, hostname)
349 def fail(cert, hostname):
350 self.assertRaises(ssl.CertificateError,
351 ssl.match_hostname, cert, hostname)
352
353 cert = {'subject': ((('commonName', 'example.com'),),)}
354 ok(cert, 'example.com')
355 ok(cert, 'ExAmple.cOm')
356 fail(cert, 'www.example.com')
357 fail(cert, '.example.com')
358 fail(cert, 'example.org')
359 fail(cert, 'exampleXcom')
360
361 cert = {'subject': ((('commonName', '*.a.com'),),)}
362 ok(cert, 'foo.a.com')
363 fail(cert, 'bar.foo.a.com')
364 fail(cert, 'a.com')
365 fail(cert, 'Xa.com')
366 fail(cert, '.a.com')
367
368 # only match one left-most wildcard
369 cert = {'subject': ((('commonName', 'f*.com'),),)}
370 ok(cert, 'foo.com')
371 ok(cert, 'f.com')
372 fail(cert, 'bar.com')
373 fail(cert, 'foo.a.com')
374 fail(cert, 'bar.foo.com')
375
376 # NULL bytes are bad, CVE-2013-4073
377 cert = {'subject': ((('commonName',
378 'null.python.org\x00example.org'),),)}
379 ok(cert, 'null.python.org\x00example.org') # or raise an error?
380 fail(cert, 'example.org')
381 fail(cert, 'null.python.org')
382
383 # error cases with wildcards
384 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
385 fail(cert, 'bar.foo.a.com')
386 fail(cert, 'a.com')
387 fail(cert, 'Xa.com')
388 fail(cert, '.a.com')
389
390 cert = {'subject': ((('commonName', 'a.*.com'),),)}
391 fail(cert, 'a.foo.com')
392 fail(cert, 'a..com')
393 fail(cert, 'a.com')
394
395 # wildcard doesn't match IDNA prefix 'xn--'
396 idna = u'püthon.python.org'.encode("idna").decode("ascii")
397 cert = {'subject': ((('commonName', idna),),)}
398 ok(cert, idna)
399 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
400 fail(cert, idna)
401 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
402 fail(cert, idna)
403
404 # wildcard in first fragment and IDNA A-labels in sequent fragments
405 # are supported.
406 idna = u'www*.pythön.org'.encode("idna").decode("ascii")
407 cert = {'subject': ((('commonName', idna),),)}
408 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
409 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
410 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
411 fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
412
413 # Slightly fake real-world example
414 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
415 'subject': ((('commonName', 'linuxfrz.org'),),),
416 'subjectAltName': (('DNS', 'linuxfr.org'),
417 ('DNS', 'linuxfr.com'),
418 ('othername', '<unsupported>'))}
419 ok(cert, 'linuxfr.org')
420 ok(cert, 'linuxfr.com')
421 # Not a "DNS" entry
422 fail(cert, '<unsupported>')
423 # When there is a subjectAltName, commonName isn't used
424 fail(cert, 'linuxfrz.org')
425
426 # A pristine real-world example
427 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
428 'subject': ((('countryName', 'US'),),
429 (('stateOrProvinceName', 'California'),),
430 (('localityName', 'Mountain View'),),
431 (('organizationName', 'Google Inc'),),
432 (('commonName', 'mail.google.com'),))}
433 ok(cert, 'mail.google.com')
434 fail(cert, 'gmail.com')
435 # Only commonName is considered
436 fail(cert, 'California')
437
438 # Neither commonName nor subjectAltName
439 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
440 'subject': ((('countryName', 'US'),),
441 (('stateOrProvinceName', 'California'),),
442 (('localityName', 'Mountain View'),),
443 (('organizationName', 'Google Inc'),))}
444 fail(cert, 'mail.google.com')
445
446 # No DNS entry in subjectAltName but a commonName
447 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
448 'subject': ((('countryName', 'US'),),
449 (('stateOrProvinceName', 'California'),),
450 (('localityName', 'Mountain View'),),
451 (('commonName', 'mail.google.com'),)),
452 'subjectAltName': (('othername', 'blabla'), )}
453 ok(cert, 'mail.google.com')
454
455 # No DNS entry subjectAltName and no commonName
456 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
457 'subject': ((('countryName', 'US'),),
458 (('stateOrProvinceName', 'California'),),
459 (('localityName', 'Mountain View'),),
460 (('organizationName', 'Google Inc'),)),
461 'subjectAltName': (('othername', 'blabla'),)}
462 fail(cert, 'google.com')
463
464 # Empty cert / no cert
465 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
466 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
467
468 # Issue #17980: avoid denials of service by refusing more than one
469 # wildcard per fragment.
470 cert = {'subject': ((('commonName', 'a*b.com'),),)}
471 ok(cert, 'axxb.com')
472 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
473 fail(cert, 'axxb.com')
474 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
475 with self.assertRaises(ssl.CertificateError) as cm:
476 ssl.match_hostname(cert, 'axxbxxc.com')
477 self.assertIn("too many wildcards", str(cm.exception))
478
479 def test_server_side(self):
480 # server_hostname doesn't work for server sockets
481 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
482 with closing(socket.socket()) as sock:
483 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
484 server_hostname="some.hostname")
485
486 def test_unknown_channel_binding(self):
487 # should raise ValueError for unknown type
488 s = socket.socket(socket.AF_INET)
489 with closing(ssl.wrap_socket(s)) as ss:
490 with self.assertRaises(ValueError):
491 ss.get_channel_binding("unknown-type")
492
493 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
494 "'tls-unique' channel binding not available")
495 def test_tls_unique_channel_binding(self):
496 # unconnected should return None for known type
497 s = socket.socket(socket.AF_INET)
498 with closing(ssl.wrap_socket(s)) as ss:
499 self.assertIsNone(ss.get_channel_binding("tls-unique"))
500 # the same for server-side
501 s = socket.socket(socket.AF_INET)
502 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
503 self.assertIsNone(ss.get_channel_binding("tls-unique"))
504
505 def test_get_default_verify_paths(self):
506 paths = ssl.get_default_verify_paths()
507 self.assertEqual(len(paths), 6)
508 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
509
510 with support.EnvironmentVarGuard() as env:
511 env["SSL_CERT_DIR"] = CAPATH
512 env["SSL_CERT_FILE"] = CERTFILE
513 paths = ssl.get_default_verify_paths()
514 self.assertEqual(paths.cafile, CERTFILE)
515 self.assertEqual(paths.capath, CAPATH)
516
517 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
518 def test_enum_certificates(self):
519 self.assertTrue(ssl.enum_certificates("CA"))
520 self.assertTrue(ssl.enum_certificates("ROOT"))
521
522 self.assertRaises(TypeError, ssl.enum_certificates)
523 self.assertRaises(WindowsError, ssl.enum_certificates, "")
524
525 trust_oids = set()
526 for storename in ("CA", "ROOT"):
527 store = ssl.enum_certificates(storename)
528 self.assertIsInstance(store, list)
529 for element in store:
530 self.assertIsInstance(element, tuple)
531 self.assertEqual(len(element), 3)
532 cert, enc, trust = element
533 self.assertIsInstance(cert, bytes)
534 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
535 self.assertIsInstance(trust, (set, bool))
536 if isinstance(trust, set):
537 trust_oids.update(trust)
538
539 serverAuth = "1.3.6.1.5.5.7.3.1"
540 self.assertIn(serverAuth, trust_oids)
541
542 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
543 def test_enum_crls(self):
544 self.assertTrue(ssl.enum_crls("CA"))
545 self.assertRaises(TypeError, ssl.enum_crls)
546 self.assertRaises(WindowsError, ssl.enum_crls, "")
547
548 crls = ssl.enum_crls("CA")
549 self.assertIsInstance(crls, list)
550 for element in crls:
551 self.assertIsInstance(element, tuple)
552 self.assertEqual(len(element), 2)
553 self.assertIsInstance(element[0], bytes)
554 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
555
556
557 def test_asn1object(self):
558 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
559 '1.3.6.1.5.5.7.3.1')
560
561 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
562 self.assertEqual(val, expected)
563 self.assertEqual(val.nid, 129)
564 self.assertEqual(val.shortname, 'serverAuth')
565 self.assertEqual(val.longname, 'TLS Web Server Authentication')
566 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
567 self.assertIsInstance(val, ssl._ASN1Object)
568 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
569
570 val = ssl._ASN1Object.fromnid(129)
571 self.assertEqual(val, expected)
572 self.assertIsInstance(val, ssl._ASN1Object)
573 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
574 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
575 ssl._ASN1Object.fromnid(100000)
576 for i in range(1000):
577 try:
578 obj = ssl._ASN1Object.fromnid(i)
579 except ValueError:
580 pass
581 else:
582 self.assertIsInstance(obj.nid, int)
583 self.assertIsInstance(obj.shortname, str)
584 self.assertIsInstance(obj.longname, str)
585 self.assertIsInstance(obj.oid, (str, type(None)))
586
587 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
588 self.assertEqual(val, expected)
589 self.assertIsInstance(val, ssl._ASN1Object)
590 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
591 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
592 expected)
593 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
594 ssl._ASN1Object.fromname('serverauth')
595
596 def test_purpose_enum(self):
597 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
598 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
599 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
600 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
601 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
602 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
603 '1.3.6.1.5.5.7.3.1')
604
605 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
606 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
607 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
608 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
609 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
610 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
611 '1.3.6.1.5.5.7.3.2')
Antoine Pitrouf7f390a2010-09-14 14:37:18 +0000612
Antoine Pitrou63cc99d2013-12-28 17:26:33 +0100613 def test_unsupported_dtls(self):
614 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
615 self.addCleanup(s.close)
616 with self.assertRaises(NotImplementedError) as cx:
617 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
618 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500619 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
620 with self.assertRaises(NotImplementedError) as cx:
621 ctx.wrap_socket(s)
622 self.assertEqual(str(cx.exception), "only stream sockets are supported")
623
624 def cert_time_ok(self, timestring, timestamp):
625 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
626
627 def cert_time_fail(self, timestring):
628 with self.assertRaises(ValueError):
629 ssl.cert_time_to_seconds(timestring)
630
631 @unittest.skipUnless(utc_offset(),
632 'local time needs to be different from UTC')
633 def test_cert_time_to_seconds_timezone(self):
634 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
635 # results if local timezone is not UTC
636 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
637 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
638
639 def test_cert_time_to_seconds(self):
640 timestring = "Jan 5 09:34:43 2018 GMT"
641 ts = 1515144883.0
642 self.cert_time_ok(timestring, ts)
643 # accept keyword parameter, assert its name
644 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
645 # accept both %e and %d (space or zero generated by strftime)
646 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
647 # case-insensitive
648 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
649 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
650 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
651 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
652 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
653 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
654 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
655 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
656
657 newyear_ts = 1230768000.0
658 # leap seconds
659 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
660 # same timestamp
661 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
662
663 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
664 # allow 60th second (even if it is not a leap second)
665 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
666 # allow 2nd leap second for compatibility with time.strptime()
667 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
668 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
669
670 # no special treatement for the special value:
671 # 99991231235959Z (rfc 5280)
672 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
673
674 @support.run_with_locale('LC_ALL', '')
675 def test_cert_time_to_seconds_locale(self):
676 # `cert_time_to_seconds()` should be locale independent
677
678 def local_february_name():
679 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
680
681 if local_february_name().lower() == 'feb':
682 self.skipTest("locale-specific month name needs to be "
683 "different from C locale")
684
685 # locale-independent
686 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
687 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
688
689
690class ContextTests(unittest.TestCase):
691
692 @skip_if_broken_ubuntu_ssl
693 def test_constructor(self):
694 for protocol in PROTOCOLS:
695 ssl.SSLContext(protocol)
696 self.assertRaises(TypeError, ssl.SSLContext)
697 self.assertRaises(ValueError, ssl.SSLContext, -1)
698 self.assertRaises(ValueError, ssl.SSLContext, 42)
699
700 @skip_if_broken_ubuntu_ssl
701 def test_protocol(self):
702 for proto in PROTOCOLS:
703 ctx = ssl.SSLContext(proto)
704 self.assertEqual(ctx.protocol, proto)
705
706 def test_ciphers(self):
707 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
708 ctx.set_ciphers("ALL")
709 ctx.set_ciphers("DEFAULT")
710 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
711 ctx.set_ciphers("^$:,;?*'dorothyx")
712
713 @skip_if_broken_ubuntu_ssl
714 def test_options(self):
715 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
716 # OP_ALL | OP_NO_SSLv2 is the default value
717 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2,
718 ctx.options)
719 ctx.options |= ssl.OP_NO_SSLv3
720 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
721 ctx.options)
722 if can_clear_options():
723 ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
724 self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
725 ctx.options)
726 ctx.options = 0
727 self.assertEqual(0, ctx.options)
728 else:
729 with self.assertRaises(ValueError):
730 ctx.options = 0
731
732 def test_verify_mode(self):
733 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
734 # Default value
735 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
736 ctx.verify_mode = ssl.CERT_OPTIONAL
737 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
738 ctx.verify_mode = ssl.CERT_REQUIRED
739 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
740 ctx.verify_mode = ssl.CERT_NONE
741 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
742 with self.assertRaises(TypeError):
743 ctx.verify_mode = None
744 with self.assertRaises(ValueError):
745 ctx.verify_mode = 42
746
747 @unittest.skipUnless(have_verify_flags(),
748 "verify_flags need OpenSSL > 0.9.8")
749 def test_verify_flags(self):
750 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
751 # default value by OpenSSL
752 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
753 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
754 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
755 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
756 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
757 ctx.verify_flags = ssl.VERIFY_DEFAULT
758 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
759 # supports any value
760 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
761 self.assertEqual(ctx.verify_flags,
762 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
763 with self.assertRaises(TypeError):
764 ctx.verify_flags = None
765
766 def test_load_cert_chain(self):
767 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
768 # Combined key and cert in a single file
769 ctx.load_cert_chain(CERTFILE)
770 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
771 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
772 with self.assertRaises(IOError) as cm:
773 ctx.load_cert_chain(WRONGCERT)
774 self.assertEqual(cm.exception.errno, errno.ENOENT)
775 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
776 ctx.load_cert_chain(BADCERT)
777 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
778 ctx.load_cert_chain(EMPTYCERT)
779 # Separate key and cert
780 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
781 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
782 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
783 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
784 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
785 ctx.load_cert_chain(ONLYCERT)
786 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
787 ctx.load_cert_chain(ONLYKEY)
788 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
789 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
790 # Mismatching key and cert
791 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
792 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
793 ctx.load_cert_chain(SVN_PYTHON_ORG_ROOT_CERT, ONLYKEY)
794 # Password protected key and cert
795 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
796 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
797 ctx.load_cert_chain(CERTFILE_PROTECTED,
798 password=bytearray(KEY_PASSWORD.encode()))
799 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
800 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
801 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
802 bytearray(KEY_PASSWORD.encode()))
803 with self.assertRaisesRegexp(TypeError, "should be a string"):
804 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
805 with self.assertRaises(ssl.SSLError):
806 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
807 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
808 # openssl has a fixed limit on the password buffer.
809 # PEM_BUFSIZE is generally set to 1kb.
810 # Return a string larger than this.
811 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
812 # Password callback
813 def getpass_unicode():
814 return KEY_PASSWORD
815 def getpass_bytes():
816 return KEY_PASSWORD.encode()
817 def getpass_bytearray():
818 return bytearray(KEY_PASSWORD.encode())
819 def getpass_badpass():
820 return "badpass"
821 def getpass_huge():
822 return b'a' * (1024 * 1024)
823 def getpass_bad_type():
824 return 9
825 def getpass_exception():
826 raise Exception('getpass error')
827 class GetPassCallable:
828 def __call__(self):
829 return KEY_PASSWORD
830 def getpass(self):
831 return KEY_PASSWORD
832 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
833 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
834 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
835 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
836 ctx.load_cert_chain(CERTFILE_PROTECTED,
837 password=GetPassCallable().getpass)
838 with self.assertRaises(ssl.SSLError):
839 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
840 with self.assertRaisesRegexp(ValueError, "cannot be longer"):
841 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
842 with self.assertRaisesRegexp(TypeError, "must return a string"):
843 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
844 with self.assertRaisesRegexp(Exception, "getpass error"):
845 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
846 # Make sure the password function isn't called if it isn't needed
847 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
848
849 def test_load_verify_locations(self):
850 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
851 ctx.load_verify_locations(CERTFILE)
852 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
853 ctx.load_verify_locations(BYTES_CERTFILE)
854 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400855 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500856 self.assertRaises(TypeError, ctx.load_verify_locations)
857 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
858 with self.assertRaises(IOError) as cm:
859 ctx.load_verify_locations(WRONGCERT)
860 self.assertEqual(cm.exception.errno, errno.ENOENT)
Benjamin Peterson876473e2014-08-28 09:33:21 -0400861 with self.assertRaises(IOError):
862 ctx.load_verify_locations(u'')
Benjamin Petersondaeb9252014-08-20 14:14:50 -0500863 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
864 ctx.load_verify_locations(BADCERT)
865 ctx.load_verify_locations(CERTFILE, CAPATH)
866 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
867
868 # Issue #10989: crash if the second argument type is invalid
869 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
870
871 def test_load_verify_cadata(self):
872 # test cadata
873 with open(CAFILE_CACERT) as f:
874 cacert_pem = f.read().decode("ascii")
875 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
876 with open(CAFILE_NEURONIO) as f:
877 neuronio_pem = f.read().decode("ascii")
878 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
879
880 # test PEM
881 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
882 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
883 ctx.load_verify_locations(cadata=cacert_pem)
884 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
885 ctx.load_verify_locations(cadata=neuronio_pem)
886 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
887 # cert already in hash table
888 ctx.load_verify_locations(cadata=neuronio_pem)
889 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
890
891 # combined
892 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
893 combined = "\n".join((cacert_pem, neuronio_pem))
894 ctx.load_verify_locations(cadata=combined)
895 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
896
897 # with junk around the certs
898 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
899 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
900 neuronio_pem, "tail"]
901 ctx.load_verify_locations(cadata="\n".join(combined))
902 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
903
904 # test DER
905 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
906 ctx.load_verify_locations(cadata=cacert_der)
907 ctx.load_verify_locations(cadata=neuronio_der)
908 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
909 # cert already in hash table
910 ctx.load_verify_locations(cadata=cacert_der)
911 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
912
913 # combined
914 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
915 combined = b"".join((cacert_der, neuronio_der))
916 ctx.load_verify_locations(cadata=combined)
917 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
918
919 # error cases
920 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
921 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
922
923 with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
924 ctx.load_verify_locations(cadata=u"broken")
925 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
926 ctx.load_verify_locations(cadata=b"broken")
927
928
929 def test_load_dh_params(self):
930 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
931 ctx.load_dh_params(DHFILE)
932 if os.name != 'nt':
933 ctx.load_dh_params(BYTES_DHFILE)
934 self.assertRaises(TypeError, ctx.load_dh_params)
935 self.assertRaises(TypeError, ctx.load_dh_params, None)
936 with self.assertRaises(IOError) as cm:
937 ctx.load_dh_params(WRONGCERT)
938 self.assertEqual(cm.exception.errno, errno.ENOENT)
939 with self.assertRaises(ssl.SSLError) as cm:
940 ctx.load_dh_params(CERTFILE)
941
942 @skip_if_broken_ubuntu_ssl
943 def test_session_stats(self):
944 for proto in PROTOCOLS:
945 ctx = ssl.SSLContext(proto)
946 self.assertEqual(ctx.session_stats(), {
947 'number': 0,
948 'connect': 0,
949 'connect_good': 0,
950 'connect_renegotiate': 0,
951 'accept': 0,
952 'accept_good': 0,
953 'accept_renegotiate': 0,
954 'hits': 0,
955 'misses': 0,
956 'timeouts': 0,
957 'cache_full': 0,
958 })
959
960 def test_set_default_verify_paths(self):
961 # There's not much we can do to test that it acts as expected,
962 # so just check it doesn't crash or raise an exception.
963 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
964 ctx.set_default_verify_paths()
965
966 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
967 def test_set_ecdh_curve(self):
968 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
969 ctx.set_ecdh_curve("prime256v1")
970 ctx.set_ecdh_curve(b"prime256v1")
971 self.assertRaises(TypeError, ctx.set_ecdh_curve)
972 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
973 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
974 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
975
976 @needs_sni
977 def test_sni_callback(self):
978 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
979
980 # set_servername_callback expects a callable, or None
981 self.assertRaises(TypeError, ctx.set_servername_callback)
982 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
983 self.assertRaises(TypeError, ctx.set_servername_callback, "")
984 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
985
986 def dummycallback(sock, servername, ctx):
987 pass
988 ctx.set_servername_callback(None)
989 ctx.set_servername_callback(dummycallback)
990
991 @needs_sni
992 def test_sni_callback_refcycle(self):
993 # Reference cycles through the servername callback are detected
994 # and cleared.
995 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
996 def dummycallback(sock, servername, ctx, cycle=ctx):
997 pass
998 ctx.set_servername_callback(dummycallback)
999 wr = weakref.ref(ctx)
1000 del ctx, dummycallback
1001 gc.collect()
1002 self.assertIs(wr(), None)
1003
1004 def test_cert_store_stats(self):
1005 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1006 self.assertEqual(ctx.cert_store_stats(),
1007 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1008 ctx.load_cert_chain(CERTFILE)
1009 self.assertEqual(ctx.cert_store_stats(),
1010 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1011 ctx.load_verify_locations(CERTFILE)
1012 self.assertEqual(ctx.cert_store_stats(),
1013 {'x509_ca': 0, 'crl': 0, 'x509': 1})
1014 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1015 self.assertEqual(ctx.cert_store_stats(),
1016 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1017
1018 def test_get_ca_certs(self):
1019 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1020 self.assertEqual(ctx.get_ca_certs(), [])
1021 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1022 ctx.load_verify_locations(CERTFILE)
1023 self.assertEqual(ctx.get_ca_certs(), [])
1024 # but SVN_PYTHON_ORG_ROOT_CERT is a CA cert
1025 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1026 self.assertEqual(ctx.get_ca_certs(),
1027 [{'issuer': ((('organizationName', 'Root CA'),),
1028 (('organizationalUnitName', 'http://www.cacert.org'),),
1029 (('commonName', 'CA Cert Signing Authority'),),
1030 (('emailAddress', 'support@cacert.org'),)),
1031 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1032 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1033 'serialNumber': '00',
1034 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1035 'subject': ((('organizationName', 'Root CA'),),
1036 (('organizationalUnitName', 'http://www.cacert.org'),),
1037 (('commonName', 'CA Cert Signing Authority'),),
1038 (('emailAddress', 'support@cacert.org'),)),
1039 'version': 3}])
1040
1041 with open(SVN_PYTHON_ORG_ROOT_CERT) as f:
1042 pem = f.read()
1043 der = ssl.PEM_cert_to_DER_cert(pem)
1044 self.assertEqual(ctx.get_ca_certs(True), [der])
1045
1046 def test_load_default_certs(self):
1047 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1048 ctx.load_default_certs()
1049
1050 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1051 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1052 ctx.load_default_certs()
1053
1054 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1055 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1056
1057 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1058 self.assertRaises(TypeError, ctx.load_default_certs, None)
1059 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1060
Benjamin Peterson0b30a2b2014-10-03 17:27:05 -04001061 def test_load_default_certs_env(self):
1062 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1063 with support.EnvironmentVarGuard() as env:
1064 env["SSL_CERT_DIR"] = CAPATH
1065 env["SSL_CERT_FILE"] = CERTFILE
1066 ctx.load_default_certs()
1067 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1068
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001069 def test_create_default_context(self):
1070 ctx = ssl.create_default_context()
1071 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1072 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1073 self.assertTrue(ctx.check_hostname)
1074 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1075 self.assertEqual(
1076 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1077 getattr(ssl, "OP_NO_COMPRESSION", 0),
1078 )
1079
1080 with open(SIGNING_CA) as f:
1081 cadata = f.read().decode("ascii")
1082 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1083 cadata=cadata)
1084 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1085 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1086 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1087 self.assertEqual(
1088 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1089 getattr(ssl, "OP_NO_COMPRESSION", 0),
1090 )
1091
1092 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1093 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1094 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1095 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1096 self.assertEqual(
1097 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1098 getattr(ssl, "OP_NO_COMPRESSION", 0),
1099 )
1100 self.assertEqual(
1101 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1102 getattr(ssl, "OP_SINGLE_DH_USE", 0),
1103 )
1104 self.assertEqual(
1105 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1106 getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1107 )
1108
1109 def test__create_stdlib_context(self):
1110 ctx = ssl._create_stdlib_context()
1111 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1112 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1113 self.assertFalse(ctx.check_hostname)
1114 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1115
1116 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1117 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1118 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1119 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1120
1121 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1122 cert_reqs=ssl.CERT_REQUIRED,
1123 check_hostname=True)
1124 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1125 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1126 self.assertTrue(ctx.check_hostname)
1127 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1128
1129 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1130 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1131 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1132 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1133
1134 def test_check_hostname(self):
1135 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1136 self.assertFalse(ctx.check_hostname)
1137
1138 # Requires CERT_REQUIRED or CERT_OPTIONAL
1139 with self.assertRaises(ValueError):
1140 ctx.check_hostname = True
1141 ctx.verify_mode = ssl.CERT_REQUIRED
1142 self.assertFalse(ctx.check_hostname)
1143 ctx.check_hostname = True
1144 self.assertTrue(ctx.check_hostname)
1145
1146 ctx.verify_mode = ssl.CERT_OPTIONAL
1147 ctx.check_hostname = True
1148 self.assertTrue(ctx.check_hostname)
1149
1150 # Cannot set CERT_NONE with check_hostname enabled
1151 with self.assertRaises(ValueError):
1152 ctx.verify_mode = ssl.CERT_NONE
1153 ctx.check_hostname = False
1154 self.assertFalse(ctx.check_hostname)
1155
1156
1157class SSLErrorTests(unittest.TestCase):
1158
1159 def test_str(self):
1160 # The str() of a SSLError doesn't include the errno
1161 e = ssl.SSLError(1, "foo")
1162 self.assertEqual(str(e), "foo")
1163 self.assertEqual(e.errno, 1)
1164 # Same for a subclass
1165 e = ssl.SSLZeroReturnError(1, "foo")
1166 self.assertEqual(str(e), "foo")
1167 self.assertEqual(e.errno, 1)
1168
1169 def test_lib_reason(self):
1170 # Test the library and reason attributes
1171 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1172 with self.assertRaises(ssl.SSLError) as cm:
1173 ctx.load_dh_params(CERTFILE)
1174 self.assertEqual(cm.exception.library, 'PEM')
1175 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1176 s = str(cm.exception)
1177 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1178
1179 def test_subclass(self):
1180 # Check that the appropriate SSLError subclass is raised
1181 # (this only tests one of them)
1182 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1183 with closing(socket.socket()) as s:
1184 s.bind(("127.0.0.1", 0))
1185 s.listen(5)
1186 c = socket.socket()
1187 c.connect(s.getsockname())
1188 c.setblocking(False)
1189 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1190 with self.assertRaises(ssl.SSLWantReadError) as cm:
1191 c.do_handshake()
1192 s = str(cm.exception)
1193 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1194 # For compatibility
1195 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
Antoine Pitrou63cc99d2013-12-28 17:26:33 +01001196
Antoine Pitrouf9de5342010-04-05 21:35:07 +00001197
Bill Janssen934b16d2008-06-28 22:19:33 +00001198class NetworkedTests(unittest.TestCase):
Bill Janssen296a59d2007-09-16 22:06:00 +00001199
Antoine Pitrou3945c862010-04-28 21:11:01 +00001200 def test_connect(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001201 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001202 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1203 cert_reqs=ssl.CERT_NONE)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001204 try:
1205 s.connect(("svn.python.org", 443))
1206 self.assertEqual({}, s.getpeercert())
1207 finally:
1208 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001209
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001210 # this should fail because we have no verification certs
1211 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1212 cert_reqs=ssl.CERT_REQUIRED)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001213 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1214 s.connect, ("svn.python.org", 443))
1215 s.close()
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001216
1217 # this should succeed because we specify the root cert
1218 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1219 cert_reqs=ssl.CERT_REQUIRED,
1220 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1221 try:
1222 s.connect(("svn.python.org", 443))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001223 self.assertTrue(s.getpeercert())
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001224 finally:
1225 s.close()
Bill Janssen296a59d2007-09-16 22:06:00 +00001226
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001227 def test_connect_ex(self):
1228 # Issue #11326: check connect_ex() implementation
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001229 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001230 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1231 cert_reqs=ssl.CERT_REQUIRED,
1232 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1233 try:
1234 self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
1235 self.assertTrue(s.getpeercert())
1236 finally:
1237 s.close()
1238
1239 def test_non_blocking_connect_ex(self):
1240 # Issue #11326: non-blocking connect_ex() should allow handshake
1241 # to proceed after the socket gets ready.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001242 with support.transient_internet("svn.python.org"):
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001243 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1244 cert_reqs=ssl.CERT_REQUIRED,
1245 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1246 do_handshake_on_connect=False)
1247 try:
1248 s.setblocking(False)
1249 rc = s.connect_ex(('svn.python.org', 443))
Antoine Pitrou8ef39072011-02-27 15:45:22 +00001250 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1251 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001252 # Wait for connect to finish
1253 select.select([], [s], [], 5.0)
1254 # Non-blocking handshake
1255 while True:
1256 try:
1257 s.do_handshake()
1258 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001259 except ssl.SSLWantReadError:
1260 select.select([s], [], [], 5.0)
1261 except ssl.SSLWantWriteError:
1262 select.select([], [s], [], 5.0)
Antoine Pitroud3f6ea12011-02-26 23:35:27 +00001263 # SSL established
1264 self.assertTrue(s.getpeercert())
1265 finally:
1266 s.close()
1267
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001268 def test_timeout_connect_ex(self):
1269 # Issue #12065: on a timeout, connect_ex() should return the original
1270 # errno (mimicking the behaviour of non-SSL sockets).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001271 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001272 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1273 cert_reqs=ssl.CERT_REQUIRED,
1274 ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
1275 do_handshake_on_connect=False)
1276 try:
1277 s.settimeout(0.0000001)
1278 rc = s.connect_ex(('svn.python.org', 443))
1279 if rc == 0:
1280 self.skipTest("svn.python.org responded too quickly")
1281 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1282 finally:
1283 s.close()
1284
1285 def test_connect_ex_error(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001286 with support.transient_internet("svn.python.org"):
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001287 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1288 cert_reqs=ssl.CERT_REQUIRED,
1289 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
1290 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001291 rc = s.connect_ex(("svn.python.org", 444))
1292 # Issue #19919: Windows machines or VMs hosted on Windows
1293 # machines sometimes return EWOULDBLOCK.
1294 self.assertIn(rc, (errno.ECONNREFUSED, errno.EWOULDBLOCK))
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001295 finally:
1296 s.close()
1297
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001298 def test_connect_with_context(self):
1299 with support.transient_internet("svn.python.org"):
1300 # Same as test_connect, but with a separately created context
1301 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1302 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1303 s.connect(("svn.python.org", 443))
1304 try:
1305 self.assertEqual({}, s.getpeercert())
1306 finally:
1307 s.close()
1308 # Same with a server hostname
1309 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1310 server_hostname="svn.python.org")
1311 if ssl.HAS_SNI:
1312 s.connect(("svn.python.org", 443))
1313 s.close()
1314 else:
1315 self.assertRaises(ValueError, s.connect, ("svn.python.org", 443))
1316 # This should fail because we have no verification certs
1317 ctx.verify_mode = ssl.CERT_REQUIRED
1318 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1319 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1320 s.connect, ("svn.python.org", 443))
1321 s.close()
1322 # This should succeed because we specify the root cert
1323 ctx.load_verify_locations(SVN_PYTHON_ORG_ROOT_CERT)
1324 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1325 s.connect(("svn.python.org", 443))
1326 try:
1327 cert = s.getpeercert()
1328 self.assertTrue(cert)
1329 finally:
1330 s.close()
1331
1332 def test_connect_capath(self):
1333 # Verify server certificates using the `capath` argument
1334 # NOTE: the subject hashing algorithm has been changed between
1335 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1336 # contain both versions of each certificate (same content, different
1337 # filename) for this test to be portable across OpenSSL releases.
1338 with support.transient_internet("svn.python.org"):
1339 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1340 ctx.verify_mode = ssl.CERT_REQUIRED
1341 ctx.load_verify_locations(capath=CAPATH)
1342 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1343 s.connect(("svn.python.org", 443))
1344 try:
1345 cert = s.getpeercert()
1346 self.assertTrue(cert)
1347 finally:
1348 s.close()
1349 # Same with a bytes `capath` argument
1350 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1351 ctx.verify_mode = ssl.CERT_REQUIRED
1352 ctx.load_verify_locations(capath=BYTES_CAPATH)
1353 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1354 s.connect(("svn.python.org", 443))
1355 try:
1356 cert = s.getpeercert()
1357 self.assertTrue(cert)
1358 finally:
1359 s.close()
1360
1361 def test_connect_cadata(self):
1362 with open(CAFILE_CACERT) as f:
1363 pem = f.read().decode('ascii')
1364 der = ssl.PEM_cert_to_DER_cert(pem)
1365 with support.transient_internet("svn.python.org"):
1366 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1367 ctx.verify_mode = ssl.CERT_REQUIRED
1368 ctx.load_verify_locations(cadata=pem)
1369 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1370 s.connect(("svn.python.org", 443))
1371 cert = s.getpeercert()
1372 self.assertTrue(cert)
1373
1374 # same with DER
1375 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1376 ctx.verify_mode = ssl.CERT_REQUIRED
1377 ctx.load_verify_locations(cadata=der)
1378 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1379 s.connect(("svn.python.org", 443))
1380 cert = s.getpeercert()
1381 self.assertTrue(cert)
1382
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001383 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1384 def test_makefile_close(self):
1385 # Issue #5238: creating a file-like object with makefile() shouldn't
1386 # delay closing the underlying "real socket" (here tested with its
1387 # file descriptor, hence skipping the test under Windows).
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001388 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001389 ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1390 ss.connect(("svn.python.org", 443))
1391 fd = ss.fileno()
1392 f = ss.makefile()
1393 f.close()
1394 # The fd is still open
Antoine Pitrou55841ac2010-04-24 10:43:57 +00001395 os.read(fd, 0)
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001396 # Closing the SSL socket should close the fd too
1397 ss.close()
1398 gc.collect()
1399 with self.assertRaises(OSError) as e:
1400 os.read(fd, 0)
1401 self.assertEqual(e.exception.errno, errno.EBADF)
Bill Janssen934b16d2008-06-28 22:19:33 +00001402
Antoine Pitrou3945c862010-04-28 21:11:01 +00001403 def test_non_blocking_handshake(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001404 with support.transient_internet("svn.python.org"):
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001405 s = socket.socket(socket.AF_INET)
1406 s.connect(("svn.python.org", 443))
1407 s.setblocking(False)
1408 s = ssl.wrap_socket(s,
1409 cert_reqs=ssl.CERT_NONE,
1410 do_handshake_on_connect=False)
1411 count = 0
1412 while True:
1413 try:
1414 count += 1
1415 s.do_handshake()
1416 break
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001417 except ssl.SSLWantReadError:
1418 select.select([s], [], [])
1419 except ssl.SSLWantWriteError:
1420 select.select([], [s], [])
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001421 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001422 if support.verbose:
Antoine Pitrou4e406d82010-09-09 13:35:44 +00001423 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Bill Janssen934b16d2008-06-28 22:19:33 +00001424
Antoine Pitrou3945c862010-04-28 21:11:01 +00001425 def test_get_server_certificate(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001426 def _test_get_server_certificate(host, port, cert=None):
1427 with support.transient_internet(host):
1428 pem = ssl.get_server_certificate((host, port))
1429 if not pem:
1430 self.fail("No server certificate on %s:%s!" % (host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001431
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001432 try:
1433 pem = ssl.get_server_certificate((host, port),
1434 ca_certs=CERTFILE)
1435 except ssl.SSLError as x:
1436 #should fail
1437 if support.verbose:
1438 sys.stdout.write("%s\n" % x)
1439 else:
1440 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
Bill Janssen296a59d2007-09-16 22:06:00 +00001441
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001442 pem = ssl.get_server_certificate((host, port),
1443 ca_certs=cert)
1444 if not pem:
1445 self.fail("No server certificate on %s:%s!" % (host, port))
1446 if support.verbose:
1447 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1448
1449 _test_get_server_certificate('svn.python.org', 443, SVN_PYTHON_ORG_ROOT_CERT)
1450 if support.IPV6_ENABLED:
1451 _test_get_server_certificate('ipv6.google.com', 443)
1452
1453 def test_ciphers(self):
1454 remote = ("svn.python.org", 443)
1455 with support.transient_internet(remote[0]):
1456 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1457 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1458 s.connect(remote)
1459 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1460 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1461 s.connect(remote)
1462 # Error checking can happen at instantiation or when connecting
1463 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1464 with closing(socket.socket(socket.AF_INET)) as sock:
1465 s = ssl.wrap_socket(sock,
1466 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1467 s.connect(remote)
Bill Janssen296a59d2007-09-16 22:06:00 +00001468
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001469 def test_algorithms(self):
1470 # Issue #8484: all algorithms should be available when verifying a
1471 # certificate.
Antoine Pitrou9aed6042010-04-22 18:00:41 +00001472 # SHA256 was added in OpenSSL 0.9.8
1473 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1474 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001475 # sha256.tbs-internet.com needs SNI to use the correct certificate
1476 if not ssl.HAS_SNI:
1477 self.skipTest("SNI needed for this test")
1478 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
Antoine Pitroud43245a2011-01-08 10:32:51 +00001479 remote = ("sha256.tbs-internet.com", 443)
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001480 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001481 with support.transient_internet("sha256.tbs-internet.com"):
1482 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1483 ctx.verify_mode = ssl.CERT_REQUIRED
1484 ctx.load_verify_locations(sha256_cert)
1485 s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1486 server_hostname="sha256.tbs-internet.com")
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001487 try:
1488 s.connect(remote)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001489 if support.verbose:
Antoine Pitrouc715a9e2010-04-21 19:28:03 +00001490 sys.stdout.write("\nCipher with %r is %r\n" %
1491 (remote, s.cipher()))
1492 sys.stdout.write("Certificate is:\n%s\n" %
1493 pprint.pformat(s.getpeercert()))
1494 finally:
1495 s.close()
1496
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001497 def test_get_ca_certs_capath(self):
1498 # capath certs are loaded on request
1499 with support.transient_internet("svn.python.org"):
1500 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1501 ctx.verify_mode = ssl.CERT_REQUIRED
1502 ctx.load_verify_locations(capath=CAPATH)
1503 self.assertEqual(ctx.get_ca_certs(), [])
1504 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1505 s.connect(("svn.python.org", 443))
1506 try:
1507 cert = s.getpeercert()
1508 self.assertTrue(cert)
1509 finally:
1510 s.close()
1511 self.assertEqual(len(ctx.get_ca_certs()), 1)
1512
1513 @needs_sni
1514 def test_context_setget(self):
1515 # Check that the context of a connected socket can be replaced.
1516 with support.transient_internet("svn.python.org"):
1517 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1518 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1519 s = socket.socket(socket.AF_INET)
1520 with closing(ctx1.wrap_socket(s)) as ss:
1521 ss.connect(("svn.python.org", 443))
1522 self.assertIs(ss.context, ctx1)
1523 self.assertIs(ss._sslobj.context, ctx1)
1524 ss.context = ctx2
1525 self.assertIs(ss.context, ctx2)
1526 self.assertIs(ss._sslobj.context, ctx2)
Bill Janssen296a59d2007-09-16 22:06:00 +00001527
Bill Janssen98d19da2007-09-10 21:51:02 +00001528try:
1529 import threading
1530except ImportError:
1531 _have_threads = False
1532else:
Bill Janssen98d19da2007-09-10 21:51:02 +00001533 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001534
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001535 from test.ssl_servers import make_https_server
1536
Bill Janssen98d19da2007-09-10 21:51:02 +00001537 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001538
Bill Janssen98d19da2007-09-10 21:51:02 +00001539 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001540
Bill Janssen98d19da2007-09-10 21:51:02 +00001541 """A mildly complicated class, because we want it to work both
1542 with and without the SSL wrapper around the socket connection, so
1543 that we can test the STARTTLS functionality."""
1544
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001545 def __init__(self, server, connsock, addr):
Bill Janssen98d19da2007-09-10 21:51:02 +00001546 self.server = server
1547 self.running = False
1548 self.sock = connsock
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001549 self.addr = addr
Bill Janssen98d19da2007-09-10 21:51:02 +00001550 self.sock.setblocking(1)
1551 self.sslconn = None
1552 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001553 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001554
Antoine Pitrou3945c862010-04-28 21:11:01 +00001555 def wrap_conn(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001556 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001557 self.sslconn = self.server.context.wrap_socket(
1558 self.sock, server_side=True)
1559 self.server.selected_protocols.append(self.sslconn.selected_npn_protocol())
1560 except socket.error as e:
1561 # We treat ConnectionResetError as though it were an
1562 # SSLError - OpenSSL on Ubuntu abruptly closes the
1563 # connection when asked to use an unsupported protocol.
1564 #
Antoine Pitroudb187842010-04-27 10:32:58 +00001565 # XXX Various errors can have happened here, for example
1566 # a mismatching protocol version, an invalid certificate,
1567 # or a low-level bug. This should be made more discriminating.
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001568 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1569 raise
Antoine Pitroud76088d2012-01-03 22:46:48 +01001570 self.server.conn_errors.append(e)
Bill Janssen98d19da2007-09-10 21:51:02 +00001571 if self.server.chatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001572 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00001573 self.running = False
1574 self.server.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001575 self.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001576 return False
Bill Janssen98d19da2007-09-10 21:51:02 +00001577 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001578 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1579 cert = self.sslconn.getpeercert()
1580 if support.verbose and self.server.chatty:
1581 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1582 cert_binary = self.sslconn.getpeercert(True)
1583 if support.verbose and self.server.chatty:
1584 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1585 cipher = self.sslconn.cipher()
1586 if support.verbose and self.server.chatty:
1587 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1588 sys.stdout.write(" server: selected protocol is now "
1589 + str(self.sslconn.selected_npn_protocol()) + "\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001590 return True
1591
1592 def read(self):
1593 if self.sslconn:
1594 return self.sslconn.read()
1595 else:
1596 return self.sock.recv(1024)
1597
1598 def write(self, bytes):
1599 if self.sslconn:
1600 return self.sslconn.write(bytes)
1601 else:
1602 return self.sock.send(bytes)
1603
1604 def close(self):
1605 if self.sslconn:
1606 self.sslconn.close()
1607 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001608 self.sock.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00001609
Antoine Pitrou3945c862010-04-28 21:11:01 +00001610 def run(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001611 self.running = True
1612 if not self.server.starttls_server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001613 if not self.wrap_conn():
Bill Janssen98d19da2007-09-10 21:51:02 +00001614 return
1615 while self.running:
1616 try:
1617 msg = self.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001618 stripped = msg.strip()
1619 if not stripped:
Bill Janssen98d19da2007-09-10 21:51:02 +00001620 # eof, so quit this handler
1621 self.running = False
1622 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001623 elif stripped == b'over':
1624 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001625 sys.stdout.write(" server: client closed connection\n")
1626 self.close()
1627 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001628 elif (self.server.starttls_server and
1629 stripped == b'STARTTLS'):
1630 if support.verbose and self.server.connectionchatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001631 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001632 self.write(b"OK\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001633 if not self.wrap_conn():
1634 return
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001635 elif (self.server.starttls_server and self.sslconn
1636 and stripped == b'ENDTLS'):
1637 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001638 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001639 self.write(b"OK\n")
1640 self.sock = self.sslconn.unwrap()
Bill Janssen39295c22008-08-12 16:31:21 +00001641 self.sslconn = None
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001642 if support.verbose and self.server.connectionchatty:
Bill Janssen39295c22008-08-12 16:31:21 +00001643 sys.stdout.write(" server: connection is now unencrypted...\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001644 elif stripped == b'CB tls-unique':
1645 if support.verbose and self.server.connectionchatty:
1646 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1647 data = self.sslconn.get_channel_binding("tls-unique")
1648 self.write(repr(data).encode("us-ascii") + b"\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001649 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001650 if (support.verbose and
Bill Janssen98d19da2007-09-10 21:51:02 +00001651 self.server.connectionchatty):
1652 ctype = (self.sslconn and "encrypted") or "unencrypted"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001653 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1654 % (msg, ctype, msg.lower(), ctype))
Bill Janssen98d19da2007-09-10 21:51:02 +00001655 self.write(msg.lower())
1656 except ssl.SSLError:
1657 if self.server.chatty:
1658 handle_error("Test server failure:\n")
1659 self.close()
1660 self.running = False
1661 # normally, we'd just stop here, but for the test
1662 # harness, we want to stop the server
1663 self.server.stop()
Bill Janssen98d19da2007-09-10 21:51:02 +00001664
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001665 def __init__(self, certificate=None, ssl_version=None,
Antoine Pitroudb187842010-04-27 10:32:58 +00001666 certreqs=None, cacerts=None,
Bill Janssen934b16d2008-06-28 22:19:33 +00001667 chatty=True, connectionchatty=False, starttls_server=False,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001668 npn_protocols=None, ciphers=None, context=None):
1669 if context:
1670 self.context = context
1671 else:
1672 self.context = ssl.SSLContext(ssl_version
1673 if ssl_version is not None
1674 else ssl.PROTOCOL_TLSv1)
1675 self.context.verify_mode = (certreqs if certreqs is not None
1676 else ssl.CERT_NONE)
1677 if cacerts:
1678 self.context.load_verify_locations(cacerts)
1679 if certificate:
1680 self.context.load_cert_chain(certificate)
1681 if npn_protocols:
1682 self.context.set_npn_protocols(npn_protocols)
1683 if ciphers:
1684 self.context.set_ciphers(ciphers)
Bill Janssen98d19da2007-09-10 21:51:02 +00001685 self.chatty = chatty
1686 self.connectionchatty = connectionchatty
1687 self.starttls_server = starttls_server
1688 self.sock = socket.socket()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001689 self.port = support.bind_port(self.sock)
Bill Janssen98d19da2007-09-10 21:51:02 +00001690 self.flag = None
Bill Janssen98d19da2007-09-10 21:51:02 +00001691 self.active = False
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001692 self.selected_protocols = []
Antoine Pitroud76088d2012-01-03 22:46:48 +01001693 self.conn_errors = []
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001694 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001695 self.daemon = True
Bill Janssen98d19da2007-09-10 21:51:02 +00001696
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001697 def __enter__(self):
1698 self.start(threading.Event())
1699 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001700 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001701
1702 def __exit__(self, *args):
1703 self.stop()
1704 self.join()
1705
Antoine Pitrou3945c862010-04-28 21:11:01 +00001706 def start(self, flag=None):
Bill Janssen98d19da2007-09-10 21:51:02 +00001707 self.flag = flag
1708 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001709
Antoine Pitrou3945c862010-04-28 21:11:01 +00001710 def run(self):
Antoine Pitrou435ba0c2010-04-27 09:51:18 +00001711 self.sock.settimeout(0.05)
Bill Janssen98d19da2007-09-10 21:51:02 +00001712 self.sock.listen(5)
1713 self.active = True
1714 if self.flag:
1715 # signal an event
1716 self.flag.set()
1717 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001718 try:
Bill Janssen98d19da2007-09-10 21:51:02 +00001719 newconn, connaddr = self.sock.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001720 if support.verbose and self.chatty:
Bill Janssen98d19da2007-09-10 21:51:02 +00001721 sys.stdout.write(' server: new connection from '
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001722 + repr(connaddr) + '\n')
1723 handler = self.ConnectionHandler(self, newconn, connaddr)
Bill Janssen98d19da2007-09-10 21:51:02 +00001724 handler.start()
Antoine Pitrou7a556842012-01-27 17:33:01 +01001725 handler.join()
Bill Janssen98d19da2007-09-10 21:51:02 +00001726 except socket.timeout:
1727 pass
1728 except KeyboardInterrupt:
1729 self.stop()
Bill Janssen934b16d2008-06-28 22:19:33 +00001730 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001731
Antoine Pitrou3945c862010-04-28 21:11:01 +00001732 def stop(self):
Bill Janssen98d19da2007-09-10 21:51:02 +00001733 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001734
Bill Janssen934b16d2008-06-28 22:19:33 +00001735 class AsyncoreEchoServer(threading.Thread):
Bill Janssen296a59d2007-09-16 22:06:00 +00001736
Antoine Pitrou3945c862010-04-28 21:11:01 +00001737 class EchoServer(asyncore.dispatcher):
Bill Janssen934b16d2008-06-28 22:19:33 +00001738
Antoine Pitrou3945c862010-04-28 21:11:01 +00001739 class ConnectionHandler(asyncore.dispatcher_with_send):
Bill Janssen934b16d2008-06-28 22:19:33 +00001740
1741 def __init__(self, conn, certfile):
Bill Janssen934b16d2008-06-28 22:19:33 +00001742 self.socket = ssl.wrap_socket(conn, server_side=True,
1743 certfile=certfile,
Antoine Pitroufc69af12010-04-24 20:04:58 +00001744 do_handshake_on_connect=False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001745 asyncore.dispatcher_with_send.__init__(self, self.socket)
Antoine Pitroufc69af12010-04-24 20:04:58 +00001746 self._ssl_accepting = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001747 self._do_ssl_handshake()
Bill Janssen934b16d2008-06-28 22:19:33 +00001748
1749 def readable(self):
1750 if isinstance(self.socket, ssl.SSLSocket):
1751 while self.socket.pending() > 0:
1752 self.handle_read_event()
1753 return True
1754
Antoine Pitroufc69af12010-04-24 20:04:58 +00001755 def _do_ssl_handshake(self):
1756 try:
1757 self.socket.do_handshake()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001758 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1759 return
1760 except ssl.SSLEOFError:
1761 return self.handle_close()
1762 except ssl.SSLError:
Antoine Pitroufc69af12010-04-24 20:04:58 +00001763 raise
1764 except socket.error, err:
1765 if err.args[0] == errno.ECONNABORTED:
1766 return self.handle_close()
1767 else:
1768 self._ssl_accepting = False
1769
Bill Janssen934b16d2008-06-28 22:19:33 +00001770 def handle_read(self):
Antoine Pitroufc69af12010-04-24 20:04:58 +00001771 if self._ssl_accepting:
1772 self._do_ssl_handshake()
1773 else:
1774 data = self.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001775 if support.verbose:
1776 sys.stdout.write(" server: read %s from client\n" % repr(data))
1777 if not data:
1778 self.close()
1779 else:
Antoine Pitroudb187842010-04-27 10:32:58 +00001780 self.send(data.lower())
Bill Janssen934b16d2008-06-28 22:19:33 +00001781
1782 def handle_close(self):
Bill Janssende34d912008-06-28 23:00:39 +00001783 self.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001784 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001785 sys.stdout.write(" server: closed connection %s\n" % self.socket)
1786
1787 def handle_error(self):
1788 raise
1789
1790 def __init__(self, certfile):
1791 self.certfile = certfile
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001792 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1793 self.port = support.bind_port(sock, '')
1794 asyncore.dispatcher.__init__(self, sock)
Bill Janssen934b16d2008-06-28 22:19:33 +00001795 self.listen(5)
1796
1797 def handle_accept(self):
1798 sock_obj, addr = self.accept()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001799 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00001800 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
1801 self.ConnectionHandler(sock_obj, self.certfile)
1802
1803 def handle_error(self):
1804 raise
1805
1806 def __init__(self, certfile):
1807 self.flag = None
1808 self.active = False
1809 self.server = self.EchoServer(certfile)
1810 self.port = self.server.port
1811 threading.Thread.__init__(self)
Benjamin Peterson26f52162008-08-18 18:39:57 +00001812 self.daemon = True
Bill Janssen934b16d2008-06-28 22:19:33 +00001813
1814 def __str__(self):
1815 return "<%s %s>" % (self.__class__.__name__, self.server)
1816
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001817 def __enter__(self):
1818 self.start(threading.Event())
1819 self.flag.wait()
Antoine Pitroud76088d2012-01-03 22:46:48 +01001820 return self
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001821
1822 def __exit__(self, *args):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001823 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001824 sys.stdout.write(" cleanup: stopping server.\n")
1825 self.stop()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001826 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001827 sys.stdout.write(" cleanup: joining server thread.\n")
1828 self.join()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001829 if support.verbose:
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001830 sys.stdout.write(" cleanup: successfully joined.\n")
1831
Antoine Pitrou3945c862010-04-28 21:11:01 +00001832 def start(self, flag=None):
Bill Janssen934b16d2008-06-28 22:19:33 +00001833 self.flag = flag
1834 threading.Thread.start(self)
1835
Antoine Pitrou3945c862010-04-28 21:11:01 +00001836 def run(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001837 self.active = True
1838 if self.flag:
1839 self.flag.set()
1840 while self.active:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001841 try:
1842 asyncore.loop(1)
1843 except:
1844 pass
Bill Janssen934b16d2008-06-28 22:19:33 +00001845
Antoine Pitrou3945c862010-04-28 21:11:01 +00001846 def stop(self):
Bill Janssen934b16d2008-06-28 22:19:33 +00001847 self.active = False
1848 self.server.close()
1849
Antoine Pitrou3945c862010-04-28 21:11:01 +00001850 def bad_cert_test(certfile):
1851 """
1852 Launch a server with CERT_REQUIRED, and check that trying to
1853 connect to it with the given client certificate fails.
1854 """
Trent Nelsone41b0062008-04-08 23:47:30 +00001855 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00001856 certreqs=ssl.CERT_REQUIRED,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001857 cacerts=CERTFILE, chatty=False,
1858 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001859 with server:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001860 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001861 with closing(socket.socket()) as sock:
1862 s = ssl.wrap_socket(sock,
1863 certfile=certfile,
1864 ssl_version=ssl.PROTOCOL_TLSv1)
1865 s.connect((HOST, server.port))
1866 except ssl.SSLError as x:
1867 if support.verbose:
1868 sys.stdout.write("\nSSLError is %s\n" % x.args[1])
1869 except OSError as x:
1870 if support.verbose:
1871 sys.stdout.write("\nOSError is %s\n" % x.args[1])
1872 except OSError as x:
1873 if x.errno != errno.ENOENT:
1874 raise
1875 if support.verbose:
1876 sys.stdout.write("\OSError is %s\n" % str(x))
Bill Janssen98d19da2007-09-10 21:51:02 +00001877 else:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001878 raise AssertionError("Use of invalid cert should have failed!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001879
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001880 def server_params_test(client_context, server_context, indata=b"FOO\n",
1881 chatty=True, connectionchatty=False, sni_name=None):
Antoine Pitrou3945c862010-04-28 21:11:01 +00001882 """
1883 Launch a server, connect a client to it and try various reads
1884 and writes.
1885 """
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001886 stats = {}
1887 server = ThreadedEchoServer(context=server_context,
Bill Janssen98d19da2007-09-10 21:51:02 +00001888 chatty=chatty,
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001889 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01001890 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001891 with closing(client_context.wrap_socket(socket.socket(),
1892 server_hostname=sni_name)) as s:
1893 s.connect((HOST, server.port))
1894 for arg in [indata, bytearray(indata), memoryview(indata)]:
1895 if connectionchatty:
1896 if support.verbose:
1897 sys.stdout.write(
1898 " client: sending %r...\n" % indata)
1899 s.write(arg)
1900 outdata = s.read()
1901 if connectionchatty:
1902 if support.verbose:
1903 sys.stdout.write(" client: read %r\n" % outdata)
1904 if outdata != indata.lower():
1905 raise AssertionError(
1906 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
1907 % (outdata[:20], len(outdata),
1908 indata[:20].lower(), len(indata)))
1909 s.write(b"over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +00001910 if connectionchatty:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001911 if support.verbose:
1912 sys.stdout.write(" client: closing connection.\n")
1913 stats.update({
1914 'compression': s.compression(),
1915 'cipher': s.cipher(),
1916 'peercert': s.getpeercert(),
Alex Gaynore98205d2014-09-04 13:33:22 -07001917 'client_npn_protocol': s.selected_npn_protocol(),
1918 'version': s.version(),
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001919 })
1920 s.close()
1921 stats['server_npn_protocols'] = server.selected_protocols
1922 return stats
Bill Janssen98d19da2007-09-10 21:51:02 +00001923
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001924 def try_protocol_combo(server_protocol, client_protocol, expect_success,
1925 certsreqs=None, server_options=0, client_options=0):
Alex Gaynore98205d2014-09-04 13:33:22 -07001926 """
1927 Try to SSL-connect using *client_protocol* to *server_protocol*.
1928 If *expect_success* is true, assert that the connection succeeds,
1929 if it's false, assert that the connection fails.
1930 Also, if *expect_success* is a string, assert that it is the protocol
1931 version actually used by the connection.
1932 """
Benjamin Peterson5b63acd2008-03-29 15:24:25 +00001933 if certsreqs is None:
Bill Janssene3f1d7d2007-09-11 01:09:19 +00001934 certsreqs = ssl.CERT_NONE
Antoine Pitrou3945c862010-04-28 21:11:01 +00001935 certtype = {
1936 ssl.CERT_NONE: "CERT_NONE",
1937 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
1938 ssl.CERT_REQUIRED: "CERT_REQUIRED",
1939 }[certsreqs]
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001940 if support.verbose:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001941 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
Bill Janssen98d19da2007-09-10 21:51:02 +00001942 sys.stdout.write(formatstr %
1943 (ssl.get_protocol_name(client_protocol),
1944 ssl.get_protocol_name(server_protocol),
1945 certtype))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001946 client_context = ssl.SSLContext(client_protocol)
1947 client_context.options |= client_options
1948 server_context = ssl.SSLContext(server_protocol)
1949 server_context.options |= server_options
1950
1951 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
1952 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
1953 # starting from OpenSSL 1.0.0 (see issue #8322).
1954 if client_context.protocol == ssl.PROTOCOL_SSLv23:
1955 client_context.set_ciphers("ALL")
1956
1957 for ctx in (client_context, server_context):
1958 ctx.verify_mode = certsreqs
1959 ctx.load_cert_chain(CERTFILE)
1960 ctx.load_verify_locations(CERTFILE)
Bill Janssen98d19da2007-09-10 21:51:02 +00001961 try:
Alex Gaynore98205d2014-09-04 13:33:22 -07001962 stats = server_params_test(client_context, server_context,
1963 chatty=False, connectionchatty=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00001964 # Protocol mismatch can result in either an SSLError, or a
1965 # "Connection reset by peer" error.
1966 except ssl.SSLError:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001967 if expect_success:
Bill Janssen98d19da2007-09-10 21:51:02 +00001968 raise
Antoine Pitroudb187842010-04-27 10:32:58 +00001969 except socket.error as e:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001970 if expect_success or e.errno != errno.ECONNRESET:
Antoine Pitroudb187842010-04-27 10:32:58 +00001971 raise
Bill Janssen98d19da2007-09-10 21:51:02 +00001972 else:
Antoine Pitrou3945c862010-04-28 21:11:01 +00001973 if not expect_success:
Antoine Pitrou1bbb68d2010-05-06 14:11:23 +00001974 raise AssertionError(
Bill Janssen98d19da2007-09-10 21:51:02 +00001975 "Client protocol %s succeeded with server protocol %s!"
1976 % (ssl.get_protocol_name(client_protocol),
1977 ssl.get_protocol_name(server_protocol)))
Alex Gaynore98205d2014-09-04 13:33:22 -07001978 elif (expect_success is not True
1979 and expect_success != stats['version']):
1980 raise AssertionError("version mismatch: expected %r, got %r"
1981 % (expect_success, stats['version']))
Bill Janssen98d19da2007-09-10 21:51:02 +00001982
1983
Bill Janssen934b16d2008-06-28 22:19:33 +00001984 class ThreadedTests(unittest.TestCase):
Bill Janssen98d19da2007-09-10 21:51:02 +00001985
Antoine Pitroud75efd92010-08-04 17:38:33 +00001986 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00001987 def test_echo(self):
1988 """Basic test of an SSL client connecting to a server"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001989 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001990 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001991 for protocol in PROTOCOLS:
1992 context = ssl.SSLContext(protocol)
1993 context.load_cert_chain(CERTFILE)
1994 server_params_test(context, context,
1995 chatty=True, connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00001996
Antoine Pitrou3945c862010-04-28 21:11:01 +00001997 def test_getpeercert(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05001998 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00001999 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002000 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2001 context.verify_mode = ssl.CERT_REQUIRED
2002 context.load_verify_locations(CERTFILE)
2003 context.load_cert_chain(CERTFILE)
2004 server = ThreadedEchoServer(context=context, chatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002005 with server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002006 s = context.wrap_socket(socket.socket(),
2007 do_handshake_on_connect=False)
Antoine Pitroudb187842010-04-27 10:32:58 +00002008 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002009 # getpeercert() raise ValueError while the handshake isn't
2010 # done.
2011 with self.assertRaises(ValueError):
2012 s.getpeercert()
2013 s.do_handshake()
Antoine Pitroudb187842010-04-27 10:32:58 +00002014 cert = s.getpeercert()
2015 self.assertTrue(cert, "Can't get peer certificate.")
2016 cipher = s.cipher()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002017 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002018 sys.stdout.write(pprint.pformat(cert) + '\n')
2019 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2020 if 'subject' not in cert:
2021 self.fail("No subject field in certificate: %s." %
2022 pprint.pformat(cert))
2023 if ((('organizationName', 'Python Software Foundation'),)
2024 not in cert['subject']):
2025 self.fail(
2026 "Missing or invalid 'organizationName' field in certificate subject; "
2027 "should be 'Python Software Foundation'.")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002028 self.assertIn('notBefore', cert)
2029 self.assertIn('notAfter', cert)
2030 before = ssl.cert_time_to_seconds(cert['notBefore'])
2031 after = ssl.cert_time_to_seconds(cert['notAfter'])
2032 self.assertLess(before, after)
Antoine Pitroudb187842010-04-27 10:32:58 +00002033 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002034
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002035 @unittest.skipUnless(have_verify_flags(),
2036 "verify_flags need OpenSSL > 0.9.8")
2037 def test_crl_check(self):
2038 if support.verbose:
2039 sys.stdout.write("\n")
2040
2041 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2042 server_context.load_cert_chain(SIGNED_CERTFILE)
2043
2044 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2045 context.verify_mode = ssl.CERT_REQUIRED
2046 context.load_verify_locations(SIGNING_CA)
2047 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT)
2048
2049 # VERIFY_DEFAULT should pass
2050 server = ThreadedEchoServer(context=server_context, chatty=True)
2051 with server:
2052 with closing(context.wrap_socket(socket.socket())) as s:
2053 s.connect((HOST, server.port))
2054 cert = s.getpeercert()
2055 self.assertTrue(cert, "Can't get peer certificate.")
2056
2057 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2058 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2059
2060 server = ThreadedEchoServer(context=server_context, chatty=True)
2061 with server:
2062 with closing(context.wrap_socket(socket.socket())) as s:
2063 with self.assertRaisesRegexp(ssl.SSLError,
2064 "certificate verify failed"):
2065 s.connect((HOST, server.port))
2066
2067 # now load a CRL file. The CRL file is signed by the CA.
2068 context.load_verify_locations(CRLFILE)
2069
2070 server = ThreadedEchoServer(context=server_context, chatty=True)
2071 with server:
2072 with closing(context.wrap_socket(socket.socket())) as s:
2073 s.connect((HOST, server.port))
2074 cert = s.getpeercert()
2075 self.assertTrue(cert, "Can't get peer certificate.")
2076
2077 @needs_sni
2078 def test_check_hostname(self):
2079 if support.verbose:
2080 sys.stdout.write("\n")
2081
2082 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2083 server_context.load_cert_chain(SIGNED_CERTFILE)
2084
2085 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2086 context.verify_mode = ssl.CERT_REQUIRED
2087 context.check_hostname = True
2088 context.load_verify_locations(SIGNING_CA)
2089
2090 # correct hostname should verify
2091 server = ThreadedEchoServer(context=server_context, chatty=True)
2092 with server:
2093 with closing(context.wrap_socket(socket.socket(),
2094 server_hostname="localhost")) as s:
2095 s.connect((HOST, server.port))
2096 cert = s.getpeercert()
2097 self.assertTrue(cert, "Can't get peer certificate.")
2098
2099 # incorrect hostname should raise an exception
2100 server = ThreadedEchoServer(context=server_context, chatty=True)
2101 with server:
2102 with closing(context.wrap_socket(socket.socket(),
2103 server_hostname="invalid")) as s:
2104 with self.assertRaisesRegexp(ssl.CertificateError,
2105 "hostname 'invalid' doesn't match u?'localhost'"):
2106 s.connect((HOST, server.port))
2107
2108 # missing server_hostname arg should cause an exception, too
2109 server = ThreadedEchoServer(context=server_context, chatty=True)
2110 with server:
2111 with closing(socket.socket()) as s:
2112 with self.assertRaisesRegexp(ValueError,
2113 "check_hostname requires server_hostname"):
2114 context.wrap_socket(s)
2115
Antoine Pitrou3945c862010-04-28 21:11:01 +00002116 def test_empty_cert(self):
2117 """Connecting with an empty cert file"""
2118 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2119 "nullcert.pem"))
2120 def test_malformed_cert(self):
2121 """Connecting with a badly formatted certificate (syntax error)"""
2122 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2123 "badcert.pem"))
2124 def test_nonexisting_cert(self):
2125 """Connecting with a non-existing cert file"""
2126 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2127 "wrongcert.pem"))
2128 def test_malformed_key(self):
2129 """Connecting with a badly formatted key (syntax error)"""
2130 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
2131 "badkey.pem"))
Bill Janssen98d19da2007-09-10 21:51:02 +00002132
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002133 def test_rude_shutdown(self):
2134 """A brutal shutdown of an SSL server should raise an OSError
2135 in the client when attempting handshake.
2136 """
2137 listener_ready = threading.Event()
2138 listener_gone = threading.Event()
2139
2140 s = socket.socket()
2141 port = support.bind_port(s, HOST)
2142
2143 # `listener` runs in a thread. It sits in an accept() until
2144 # the main thread connects. Then it rudely closes the socket,
2145 # and sets Event `listener_gone` to let the main thread know
2146 # the socket is gone.
2147 def listener():
2148 s.listen(5)
2149 listener_ready.set()
2150 newsock, addr = s.accept()
2151 newsock.close()
2152 s.close()
2153 listener_gone.set()
2154
2155 def connector():
2156 listener_ready.wait()
2157 with closing(socket.socket()) as c:
2158 c.connect((HOST, port))
2159 listener_gone.wait()
2160 try:
2161 ssl_sock = ssl.wrap_socket(c)
Benjamin Petersone208b572014-08-20 14:49:08 -05002162 except socket.error:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002163 pass
2164 else:
2165 self.fail('connecting to closed SSL socket should have failed')
2166
2167 t = threading.Thread(target=listener)
2168 t.start()
2169 try:
2170 connector()
2171 finally:
2172 t.join()
2173
Antoine Pitroud75efd92010-08-04 17:38:33 +00002174 @skip_if_broken_ubuntu_ssl
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002175 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2176 "OpenSSL is compiled without SSLv2 support")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002177 def test_protocol_sslv2(self):
2178 """Connecting to an SSLv2 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002179 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002180 sys.stdout.write("\n")
Antoine Pitrou3945c862010-04-28 21:11:01 +00002181 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2182 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2183 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Antoine Pitrou3b2afbb2014-01-09 19:52:12 +01002184 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002185 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2186 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002187 # SSLv23 client with specific SSL options
2188 if no_sslv2_implies_sslv3_hello():
2189 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2190 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2191 client_options=ssl.OP_NO_SSLv2)
2192 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2193 client_options=ssl.OP_NO_SSLv3)
2194 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2195 client_options=ssl.OP_NO_TLSv1)
Bill Janssen98d19da2007-09-10 21:51:02 +00002196
Antoine Pitroud75efd92010-08-04 17:38:33 +00002197 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002198 def test_protocol_sslv23(self):
2199 """Connecting to an SSLv23 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002200 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002201 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002202 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2203 try:
2204 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2205 except socket.error as x:
2206 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2207 if support.verbose:
2208 sys.stdout.write(
2209 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2210 % str(x))
Alex Gaynore98205d2014-09-04 13:33:22 -07002211 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3')
Antoine Pitrou3945c862010-04-28 21:11:01 +00002212 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
Alex Gaynore98205d2014-09-04 13:33:22 -07002213 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
Bill Janssen98d19da2007-09-10 21:51:02 +00002214
Alex Gaynore98205d2014-09-04 13:33:22 -07002215 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002216 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
Alex Gaynore98205d2014-09-04 13:33:22 -07002217 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Bill Janssen98d19da2007-09-10 21:51:02 +00002218
Alex Gaynore98205d2014-09-04 13:33:22 -07002219 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002220 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
Alex Gaynore98205d2014-09-04 13:33:22 -07002221 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Bill Janssen98d19da2007-09-10 21:51:02 +00002222
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002223 # Server with specific SSL options
2224 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2225 server_options=ssl.OP_NO_SSLv3)
2226 # Will choose TLSv1
2227 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2228 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2229 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2230 server_options=ssl.OP_NO_TLSv1)
2231
2232
Antoine Pitroud75efd92010-08-04 17:38:33 +00002233 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002234 def test_protocol_sslv3(self):
2235 """Connecting to an SSLv3 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002236 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002237 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002238 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2239 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2240 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002241 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2242 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002243 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2244 client_options=ssl.OP_NO_SSLv3)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002245 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002246 if no_sslv2_implies_sslv3_hello():
2247 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Alex Gaynore98205d2014-09-04 13:33:22 -07002248 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 'SSLv3',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002249 client_options=ssl.OP_NO_SSLv2)
Bill Janssen98d19da2007-09-10 21:51:02 +00002250
Antoine Pitroud75efd92010-08-04 17:38:33 +00002251 @skip_if_broken_ubuntu_ssl
Antoine Pitrou3945c862010-04-28 21:11:01 +00002252 def test_protocol_tlsv1(self):
2253 """Connecting to a TLSv1 server with various client options"""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002254 if support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +00002255 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002256 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2257 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2258 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Victor Stinnerb1241f92011-05-10 01:52:03 +02002259 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2260 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002261 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002262 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2263 client_options=ssl.OP_NO_TLSv1)
2264
2265 @skip_if_broken_ubuntu_ssl
2266 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2267 "TLS version 1.1 not supported.")
2268 def test_protocol_tlsv1_1(self):
2269 """Connecting to a TLSv1.1 server with various client options.
2270 Testing against older TLS versions."""
2271 if support.verbose:
2272 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002273 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002274 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2275 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2276 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2277 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2278 client_options=ssl.OP_NO_TLSv1_1)
2279
Alex Gaynore98205d2014-09-04 13:33:22 -07002280 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002281 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2282 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2283
2284
2285 @skip_if_broken_ubuntu_ssl
2286 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2287 "TLS version 1.2 not supported.")
2288 def test_protocol_tlsv1_2(self):
2289 """Connecting to a TLSv1.2 server with various client options.
2290 Testing against older TLS versions."""
2291 if support.verbose:
2292 sys.stdout.write("\n")
Alex Gaynore98205d2014-09-04 13:33:22 -07002293 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002294 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2295 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2296 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2297 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2298 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2299 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2300 client_options=ssl.OP_NO_TLSv1_2)
2301
Alex Gaynore98205d2014-09-04 13:33:22 -07002302 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002303 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2304 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2305 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2306 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
Bill Janssen98d19da2007-09-10 21:51:02 +00002307
Antoine Pitrou3945c862010-04-28 21:11:01 +00002308 def test_starttls(self):
2309 """Switching from clear text to encrypted and back again."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002310 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
Bill Janssen98d19da2007-09-10 21:51:02 +00002311
Trent Nelsone41b0062008-04-08 23:47:30 +00002312 server = ThreadedEchoServer(CERTFILE,
Bill Janssen98d19da2007-09-10 21:51:02 +00002313 ssl_version=ssl.PROTOCOL_TLSv1,
2314 starttls_server=True,
2315 chatty=True,
2316 connectionchatty=True)
Bill Janssen98d19da2007-09-10 21:51:02 +00002317 wrapped = False
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002318 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002319 s = socket.socket()
2320 s.setblocking(1)
2321 s.connect((HOST, server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002322 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002323 sys.stdout.write("\n")
2324 for indata in msgs:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002325 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002326 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002327 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002328 if wrapped:
2329 conn.write(indata)
2330 outdata = conn.read()
2331 else:
2332 s.send(indata)
2333 outdata = s.recv(1024)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002334 msg = outdata.strip().lower()
2335 if indata == b"STARTTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002336 # STARTTLS ok, switch to secure mode
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002337 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002338 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002339 " client: read %r from server, starting TLS...\n"
2340 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002341 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2342 wrapped = True
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002343 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
Antoine Pitrou3945c862010-04-28 21:11:01 +00002344 # ENDTLS ok, switch back to clear text
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002345 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002346 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002347 " client: read %r from server, ending TLS...\n"
2348 % msg)
Antoine Pitroudb187842010-04-27 10:32:58 +00002349 s = conn.unwrap()
2350 wrapped = False
Bill Janssen98d19da2007-09-10 21:51:02 +00002351 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002352 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002353 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002354 " client: read %r from server\n" % msg)
2355 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002356 sys.stdout.write(" client: closing connection.\n")
2357 if wrapped:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002358 conn.write(b"over\n")
Antoine Pitroudb187842010-04-27 10:32:58 +00002359 else:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002360 s.send(b"over\n")
2361 if wrapped:
2362 conn.close()
2363 else:
2364 s.close()
Bill Janssen98d19da2007-09-10 21:51:02 +00002365
Antoine Pitrou3945c862010-04-28 21:11:01 +00002366 def test_socketserver(self):
2367 """Using a SocketServer to create and manage SSL connections."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002368 server = make_https_server(self, certfile=CERTFILE)
Bill Janssen296a59d2007-09-16 22:06:00 +00002369 # try to connect
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002370 if support.verbose:
2371 sys.stdout.write('\n')
2372 with open(CERTFILE, 'rb') as f:
2373 d1 = f.read()
2374 d2 = ''
2375 # now fetch the same data from the HTTPS server
2376 url = 'https://%s:%d/%s' % (
2377 HOST, server.port, os.path.split(CERTFILE)[1])
Victor Stinnera3acea32014-09-05 21:05:05 +02002378 with support.check_py3k_warnings():
2379 f = urllib.urlopen(url)
Bill Janssen296a59d2007-09-16 22:06:00 +00002380 try:
Bill Janssen296a59d2007-09-16 22:06:00 +00002381 dlen = f.info().getheader("content-length")
2382 if dlen and (int(dlen) > 0):
2383 d2 = f.read(int(dlen))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002384 if support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +00002385 sys.stdout.write(
2386 " client: read %d bytes from remote server '%s'\n"
2387 % (len(d2), server))
Bill Janssen296a59d2007-09-16 22:06:00 +00002388 finally:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002389 f.close()
2390 self.assertEqual(d1, d2)
Bill Janssen934b16d2008-06-28 22:19:33 +00002391
Antoine Pitrou3945c862010-04-28 21:11:01 +00002392 def test_asyncore_server(self):
2393 """Check the example asyncore integration."""
Bill Janssen934b16d2008-06-28 22:19:33 +00002394 indata = "TEST MESSAGE of mixed case\n"
2395
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002396 if support.verbose:
Bill Janssen934b16d2008-06-28 22:19:33 +00002397 sys.stdout.write("\n")
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002398
2399 indata = b"FOO\n"
Bill Janssen934b16d2008-06-28 22:19:33 +00002400 server = AsyncoreEchoServer(CERTFILE)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002401 with server:
Antoine Pitroudb187842010-04-27 10:32:58 +00002402 s = ssl.wrap_socket(socket.socket())
2403 s.connect(('127.0.0.1', server.port))
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002404 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002405 sys.stdout.write(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002406 " client: sending %r...\n" % indata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002407 s.write(indata)
2408 outdata = s.read()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002409 if support.verbose:
2410 sys.stdout.write(" client: read %r\n" % outdata)
Antoine Pitroudb187842010-04-27 10:32:58 +00002411 if outdata != indata.lower():
2412 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002413 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2414 % (outdata[:20], len(outdata),
2415 indata[:20].lower(), len(indata)))
2416 s.write(b"over\n")
2417 if support.verbose:
Antoine Pitroudb187842010-04-27 10:32:58 +00002418 sys.stdout.write(" client: closing connection.\n")
2419 s.close()
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002420 if support.verbose:
2421 sys.stdout.write(" client: connection closed.\n")
Bill Janssen934b16d2008-06-28 22:19:33 +00002422
Antoine Pitrou3945c862010-04-28 21:11:01 +00002423 def test_recv_send(self):
2424 """Test recv(), send() and friends."""
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002425 if support.verbose:
Bill Janssen61c001a2008-09-08 16:37:24 +00002426 sys.stdout.write("\n")
2427
2428 server = ThreadedEchoServer(CERTFILE,
2429 certreqs=ssl.CERT_NONE,
2430 ssl_version=ssl.PROTOCOL_TLSv1,
2431 cacerts=CERTFILE,
2432 chatty=True,
2433 connectionchatty=False)
Antoine Pitrou5b95eb92011-12-21 16:52:40 +01002434 with server:
2435 s = ssl.wrap_socket(socket.socket(),
2436 server_side=False,
2437 certfile=CERTFILE,
2438 ca_certs=CERTFILE,
2439 cert_reqs=ssl.CERT_NONE,
2440 ssl_version=ssl.PROTOCOL_TLSv1)
2441 s.connect((HOST, server.port))
Bill Janssen61c001a2008-09-08 16:37:24 +00002442 # helper methods for standardising recv* method signatures
2443 def _recv_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002444 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002445 count = s.recv_into(b)
2446 return b[:count]
2447
2448 def _recvfrom_into():
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002449 b = bytearray(b"\0"*100)
Bill Janssen61c001a2008-09-08 16:37:24 +00002450 count, addr = s.recvfrom_into(b)
2451 return b[:count]
2452
2453 # (name, method, whether to expect success, *args)
2454 send_methods = [
2455 ('send', s.send, True, []),
2456 ('sendto', s.sendto, False, ["some.address"]),
2457 ('sendall', s.sendall, True, []),
2458 ]
2459 recv_methods = [
2460 ('recv', s.recv, True, []),
2461 ('recvfrom', s.recvfrom, False, ["some.address"]),
2462 ('recv_into', _recv_into, True, []),
2463 ('recvfrom_into', _recvfrom_into, False, []),
2464 ]
2465 data_prefix = u"PREFIX_"
2466
2467 for meth_name, send_meth, expect_success, args in send_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002468 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002469 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002470 send_meth(indata, *args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002471 outdata = s.read()
Bill Janssen61c001a2008-09-08 16:37:24 +00002472 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002473 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002474 "While sending with <<{name:s}>> bad data "
2475 "<<{outdata:r}>> ({nout:d}) received; "
2476 "expected <<{indata:r}>> ({nin:d})\n".format(
2477 name=meth_name, outdata=outdata[:20],
2478 nout=len(outdata),
2479 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002480 )
2481 )
2482 except ValueError as e:
2483 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002484 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002485 "Failed to send with method <<{name:s}>>; "
2486 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002487 )
2488 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002489 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002490 "Method <<{name:s}>> failed with unexpected "
2491 "exception message: {exp:s}\n".format(
2492 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002493 )
2494 )
2495
2496 for meth_name, recv_meth, expect_success, args in recv_methods:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002497 indata = (data_prefix + meth_name).encode('ascii')
Bill Janssen61c001a2008-09-08 16:37:24 +00002498 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002499 s.send(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002500 outdata = recv_meth(*args)
Bill Janssen61c001a2008-09-08 16:37:24 +00002501 if outdata != indata.lower():
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002502 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002503 "While receiving with <<{name:s}>> bad data "
2504 "<<{outdata:r}>> ({nout:d}) received; "
2505 "expected <<{indata:r}>> ({nin:d})\n".format(
2506 name=meth_name, outdata=outdata[:20],
2507 nout=len(outdata),
2508 indata=indata[:20], nin=len(indata)
Bill Janssen61c001a2008-09-08 16:37:24 +00002509 )
2510 )
2511 except ValueError as e:
2512 if expect_success:
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002513 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002514 "Failed to receive with method <<{name:s}>>; "
2515 "expected to succeed.\n".format(name=meth_name)
Bill Janssen61c001a2008-09-08 16:37:24 +00002516 )
2517 if not str(e).startswith(meth_name):
Georg Brandlc7ca56d2010-02-06 23:23:45 +00002518 self.fail(
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002519 "Method <<{name:s}>> failed with unexpected "
2520 "exception message: {exp:s}\n".format(
2521 name=meth_name, exp=e
Bill Janssen61c001a2008-09-08 16:37:24 +00002522 )
2523 )
2524 # consume data
2525 s.read()
2526
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002527 s.write(b"over\n")
Bill Janssen61c001a2008-09-08 16:37:24 +00002528 s.close()
Bill Janssen61c001a2008-09-08 16:37:24 +00002529
Antoine Pitroufc69af12010-04-24 20:04:58 +00002530 def test_handshake_timeout(self):
2531 # Issue #5103: SSL handshake must respect the socket timeout
2532 server = socket.socket(socket.AF_INET)
2533 host = "127.0.0.1"
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002534 port = support.bind_port(server)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002535 started = threading.Event()
2536 finish = False
2537
2538 def serve():
2539 server.listen(5)
2540 started.set()
2541 conns = []
2542 while not finish:
2543 r, w, e = select.select([server], [], [], 0.1)
2544 if server in r:
2545 # Let the socket hang around rather than having
2546 # it closed by garbage collection.
2547 conns.append(server.accept()[0])
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002548 for sock in conns:
2549 sock.close()
Antoine Pitroufc69af12010-04-24 20:04:58 +00002550
2551 t = threading.Thread(target=serve)
2552 t.start()
2553 started.wait()
2554
2555 try:
2556 try:
2557 c = socket.socket(socket.AF_INET)
2558 c.settimeout(0.2)
2559 c.connect((host, port))
2560 # Will attempt handshake and time out
2561 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2562 ssl.wrap_socket, c)
2563 finally:
2564 c.close()
2565 try:
2566 c = socket.socket(socket.AF_INET)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002567 c = ssl.wrap_socket(c)
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002568 c.settimeout(0.2)
Antoine Pitroufc69af12010-04-24 20:04:58 +00002569 # Will attempt handshake and time out
2570 self.assertRaisesRegexp(ssl.SSLError, "timed out",
2571 c.connect, (host, port))
2572 finally:
2573 c.close()
2574 finally:
2575 finish = True
2576 t.join()
2577 server.close()
2578
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002579 def test_server_accept(self):
2580 # Issue #16357: accept() on a SSLSocket created through
2581 # SSLContext.wrap_socket().
2582 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2583 context.verify_mode = ssl.CERT_REQUIRED
2584 context.load_verify_locations(CERTFILE)
2585 context.load_cert_chain(CERTFILE)
2586 server = socket.socket(socket.AF_INET)
2587 host = "127.0.0.1"
2588 port = support.bind_port(server)
2589 server = context.wrap_socket(server, server_side=True)
2590
2591 evt = threading.Event()
2592 remote = [None]
2593 peer = [None]
2594 def serve():
2595 server.listen(5)
2596 # Block on the accept and wait on the connection to close.
2597 evt.set()
2598 remote[0], peer[0] = server.accept()
2599 remote[0].recv(1)
2600
2601 t = threading.Thread(target=serve)
2602 t.start()
2603 # Client wait until server setup and perform a connect.
2604 evt.wait()
2605 client = context.wrap_socket(socket.socket())
2606 client.connect((host, port))
2607 client_addr = client.getsockname()
2608 client.close()
2609 t.join()
2610 remote[0].close()
2611 server.close()
2612 # Sanity checks.
2613 self.assertIsInstance(remote[0], ssl.SSLSocket)
2614 self.assertEqual(peer[0], client_addr)
2615
2616 def test_getpeercert_enotconn(self):
2617 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2618 with closing(context.wrap_socket(socket.socket())) as sock:
2619 with self.assertRaises(socket.error) as cm:
2620 sock.getpeercert()
2621 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2622
2623 def test_do_handshake_enotconn(self):
2624 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2625 with closing(context.wrap_socket(socket.socket())) as sock:
2626 with self.assertRaises(socket.error) as cm:
2627 sock.do_handshake()
2628 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2629
Antoine Pitroud76088d2012-01-03 22:46:48 +01002630 def test_default_ciphers(self):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002631 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2632 try:
2633 # Force a set of weak ciphers on our client context
2634 context.set_ciphers("DES")
2635 except ssl.SSLError:
2636 self.skipTest("no DES cipher available")
Antoine Pitroud76088d2012-01-03 22:46:48 +01002637 with ThreadedEchoServer(CERTFILE,
2638 ssl_version=ssl.PROTOCOL_SSLv23,
2639 chatty=False) as server:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002640 with closing(context.wrap_socket(socket.socket())) as s:
2641 with self.assertRaises(ssl.SSLError):
Antoine Pitroud76088d2012-01-03 22:46:48 +01002642 s.connect((HOST, server.port))
Antoine Pitroud76088d2012-01-03 22:46:48 +01002643 self.assertIn("no shared cipher", str(server.conn_errors[0]))
2644
Alex Gaynore98205d2014-09-04 13:33:22 -07002645 def test_version_basic(self):
2646 """
2647 Basic tests for SSLSocket.version().
2648 More tests are done in the test_protocol_*() methods.
2649 """
2650 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2651 with ThreadedEchoServer(CERTFILE,
2652 ssl_version=ssl.PROTOCOL_TLSv1,
2653 chatty=False) as server:
2654 with closing(context.wrap_socket(socket.socket())) as s:
2655 self.assertIs(s.version(), None)
2656 s.connect((HOST, server.port))
2657 self.assertEqual(s.version(), "TLSv1")
2658 self.assertIs(s.version(), None)
2659
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002660 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2661 def test_default_ecdh_curve(self):
2662 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2663 # should be enabled by default on SSL contexts.
2664 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2665 context.load_cert_chain(CERTFILE)
2666 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2667 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
2668 # our default cipher list should prefer ECDH-based ciphers
2669 # automatically.
2670 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2671 context.set_ciphers("ECCdraft:ECDH")
2672 with ThreadedEchoServer(context=context) as server:
2673 with closing(context.wrap_socket(socket.socket())) as s:
2674 s.connect((HOST, server.port))
2675 self.assertIn("ECDH", s.cipher()[0])
2676
2677 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2678 "'tls-unique' channel binding not available")
2679 def test_tls_unique_channel_binding(self):
2680 """Test tls-unique channel binding."""
2681 if support.verbose:
2682 sys.stdout.write("\n")
2683
2684 server = ThreadedEchoServer(CERTFILE,
2685 certreqs=ssl.CERT_NONE,
2686 ssl_version=ssl.PROTOCOL_TLSv1,
2687 cacerts=CERTFILE,
2688 chatty=True,
2689 connectionchatty=False)
2690 with server:
2691 s = ssl.wrap_socket(socket.socket(),
2692 server_side=False,
2693 certfile=CERTFILE,
2694 ca_certs=CERTFILE,
2695 cert_reqs=ssl.CERT_NONE,
2696 ssl_version=ssl.PROTOCOL_TLSv1)
2697 s.connect((HOST, server.port))
2698 # get the data
2699 cb_data = s.get_channel_binding("tls-unique")
2700 if support.verbose:
2701 sys.stdout.write(" got channel binding data: {0!r}\n"
2702 .format(cb_data))
2703
2704 # check if it is sane
2705 self.assertIsNotNone(cb_data)
2706 self.assertEqual(len(cb_data), 12) # True for TLSv1
2707
2708 # and compare with the peers version
2709 s.write(b"CB tls-unique\n")
2710 peer_data_repr = s.read().strip()
2711 self.assertEqual(peer_data_repr,
2712 repr(cb_data).encode("us-ascii"))
2713 s.close()
2714
2715 # now, again
2716 s = ssl.wrap_socket(socket.socket(),
2717 server_side=False,
2718 certfile=CERTFILE,
2719 ca_certs=CERTFILE,
2720 cert_reqs=ssl.CERT_NONE,
2721 ssl_version=ssl.PROTOCOL_TLSv1)
2722 s.connect((HOST, server.port))
2723 new_cb_data = s.get_channel_binding("tls-unique")
2724 if support.verbose:
2725 sys.stdout.write(" got another channel binding data: {0!r}\n"
2726 .format(new_cb_data))
2727 # is it really unique
2728 self.assertNotEqual(cb_data, new_cb_data)
2729 self.assertIsNotNone(cb_data)
2730 self.assertEqual(len(cb_data), 12) # True for TLSv1
2731 s.write(b"CB tls-unique\n")
2732 peer_data_repr = s.read().strip()
2733 self.assertEqual(peer_data_repr,
2734 repr(new_cb_data).encode("us-ascii"))
2735 s.close()
2736
2737 def test_compression(self):
2738 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2739 context.load_cert_chain(CERTFILE)
2740 stats = server_params_test(context, context,
2741 chatty=True, connectionchatty=True)
2742 if support.verbose:
2743 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2744 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2745
2746 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2747 "ssl.OP_NO_COMPRESSION needed for this test")
2748 def test_compression_disabled(self):
2749 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2750 context.load_cert_chain(CERTFILE)
2751 context.options |= ssl.OP_NO_COMPRESSION
2752 stats = server_params_test(context, context,
2753 chatty=True, connectionchatty=True)
2754 self.assertIs(stats['compression'], None)
2755
2756 def test_dh_params(self):
2757 # Check we can get a connection with ephemeral Diffie-Hellman
2758 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2759 context.load_cert_chain(CERTFILE)
2760 context.load_dh_params(DHFILE)
2761 context.set_ciphers("kEDH")
2762 stats = server_params_test(context, context,
2763 chatty=True, connectionchatty=True)
2764 cipher = stats["cipher"][0]
2765 parts = cipher.split("-")
2766 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2767 self.fail("Non-DH cipher: " + cipher[0])
2768
2769 def test_selected_npn_protocol(self):
2770 # selected_npn_protocol() is None unless NPN is used
2771 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2772 context.load_cert_chain(CERTFILE)
2773 stats = server_params_test(context, context,
2774 chatty=True, connectionchatty=True)
2775 self.assertIs(stats['client_npn_protocol'], None)
2776
2777 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2778 def test_npn_protocols(self):
2779 server_protocols = ['http/1.1', 'spdy/2']
2780 protocol_tests = [
2781 (['http/1.1', 'spdy/2'], 'http/1.1'),
2782 (['spdy/2', 'http/1.1'], 'http/1.1'),
2783 (['spdy/2', 'test'], 'spdy/2'),
2784 (['abc', 'def'], 'abc')
2785 ]
2786 for client_protocols, expected in protocol_tests:
2787 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2788 server_context.load_cert_chain(CERTFILE)
2789 server_context.set_npn_protocols(server_protocols)
2790 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2791 client_context.load_cert_chain(CERTFILE)
2792 client_context.set_npn_protocols(client_protocols)
2793 stats = server_params_test(client_context, server_context,
2794 chatty=True, connectionchatty=True)
2795
2796 msg = "failed trying %s (s) and %s (c).\n" \
2797 "was expecting %s, but got %%s from the %%s" \
2798 % (str(server_protocols), str(client_protocols),
2799 str(expected))
2800 client_result = stats['client_npn_protocol']
2801 self.assertEqual(client_result, expected, msg % (client_result, "client"))
2802 server_result = stats['server_npn_protocols'][-1] \
2803 if len(stats['server_npn_protocols']) else 'nothing'
2804 self.assertEqual(server_result, expected, msg % (server_result, "server"))
2805
2806 def sni_contexts(self):
2807 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2808 server_context.load_cert_chain(SIGNED_CERTFILE)
2809 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2810 other_context.load_cert_chain(SIGNED_CERTFILE2)
2811 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2812 client_context.verify_mode = ssl.CERT_REQUIRED
2813 client_context.load_verify_locations(SIGNING_CA)
2814 return server_context, other_context, client_context
2815
2816 def check_common_name(self, stats, name):
2817 cert = stats['peercert']
2818 self.assertIn((('commonName', name),), cert['subject'])
2819
2820 @needs_sni
2821 def test_sni_callback(self):
2822 calls = []
2823 server_context, other_context, client_context = self.sni_contexts()
2824
2825 def servername_cb(ssl_sock, server_name, initial_context):
2826 calls.append((server_name, initial_context))
2827 if server_name is not None:
2828 ssl_sock.context = other_context
2829 server_context.set_servername_callback(servername_cb)
2830
2831 stats = server_params_test(client_context, server_context,
2832 chatty=True,
2833 sni_name='supermessage')
2834 # The hostname was fetched properly, and the certificate was
2835 # changed for the connection.
2836 self.assertEqual(calls, [("supermessage", server_context)])
2837 # CERTFILE4 was selected
2838 self.check_common_name(stats, 'fakehostname')
2839
2840 calls = []
2841 # The callback is called with server_name=None
2842 stats = server_params_test(client_context, server_context,
2843 chatty=True,
2844 sni_name=None)
2845 self.assertEqual(calls, [(None, server_context)])
2846 self.check_common_name(stats, 'localhost')
2847
2848 # Check disabling the callback
2849 calls = []
2850 server_context.set_servername_callback(None)
2851
2852 stats = server_params_test(client_context, server_context,
2853 chatty=True,
2854 sni_name='notfunny')
2855 # Certificate didn't change
2856 self.check_common_name(stats, 'localhost')
2857 self.assertEqual(calls, [])
2858
2859 @needs_sni
2860 def test_sni_callback_alert(self):
2861 # Returning a TLS alert is reflected to the connecting client
2862 server_context, other_context, client_context = self.sni_contexts()
2863
2864 def cb_returning_alert(ssl_sock, server_name, initial_context):
2865 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
2866 server_context.set_servername_callback(cb_returning_alert)
2867
2868 with self.assertRaises(ssl.SSLError) as cm:
2869 stats = server_params_test(client_context, server_context,
2870 chatty=False,
2871 sni_name='supermessage')
2872 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
2873
2874 @needs_sni
2875 def test_sni_callback_raising(self):
2876 # Raising fails the connection with a TLS handshake failure alert.
2877 server_context, other_context, client_context = self.sni_contexts()
2878
2879 def cb_raising(ssl_sock, server_name, initial_context):
2880 1/0
2881 server_context.set_servername_callback(cb_raising)
2882
2883 with self.assertRaises(ssl.SSLError) as cm, \
2884 support.captured_stderr() as stderr:
2885 stats = server_params_test(client_context, server_context,
2886 chatty=False,
2887 sni_name='supermessage')
2888 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
2889 self.assertIn("ZeroDivisionError", stderr.getvalue())
2890
2891 @needs_sni
2892 def test_sni_callback_wrong_return_type(self):
2893 # Returning the wrong return type terminates the TLS connection
2894 # with an internal error alert.
2895 server_context, other_context, client_context = self.sni_contexts()
2896
2897 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
2898 return "foo"
2899 server_context.set_servername_callback(cb_wrong_return_type)
2900
2901 with self.assertRaises(ssl.SSLError) as cm, \
2902 support.captured_stderr() as stderr:
2903 stats = server_params_test(client_context, server_context,
2904 chatty=False,
2905 sni_name='supermessage')
2906 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
2907 self.assertIn("TypeError", stderr.getvalue())
2908
2909 def test_read_write_after_close_raises_valuerror(self):
2910 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2911 context.verify_mode = ssl.CERT_REQUIRED
2912 context.load_verify_locations(CERTFILE)
2913 context.load_cert_chain(CERTFILE)
2914 server = ThreadedEchoServer(context=context, chatty=False)
2915
2916 with server:
2917 s = context.wrap_socket(socket.socket())
2918 s.connect((HOST, server.port))
2919 s.close()
2920
2921 self.assertRaises(ValueError, s.read, 1024)
2922 self.assertRaises(ValueError, s.write, b'hello')
2923
Bill Janssen61c001a2008-09-08 16:37:24 +00002924
Neal Norwitz9eb9b102007-08-27 01:15:33 +00002925def test_main(verbose=False):
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002926 if support.verbose:
2927 plats = {
2928 'Linux': platform.linux_distribution,
2929 'Mac': platform.mac_ver,
2930 'Windows': platform.win32_ver,
2931 }
2932 for name, func in plats.items():
2933 plat = func()
2934 if plat and plat[0]:
2935 plat = '%s %r' % (name, plat)
2936 break
2937 else:
2938 plat = repr(platform.platform())
2939 print("test_ssl: testing with %r %r" %
2940 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
2941 print(" under %s" % plat)
2942 print(" HAS_SNI = %r" % ssl.HAS_SNI)
2943 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
2944 try:
2945 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
2946 except AttributeError:
2947 pass
Bill Janssen296a59d2007-09-16 22:06:00 +00002948
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002949 for filename in [
2950 CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, BYTES_CERTFILE,
2951 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
2952 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
2953 BADCERT, BADKEY, EMPTYCERT]:
2954 if not os.path.exists(filename):
2955 raise support.TestFailed("Can't read certificate file %r" % filename)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002956
Benjamin Peterson2f334562014-10-01 23:53:01 -04002957 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002958
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002959 if support.is_resource_enabled('network'):
Bill Janssen934b16d2008-06-28 22:19:33 +00002960 tests.append(NetworkedTests)
Bill Janssen296a59d2007-09-16 22:06:00 +00002961
Bill Janssen98d19da2007-09-10 21:51:02 +00002962 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002963 thread_info = support.threading_setup()
2964 if thread_info:
Bill Janssen934b16d2008-06-28 22:19:33 +00002965 tests.append(ThreadedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002966
Antoine Pitrou3945c862010-04-28 21:11:01 +00002967 try:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002968 support.run_unittest(*tests)
Antoine Pitrou3945c862010-04-28 21:11:01 +00002969 finally:
2970 if _have_threads:
Benjamin Petersondaeb9252014-08-20 14:14:50 -05002971 support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00002972
2973if __name__ == "__main__":
2974 test_main()