blob: a253f51d2a440a444839c6569e6d8daf4a152ee9 [file] [log] [blame]
Thomas Woutersed03b412007-08-28 21:37:11 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00005from test import support
Thomas Woutersed03b412007-08-28 21:37:11 +00006import socket
Bill Janssen6e027db2007-11-15 22:23:56 +00007import select
Thomas Woutersed03b412007-08-28 21:37:11 +00008import time
Christian Heimes9424bb42013-06-17 15:32:57 +02009import datetime
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000010import gc
Thomas Woutersed03b412007-08-28 21:37:11 +000011import os
Antoine Pitroucfcd8ad2010-04-23 23:31:47 +000012import errno
Thomas Woutersed03b412007-08-28 21:37:11 +000013import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +000014import urllib.request
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020015import threading
Thomas Woutersed03b412007-08-28 21:37:11 +000016import traceback
Bill Janssen54cc54c2007-12-14 22:08:56 +000017import asyncore
Antoine Pitrou9d543662010-04-23 23:10:32 +000018import weakref
Antoine Pitrou15cee622010-08-04 16:45:21 +000019import platform
Antoine Pitrou23df4832010-08-04 17:14:06 +000020import functools
Christian Heimes892d66e2018-01-29 14:10:18 +010021import sysconfig
Christian Heimes888bbdc2017-09-07 14:18:21 -070022try:
23 import ctypes
24except ImportError:
25 ctypes = None
Thomas Woutersed03b412007-08-28 21:37:11 +000026
Antoine Pitrou05d936d2010-10-13 11:38:36 +000027ssl = support.import_module("ssl")
28
Martin Panter3840b2a2016-03-27 01:53:46 +000029
Antoine Pitrou2463e5f2013-03-28 22:24:43 +010030PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
Benjamin Petersonee8712c2008-05-20 21:35:26 +000031HOST = support.HOST
Christian Heimes598894f2016-09-05 23:19:05 +020032IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
33IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
Christian Heimes892d66e2018-01-29 14:10:18 +010034PY_SSL_DEFAULT_CIPHERS = sysconfig.get_config_var('PY_SSL_DEFAULT_CIPHERS')
Antoine Pitrou152efa22010-05-16 18:19:27 +000035
Christian Heimesefff7062013-11-21 03:35:02 +010036def data_file(*name):
37 return os.path.join(os.path.dirname(__file__), *name)
Antoine Pitrou152efa22010-05-16 18:19:27 +000038
Antoine Pitrou81564092010-10-08 23:06:24 +000039# The custom key and certificate files used in test_ssl are generated
40# using Lib/test/make_ssl_certs.py.
41# Other certificates are simply fetched from the Internet servers they
42# are meant to authenticate.
43
Antoine Pitrou152efa22010-05-16 18:19:27 +000044CERTFILE = data_file("keycert.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000045BYTES_CERTFILE = os.fsencode(CERTFILE)
Antoine Pitrou152efa22010-05-16 18:19:27 +000046ONLYCERT = data_file("ssl_cert.pem")
47ONLYKEY = data_file("ssl_key.pem")
Victor Stinner313a1202010-06-11 23:56:51 +000048BYTES_ONLYCERT = os.fsencode(ONLYCERT)
49BYTES_ONLYKEY = os.fsencode(ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +020050CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
51ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
52KEY_PASSWORD = "somepass"
Antoine Pitrou152efa22010-05-16 18:19:27 +000053CAPATH = data_file("capath")
Victor Stinner313a1202010-06-11 23:56:51 +000054BYTES_CAPATH = os.fsencode(CAPATH)
Christian Heimesefff7062013-11-21 03:35:02 +010055CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
56CAFILE_CACERT = data_file("capath", "5ed36f99.0")
57
Christian Heimesbd5c7d22018-01-20 15:16:30 +010058CERTFILE_INFO = {
59 'issuer': ((('countryName', 'XY'),),
60 (('localityName', 'Castle Anthrax'),),
61 (('organizationName', 'Python Software Foundation'),),
62 (('commonName', 'localhost'),)),
63 'notAfter': 'Jan 17 19:09:06 2028 GMT',
64 'notBefore': 'Jan 19 19:09:06 2018 GMT',
65 'serialNumber': 'F9BA076D5B6ABD9B',
66 'subject': ((('countryName', 'XY'),),
67 (('localityName', 'Castle Anthrax'),),
68 (('organizationName', 'Python Software Foundation'),),
69 (('commonName', 'localhost'),)),
70 'subjectAltName': (('DNS', 'localhost'),),
71 'version': 3
72}
Antoine Pitrou152efa22010-05-16 18:19:27 +000073
Christian Heimes22587792013-11-21 23:56:13 +010074# empty CRL
75CRLFILE = data_file("revocation.crl")
76
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010077# Two keys and certs signed by the same CA (for SNI tests)
78SIGNED_CERTFILE = data_file("keycert3.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +020079SIGNED_CERTFILE_HOSTNAME = 'localhost'
Christian Heimesbd5c7d22018-01-20 15:16:30 +010080
81SIGNED_CERTFILE_INFO = {
82 'OCSP': ('http://testca.pythontest.net/testca/ocsp/',),
83 'caIssuers': ('http://testca.pythontest.net/testca/pycacert.cer',),
84 'crlDistributionPoints': ('http://testca.pythontest.net/testca/revocation.crl',),
85 'issuer': ((('countryName', 'XY'),),
86 (('organizationName', 'Python Software Foundation CA'),),
87 (('commonName', 'our-ca-server'),)),
88 'notAfter': 'Nov 28 19:09:06 2027 GMT',
89 'notBefore': 'Jan 19 19:09:06 2018 GMT',
90 'serialNumber': '82EDBF41C880919C',
91 'subject': ((('countryName', 'XY'),),
92 (('localityName', 'Castle Anthrax'),),
93 (('organizationName', 'Python Software Foundation'),),
94 (('commonName', 'localhost'),)),
95 'subjectAltName': (('DNS', 'localhost'),),
96 'version': 3
97}
98
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +010099SIGNED_CERTFILE2 = data_file("keycert4.pem")
Christian Heimesa170fa12017-09-15 20:27:30 +0200100SIGNED_CERTFILE2_HOSTNAME = 'fakehostname'
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100101SIGNED_CERTFILE_ECC = data_file("keycertecc.pem")
102SIGNED_CERTFILE_ECC_HOSTNAME = 'localhost-ecc'
103
Martin Panter3840b2a2016-03-27 01:53:46 +0000104# Same certificate as pycacert.pem, but without extra text in file
105SIGNING_CA = data_file("capath", "ceff1710.0")
Christian Heimes1c03abd2016-09-06 23:25:35 +0200106# cert with all kinds of subject alt names
107ALLSANFILE = data_file("allsans.pem")
Christian Heimes66e57422018-01-29 14:25:13 +0100108IDNSANSFILE = data_file("idnsans.pem")
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100109
Martin Panter3d81d932016-01-14 09:36:00 +0000110REMOTE_HOST = "self-signed.pythontest.net"
Antoine Pitrou152efa22010-05-16 18:19:27 +0000111
112EMPTYCERT = data_file("nullcert.pem")
113BADCERT = data_file("badcert.pem")
Martin Panter407b62f2016-01-30 03:41:43 +0000114NONEXISTINGCERT = data_file("XXXnonexisting.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000115BADKEY = data_file("badkey.pem")
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200116NOKIACERT = data_file("nokia.pem")
Christian Heimes824f7f32013-08-17 00:54:47 +0200117NULLBYTECERT = data_file("nullbytecert.pem")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000118
Benjamin Petersona7eaf562015-04-02 00:04:06 -0400119DHFILE = data_file("dh1024.pem")
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100120BYTES_DHFILE = os.fsencode(DHFILE)
Thomas Woutersed03b412007-08-28 21:37:11 +0000121
Christian Heimes358cfd42016-09-10 22:43:48 +0200122# Not defined in all versions of OpenSSL
123OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
124OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
125OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
126OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
127
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100128
Thomas Woutersed03b412007-08-28 21:37:11 +0000129def handle_error(prefix):
130 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000131 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000132 sys.stdout.write(prefix + exc_format)
Thomas Woutersed03b412007-08-28 21:37:11 +0000133
Antoine Pitroub5218772010-05-21 09:56:06 +0000134def can_clear_options():
135 # 0.9.8m or higher
Antoine Pitroub9ac25d2011-07-08 18:47:06 +0200136 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
Antoine Pitroub5218772010-05-21 09:56:06 +0000137
138def no_sslv2_implies_sslv3_hello():
139 # 0.9.7h or higher
140 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
141
Christian Heimes2427b502013-11-23 11:24:32 +0100142def have_verify_flags():
143 # 0.9.8 or higher
144 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
145
Antoine Pitrouc695c952014-04-28 20:57:36 +0200146def utc_offset(): #NOTE: ignore issues like #1647654
147 # local time = utc time + utc offset
148 if time.daylight and time.localtime().tm_isdst > 0:
149 return -time.altzone # seconds
150 return -time.timezone
151
Christian Heimes9424bb42013-06-17 15:32:57 +0200152def asn1time(cert_time):
153 # Some versions of OpenSSL ignore seconds, see #18207
154 # 0.9.8.i
155 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
156 fmt = "%b %d %H:%M:%S %Y GMT"
157 dt = datetime.datetime.strptime(cert_time, fmt)
158 dt = dt.replace(second=0)
159 cert_time = dt.strftime(fmt)
160 # %d adds leading zero but ASN1_TIME_print() uses leading space
161 if cert_time[4] == "0":
162 cert_time = cert_time[:4] + " " + cert_time[5:]
163
164 return cert_time
Thomas Woutersed03b412007-08-28 21:37:11 +0000165
Antoine Pitrou23df4832010-08-04 17:14:06 +0000166# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
167def skip_if_broken_ubuntu_ssl(func):
Victor Stinner3de49192011-05-09 00:42:58 +0200168 if hasattr(ssl, 'PROTOCOL_SSLv2'):
169 @functools.wraps(func)
170 def f(*args, **kwargs):
171 try:
172 ssl.SSLContext(ssl.PROTOCOL_SSLv2)
173 except ssl.SSLError:
174 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
175 platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
176 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
177 return func(*args, **kwargs)
178 return f
179 else:
180 return func
Antoine Pitrou23df4832010-08-04 17:14:06 +0000181
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +0100182needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
183
Antoine Pitrou23df4832010-08-04 17:14:06 +0000184
Christian Heimesd0486372016-09-10 23:23:33 +0200185def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
186 cert_reqs=ssl.CERT_NONE, ca_certs=None,
187 ciphers=None, certfile=None, keyfile=None,
188 **kwargs):
189 context = ssl.SSLContext(ssl_version)
190 if cert_reqs is not None:
Christian Heimesa170fa12017-09-15 20:27:30 +0200191 if cert_reqs == ssl.CERT_NONE:
192 context.check_hostname = False
Christian Heimesd0486372016-09-10 23:23:33 +0200193 context.verify_mode = cert_reqs
194 if ca_certs is not None:
195 context.load_verify_locations(ca_certs)
196 if certfile is not None or keyfile is not None:
197 context.load_cert_chain(certfile, keyfile)
198 if ciphers is not None:
199 context.set_ciphers(ciphers)
200 return context.wrap_socket(sock, **kwargs)
201
Christian Heimesa170fa12017-09-15 20:27:30 +0200202
203def testing_context(server_cert=SIGNED_CERTFILE):
204 """Create context
205
206 client_context, server_context, hostname = testing_context()
207 """
208 if server_cert == SIGNED_CERTFILE:
209 hostname = SIGNED_CERTFILE_HOSTNAME
210 elif server_cert == SIGNED_CERTFILE2:
211 hostname = SIGNED_CERTFILE2_HOSTNAME
212 else:
213 raise ValueError(server_cert)
214
215 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
216 client_context.load_verify_locations(SIGNING_CA)
217
218 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
219 server_context.load_cert_chain(server_cert)
220
221 return client_context, server_context, hostname
222
223
Antoine Pitrou152efa22010-05-16 18:19:27 +0000224class BasicSocketTests(unittest.TestCase):
Thomas Woutersed03b412007-08-28 21:37:11 +0000225
Antoine Pitrou480a1242010-04-28 21:37:09 +0000226 def test_constants(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000227 ssl.CERT_NONE
228 ssl.CERT_OPTIONAL
229 ssl.CERT_REQUIRED
Antoine Pitrou6db49442011-12-19 13:27:11 +0100230 ssl.OP_CIPHER_SERVER_PREFERENCE
Antoine Pitrou0e576f12011-12-22 10:03:38 +0100231 ssl.OP_SINGLE_DH_USE
Antoine Pitrouc135fa42012-02-19 21:22:39 +0100232 if ssl.HAS_ECDH:
233 ssl.OP_SINGLE_ECDH_USE
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +0100234 if ssl.OPENSSL_VERSION_INFO >= (1, 0):
235 ssl.OP_NO_COMPRESSION
Antoine Pitroud5323212010-10-22 18:19:07 +0000236 self.assertIn(ssl.HAS_SNI, {True, False})
Antoine Pitrou501da612011-12-21 09:27:41 +0100237 self.assertIn(ssl.HAS_ECDH, {True, False})
Christian Heimescb5b68a2017-09-07 18:07:00 -0700238 ssl.OP_NO_SSLv2
239 ssl.OP_NO_SSLv3
240 ssl.OP_NO_TLSv1
241 ssl.OP_NO_TLSv1_3
242 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 1):
243 ssl.OP_NO_TLSv1_1
244 ssl.OP_NO_TLSv1_2
Christian Heimesa170fa12017-09-15 20:27:30 +0200245 self.assertEqual(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv23)
Thomas Woutersed03b412007-08-28 21:37:11 +0000246
Antoine Pitrou172f0252014-04-18 20:33:08 +0200247 def test_str_for_enums(self):
248 # Make sure that the PROTOCOL_* constants have enum-like string
249 # reprs.
Christian Heimes598894f2016-09-05 23:19:05 +0200250 proto = ssl.PROTOCOL_TLS
251 self.assertEqual(str(proto), '_SSLMethod.PROTOCOL_TLS')
Antoine Pitrou172f0252014-04-18 20:33:08 +0200252 ctx = ssl.SSLContext(proto)
253 self.assertIs(ctx.protocol, proto)
254
Antoine Pitrou480a1242010-04-28 21:37:09 +0000255 def test_random(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000256 v = ssl.RAND_status()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000257 if support.verbose:
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000258 sys.stdout.write("\n RAND_status is %d (%s)\n"
259 % (v, (v and "sufficient randomness") or
260 "insufficient randomness"))
Victor Stinner99c8b162011-05-24 12:05:19 +0200261
262 data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
263 self.assertEqual(len(data), 16)
264 self.assertEqual(is_cryptographic, v == 1)
265 if v:
266 data = ssl.RAND_bytes(16)
267 self.assertEqual(len(data), 16)
Victor Stinner2e2baa92011-05-25 11:15:16 +0200268 else:
269 self.assertRaises(ssl.SSLError, ssl.RAND_bytes, 16)
Victor Stinner99c8b162011-05-24 12:05:19 +0200270
Victor Stinner1e81a392013-12-19 16:47:04 +0100271 # negative num is invalid
272 self.assertRaises(ValueError, ssl.RAND_bytes, -5)
273 self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
274
Victor Stinnerbeeb5122014-11-28 13:28:25 +0100275 if hasattr(ssl, 'RAND_egd'):
276 self.assertRaises(TypeError, ssl.RAND_egd, 1)
277 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000278 ssl.RAND_add("this is a random string", 75.0)
Serhiy Storchaka8490f5a2015-03-20 09:00:36 +0200279 ssl.RAND_add(b"this is a random bytes object", 75.0)
280 ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
Thomas Woutersed03b412007-08-28 21:37:11 +0000281
Christian Heimesf77b4b22013-08-21 13:26:05 +0200282 @unittest.skipUnless(os.name == 'posix', 'requires posix')
283 def test_random_fork(self):
284 status = ssl.RAND_status()
285 if not status:
286 self.fail("OpenSSL's PRNG has insufficient randomness")
287
288 rfd, wfd = os.pipe()
289 pid = os.fork()
290 if pid == 0:
291 try:
292 os.close(rfd)
293 child_random = ssl.RAND_pseudo_bytes(16)[0]
294 self.assertEqual(len(child_random), 16)
295 os.write(wfd, child_random)
296 os.close(wfd)
297 except BaseException:
298 os._exit(1)
299 else:
300 os._exit(0)
301 else:
302 os.close(wfd)
303 self.addCleanup(os.close, rfd)
304 _, status = os.waitpid(pid, 0)
305 self.assertEqual(status, 0)
306
307 child_random = os.read(rfd, 16)
308 self.assertEqual(len(child_random), 16)
309 parent_random = ssl.RAND_pseudo_bytes(16)[0]
310 self.assertEqual(len(parent_random), 16)
311
312 self.assertNotEqual(child_random, parent_random)
313
Antoine Pitrou480a1242010-04-28 21:37:09 +0000314 def test_parse_cert(self):
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000315 # note that this uses an 'unofficial' function in _ssl.c,
316 # provided solely for this test, to exercise the certificate
317 # parsing code
Christian Heimesbd5c7d22018-01-20 15:16:30 +0100318 self.assertEqual(
319 ssl._ssl._test_decode_cert(CERTFILE),
320 CERTFILE_INFO
321 )
322 self.assertEqual(
323 ssl._ssl._test_decode_cert(SIGNED_CERTFILE),
324 SIGNED_CERTFILE_INFO
325 )
326
Antoine Pitroud8c347a2011-10-01 19:20:25 +0200327 # Issue #13034: the subjectAltName in some certificates
328 # (notably projects.developer.nokia.com:443) wasn't parsed
329 p = ssl._ssl._test_decode_cert(NOKIACERT)
330 if support.verbose:
331 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
332 self.assertEqual(p['subjectAltName'],
333 (('DNS', 'projects.developer.nokia.com'),
334 ('DNS', 'projects.forum.nokia.com'))
335 )
Christian Heimesbd3a7f92013-11-21 03:40:15 +0100336 # extra OCSP and AIA fields
337 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
338 self.assertEqual(p['caIssuers'],
339 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
340 self.assertEqual(p['crlDistributionPoints'],
341 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
Thomas Woutersed03b412007-08-28 21:37:11 +0000342
Christian Heimes824f7f32013-08-17 00:54:47 +0200343 def test_parse_cert_CVE_2013_4238(self):
344 p = ssl._ssl._test_decode_cert(NULLBYTECERT)
345 if support.verbose:
346 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
347 subject = ((('countryName', 'US'),),
348 (('stateOrProvinceName', 'Oregon'),),
349 (('localityName', 'Beaverton'),),
350 (('organizationName', 'Python Software Foundation'),),
351 (('organizationalUnitName', 'Python Core Development'),),
352 (('commonName', 'null.python.org\x00example.org'),),
353 (('emailAddress', 'python-dev@python.org'),))
354 self.assertEqual(p['subject'], subject)
355 self.assertEqual(p['issuer'], subject)
Christian Heimes157c9832013-08-25 14:12:41 +0200356 if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
357 san = (('DNS', 'altnull.python.org\x00example.com'),
358 ('email', 'null@python.org\x00user@example.org'),
359 ('URI', 'http://null.python.org\x00http://example.org'),
360 ('IP Address', '192.0.2.1'),
361 ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
362 else:
363 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
364 san = (('DNS', 'altnull.python.org\x00example.com'),
365 ('email', 'null@python.org\x00user@example.org'),
366 ('URI', 'http://null.python.org\x00http://example.org'),
367 ('IP Address', '192.0.2.1'),
368 ('IP Address', '<invalid>'))
369
370 self.assertEqual(p['subjectAltName'], san)
Christian Heimes824f7f32013-08-17 00:54:47 +0200371
Christian Heimes1c03abd2016-09-06 23:25:35 +0200372 def test_parse_all_sans(self):
373 p = ssl._ssl._test_decode_cert(ALLSANFILE)
374 self.assertEqual(p['subjectAltName'],
375 (
376 ('DNS', 'allsans'),
377 ('othername', '<unsupported>'),
378 ('othername', '<unsupported>'),
379 ('email', 'user@example.org'),
380 ('DNS', 'www.example.org'),
381 ('DirName',
382 ((('countryName', 'XY'),),
383 (('localityName', 'Castle Anthrax'),),
384 (('organizationName', 'Python Software Foundation'),),
385 (('commonName', 'dirname example'),))),
386 ('URI', 'https://www.python.org/'),
387 ('IP Address', '127.0.0.1'),
388 ('IP Address', '0:0:0:0:0:0:0:1\n'),
389 ('Registered ID', '1.2.3.4.5')
390 )
391 )
392
Antoine Pitrou480a1242010-04-28 21:37:09 +0000393 def test_DER_to_PEM(self):
Martin Panter3d81d932016-01-14 09:36:00 +0000394 with open(CAFILE_CACERT, 'r') as f:
Antoine Pitrou480a1242010-04-28 21:37:09 +0000395 pem = f.read()
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000396 d1 = ssl.PEM_cert_to_DER_cert(pem)
397 p2 = ssl.DER_cert_to_PEM_cert(d1)
398 d2 = ssl.PEM_cert_to_DER_cert(p2)
Antoine Pitrou18c913e2010-04-27 10:59:39 +0000399 self.assertEqual(d1, d2)
Antoine Pitrou9bfbe612010-04-27 22:08:08 +0000400 if not p2.startswith(ssl.PEM_HEADER + '\n'):
401 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
402 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
403 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
Thomas Wouters1b7f8912007-09-19 03:06:30 +0000404
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000405 def test_openssl_version(self):
406 n = ssl.OPENSSL_VERSION_NUMBER
407 t = ssl.OPENSSL_VERSION_INFO
408 s = ssl.OPENSSL_VERSION
409 self.assertIsInstance(n, int)
410 self.assertIsInstance(t, tuple)
411 self.assertIsInstance(s, str)
412 # Some sanity checks follow
413 # >= 0.9
414 self.assertGreaterEqual(n, 0x900000)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400415 # < 3.0
416 self.assertLess(n, 0x30000000)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000417 major, minor, fix, patch, status = t
418 self.assertGreaterEqual(major, 0)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400419 self.assertLess(major, 3)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000420 self.assertGreaterEqual(minor, 0)
421 self.assertLess(minor, 256)
422 self.assertGreaterEqual(fix, 0)
423 self.assertLess(fix, 256)
424 self.assertGreaterEqual(patch, 0)
Ned Deily05784a72015-02-05 17:20:13 +1100425 self.assertLessEqual(patch, 63)
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000426 self.assertGreaterEqual(status, 0)
427 self.assertLessEqual(status, 15)
Antoine Pitroudfab9352014-07-21 18:35:01 -0400428 # Version string as returned by {Open,Libre}SSL, the format might change
Christian Heimes598894f2016-09-05 23:19:05 +0200429 if IS_LIBRESSL:
430 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
Victor Stinner789b8052015-01-06 11:51:06 +0100431 (s, t, hex(n)))
Antoine Pitroudfab9352014-07-21 18:35:01 -0400432 else:
433 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
Victor Stinner789b8052015-01-06 11:51:06 +0100434 (s, t, hex(n)))
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000435
Antoine Pitrou9d543662010-04-23 23:10:32 +0000436 @support.cpython_only
437 def test_refcycle(self):
438 # Issue #7943: an SSL object doesn't create reference cycles with
439 # itself.
440 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200441 ss = test_wrap_socket(s)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000442 wr = weakref.ref(ss)
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100443 with support.check_warnings(("", ResourceWarning)):
444 del ss
Victor Stinnere0b75b72016-03-21 17:26:04 +0100445 self.assertEqual(wr(), None)
Antoine Pitrou9d543662010-04-23 23:10:32 +0000446
Antoine Pitroua468adc2010-09-14 14:43:44 +0000447 def test_wrapped_unconnected(self):
448 # Methods on an unconnected SSLSocket propagate the original
Andrew Svetlov0832af62012-12-18 23:10:48 +0200449 # OSError raise by the underlying socket object.
Antoine Pitroua468adc2010-09-14 14:43:44 +0000450 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200451 with test_wrap_socket(s) as ss:
Antoine Pitroue9bb4732013-01-12 21:56:56 +0100452 self.assertRaises(OSError, ss.recv, 1)
453 self.assertRaises(OSError, ss.recv_into, bytearray(b'x'))
454 self.assertRaises(OSError, ss.recvfrom, 1)
455 self.assertRaises(OSError, ss.recvfrom_into, bytearray(b'x'), 1)
456 self.assertRaises(OSError, ss.send, b'x')
457 self.assertRaises(OSError, ss.sendto, b'x', ('0.0.0.0', 0))
Antoine Pitroua468adc2010-09-14 14:43:44 +0000458
Antoine Pitrou40f08742010-04-24 22:04:40 +0000459 def test_timeout(self):
460 # Issue #8524: when creating an SSL socket, the timeout of the
461 # original socket should be retained.
462 for timeout in (None, 0.0, 5.0):
463 s = socket.socket(socket.AF_INET)
464 s.settimeout(timeout)
Christian Heimesd0486372016-09-10 23:23:33 +0200465 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100466 self.assertEqual(timeout, ss.gettimeout())
Antoine Pitrou40f08742010-04-24 22:04:40 +0000467
Christian Heimesd0486372016-09-10 23:23:33 +0200468 def test_errors_sslwrap(self):
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000469 sock = socket.socket()
Ezio Melottied3a7d22010-12-01 02:32:32 +0000470 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000471 "certfile must be specified",
472 ssl.wrap_socket, sock, keyfile=CERTFILE)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000473 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000474 "certfile must be specified for server-side operations",
475 ssl.wrap_socket, sock, server_side=True)
Ezio Melottied3a7d22010-12-01 02:32:32 +0000476 self.assertRaisesRegex(ValueError,
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000477 "certfile must be specified for server-side operations",
Christian Heimesd0486372016-09-10 23:23:33 +0200478 ssl.wrap_socket, sock, server_side=True, certfile="")
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100479 with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s:
480 self.assertRaisesRegex(ValueError, "can't connect in server-side mode",
Christian Heimesd0486372016-09-10 23:23:33 +0200481 s.connect, (HOST, 8080))
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200482 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000483 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000484 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000485 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200486 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000487 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000488 ssl.wrap_socket(sock,
489 certfile=CERTFILE, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà8b7da622010-08-30 18:28:05 +0000490 self.assertEqual(cm.exception.errno, errno.ENOENT)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200491 with self.assertRaises(OSError) as cm:
Antoine Pitroud2eca372010-10-29 23:41:37 +0000492 with socket.socket() as sock:
Martin Panter407b62f2016-01-30 03:41:43 +0000493 ssl.wrap_socket(sock,
494 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +0000495 self.assertEqual(cm.exception.errno, errno.ENOENT)
Giampaolo Rodolà745ab382010-08-29 19:25:49 +0000496
Martin Panter3464ea22016-02-01 21:58:11 +0000497 def bad_cert_test(self, certfile):
498 """Check that trying to use the given client certificate fails"""
499 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
500 certfile)
501 sock = socket.socket()
502 self.addCleanup(sock.close)
503 with self.assertRaises(ssl.SSLError):
Christian Heimesd0486372016-09-10 23:23:33 +0200504 test_wrap_socket(sock,
Christian Heimesa170fa12017-09-15 20:27:30 +0200505 certfile=certfile)
Martin Panter3464ea22016-02-01 21:58:11 +0000506
507 def test_empty_cert(self):
508 """Wrapping with an empty cert file"""
509 self.bad_cert_test("nullcert.pem")
510
511 def test_malformed_cert(self):
512 """Wrapping with a badly formatted certificate (syntax error)"""
513 self.bad_cert_test("badcert.pem")
514
515 def test_malformed_key(self):
516 """Wrapping with a badly formatted key (syntax error)"""
517 self.bad_cert_test("badkey.pem")
518
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000519 def test_match_hostname(self):
520 def ok(cert, hostname):
521 ssl.match_hostname(cert, hostname)
522 def fail(cert, hostname):
523 self.assertRaises(ssl.CertificateError,
524 ssl.match_hostname, cert, hostname)
525
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100526 # -- Hostname matching --
527
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000528 cert = {'subject': ((('commonName', 'example.com'),),)}
529 ok(cert, 'example.com')
530 ok(cert, 'ExAmple.cOm')
531 fail(cert, 'www.example.com')
532 fail(cert, '.example.com')
533 fail(cert, 'example.org')
534 fail(cert, 'exampleXcom')
535
536 cert = {'subject': ((('commonName', '*.a.com'),),)}
537 ok(cert, 'foo.a.com')
538 fail(cert, 'bar.foo.a.com')
539 fail(cert, 'a.com')
540 fail(cert, 'Xa.com')
541 fail(cert, '.a.com')
542
Mandeep Singhede2ac92017-11-27 04:01:27 +0530543 # only match wildcards when they are the only thing
544 # in left-most segment
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000545 cert = {'subject': ((('commonName', 'f*.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530546 fail(cert, 'foo.com')
547 fail(cert, 'f.com')
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000548 fail(cert, 'bar.com')
549 fail(cert, 'foo.a.com')
550 fail(cert, 'bar.foo.com')
551
Christian Heimes824f7f32013-08-17 00:54:47 +0200552 # NULL bytes are bad, CVE-2013-4073
553 cert = {'subject': ((('commonName',
554 'null.python.org\x00example.org'),),)}
555 ok(cert, 'null.python.org\x00example.org') # or raise an error?
556 fail(cert, 'example.org')
557 fail(cert, 'null.python.org')
558
Georg Brandl72c98d32013-10-27 07:16:53 +0100559 # error cases with wildcards
560 cert = {'subject': ((('commonName', '*.*.a.com'),),)}
561 fail(cert, 'bar.foo.a.com')
562 fail(cert, 'a.com')
563 fail(cert, 'Xa.com')
564 fail(cert, '.a.com')
565
566 cert = {'subject': ((('commonName', 'a.*.com'),),)}
567 fail(cert, 'a.foo.com')
568 fail(cert, 'a..com')
569 fail(cert, 'a.com')
570
571 # wildcard doesn't match IDNA prefix 'xn--'
572 idna = 'püthon.python.org'.encode("idna").decode("ascii")
573 cert = {'subject': ((('commonName', idna),),)}
574 ok(cert, idna)
575 cert = {'subject': ((('commonName', 'x*.python.org'),),)}
576 fail(cert, idna)
577 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
578 fail(cert, idna)
579
580 # wildcard in first fragment and IDNA A-labels in sequent fragments
581 # are supported.
582 idna = 'www*.pythön.org'.encode("idna").decode("ascii")
583 cert = {'subject': ((('commonName', idna),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530584 fail(cert, 'www.pythön.org'.encode("idna").decode("ascii"))
585 fail(cert, 'www1.pythön.org'.encode("idna").decode("ascii"))
Georg Brandl72c98d32013-10-27 07:16:53 +0100586 fail(cert, 'ftp.pythön.org'.encode("idna").decode("ascii"))
587 fail(cert, 'pythön.org'.encode("idna").decode("ascii"))
588
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000589 # Slightly fake real-world example
590 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
591 'subject': ((('commonName', 'linuxfrz.org'),),),
592 'subjectAltName': (('DNS', 'linuxfr.org'),
593 ('DNS', 'linuxfr.com'),
594 ('othername', '<unsupported>'))}
595 ok(cert, 'linuxfr.org')
596 ok(cert, 'linuxfr.com')
597 # Not a "DNS" entry
598 fail(cert, '<unsupported>')
599 # When there is a subjectAltName, commonName isn't used
600 fail(cert, 'linuxfrz.org')
601
602 # A pristine real-world example
603 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
604 'subject': ((('countryName', 'US'),),
605 (('stateOrProvinceName', 'California'),),
606 (('localityName', 'Mountain View'),),
607 (('organizationName', 'Google Inc'),),
608 (('commonName', 'mail.google.com'),))}
609 ok(cert, 'mail.google.com')
610 fail(cert, 'gmail.com')
611 # Only commonName is considered
612 fail(cert, 'California')
613
Antoine Pitrouc481bfb2015-02-15 18:12:20 +0100614 # -- IPv4 matching --
615 cert = {'subject': ((('commonName', 'example.com'),),),
616 'subjectAltName': (('DNS', 'example.com'),
617 ('IP Address', '10.11.12.13'),
618 ('IP Address', '14.15.16.17'))}
619 ok(cert, '10.11.12.13')
620 ok(cert, '14.15.16.17')
621 fail(cert, '14.15.16.18')
622 fail(cert, 'example.net')
623
624 # -- IPv6 matching --
625 cert = {'subject': ((('commonName', 'example.com'),),),
626 'subjectAltName': (('DNS', 'example.com'),
627 ('IP Address', '2001:0:0:0:0:0:0:CAFE\n'),
628 ('IP Address', '2003:0:0:0:0:0:0:BABA\n'))}
629 ok(cert, '2001::cafe')
630 ok(cert, '2003::baba')
631 fail(cert, '2003::bebe')
632 fail(cert, 'example.net')
633
634 # -- Miscellaneous --
635
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000636 # Neither commonName nor subjectAltName
637 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
638 'subject': ((('countryName', 'US'),),
639 (('stateOrProvinceName', 'California'),),
640 (('localityName', 'Mountain View'),),
641 (('organizationName', 'Google Inc'),))}
642 fail(cert, 'mail.google.com')
643
Antoine Pitrou1c86b442011-05-06 15:19:49 +0200644 # No DNS entry in subjectAltName but a commonName
645 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
646 'subject': ((('countryName', 'US'),),
647 (('stateOrProvinceName', 'California'),),
648 (('localityName', 'Mountain View'),),
649 (('commonName', 'mail.google.com'),)),
650 'subjectAltName': (('othername', 'blabla'), )}
651 ok(cert, 'mail.google.com')
652
653 # No DNS entry subjectAltName and no commonName
654 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
655 'subject': ((('countryName', 'US'),),
656 (('stateOrProvinceName', 'California'),),
657 (('localityName', 'Mountain View'),),
658 (('organizationName', 'Google Inc'),)),
659 'subjectAltName': (('othername', 'blabla'),)}
660 fail(cert, 'google.com')
661
Antoine Pitrou59fdd672010-10-08 10:37:08 +0000662 # Empty cert / no cert
663 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
664 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
665
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200666 # Issue #17980: avoid denials of service by refusing more than one
667 # wildcard per fragment.
668 cert = {'subject': ((('commonName', 'a*b.com'),),)}
Mandeep Singhede2ac92017-11-27 04:01:27 +0530669 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200670 cert = {'subject': ((('commonName', 'a*b.co*'),),)}
Georg Brandl72c98d32013-10-27 07:16:53 +0100671 fail(cert, 'axxb.com')
Antoine Pitrou636f93c2013-05-18 17:56:42 +0200672 cert = {'subject': ((('commonName', 'a*b*.com'),),)}
673 with self.assertRaises(ssl.CertificateError) as cm:
674 ssl.match_hostname(cert, 'axxbxxc.com')
675 self.assertIn("too many wildcards", str(cm.exception))
676
Antoine Pitroud5323212010-10-22 18:19:07 +0000677 def test_server_side(self):
678 # server_hostname doesn't work for server sockets
Christian Heimesa170fa12017-09-15 20:27:30 +0200679 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroud2eca372010-10-29 23:41:37 +0000680 with socket.socket() as sock:
681 self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
682 server_hostname="some.hostname")
Antoine Pitrou04f6a322010-04-05 21:40:07 +0000683
Antoine Pitroud6494802011-07-21 01:11:30 +0200684 def test_unknown_channel_binding(self):
685 # should raise ValueError for unknown type
686 s = socket.socket(socket.AF_INET)
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200687 s.bind(('127.0.0.1', 0))
688 s.listen()
689 c = socket.socket(socket.AF_INET)
690 c.connect(s.getsockname())
Christian Heimesd0486372016-09-10 23:23:33 +0200691 with test_wrap_socket(c, do_handshake_on_connect=False) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100692 with self.assertRaises(ValueError):
693 ss.get_channel_binding("unknown-type")
Antoine Pitroub1fdf472014-10-05 20:41:53 +0200694 s.close()
Antoine Pitroud6494802011-07-21 01:11:30 +0200695
696 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
697 "'tls-unique' channel binding not available")
698 def test_tls_unique_channel_binding(self):
699 # unconnected should return None for known type
700 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200701 with test_wrap_socket(s) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100702 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200703 # the same for server-side
704 s = socket.socket(socket.AF_INET)
Christian Heimesd0486372016-09-10 23:23:33 +0200705 with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss:
Antoine Pitroue1ceb502013-01-12 21:54:44 +0100706 self.assertIsNone(ss.get_channel_binding("tls-unique"))
Antoine Pitroud6494802011-07-21 01:11:30 +0200707
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600708 def test_dealloc_warn(self):
Christian Heimesd0486372016-09-10 23:23:33 +0200709 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Benjamin Peterson36f7b972013-01-10 14:16:20 -0600710 r = repr(ss)
711 with self.assertWarns(ResourceWarning) as cm:
712 ss = None
713 support.gc_collect()
714 self.assertIn(r, str(cm.warning.args[0]))
715
Christian Heimes6d7ad132013-06-09 18:02:55 +0200716 def test_get_default_verify_paths(self):
717 paths = ssl.get_default_verify_paths()
718 self.assertEqual(len(paths), 6)
719 self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
720
721 with support.EnvironmentVarGuard() as env:
722 env["SSL_CERT_DIR"] = CAPATH
723 env["SSL_CERT_FILE"] = CERTFILE
724 paths = ssl.get_default_verify_paths()
725 self.assertEqual(paths.cafile, CERTFILE)
726 self.assertEqual(paths.capath, CAPATH)
727
Christian Heimes44109d72013-11-22 01:51:30 +0100728 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
729 def test_enum_certificates(self):
730 self.assertTrue(ssl.enum_certificates("CA"))
731 self.assertTrue(ssl.enum_certificates("ROOT"))
732
733 self.assertRaises(TypeError, ssl.enum_certificates)
734 self.assertRaises(WindowsError, ssl.enum_certificates, "")
735
Christian Heimesc2d65e12013-11-22 16:13:55 +0100736 trust_oids = set()
737 for storename in ("CA", "ROOT"):
738 store = ssl.enum_certificates(storename)
739 self.assertIsInstance(store, list)
740 for element in store:
741 self.assertIsInstance(element, tuple)
742 self.assertEqual(len(element), 3)
743 cert, enc, trust = element
744 self.assertIsInstance(cert, bytes)
745 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
746 self.assertIsInstance(trust, (set, bool))
747 if isinstance(trust, set):
748 trust_oids.update(trust)
Christian Heimes44109d72013-11-22 01:51:30 +0100749
750 serverAuth = "1.3.6.1.5.5.7.3.1"
Christian Heimesc2d65e12013-11-22 16:13:55 +0100751 self.assertIn(serverAuth, trust_oids)
Christian Heimes6d7ad132013-06-09 18:02:55 +0200752
Christian Heimes46bebee2013-06-09 19:03:31 +0200753 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Christian Heimes44109d72013-11-22 01:51:30 +0100754 def test_enum_crls(self):
755 self.assertTrue(ssl.enum_crls("CA"))
756 self.assertRaises(TypeError, ssl.enum_crls)
757 self.assertRaises(WindowsError, ssl.enum_crls, "")
Christian Heimes46bebee2013-06-09 19:03:31 +0200758
Christian Heimes44109d72013-11-22 01:51:30 +0100759 crls = ssl.enum_crls("CA")
760 self.assertIsInstance(crls, list)
761 for element in crls:
762 self.assertIsInstance(element, tuple)
763 self.assertEqual(len(element), 2)
764 self.assertIsInstance(element[0], bytes)
765 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
Christian Heimes46bebee2013-06-09 19:03:31 +0200766
Christian Heimes46bebee2013-06-09 19:03:31 +0200767
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100768 def test_asn1object(self):
769 expected = (129, 'serverAuth', 'TLS Web Server Authentication',
770 '1.3.6.1.5.5.7.3.1')
771
772 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
773 self.assertEqual(val, expected)
774 self.assertEqual(val.nid, 129)
775 self.assertEqual(val.shortname, 'serverAuth')
776 self.assertEqual(val.longname, 'TLS Web Server Authentication')
777 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
778 self.assertIsInstance(val, ssl._ASN1Object)
779 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
780
781 val = ssl._ASN1Object.fromnid(129)
782 self.assertEqual(val, expected)
783 self.assertIsInstance(val, ssl._ASN1Object)
784 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100785 with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
786 ssl._ASN1Object.fromnid(100000)
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100787 for i in range(1000):
788 try:
789 obj = ssl._ASN1Object.fromnid(i)
790 except ValueError:
791 pass
792 else:
793 self.assertIsInstance(obj.nid, int)
794 self.assertIsInstance(obj.shortname, str)
795 self.assertIsInstance(obj.longname, str)
796 self.assertIsInstance(obj.oid, (str, type(None)))
797
798 val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
799 self.assertEqual(val, expected)
800 self.assertIsInstance(val, ssl._ASN1Object)
801 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
802 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
803 expected)
Christian Heimes5398e1a2013-11-22 16:20:53 +0100804 with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
805 ssl._ASN1Object.fromname('serverauth')
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100806
Christian Heimes72d28502013-11-23 13:56:58 +0100807 def test_purpose_enum(self):
808 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
809 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
810 self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
811 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
812 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
813 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
814 '1.3.6.1.5.5.7.3.1')
815
816 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
817 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
818 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
819 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
820 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
821 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
822 '1.3.6.1.5.5.7.3.2')
823
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100824 def test_unsupported_dtls(self):
825 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
826 self.addCleanup(s.close)
827 with self.assertRaises(NotImplementedError) as cx:
Christian Heimesd0486372016-09-10 23:23:33 +0200828 test_wrap_socket(s, cert_reqs=ssl.CERT_NONE)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100829 self.assertEqual(str(cx.exception), "only stream sockets are supported")
Christian Heimesa170fa12017-09-15 20:27:30 +0200830 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3e86ba42013-12-28 17:26:33 +0100831 with self.assertRaises(NotImplementedError) as cx:
832 ctx.wrap_socket(s)
833 self.assertEqual(str(cx.exception), "only stream sockets are supported")
834
Antoine Pitrouc695c952014-04-28 20:57:36 +0200835 def cert_time_ok(self, timestring, timestamp):
836 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
837
838 def cert_time_fail(self, timestring):
839 with self.assertRaises(ValueError):
840 ssl.cert_time_to_seconds(timestring)
841
842 @unittest.skipUnless(utc_offset(),
843 'local time needs to be different from UTC')
844 def test_cert_time_to_seconds_timezone(self):
845 # Issue #19940: ssl.cert_time_to_seconds() returns wrong
846 # results if local timezone is not UTC
847 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0)
848 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0)
849
850 def test_cert_time_to_seconds(self):
851 timestring = "Jan 5 09:34:43 2018 GMT"
852 ts = 1515144883.0
853 self.cert_time_ok(timestring, ts)
854 # accept keyword parameter, assert its name
855 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
856 # accept both %e and %d (space or zero generated by strftime)
857 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
858 # case-insensitive
859 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts)
860 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds
861 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT
862 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone
863 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day
864 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month
865 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour
866 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute
867
868 newyear_ts = 1230768000.0
869 # leap seconds
870 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
871 # same timestamp
872 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts)
873
874 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899)
875 # allow 60th second (even if it is not a leap second)
876 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900)
877 # allow 2nd leap second for compatibility with time.strptime()
878 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901)
879 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds
880
Mike53f7a7c2017-12-14 14:04:53 +0300881 # no special treatment for the special value:
Antoine Pitrouc695c952014-04-28 20:57:36 +0200882 # 99991231235959Z (rfc 5280)
883 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
884
885 @support.run_with_locale('LC_ALL', '')
886 def test_cert_time_to_seconds_locale(self):
887 # `cert_time_to_seconds()` should be locale independent
888
889 def local_february_name():
890 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
891
892 if local_february_name().lower() == 'feb':
893 self.skipTest("locale-specific month name needs to be "
894 "different from C locale")
895
896 # locale-independent
897 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
898 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
899
Martin Panter3840b2a2016-03-27 01:53:46 +0000900 def test_connect_ex_error(self):
901 server = socket.socket(socket.AF_INET)
902 self.addCleanup(server.close)
903 port = support.bind_port(server) # Reserve port but don't listen
Christian Heimesd0486372016-09-10 23:23:33 +0200904 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +0000905 cert_reqs=ssl.CERT_REQUIRED)
906 self.addCleanup(s.close)
907 rc = s.connect_ex((HOST, port))
908 # Issue #19919: Windows machines or VMs hosted on Windows
909 # machines sometimes return EWOULDBLOCK.
910 errors = (
911 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
912 errno.EWOULDBLOCK,
913 )
914 self.assertIn(rc, errors)
915
Christian Heimesa6bc95a2013-11-17 19:59:14 +0100916
Antoine Pitrou152efa22010-05-16 18:19:27 +0000917class ContextTests(unittest.TestCase):
918
Antoine Pitrou23df4832010-08-04 17:14:06 +0000919 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000920 def test_constructor(self):
Antoine Pitrou2463e5f2013-03-28 22:24:43 +0100921 for protocol in PROTOCOLS:
922 ssl.SSLContext(protocol)
Christian Heimes598894f2016-09-05 23:19:05 +0200923 ctx = ssl.SSLContext()
924 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000925 self.assertRaises(ValueError, ssl.SSLContext, -1)
926 self.assertRaises(ValueError, ssl.SSLContext, 42)
927
Antoine Pitrou23df4832010-08-04 17:14:06 +0000928 @skip_if_broken_ubuntu_ssl
Antoine Pitrou152efa22010-05-16 18:19:27 +0000929 def test_protocol(self):
930 for proto in PROTOCOLS:
931 ctx = ssl.SSLContext(proto)
932 self.assertEqual(ctx.protocol, proto)
933
934 def test_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200935 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000936 ctx.set_ciphers("ALL")
937 ctx.set_ciphers("DEFAULT")
Ezio Melottied3a7d22010-12-01 02:32:32 +0000938 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
Antoine Pitrou30474062010-05-16 23:46:26 +0000939 ctx.set_ciphers("^$:,;?*'dorothyx")
Antoine Pitrou152efa22010-05-16 18:19:27 +0000940
Christian Heimes892d66e2018-01-29 14:10:18 +0100941 @unittest.skipUnless(PY_SSL_DEFAULT_CIPHERS == 1,
942 "Test applies only to Python default ciphers")
943 def test_python_ciphers(self):
944 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
945 ciphers = ctx.get_ciphers()
946 for suite in ciphers:
947 name = suite['name']
948 self.assertNotIn("PSK", name)
949 self.assertNotIn("SRP", name)
950 self.assertNotIn("MD5", name)
951 self.assertNotIn("RC4", name)
952 self.assertNotIn("3DES", name)
953
Christian Heimes25bfcd52016-09-06 00:04:45 +0200954 @unittest.skipIf(ssl.OPENSSL_VERSION_INFO < (1, 0, 2, 0, 0), 'OpenSSL too old')
955 def test_get_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200956 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesea9b2dc2016-09-06 10:45:44 +0200957 ctx.set_ciphers('AESGCM')
Christian Heimes25bfcd52016-09-06 00:04:45 +0200958 names = set(d['name'] for d in ctx.get_ciphers())
Christian Heimes582282b2016-09-06 11:27:25 +0200959 self.assertIn('AES256-GCM-SHA384', names)
960 self.assertIn('AES128-GCM-SHA256', names)
Christian Heimes25bfcd52016-09-06 00:04:45 +0200961
Antoine Pitrou23df4832010-08-04 17:14:06 +0000962 @skip_if_broken_ubuntu_ssl
Antoine Pitroub5218772010-05-21 09:56:06 +0000963 def test_options(self):
Christian Heimesa170fa12017-09-15 20:27:30 +0200964 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800965 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
Christian Heimes598894f2016-09-05 23:19:05 +0200966 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimes358cfd42016-09-10 22:43:48 +0200967 # SSLContext also enables these by default
968 default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
969 OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
Christian Heimes598894f2016-09-05 23:19:05 +0200970 self.assertEqual(default, ctx.options)
Benjamin Petersona9dcdab2015-11-11 22:38:41 -0800971 ctx.options |= ssl.OP_NO_TLSv1
Christian Heimes598894f2016-09-05 23:19:05 +0200972 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000973 if can_clear_options():
Christian Heimes598894f2016-09-05 23:19:05 +0200974 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
975 self.assertEqual(default, ctx.options)
Antoine Pitroub5218772010-05-21 09:56:06 +0000976 ctx.options = 0
Matthias Klosef7c56242016-06-12 23:40:00 -0700977 # Ubuntu has OP_NO_SSLv3 forced on by default
978 self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
Antoine Pitroub5218772010-05-21 09:56:06 +0000979 else:
980 with self.assertRaises(ValueError):
981 ctx.options = 0
982
Christian Heimesa170fa12017-09-15 20:27:30 +0200983 def test_verify_mode_protocol(self):
984 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitrou152efa22010-05-16 18:19:27 +0000985 # Default value
986 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
987 ctx.verify_mode = ssl.CERT_OPTIONAL
988 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
989 ctx.verify_mode = ssl.CERT_REQUIRED
990 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
991 ctx.verify_mode = ssl.CERT_NONE
992 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
993 with self.assertRaises(TypeError):
994 ctx.verify_mode = None
995 with self.assertRaises(ValueError):
996 ctx.verify_mode = 42
997
Christian Heimesa170fa12017-09-15 20:27:30 +0200998 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
999 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1000 self.assertFalse(ctx.check_hostname)
1001
1002 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1003 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1004 self.assertTrue(ctx.check_hostname)
1005
Christian Heimes61d478c2018-01-27 15:51:38 +01001006 def test_hostname_checks_common_name(self):
1007 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1008 self.assertTrue(ctx.hostname_checks_common_name)
1009 if ssl.HAS_NEVER_CHECK_COMMON_NAME:
1010 ctx.hostname_checks_common_name = True
1011 self.assertTrue(ctx.hostname_checks_common_name)
1012 ctx.hostname_checks_common_name = False
1013 self.assertFalse(ctx.hostname_checks_common_name)
1014 ctx.hostname_checks_common_name = True
1015 self.assertTrue(ctx.hostname_checks_common_name)
1016 else:
1017 with self.assertRaises(AttributeError):
1018 ctx.hostname_checks_common_name = True
Christian Heimesa170fa12017-09-15 20:27:30 +02001019
Christian Heimes2427b502013-11-23 11:24:32 +01001020 @unittest.skipUnless(have_verify_flags(),
1021 "verify_flags need OpenSSL > 0.9.8")
Christian Heimes22587792013-11-21 23:56:13 +01001022 def test_verify_flags(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001023 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Benjamin Peterson990fcaa2015-03-04 22:49:41 -05001024 # default value
1025 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
1026 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
Christian Heimes22587792013-11-21 23:56:13 +01001027 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
1028 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
1029 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
1030 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
1031 ctx.verify_flags = ssl.VERIFY_DEFAULT
1032 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
1033 # supports any value
1034 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
1035 self.assertEqual(ctx.verify_flags,
1036 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
1037 with self.assertRaises(TypeError):
1038 ctx.verify_flags = None
1039
Antoine Pitrou152efa22010-05-16 18:19:27 +00001040 def test_load_cert_chain(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001041 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001042 # Combined key and cert in a single file
Benjamin Peterson1ea070e2014-11-03 21:05:01 -05001043 ctx.load_cert_chain(CERTFILE, keyfile=None)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001044 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
1045 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001046 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001047 ctx.load_cert_chain(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001048 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001049 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001050 ctx.load_cert_chain(BADCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001051 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001052 ctx.load_cert_chain(EMPTYCERT)
1053 # Separate key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001054 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001055 ctx.load_cert_chain(ONLYCERT, ONLYKEY)
1056 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
1057 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001058 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001059 ctx.load_cert_chain(ONLYCERT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001060 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001061 ctx.load_cert_chain(ONLYKEY)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001062 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001063 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
1064 # Mismatching key and cert
Christian Heimesa170fa12017-09-15 20:27:30 +02001065 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001066 with self.assertRaisesRegex(ssl.SSLError, "key values mismatch"):
Martin Panter3d81d932016-01-14 09:36:00 +00001067 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
Antoine Pitrou4fd1e6a2011-08-25 14:39:44 +02001068 # Password protected key and cert
1069 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
1070 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
1071 ctx.load_cert_chain(CERTFILE_PROTECTED,
1072 password=bytearray(KEY_PASSWORD.encode()))
1073 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
1074 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
1075 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
1076 bytearray(KEY_PASSWORD.encode()))
1077 with self.assertRaisesRegex(TypeError, "should be a string"):
1078 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
1079 with self.assertRaises(ssl.SSLError):
1080 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
1081 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1082 # openssl has a fixed limit on the password buffer.
1083 # PEM_BUFSIZE is generally set to 1kb.
1084 # Return a string larger than this.
1085 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
1086 # Password callback
1087 def getpass_unicode():
1088 return KEY_PASSWORD
1089 def getpass_bytes():
1090 return KEY_PASSWORD.encode()
1091 def getpass_bytearray():
1092 return bytearray(KEY_PASSWORD.encode())
1093 def getpass_badpass():
1094 return "badpass"
1095 def getpass_huge():
1096 return b'a' * (1024 * 1024)
1097 def getpass_bad_type():
1098 return 9
1099 def getpass_exception():
1100 raise Exception('getpass error')
1101 class GetPassCallable:
1102 def __call__(self):
1103 return KEY_PASSWORD
1104 def getpass(self):
1105 return KEY_PASSWORD
1106 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
1107 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
1108 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
1109 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
1110 ctx.load_cert_chain(CERTFILE_PROTECTED,
1111 password=GetPassCallable().getpass)
1112 with self.assertRaises(ssl.SSLError):
1113 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
1114 with self.assertRaisesRegex(ValueError, "cannot be longer"):
1115 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
1116 with self.assertRaisesRegex(TypeError, "must return a string"):
1117 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
1118 with self.assertRaisesRegex(Exception, "getpass error"):
1119 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
1120 # Make sure the password function isn't called if it isn't needed
1121 ctx.load_cert_chain(CERTFILE, password=getpass_exception)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001122
1123 def test_load_verify_locations(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001124 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001125 ctx.load_verify_locations(CERTFILE)
1126 ctx.load_verify_locations(cafile=CERTFILE, capath=None)
1127 ctx.load_verify_locations(BYTES_CERTFILE)
1128 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
1129 self.assertRaises(TypeError, ctx.load_verify_locations)
Christian Heimesefff7062013-11-21 03:35:02 +01001130 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001131 with self.assertRaises(OSError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001132 ctx.load_verify_locations(NONEXISTINGCERT)
Giampaolo Rodolà4a656eb2010-08-29 22:50:39 +00001133 self.assertEqual(cm.exception.errno, errno.ENOENT)
Ezio Melottied3a7d22010-12-01 02:32:32 +00001134 with self.assertRaisesRegex(ssl.SSLError, "PEM lib"):
Antoine Pitrou152efa22010-05-16 18:19:27 +00001135 ctx.load_verify_locations(BADCERT)
1136 ctx.load_verify_locations(CERTFILE, CAPATH)
1137 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
1138
Victor Stinner80f75e62011-01-29 11:31:20 +00001139 # Issue #10989: crash if the second argument type is invalid
1140 self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
1141
Christian Heimesefff7062013-11-21 03:35:02 +01001142 def test_load_verify_cadata(self):
1143 # test cadata
1144 with open(CAFILE_CACERT) as f:
1145 cacert_pem = f.read()
1146 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
1147 with open(CAFILE_NEURONIO) as f:
1148 neuronio_pem = f.read()
1149 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
1150
1151 # test PEM
Christian Heimesa170fa12017-09-15 20:27:30 +02001152 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001153 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
1154 ctx.load_verify_locations(cadata=cacert_pem)
1155 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
1156 ctx.load_verify_locations(cadata=neuronio_pem)
1157 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1158 # cert already in hash table
1159 ctx.load_verify_locations(cadata=neuronio_pem)
1160 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1161
1162 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001163 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001164 combined = "\n".join((cacert_pem, neuronio_pem))
1165 ctx.load_verify_locations(cadata=combined)
1166 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1167
1168 # with junk around the certs
Christian Heimesa170fa12017-09-15 20:27:30 +02001169 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001170 combined = ["head", cacert_pem, "other", neuronio_pem, "again",
1171 neuronio_pem, "tail"]
1172 ctx.load_verify_locations(cadata="\n".join(combined))
1173 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1174
1175 # test DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001176 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001177 ctx.load_verify_locations(cadata=cacert_der)
1178 ctx.load_verify_locations(cadata=neuronio_der)
1179 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1180 # cert already in hash table
1181 ctx.load_verify_locations(cadata=cacert_der)
1182 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1183
1184 # combined
Christian Heimesa170fa12017-09-15 20:27:30 +02001185 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001186 combined = b"".join((cacert_der, neuronio_der))
1187 ctx.load_verify_locations(cadata=combined)
1188 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
1189
1190 # error cases
Christian Heimesa170fa12017-09-15 20:27:30 +02001191 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimesefff7062013-11-21 03:35:02 +01001192 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
1193
1194 with self.assertRaisesRegex(ssl.SSLError, "no start line"):
1195 ctx.load_verify_locations(cadata="broken")
1196 with self.assertRaisesRegex(ssl.SSLError, "not enough data"):
1197 ctx.load_verify_locations(cadata=b"broken")
1198
1199
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001200 def test_load_dh_params(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001201 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001202 ctx.load_dh_params(DHFILE)
1203 if os.name != 'nt':
1204 ctx.load_dh_params(BYTES_DHFILE)
1205 self.assertRaises(TypeError, ctx.load_dh_params)
1206 self.assertRaises(TypeError, ctx.load_dh_params, None)
1207 with self.assertRaises(FileNotFoundError) as cm:
Martin Panter407b62f2016-01-30 03:41:43 +00001208 ctx.load_dh_params(NONEXISTINGCERT)
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001209 self.assertEqual(cm.exception.errno, errno.ENOENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001210 with self.assertRaises(ssl.SSLError) as cm:
Antoine Pitrou0e576f12011-12-22 10:03:38 +01001211 ctx.load_dh_params(CERTFILE)
1212
Antoine Pitroueb585ad2010-10-22 18:24:20 +00001213 @skip_if_broken_ubuntu_ssl
Antoine Pitroub0182c82010-10-12 20:09:02 +00001214 def test_session_stats(self):
1215 for proto in PROTOCOLS:
1216 ctx = ssl.SSLContext(proto)
1217 self.assertEqual(ctx.session_stats(), {
1218 'number': 0,
1219 'connect': 0,
1220 'connect_good': 0,
1221 'connect_renegotiate': 0,
1222 'accept': 0,
1223 'accept_good': 0,
1224 'accept_renegotiate': 0,
1225 'hits': 0,
1226 'misses': 0,
1227 'timeouts': 0,
1228 'cache_full': 0,
1229 })
1230
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001231 def test_set_default_verify_paths(self):
1232 # There's not much we can do to test that it acts as expected,
1233 # so just check it doesn't crash or raise an exception.
Christian Heimesa170fa12017-09-15 20:27:30 +02001234 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou664c2d12010-11-17 20:29:42 +00001235 ctx.set_default_verify_paths()
1236
Antoine Pitrou501da612011-12-21 09:27:41 +01001237 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001238 def test_set_ecdh_curve(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001239 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou923df6f2011-12-19 17:16:51 +01001240 ctx.set_ecdh_curve("prime256v1")
1241 ctx.set_ecdh_curve(b"prime256v1")
1242 self.assertRaises(TypeError, ctx.set_ecdh_curve)
1243 self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1244 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1245 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1246
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001247 @needs_sni
1248 def test_sni_callback(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001249 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001250
1251 # set_servername_callback expects a callable, or None
1252 self.assertRaises(TypeError, ctx.set_servername_callback)
1253 self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1254 self.assertRaises(TypeError, ctx.set_servername_callback, "")
1255 self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1256
1257 def dummycallback(sock, servername, ctx):
1258 pass
1259 ctx.set_servername_callback(None)
1260 ctx.set_servername_callback(dummycallback)
1261
1262 @needs_sni
1263 def test_sni_callback_refcycle(self):
1264 # Reference cycles through the servername callback are detected
1265 # and cleared.
Christian Heimesa170fa12017-09-15 20:27:30 +02001266 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01001267 def dummycallback(sock, servername, ctx, cycle=ctx):
1268 pass
1269 ctx.set_servername_callback(dummycallback)
1270 wr = weakref.ref(ctx)
1271 del ctx, dummycallback
1272 gc.collect()
1273 self.assertIs(wr(), None)
1274
Christian Heimes9a5395a2013-06-17 15:44:12 +02001275 def test_cert_store_stats(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001276 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001277 self.assertEqual(ctx.cert_store_stats(),
1278 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1279 ctx.load_cert_chain(CERTFILE)
1280 self.assertEqual(ctx.cert_store_stats(),
1281 {'x509_ca': 0, 'crl': 0, 'x509': 0})
1282 ctx.load_verify_locations(CERTFILE)
1283 self.assertEqual(ctx.cert_store_stats(),
1284 {'x509_ca': 0, 'crl': 0, 'x509': 1})
Martin Panterb55f8b72016-01-14 12:53:56 +00001285 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001286 self.assertEqual(ctx.cert_store_stats(),
1287 {'x509_ca': 1, 'crl': 0, 'x509': 2})
1288
1289 def test_get_ca_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001290 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001291 self.assertEqual(ctx.get_ca_certs(), [])
1292 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1293 ctx.load_verify_locations(CERTFILE)
1294 self.assertEqual(ctx.get_ca_certs(), [])
Martin Panterb55f8b72016-01-14 12:53:56 +00001295 # but CAFILE_CACERT is a CA cert
1296 ctx.load_verify_locations(CAFILE_CACERT)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001297 self.assertEqual(ctx.get_ca_certs(),
1298 [{'issuer': ((('organizationName', 'Root CA'),),
1299 (('organizationalUnitName', 'http://www.cacert.org'),),
1300 (('commonName', 'CA Cert Signing Authority'),),
1301 (('emailAddress', 'support@cacert.org'),)),
1302 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1303 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1304 'serialNumber': '00',
Christian Heimesbd3a7f92013-11-21 03:40:15 +01001305 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
Christian Heimes9a5395a2013-06-17 15:44:12 +02001306 'subject': ((('organizationName', 'Root CA'),),
1307 (('organizationalUnitName', 'http://www.cacert.org'),),
1308 (('commonName', 'CA Cert Signing Authority'),),
1309 (('emailAddress', 'support@cacert.org'),)),
1310 'version': 3}])
1311
Martin Panterb55f8b72016-01-14 12:53:56 +00001312 with open(CAFILE_CACERT) as f:
Christian Heimes9a5395a2013-06-17 15:44:12 +02001313 pem = f.read()
1314 der = ssl.PEM_cert_to_DER_cert(pem)
1315 self.assertEqual(ctx.get_ca_certs(True), [der])
1316
Christian Heimes72d28502013-11-23 13:56:58 +01001317 def test_load_default_certs(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001318 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001319 ctx.load_default_certs()
1320
Christian Heimesa170fa12017-09-15 20:27:30 +02001321 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001322 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1323 ctx.load_default_certs()
1324
Christian Heimesa170fa12017-09-15 20:27:30 +02001325 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001326 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1327
Christian Heimesa170fa12017-09-15 20:27:30 +02001328 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Christian Heimes72d28502013-11-23 13:56:58 +01001329 self.assertRaises(TypeError, ctx.load_default_certs, None)
1330 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1331
Benjamin Peterson91244e02014-10-03 18:17:15 -04001332 @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
Christian Heimes598894f2016-09-05 23:19:05 +02001333 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001334 def test_load_default_certs_env(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001335 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson5915b0f2014-10-03 17:27:05 -04001336 with support.EnvironmentVarGuard() as env:
1337 env["SSL_CERT_DIR"] = CAPATH
1338 env["SSL_CERT_FILE"] = CERTFILE
1339 ctx.load_default_certs()
1340 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1341
Benjamin Peterson91244e02014-10-03 18:17:15 -04001342 @unittest.skipUnless(sys.platform == "win32", "Windows specific")
Steve Dower68d663c2017-07-17 11:15:48 +02001343 @unittest.skipIf(hasattr(sys, "gettotalrefcount"), "Debug build does not share environment between CRTs")
Benjamin Peterson91244e02014-10-03 18:17:15 -04001344 def test_load_default_certs_env_windows(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001345 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001346 ctx.load_default_certs()
1347 stats = ctx.cert_store_stats()
1348
Christian Heimesa170fa12017-09-15 20:27:30 +02001349 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Benjamin Peterson91244e02014-10-03 18:17:15 -04001350 with support.EnvironmentVarGuard() as env:
1351 env["SSL_CERT_DIR"] = CAPATH
1352 env["SSL_CERT_FILE"] = CERTFILE
1353 ctx.load_default_certs()
1354 stats["x509"] += 1
1355 self.assertEqual(ctx.cert_store_stats(), stats)
1356
Christian Heimes358cfd42016-09-10 22:43:48 +02001357 def _assert_context_options(self, ctx):
1358 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1359 if OP_NO_COMPRESSION != 0:
1360 self.assertEqual(ctx.options & OP_NO_COMPRESSION,
1361 OP_NO_COMPRESSION)
1362 if OP_SINGLE_DH_USE != 0:
1363 self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
1364 OP_SINGLE_DH_USE)
1365 if OP_SINGLE_ECDH_USE != 0:
1366 self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
1367 OP_SINGLE_ECDH_USE)
1368 if OP_CIPHER_SERVER_PREFERENCE != 0:
1369 self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
1370 OP_CIPHER_SERVER_PREFERENCE)
1371
Christian Heimes4c05b472013-11-23 15:58:30 +01001372 def test_create_default_context(self):
1373 ctx = ssl.create_default_context()
Christian Heimes358cfd42016-09-10 22:43:48 +02001374
Christian Heimesa170fa12017-09-15 20:27:30 +02001375 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001376 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001377 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001378 self._assert_context_options(ctx)
1379
Christian Heimes4c05b472013-11-23 15:58:30 +01001380 with open(SIGNING_CA) as f:
1381 cadata = f.read()
1382 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1383 cadata=cadata)
Christian Heimesa170fa12017-09-15 20:27:30 +02001384 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001385 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes358cfd42016-09-10 22:43:48 +02001386 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001387
1388 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001389 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes4c05b472013-11-23 15:58:30 +01001390 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001391 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001392
Christian Heimes67986f92013-11-23 22:43:47 +01001393 def test__create_stdlib_context(self):
1394 ctx = ssl._create_stdlib_context()
Christian Heimesa170fa12017-09-15 20:27:30 +02001395 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001396 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001397 self.assertFalse(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001398 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001399
1400 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1401 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1402 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001403 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001404
1405 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
Christian Heimesa02c69a2013-12-02 20:59:28 +01001406 cert_reqs=ssl.CERT_REQUIRED,
1407 check_hostname=True)
Christian Heimes67986f92013-11-23 22:43:47 +01001408 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1409 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimesa02c69a2013-12-02 20:59:28 +01001410 self.assertTrue(ctx.check_hostname)
Christian Heimes358cfd42016-09-10 22:43:48 +02001411 self._assert_context_options(ctx)
Christian Heimes67986f92013-11-23 22:43:47 +01001412
1413 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
Christian Heimesa170fa12017-09-15 20:27:30 +02001414 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
Christian Heimes67986f92013-11-23 22:43:47 +01001415 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes358cfd42016-09-10 22:43:48 +02001416 self._assert_context_options(ctx)
Christian Heimes4c05b472013-11-23 15:58:30 +01001417
Christian Heimes1aa9a752013-12-02 02:41:19 +01001418 def test_check_hostname(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02001419 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001420 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001421 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001422
Christian Heimese82c0342017-09-15 20:29:57 +02001423 # Auto set CERT_REQUIRED
1424 ctx.check_hostname = True
1425 self.assertTrue(ctx.check_hostname)
1426 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1427 ctx.check_hostname = False
Christian Heimes1aa9a752013-12-02 02:41:19 +01001428 ctx.verify_mode = ssl.CERT_REQUIRED
1429 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001430 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001431
Christian Heimese82c0342017-09-15 20:29:57 +02001432 # Changing verify_mode does not affect check_hostname
1433 ctx.check_hostname = False
1434 ctx.verify_mode = ssl.CERT_NONE
1435 ctx.check_hostname = False
1436 self.assertFalse(ctx.check_hostname)
1437 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1438 # Auto set
Christian Heimes1aa9a752013-12-02 02:41:19 +01001439 ctx.check_hostname = True
1440 self.assertTrue(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001441 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1442
1443 ctx.check_hostname = False
1444 ctx.verify_mode = ssl.CERT_OPTIONAL
1445 ctx.check_hostname = False
1446 self.assertFalse(ctx.check_hostname)
1447 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
1448 # keep CERT_OPTIONAL
1449 ctx.check_hostname = True
1450 self.assertTrue(ctx.check_hostname)
1451 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001452
1453 # Cannot set CERT_NONE with check_hostname enabled
1454 with self.assertRaises(ValueError):
1455 ctx.verify_mode = ssl.CERT_NONE
1456 ctx.check_hostname = False
1457 self.assertFalse(ctx.check_hostname)
Christian Heimese82c0342017-09-15 20:29:57 +02001458 ctx.verify_mode = ssl.CERT_NONE
1459 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
Christian Heimes1aa9a752013-12-02 02:41:19 +01001460
Christian Heimes5fe668c2016-09-12 00:01:11 +02001461 def test_context_client_server(self):
1462 # PROTOCOL_TLS_CLIENT has sane defaults
1463 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1464 self.assertTrue(ctx.check_hostname)
1465 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1466
1467 # PROTOCOL_TLS_SERVER has different but also sane defaults
1468 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1469 self.assertFalse(ctx.check_hostname)
1470 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1471
Christian Heimes4df60f12017-09-15 20:26:05 +02001472 def test_context_custom_class(self):
1473 class MySSLSocket(ssl.SSLSocket):
1474 pass
1475
1476 class MySSLObject(ssl.SSLObject):
1477 pass
1478
1479 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1480 ctx.sslsocket_class = MySSLSocket
1481 ctx.sslobject_class = MySSLObject
1482
1483 with ctx.wrap_socket(socket.socket(), server_side=True) as sock:
1484 self.assertIsInstance(sock, MySSLSocket)
1485 obj = ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO())
1486 self.assertIsInstance(obj, MySSLObject)
1487
Antoine Pitrou152efa22010-05-16 18:19:27 +00001488
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001489class SSLErrorTests(unittest.TestCase):
1490
1491 def test_str(self):
1492 # The str() of a SSLError doesn't include the errno
1493 e = ssl.SSLError(1, "foo")
1494 self.assertEqual(str(e), "foo")
1495 self.assertEqual(e.errno, 1)
1496 # Same for a subclass
1497 e = ssl.SSLZeroReturnError(1, "foo")
1498 self.assertEqual(str(e), "foo")
1499 self.assertEqual(e.errno, 1)
1500
1501 def test_lib_reason(self):
1502 # Test the library and reason attributes
Christian Heimesa170fa12017-09-15 20:27:30 +02001503 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001504 with self.assertRaises(ssl.SSLError) as cm:
1505 ctx.load_dh_params(CERTFILE)
1506 self.assertEqual(cm.exception.library, 'PEM')
1507 self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1508 s = str(cm.exception)
1509 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1510
1511 def test_subclass(self):
1512 # Check that the appropriate SSLError subclass is raised
1513 # (this only tests one of them)
Christian Heimesa170fa12017-09-15 20:27:30 +02001514 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1515 ctx.check_hostname = False
1516 ctx.verify_mode = ssl.CERT_NONE
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001517 with socket.socket() as s:
1518 s.bind(("127.0.0.1", 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01001519 s.listen()
Antoine Pitroue1ceb502013-01-12 21:54:44 +01001520 c = socket.socket()
1521 c.connect(s.getsockname())
1522 c.setblocking(False)
1523 with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c:
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001524 with self.assertRaises(ssl.SSLWantReadError) as cm:
1525 c.do_handshake()
1526 s = str(cm.exception)
1527 self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1528 # For compatibility
1529 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1530
Nathaniel J. Smith59fdf0f2017-06-09 02:35:16 -07001531 def test_bad_idna_in_server_hostname(self):
1532 # Note: this test is testing some code that probably shouldn't exist
1533 # in the first place, so if it starts failing at some point because
1534 # you made the ssl module stop doing IDNA decoding then please feel
1535 # free to remove it. The test was mainly added because this case used
1536 # to cause memory corruption (see bpo-30594).
1537 ctx = ssl.create_default_context()
1538 with self.assertRaises(UnicodeError):
1539 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1540 server_hostname="xn--.com")
Antoine Pitrou3b36fb12012-06-22 21:11:52 +02001541
Christian Heimes61d478c2018-01-27 15:51:38 +01001542 def test_bad_server_hostname(self):
1543 ctx = ssl.create_default_context()
1544 with self.assertRaises(ValueError):
1545 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1546 server_hostname="")
1547 with self.assertRaises(ValueError):
1548 ctx.wrap_bio(ssl.MemoryBIO(), ssl.MemoryBIO(),
1549 server_hostname=".example.org")
1550
1551
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001552class MemoryBIOTests(unittest.TestCase):
1553
1554 def test_read_write(self):
1555 bio = ssl.MemoryBIO()
1556 bio.write(b'foo')
1557 self.assertEqual(bio.read(), b'foo')
1558 self.assertEqual(bio.read(), b'')
1559 bio.write(b'foo')
1560 bio.write(b'bar')
1561 self.assertEqual(bio.read(), b'foobar')
1562 self.assertEqual(bio.read(), b'')
1563 bio.write(b'baz')
1564 self.assertEqual(bio.read(2), b'ba')
1565 self.assertEqual(bio.read(1), b'z')
1566 self.assertEqual(bio.read(1), b'')
1567
1568 def test_eof(self):
1569 bio = ssl.MemoryBIO()
1570 self.assertFalse(bio.eof)
1571 self.assertEqual(bio.read(), b'')
1572 self.assertFalse(bio.eof)
1573 bio.write(b'foo')
1574 self.assertFalse(bio.eof)
1575 bio.write_eof()
1576 self.assertFalse(bio.eof)
1577 self.assertEqual(bio.read(2), b'fo')
1578 self.assertFalse(bio.eof)
1579 self.assertEqual(bio.read(1), b'o')
1580 self.assertTrue(bio.eof)
1581 self.assertEqual(bio.read(), b'')
1582 self.assertTrue(bio.eof)
1583
1584 def test_pending(self):
1585 bio = ssl.MemoryBIO()
1586 self.assertEqual(bio.pending, 0)
1587 bio.write(b'foo')
1588 self.assertEqual(bio.pending, 3)
1589 for i in range(3):
1590 bio.read(1)
1591 self.assertEqual(bio.pending, 3-i-1)
1592 for i in range(3):
1593 bio.write(b'x')
1594 self.assertEqual(bio.pending, i+1)
1595 bio.read()
1596 self.assertEqual(bio.pending, 0)
1597
1598 def test_buffer_types(self):
1599 bio = ssl.MemoryBIO()
1600 bio.write(b'foo')
1601 self.assertEqual(bio.read(), b'foo')
1602 bio.write(bytearray(b'bar'))
1603 self.assertEqual(bio.read(), b'bar')
1604 bio.write(memoryview(b'baz'))
1605 self.assertEqual(bio.read(), b'baz')
1606
1607 def test_error_types(self):
1608 bio = ssl.MemoryBIO()
1609 self.assertRaises(TypeError, bio.write, 'foo')
1610 self.assertRaises(TypeError, bio.write, None)
1611 self.assertRaises(TypeError, bio.write, True)
1612 self.assertRaises(TypeError, bio.write, 1)
1613
1614
Martin Panter3840b2a2016-03-27 01:53:46 +00001615class SimpleBackgroundTests(unittest.TestCase):
Martin Panter3840b2a2016-03-27 01:53:46 +00001616 """Tests that connect to a simple server running in the background"""
1617
1618 def setUp(self):
1619 server = ThreadedEchoServer(SIGNED_CERTFILE)
1620 self.server_addr = (HOST, server.port)
1621 server.__enter__()
1622 self.addCleanup(server.__exit__, None, None, None)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001623
Antoine Pitrou480a1242010-04-28 21:37:09 +00001624 def test_connect(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001625 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001626 cert_reqs=ssl.CERT_NONE) as s:
1627 s.connect(self.server_addr)
1628 self.assertEqual({}, s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001629 self.assertFalse(s.server_side)
Antoine Pitrou350c7222010-09-09 13:31:46 +00001630
Martin Panter3840b2a2016-03-27 01:53:46 +00001631 # this should succeed because we specify the root cert
Christian Heimesd0486372016-09-10 23:23:33 +02001632 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001633 cert_reqs=ssl.CERT_REQUIRED,
1634 ca_certs=SIGNING_CA) as s:
1635 s.connect(self.server_addr)
1636 self.assertTrue(s.getpeercert())
Christian Heimesa5d07652016-09-24 10:48:05 +02001637 self.assertFalse(s.server_side)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001638
Martin Panter3840b2a2016-03-27 01:53:46 +00001639 def test_connect_fail(self):
1640 # This should fail because we have no verification certs. Connection
1641 # failure crashes ThreadedEchoServer, so run this in an independent
1642 # test method.
Christian Heimesd0486372016-09-10 23:23:33 +02001643 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001644 cert_reqs=ssl.CERT_REQUIRED)
1645 self.addCleanup(s.close)
1646 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1647 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001648
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001649 def test_connect_ex(self):
1650 # Issue #11326: check connect_ex() implementation
Christian Heimesd0486372016-09-10 23:23:33 +02001651 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001652 cert_reqs=ssl.CERT_REQUIRED,
1653 ca_certs=SIGNING_CA)
1654 self.addCleanup(s.close)
1655 self.assertEqual(0, s.connect_ex(self.server_addr))
1656 self.assertTrue(s.getpeercert())
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001657
1658 def test_non_blocking_connect_ex(self):
1659 # Issue #11326: non-blocking connect_ex() should allow handshake
1660 # to proceed after the socket gets ready.
Christian Heimesd0486372016-09-10 23:23:33 +02001661 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001662 cert_reqs=ssl.CERT_REQUIRED,
1663 ca_certs=SIGNING_CA,
1664 do_handshake_on_connect=False)
1665 self.addCleanup(s.close)
1666 s.setblocking(False)
1667 rc = s.connect_ex(self.server_addr)
1668 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1669 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1670 # Wait for connect to finish
1671 select.select([], [s], [], 5.0)
1672 # Non-blocking handshake
1673 while True:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001674 try:
Martin Panter3840b2a2016-03-27 01:53:46 +00001675 s.do_handshake()
1676 break
1677 except ssl.SSLWantReadError:
1678 select.select([s], [], [], 5.0)
1679 except ssl.SSLWantWriteError:
Antoine Pitroue93bf7a2011-02-26 23:24:06 +00001680 select.select([], [s], [], 5.0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001681 # SSL established
1682 self.assertTrue(s.getpeercert())
Antoine Pitrou40f12ab2012-12-28 19:03:43 +01001683
Antoine Pitrou152efa22010-05-16 18:19:27 +00001684 def test_connect_with_context(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001685 # Same as test_connect, but with a separately created context
Christian Heimesa170fa12017-09-15 20:27:30 +02001686 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001687 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1688 s.connect(self.server_addr)
1689 self.assertEqual({}, s.getpeercert())
1690 # Same with a server hostname
1691 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1692 server_hostname="dummy") as s:
1693 s.connect(self.server_addr)
1694 ctx.verify_mode = ssl.CERT_REQUIRED
1695 # This should succeed because we specify the root cert
1696 ctx.load_verify_locations(SIGNING_CA)
1697 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1698 s.connect(self.server_addr)
1699 cert = s.getpeercert()
1700 self.assertTrue(cert)
1701
1702 def test_connect_with_context_fail(self):
1703 # This should fail because we have no verification certs. Connection
1704 # failure crashes ThreadedEchoServer, so run this in an independent
1705 # test method.
Christian Heimesa170fa12017-09-15 20:27:30 +02001706 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001707 ctx.verify_mode = ssl.CERT_REQUIRED
1708 s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1709 self.addCleanup(s.close)
1710 self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
1711 s.connect, self.server_addr)
Antoine Pitrou152efa22010-05-16 18:19:27 +00001712
1713 def test_connect_capath(self):
1714 # Verify server certificates using the `capath` argument
Antoine Pitrou467f28d2010-05-16 19:22:44 +00001715 # NOTE: the subject hashing algorithm has been changed between
1716 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1717 # contain both versions of each certificate (same content, different
Antoine Pitroud7e4c1c2010-05-17 14:13:10 +00001718 # filename) for this test to be portable across OpenSSL releases.
Christian Heimesa170fa12017-09-15 20:27:30 +02001719 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001720 ctx.verify_mode = ssl.CERT_REQUIRED
1721 ctx.load_verify_locations(capath=CAPATH)
1722 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1723 s.connect(self.server_addr)
1724 cert = s.getpeercert()
1725 self.assertTrue(cert)
1726 # Same with a bytes `capath` argument
Christian Heimesa170fa12017-09-15 20:27:30 +02001727 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001728 ctx.verify_mode = ssl.CERT_REQUIRED
1729 ctx.load_verify_locations(capath=BYTES_CAPATH)
1730 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1731 s.connect(self.server_addr)
1732 cert = s.getpeercert()
1733 self.assertTrue(cert)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001734
Christian Heimesefff7062013-11-21 03:35:02 +01001735 def test_connect_cadata(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001736 with open(SIGNING_CA) as f:
Christian Heimesefff7062013-11-21 03:35:02 +01001737 pem = f.read()
1738 der = ssl.PEM_cert_to_DER_cert(pem)
Christian Heimesa170fa12017-09-15 20:27:30 +02001739 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001740 ctx.verify_mode = ssl.CERT_REQUIRED
1741 ctx.load_verify_locations(cadata=pem)
1742 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1743 s.connect(self.server_addr)
1744 cert = s.getpeercert()
1745 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001746
Martin Panter3840b2a2016-03-27 01:53:46 +00001747 # same with DER
Christian Heimesa170fa12017-09-15 20:27:30 +02001748 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001749 ctx.verify_mode = ssl.CERT_REQUIRED
1750 ctx.load_verify_locations(cadata=der)
1751 with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
1752 s.connect(self.server_addr)
1753 cert = s.getpeercert()
1754 self.assertTrue(cert)
Christian Heimesefff7062013-11-21 03:35:02 +01001755
Antoine Pitroue3220242010-04-24 11:13:53 +00001756 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1757 def test_makefile_close(self):
1758 # Issue #5238: creating a file-like object with makefile() shouldn't
1759 # delay closing the underlying "real socket" (here tested with its
1760 # file descriptor, hence skipping the test under Windows).
Christian Heimesd0486372016-09-10 23:23:33 +02001761 ss = test_wrap_socket(socket.socket(socket.AF_INET))
Martin Panter3840b2a2016-03-27 01:53:46 +00001762 ss.connect(self.server_addr)
1763 fd = ss.fileno()
1764 f = ss.makefile()
1765 f.close()
1766 # The fd is still open
1767 os.read(fd, 0)
1768 # Closing the SSL socket should close the fd too
1769 ss.close()
1770 gc.collect()
1771 with self.assertRaises(OSError) as e:
Antoine Pitroue3220242010-04-24 11:13:53 +00001772 os.read(fd, 0)
Martin Panter3840b2a2016-03-27 01:53:46 +00001773 self.assertEqual(e.exception.errno, errno.EBADF)
Antoine Pitroue3220242010-04-24 11:13:53 +00001774
Antoine Pitrou480a1242010-04-28 21:37:09 +00001775 def test_non_blocking_handshake(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001776 s = socket.socket(socket.AF_INET)
1777 s.connect(self.server_addr)
1778 s.setblocking(False)
Christian Heimesd0486372016-09-10 23:23:33 +02001779 s = test_wrap_socket(s,
Martin Panter3840b2a2016-03-27 01:53:46 +00001780 cert_reqs=ssl.CERT_NONE,
1781 do_handshake_on_connect=False)
1782 self.addCleanup(s.close)
1783 count = 0
1784 while True:
1785 try:
1786 count += 1
1787 s.do_handshake()
1788 break
1789 except ssl.SSLWantReadError:
1790 select.select([s], [], [])
1791 except ssl.SSLWantWriteError:
1792 select.select([], [s], [])
1793 if support.verbose:
1794 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001795
Antoine Pitrou480a1242010-04-28 21:37:09 +00001796 def test_get_server_certificate(self):
Martin Panter3840b2a2016-03-27 01:53:46 +00001797 _test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
Antoine Pitrou5aefa662011-04-28 19:24:46 +02001798
Martin Panter3840b2a2016-03-27 01:53:46 +00001799 def test_get_server_certificate_fail(self):
1800 # Connection failure crashes ThreadedEchoServer, so run this in an
1801 # independent test method
1802 _test_get_server_certificate_fail(self, *self.server_addr)
Bill Janssen54cc54c2007-12-14 22:08:56 +00001803
Antoine Pitrouf4c7bad2010-08-15 23:02:22 +00001804 def test_ciphers(self):
Christian Heimesd0486372016-09-10 23:23:33 +02001805 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001806 cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
1807 s.connect(self.server_addr)
Christian Heimesd0486372016-09-10 23:23:33 +02001808 with test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001809 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
1810 s.connect(self.server_addr)
1811 # Error checking can happen at instantiation or when connecting
1812 with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
1813 with socket.socket(socket.AF_INET) as sock:
Christian Heimesd0486372016-09-10 23:23:33 +02001814 s = test_wrap_socket(sock,
Martin Panter3840b2a2016-03-27 01:53:46 +00001815 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1816 s.connect(self.server_addr)
Antoine Pitroufec12ff2010-04-21 19:46:23 +00001817
Christian Heimes9a5395a2013-06-17 15:44:12 +02001818 def test_get_ca_certs_capath(self):
1819 # capath certs are loaded on request
Christian Heimesa170fa12017-09-15 20:27:30 +02001820 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Martin Panter3840b2a2016-03-27 01:53:46 +00001821 ctx.load_verify_locations(capath=CAPATH)
1822 self.assertEqual(ctx.get_ca_certs(), [])
Christian Heimesa170fa12017-09-15 20:27:30 +02001823 with ctx.wrap_socket(socket.socket(socket.AF_INET),
1824 server_hostname='localhost') as s:
Martin Panter3840b2a2016-03-27 01:53:46 +00001825 s.connect(self.server_addr)
1826 cert = s.getpeercert()
1827 self.assertTrue(cert)
1828 self.assertEqual(len(ctx.get_ca_certs()), 1)
Christian Heimes9a5395a2013-06-17 15:44:12 +02001829
Christian Heimes575596e2013-12-15 21:49:17 +01001830 @needs_sni
Christian Heimes8e7f3942013-12-05 07:41:08 +01001831 def test_context_setget(self):
1832 # Check that the context of a connected socket can be replaced.
Christian Heimesa170fa12017-09-15 20:27:30 +02001833 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1834 ctx1.load_verify_locations(capath=CAPATH)
1835 ctx2 = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1836 ctx2.load_verify_locations(capath=CAPATH)
Martin Panter3840b2a2016-03-27 01:53:46 +00001837 s = socket.socket(socket.AF_INET)
Christian Heimesa170fa12017-09-15 20:27:30 +02001838 with ctx1.wrap_socket(s, server_hostname='localhost') as ss:
Martin Panter3840b2a2016-03-27 01:53:46 +00001839 ss.connect(self.server_addr)
1840 self.assertIs(ss.context, ctx1)
1841 self.assertIs(ss._sslobj.context, ctx1)
1842 ss.context = ctx2
1843 self.assertIs(ss.context, ctx2)
1844 self.assertIs(ss._sslobj.context, ctx2)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001845
1846 def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
1847 # A simple IO loop. Call func(*args) depending on the error we get
1848 # (WANT_READ or WANT_WRITE) move data between the socket and the BIOs.
1849 timeout = kwargs.get('timeout', 10)
Victor Stinner50a72af2017-09-11 09:34:24 -07001850 deadline = time.monotonic() + timeout
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001851 count = 0
1852 while True:
Victor Stinner50a72af2017-09-11 09:34:24 -07001853 if time.monotonic() > deadline:
1854 self.fail("timeout")
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001855 errno = None
1856 count += 1
1857 try:
1858 ret = func(*args)
1859 except ssl.SSLError as e:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001860 if e.errno not in (ssl.SSL_ERROR_WANT_READ,
Martin Panter40b97ec2016-01-14 13:05:46 +00001861 ssl.SSL_ERROR_WANT_WRITE):
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001862 raise
1863 errno = e.errno
1864 # Get any data from the outgoing BIO irrespective of any error, and
1865 # send it to the socket.
1866 buf = outgoing.read()
1867 sock.sendall(buf)
1868 # If there's no error, we're done. For WANT_READ, we need to get
1869 # data from the socket and put it in the incoming BIO.
1870 if errno is None:
1871 break
1872 elif errno == ssl.SSL_ERROR_WANT_READ:
1873 buf = sock.recv(32768)
1874 if buf:
1875 incoming.write(buf)
1876 else:
1877 incoming.write_eof()
1878 if support.verbose:
1879 sys.stdout.write("Needed %d calls to complete %s().\n"
1880 % (count, func.__name__))
1881 return ret
1882
Martin Panter3840b2a2016-03-27 01:53:46 +00001883 def test_bio_handshake(self):
1884 sock = socket.socket(socket.AF_INET)
1885 self.addCleanup(sock.close)
1886 sock.connect(self.server_addr)
1887 incoming = ssl.MemoryBIO()
1888 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001889 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1890 self.assertTrue(ctx.check_hostname)
1891 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
Martin Panter3840b2a2016-03-27 01:53:46 +00001892 ctx.load_verify_locations(SIGNING_CA)
Christian Heimesa170fa12017-09-15 20:27:30 +02001893 sslobj = ctx.wrap_bio(incoming, outgoing, False,
1894 SIGNED_CERTFILE_HOSTNAME)
Martin Panter3840b2a2016-03-27 01:53:46 +00001895 self.assertIs(sslobj._sslobj.owner, sslobj)
1896 self.assertIsNone(sslobj.cipher())
Christian Heimes68771112017-09-05 21:55:40 -07001897 self.assertIsNone(sslobj.version())
Christian Heimes01113fa2016-09-05 23:23:24 +02001898 self.assertIsNotNone(sslobj.shared_ciphers())
Martin Panter3840b2a2016-03-27 01:53:46 +00001899 self.assertRaises(ValueError, sslobj.getpeercert)
1900 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1901 self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
1902 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1903 self.assertTrue(sslobj.cipher())
Christian Heimes01113fa2016-09-05 23:23:24 +02001904 self.assertIsNotNone(sslobj.shared_ciphers())
Christian Heimes68771112017-09-05 21:55:40 -07001905 self.assertIsNotNone(sslobj.version())
Martin Panter3840b2a2016-03-27 01:53:46 +00001906 self.assertTrue(sslobj.getpeercert())
1907 if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
1908 self.assertTrue(sslobj.get_channel_binding('tls-unique'))
1909 try:
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001910 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Martin Panter3840b2a2016-03-27 01:53:46 +00001911 except ssl.SSLSyscallError:
1912 # If the server shuts down the TCP connection without sending a
1913 # secure shutdown message, this is reported as SSL_ERROR_SYSCALL
1914 pass
1915 self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
1916
1917 def test_bio_read_write_data(self):
1918 sock = socket.socket(socket.AF_INET)
1919 self.addCleanup(sock.close)
1920 sock.connect(self.server_addr)
1921 incoming = ssl.MemoryBIO()
1922 outgoing = ssl.MemoryBIO()
Christian Heimesa170fa12017-09-15 20:27:30 +02001923 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
Martin Panter3840b2a2016-03-27 01:53:46 +00001924 ctx.verify_mode = ssl.CERT_NONE
1925 sslobj = ctx.wrap_bio(incoming, outgoing, False)
1926 self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
1927 req = b'FOO\n'
1928 self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
1929 buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
1930 self.assertEqual(buf, b'foo\n')
1931 self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
Antoine Pitroub1fdf472014-10-05 20:41:53 +02001932
1933
Martin Panter3840b2a2016-03-27 01:53:46 +00001934class NetworkedTests(unittest.TestCase):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001935
Martin Panter3840b2a2016-03-27 01:53:46 +00001936 def test_timeout_connect_ex(self):
1937 # Issue #12065: on a timeout, connect_ex() should return the original
1938 # errno (mimicking the behaviour of non-SSL sockets).
1939 with support.transient_internet(REMOTE_HOST):
Christian Heimesd0486372016-09-10 23:23:33 +02001940 s = test_wrap_socket(socket.socket(socket.AF_INET),
Martin Panter3840b2a2016-03-27 01:53:46 +00001941 cert_reqs=ssl.CERT_REQUIRED,
1942 do_handshake_on_connect=False)
1943 self.addCleanup(s.close)
1944 s.settimeout(0.0000001)
1945 rc = s.connect_ex((REMOTE_HOST, 443))
1946 if rc == 0:
1947 self.skipTest("REMOTE_HOST responded too quickly")
1948 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1949
1950 @unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
1951 def test_get_server_certificate_ipv6(self):
1952 with support.transient_internet('ipv6.google.com'):
1953 _test_get_server_certificate(self, 'ipv6.google.com', 443)
1954 _test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
1955
Martin Panter3840b2a2016-03-27 01:53:46 +00001956
1957def _test_get_server_certificate(test, host, port, cert=None):
1958 pem = ssl.get_server_certificate((host, port))
1959 if not pem:
1960 test.fail("No server certificate on %s:%s!" % (host, port))
1961
1962 pem = ssl.get_server_certificate((host, port), ca_certs=cert)
1963 if not pem:
1964 test.fail("No server certificate on %s:%s!" % (host, port))
1965 if support.verbose:
1966 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1967
1968def _test_get_server_certificate_fail(test, host, port):
1969 try:
1970 pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
1971 except ssl.SSLError as x:
1972 #should fail
1973 if support.verbose:
1974 sys.stdout.write("%s\n" % x)
1975 else:
1976 test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1977
1978
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001979from test.ssl_servers import make_https_server
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001980
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001981class ThreadedEchoServer(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001982
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001983 class ConnectionHandler(threading.Thread):
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001984
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001985 """A mildly complicated class, because we want it to work both
1986 with and without the SSL wrapper around the socket connection, so
1987 that we can test the STARTTLS functionality."""
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001988
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001989 def __init__(self, server, connsock, addr):
1990 self.server = server
1991 self.running = False
1992 self.sock = connsock
1993 self.addr = addr
1994 self.sock.setblocking(1)
1995 self.sslconn = None
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001996 threading.Thread.__init__(self)
Benjamin Peterson4171da52008-08-18 21:11:09 +00001997 self.daemon = True
Thomas Wouters1b7f8912007-09-19 03:06:30 +00001998
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02001999 def wrap_conn(self):
2000 try:
2001 self.sslconn = self.server.context.wrap_socket(
2002 self.sock, server_side=True)
2003 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
2004 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
2005 except (ssl.SSLError, ConnectionResetError, OSError) as e:
2006 # We treat ConnectionResetError as though it were an
2007 # SSLError - OpenSSL on Ubuntu abruptly closes the
2008 # connection when asked to use an unsupported protocol.
2009 #
2010 # OSError may occur with wrong protocols, e.g. both
2011 # sides use PROTOCOL_TLS_SERVER.
2012 #
2013 # XXX Various errors can have happened here, for example
2014 # a mismatching protocol version, an invalid certificate,
2015 # or a low-level bug. This should be made more discriminating.
2016 #
2017 # bpo-31323: Store the exception as string to prevent
2018 # a reference leak: server -> conn_errors -> exception
2019 # -> traceback -> self (ConnectionHandler) -> server
2020 self.server.conn_errors.append(str(e))
2021 if self.server.chatty:
2022 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
2023 self.running = False
2024 self.server.stop()
2025 self.close()
2026 return False
2027 else:
2028 self.server.shared_ciphers.append(self.sslconn.shared_ciphers())
2029 if self.server.context.verify_mode == ssl.CERT_REQUIRED:
2030 cert = self.sslconn.getpeercert()
2031 if support.verbose and self.server.chatty:
2032 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
2033 cert_binary = self.sslconn.getpeercert(True)
2034 if support.verbose and self.server.chatty:
2035 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
2036 cipher = self.sslconn.cipher()
2037 if support.verbose and self.server.chatty:
2038 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
2039 sys.stdout.write(" server: selected protocol is now "
2040 + str(self.sslconn.selected_npn_protocol()) + "\n")
2041 return True
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002042
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002043 def read(self):
2044 if self.sslconn:
2045 return self.sslconn.read()
2046 else:
2047 return self.sock.recv(1024)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002048
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002049 def write(self, bytes):
2050 if self.sslconn:
2051 return self.sslconn.write(bytes)
2052 else:
2053 return self.sock.send(bytes)
2054
2055 def close(self):
2056 if self.sslconn:
2057 self.sslconn.close()
2058 else:
2059 self.sock.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002060
Antoine Pitrou480a1242010-04-28 21:37:09 +00002061 def run(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002062 self.running = True
2063 if not self.server.starttls_server:
2064 if not self.wrap_conn():
2065 return
2066 while self.running:
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002067 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002068 msg = self.read()
2069 stripped = msg.strip()
2070 if not stripped:
2071 # eof, so quit this handler
2072 self.running = False
2073 try:
2074 self.sock = self.sslconn.unwrap()
2075 except OSError:
2076 # Many tests shut the TCP connection down
2077 # without an SSL shutdown. This causes
2078 # unwrap() to raise OSError with errno=0!
2079 pass
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002080 else:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002081 self.sslconn = None
2082 self.close()
2083 elif stripped == b'over':
2084 if support.verbose and self.server.connectionchatty:
2085 sys.stdout.write(" server: client closed connection\n")
2086 self.close()
2087 return
2088 elif (self.server.starttls_server and
2089 stripped == b'STARTTLS'):
2090 if support.verbose and self.server.connectionchatty:
2091 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
2092 self.write(b"OK\n")
2093 if not self.wrap_conn():
2094 return
2095 elif (self.server.starttls_server and self.sslconn
2096 and stripped == b'ENDTLS'):
2097 if support.verbose and self.server.connectionchatty:
2098 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
2099 self.write(b"OK\n")
2100 self.sock = self.sslconn.unwrap()
2101 self.sslconn = None
2102 if support.verbose and self.server.connectionchatty:
2103 sys.stdout.write(" server: connection is now unencrypted...\n")
2104 elif stripped == b'CB tls-unique':
2105 if support.verbose and self.server.connectionchatty:
2106 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
2107 data = self.sslconn.get_channel_binding("tls-unique")
2108 self.write(repr(data).encode("us-ascii") + b"\n")
2109 else:
2110 if (support.verbose and
2111 self.server.connectionchatty):
2112 ctype = (self.sslconn and "encrypted") or "unencrypted"
2113 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
2114 % (msg, ctype, msg.lower(), ctype))
2115 self.write(msg.lower())
2116 except OSError:
2117 if self.server.chatty:
2118 handle_error("Test server failure:\n")
Bill Janssen2f5799b2008-06-29 00:08:12 +00002119 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002120 self.running = False
2121 # normally, we'd just stop here, but for the test
2122 # harness, we want to stop the server
2123 self.server.stop()
Bill Janssen54cc54c2007-12-14 22:08:56 +00002124
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002125 def __init__(self, certificate=None, ssl_version=None,
2126 certreqs=None, cacerts=None,
2127 chatty=True, connectionchatty=False, starttls_server=False,
2128 npn_protocols=None, alpn_protocols=None,
2129 ciphers=None, context=None):
2130 if context:
2131 self.context = context
2132 else:
2133 self.context = ssl.SSLContext(ssl_version
2134 if ssl_version is not None
Christian Heimesa170fa12017-09-15 20:27:30 +02002135 else ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002136 self.context.verify_mode = (certreqs if certreqs is not None
2137 else ssl.CERT_NONE)
2138 if cacerts:
2139 self.context.load_verify_locations(cacerts)
2140 if certificate:
2141 self.context.load_cert_chain(certificate)
2142 if npn_protocols:
2143 self.context.set_npn_protocols(npn_protocols)
2144 if alpn_protocols:
2145 self.context.set_alpn_protocols(alpn_protocols)
2146 if ciphers:
2147 self.context.set_ciphers(ciphers)
2148 self.chatty = chatty
2149 self.connectionchatty = connectionchatty
2150 self.starttls_server = starttls_server
2151 self.sock = socket.socket()
2152 self.port = support.bind_port(self.sock)
2153 self.flag = None
2154 self.active = False
2155 self.selected_npn_protocols = []
2156 self.selected_alpn_protocols = []
2157 self.shared_ciphers = []
2158 self.conn_errors = []
2159 threading.Thread.__init__(self)
2160 self.daemon = True
2161
2162 def __enter__(self):
2163 self.start(threading.Event())
2164 self.flag.wait()
2165 return self
2166
2167 def __exit__(self, *args):
2168 self.stop()
2169 self.join()
2170
2171 def start(self, flag=None):
2172 self.flag = flag
2173 threading.Thread.start(self)
2174
2175 def run(self):
2176 self.sock.settimeout(0.05)
2177 self.sock.listen()
2178 self.active = True
2179 if self.flag:
2180 # signal an event
2181 self.flag.set()
2182 while self.active:
2183 try:
2184 newconn, connaddr = self.sock.accept()
2185 if support.verbose and self.chatty:
2186 sys.stdout.write(' server: new connection from '
2187 + repr(connaddr) + '\n')
2188 handler = self.ConnectionHandler(self, newconn, connaddr)
2189 handler.start()
2190 handler.join()
2191 except socket.timeout:
2192 pass
2193 except KeyboardInterrupt:
2194 self.stop()
2195 self.sock.close()
2196
2197 def stop(self):
2198 self.active = False
2199
2200class AsyncoreEchoServer(threading.Thread):
2201
2202 # this one's based on asyncore.dispatcher
2203
2204 class EchoServer (asyncore.dispatcher):
2205
2206 class ConnectionHandler(asyncore.dispatcher_with_send):
2207
2208 def __init__(self, conn, certfile):
2209 self.socket = test_wrap_socket(conn, server_side=True,
2210 certfile=certfile,
2211 do_handshake_on_connect=False)
2212 asyncore.dispatcher_with_send.__init__(self, self.socket)
2213 self._ssl_accepting = True
2214 self._do_ssl_handshake()
2215
2216 def readable(self):
2217 if isinstance(self.socket, ssl.SSLSocket):
2218 while self.socket.pending() > 0:
2219 self.handle_read_event()
2220 return True
2221
2222 def _do_ssl_handshake(self):
2223 try:
2224 self.socket.do_handshake()
2225 except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
2226 return
2227 except ssl.SSLEOFError:
2228 return self.handle_close()
2229 except ssl.SSLError:
Bill Janssen54cc54c2007-12-14 22:08:56 +00002230 raise
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002231 except OSError as err:
2232 if err.args[0] == errno.ECONNABORTED:
2233 return self.handle_close()
2234 else:
2235 self._ssl_accepting = False
Bill Janssen54cc54c2007-12-14 22:08:56 +00002236
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002237 def handle_read(self):
2238 if self._ssl_accepting:
2239 self._do_ssl_handshake()
2240 else:
2241 data = self.recv(1024)
2242 if support.verbose:
2243 sys.stdout.write(" server: read %s from client\n" % repr(data))
2244 if not data:
2245 self.close()
2246 else:
2247 self.send(data.lower())
Bill Janssen54cc54c2007-12-14 22:08:56 +00002248
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002249 def handle_close(self):
2250 self.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002251 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002252 sys.stdout.write(" server: closed connection %s\n" % self.socket)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002253
2254 def handle_error(self):
2255 raise
2256
Trent Nelson78520002008-04-10 20:54:35 +00002257 def __init__(self, certfile):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002258 self.certfile = certfile
2259 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2260 self.port = support.bind_port(sock, '')
2261 asyncore.dispatcher.__init__(self, sock)
2262 self.listen(5)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002263
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002264 def handle_accepted(self, sock_obj, addr):
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002265 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002266 sys.stdout.write(" server: new connection from %s:%s\n" %addr)
2267 self.ConnectionHandler(sock_obj, self.certfile)
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002268
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002269 def handle_error(self):
2270 raise
Bill Janssen54cc54c2007-12-14 22:08:56 +00002271
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002272 def __init__(self, certfile):
2273 self.flag = None
2274 self.active = False
2275 self.server = self.EchoServer(certfile)
2276 self.port = self.server.port
2277 threading.Thread.__init__(self)
2278 self.daemon = True
Bill Janssen54cc54c2007-12-14 22:08:56 +00002279
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002280 def __str__(self):
2281 return "<%s %s>" % (self.__class__.__name__, self.server)
Bill Janssen54cc54c2007-12-14 22:08:56 +00002282
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002283 def __enter__(self):
2284 self.start(threading.Event())
2285 self.flag.wait()
2286 return self
Thomas Woutersed03b412007-08-28 21:37:11 +00002287
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002288 def __exit__(self, *args):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002289 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002290 sys.stdout.write(" cleanup: stopping server.\n")
2291 self.stop()
2292 if support.verbose:
2293 sys.stdout.write(" cleanup: joining server thread.\n")
2294 self.join()
2295 if support.verbose:
2296 sys.stdout.write(" cleanup: successfully joined.\n")
2297 # make sure that ConnectionHandler is removed from socket_map
2298 asyncore.close_all(ignore_all=True)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002299
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002300 def start (self, flag=None):
2301 self.flag = flag
2302 threading.Thread.start(self)
Antoine Pitrou2463e5f2013-03-28 22:24:43 +01002303
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002304 def run(self):
2305 self.active = True
2306 if self.flag:
2307 self.flag.set()
2308 while self.active:
Antoine Pitrou773b5db2010-04-27 08:53:36 +00002309 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002310 asyncore.loop(1)
2311 except:
2312 pass
Trent Nelson6b240cd2008-04-10 20:12:06 +00002313
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002314 def stop(self):
2315 self.active = False
2316 self.server.close()
Thomas Wouters1b7f8912007-09-19 03:06:30 +00002317
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002318def server_params_test(client_context, server_context, indata=b"FOO\n",
2319 chatty=True, connectionchatty=False, sni_name=None,
2320 session=None):
2321 """
2322 Launch a server, connect a client to it and try various reads
2323 and writes.
2324 """
2325 stats = {}
2326 server = ThreadedEchoServer(context=server_context,
2327 chatty=chatty,
2328 connectionchatty=False)
2329 with server:
2330 with client_context.wrap_socket(socket.socket(),
2331 server_hostname=sni_name, session=session) as s:
2332 s.connect((HOST, server.port))
2333 for arg in [indata, bytearray(indata), memoryview(indata)]:
2334 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002335 if support.verbose:
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002336 sys.stdout.write(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002337 " client: sending %r...\n" % indata)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002338 s.write(arg)
Antoine Pitrou18c913e2010-04-27 10:59:39 +00002339 outdata = s.read()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002340 if connectionchatty:
2341 if support.verbose:
2342 sys.stdout.write(" client: read %r\n" % outdata)
Trent Nelson6b240cd2008-04-10 20:12:06 +00002343 if outdata != indata.lower():
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002344 raise AssertionError(
Antoine Pitrou480a1242010-04-28 21:37:09 +00002345 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2346 % (outdata[:20], len(outdata),
2347 indata[:20].lower(), len(indata)))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002348 s.write(b"over\n")
2349 if connectionchatty:
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002350 if support.verbose:
Trent Nelson6b240cd2008-04-10 20:12:06 +00002351 sys.stdout.write(" client: closing connection.\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002352 stats.update({
2353 'compression': s.compression(),
2354 'cipher': s.cipher(),
2355 'peercert': s.getpeercert(),
2356 'client_alpn_protocol': s.selected_alpn_protocol(),
2357 'client_npn_protocol': s.selected_npn_protocol(),
2358 'version': s.version(),
2359 'session_reused': s.session_reused,
2360 'session': s.session,
2361 })
2362 s.close()
2363 stats['server_alpn_protocols'] = server.selected_alpn_protocols
2364 stats['server_npn_protocols'] = server.selected_npn_protocols
2365 stats['server_shared_ciphers'] = server.shared_ciphers
2366 return stats
Trent Nelson6b240cd2008-04-10 20:12:06 +00002367
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002368def try_protocol_combo(server_protocol, client_protocol, expect_success,
2369 certsreqs=None, server_options=0, client_options=0):
2370 """
2371 Try to SSL-connect using *client_protocol* to *server_protocol*.
2372 If *expect_success* is true, assert that the connection succeeds,
2373 if it's false, assert that the connection fails.
2374 Also, if *expect_success* is a string, assert that it is the protocol
2375 version actually used by the connection.
2376 """
2377 if certsreqs is None:
2378 certsreqs = ssl.CERT_NONE
2379 certtype = {
2380 ssl.CERT_NONE: "CERT_NONE",
2381 ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2382 ssl.CERT_REQUIRED: "CERT_REQUIRED",
2383 }[certsreqs]
2384 if support.verbose:
2385 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2386 sys.stdout.write(formatstr %
2387 (ssl.get_protocol_name(client_protocol),
2388 ssl.get_protocol_name(server_protocol),
2389 certtype))
2390 client_context = ssl.SSLContext(client_protocol)
2391 client_context.options |= client_options
2392 server_context = ssl.SSLContext(server_protocol)
2393 server_context.options |= server_options
2394
2395 # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2396 # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2397 # starting from OpenSSL 1.0.0 (see issue #8322).
Christian Heimesa170fa12017-09-15 20:27:30 +02002398 if client_context.protocol == ssl.PROTOCOL_TLS:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002399 client_context.set_ciphers("ALL")
2400
2401 for ctx in (client_context, server_context):
2402 ctx.verify_mode = certsreqs
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002403 ctx.load_cert_chain(SIGNED_CERTFILE)
2404 ctx.load_verify_locations(SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002405 try:
2406 stats = server_params_test(client_context, server_context,
2407 chatty=False, connectionchatty=False)
2408 # Protocol mismatch can result in either an SSLError, or a
2409 # "Connection reset by peer" error.
2410 except ssl.SSLError:
2411 if expect_success:
2412 raise
2413 except OSError as e:
2414 if expect_success or e.errno != errno.ECONNRESET:
2415 raise
2416 else:
2417 if not expect_success:
2418 raise AssertionError(
2419 "Client protocol %s succeeded with server protocol %s!"
2420 % (ssl.get_protocol_name(client_protocol),
2421 ssl.get_protocol_name(server_protocol)))
2422 elif (expect_success is not True
2423 and expect_success != stats['version']):
2424 raise AssertionError("version mismatch: expected %r, got %r"
2425 % (expect_success, stats['version']))
2426
2427
2428class ThreadedTests(unittest.TestCase):
2429
2430 @skip_if_broken_ubuntu_ssl
2431 def test_echo(self):
2432 """Basic test of an SSL client connecting to a server"""
2433 if support.verbose:
2434 sys.stdout.write("\n")
2435 for protocol in PROTOCOLS:
2436 if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
2437 continue
2438 with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
2439 context = ssl.SSLContext(protocol)
2440 context.load_cert_chain(CERTFILE)
2441 server_params_test(context, context,
2442 chatty=True, connectionchatty=True)
2443
Christian Heimesa170fa12017-09-15 20:27:30 +02002444 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002445
2446 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_SERVER):
2447 server_params_test(client_context=client_context,
2448 server_context=server_context,
2449 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002450 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002451
2452 client_context.check_hostname = False
2453 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_CLIENT):
2454 with self.assertRaises(ssl.SSLError) as e:
2455 server_params_test(client_context=server_context,
2456 server_context=client_context,
2457 chatty=True, connectionchatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02002458 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002459 self.assertIn('called a function you should not call',
2460 str(e.exception))
2461
2462 with self.subTest(client=ssl.PROTOCOL_TLS_SERVER, server=ssl.PROTOCOL_TLS_SERVER):
2463 with self.assertRaises(ssl.SSLError) as e:
2464 server_params_test(client_context=server_context,
2465 server_context=server_context,
2466 chatty=True, connectionchatty=True)
2467 self.assertIn('called a function you should not call',
2468 str(e.exception))
2469
2470 with self.subTest(client=ssl.PROTOCOL_TLS_CLIENT, server=ssl.PROTOCOL_TLS_CLIENT):
2471 with self.assertRaises(ssl.SSLError) as e:
2472 server_params_test(client_context=server_context,
2473 server_context=client_context,
2474 chatty=True, connectionchatty=True)
2475 self.assertIn('called a function you should not call',
2476 str(e.exception))
2477
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002478 def test_getpeercert(self):
2479 if support.verbose:
2480 sys.stdout.write("\n")
Christian Heimesa170fa12017-09-15 20:27:30 +02002481
2482 client_context, server_context, hostname = testing_context()
2483 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002484 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002485 with client_context.wrap_socket(socket.socket(),
2486 do_handshake_on_connect=False,
2487 server_hostname=hostname) as s:
2488 s.connect((HOST, server.port))
2489 # getpeercert() raise ValueError while the handshake isn't
2490 # done.
2491 with self.assertRaises(ValueError):
2492 s.getpeercert()
2493 s.do_handshake()
2494 cert = s.getpeercert()
2495 self.assertTrue(cert, "Can't get peer certificate.")
2496 cipher = s.cipher()
2497 if support.verbose:
2498 sys.stdout.write(pprint.pformat(cert) + '\n')
2499 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2500 if 'subject' not in cert:
2501 self.fail("No subject field in certificate: %s." %
2502 pprint.pformat(cert))
2503 if ((('organizationName', 'Python Software Foundation'),)
2504 not in cert['subject']):
2505 self.fail(
2506 "Missing or invalid 'organizationName' field in certificate subject; "
2507 "should be 'Python Software Foundation'.")
2508 self.assertIn('notBefore', cert)
2509 self.assertIn('notAfter', cert)
2510 before = ssl.cert_time_to_seconds(cert['notBefore'])
2511 after = ssl.cert_time_to_seconds(cert['notAfter'])
2512 self.assertLess(before, after)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002513
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002514 @unittest.skipUnless(have_verify_flags(),
2515 "verify_flags need OpenSSL > 0.9.8")
2516 def test_crl_check(self):
2517 if support.verbose:
2518 sys.stdout.write("\n")
2519
Christian Heimesa170fa12017-09-15 20:27:30 +02002520 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002521
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002522 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
Christian Heimesa170fa12017-09-15 20:27:30 +02002523 self.assertEqual(client_context.verify_flags, ssl.VERIFY_DEFAULT | tf)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002524
2525 # VERIFY_DEFAULT should pass
2526 server = ThreadedEchoServer(context=server_context, chatty=True)
2527 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002528 with client_context.wrap_socket(socket.socket(),
2529 server_hostname=hostname) as s:
Antoine Pitrou65a3f4b2011-12-21 16:52:40 +01002530 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002531 cert = s.getpeercert()
2532 self.assertTrue(cert, "Can't get peer certificate.")
Bill Janssen58afe4c2008-09-08 16:45:19 +00002533
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002534 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
Christian Heimesa170fa12017-09-15 20:27:30 +02002535 client_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
Bill Janssen58afe4c2008-09-08 16:45:19 +00002536
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002537 server = ThreadedEchoServer(context=server_context, chatty=True)
2538 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002539 with client_context.wrap_socket(socket.socket(),
2540 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002541 with self.assertRaisesRegex(ssl.SSLError,
2542 "certificate verify failed"):
2543 s.connect((HOST, server.port))
Bill Janssen58afe4c2008-09-08 16:45:19 +00002544
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002545 # now load a CRL file. The CRL file is signed by the CA.
Christian Heimesa170fa12017-09-15 20:27:30 +02002546 client_context.load_verify_locations(CRLFILE)
Bill Janssen58afe4c2008-09-08 16:45:19 +00002547
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002548 server = ThreadedEchoServer(context=server_context, chatty=True)
2549 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002550 with client_context.wrap_socket(socket.socket(),
2551 server_hostname=hostname) as s:
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002552 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002553 cert = s.getpeercert()
2554 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002555
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002556 def test_check_hostname(self):
2557 if support.verbose:
2558 sys.stdout.write("\n")
Antoine Pitroub4bebda2014-04-29 10:03:28 +02002559
Christian Heimesa170fa12017-09-15 20:27:30 +02002560 client_context, server_context, hostname = testing_context()
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002561
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002562 # correct hostname should verify
2563 server = ThreadedEchoServer(context=server_context, chatty=True)
2564 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002565 with client_context.wrap_socket(socket.socket(),
2566 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002567 s.connect((HOST, server.port))
2568 cert = s.getpeercert()
2569 self.assertTrue(cert, "Can't get peer certificate.")
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002570
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002571 # incorrect hostname should raise an exception
2572 server = ThreadedEchoServer(context=server_context, chatty=True)
2573 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02002574 with client_context.wrap_socket(socket.socket(),
2575 server_hostname="invalid") as s:
Christian Heimes61d478c2018-01-27 15:51:38 +01002576 with self.assertRaisesRegex(
2577 ssl.CertificateError,
2578 "Hostname mismatch, certificate is not valid for 'invalid'."):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002579 s.connect((HOST, server.port))
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002580
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002581 # missing server_hostname arg should cause an exception, too
2582 server = ThreadedEchoServer(context=server_context, chatty=True)
2583 with server:
2584 with socket.socket() as s:
2585 with self.assertRaisesRegex(ValueError,
2586 "check_hostname requires server_hostname"):
Christian Heimesa170fa12017-09-15 20:27:30 +02002587 client_context.wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002588
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002589 def test_ecc_cert(self):
2590 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2591 client_context.load_verify_locations(SIGNING_CA)
2592 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2593 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2594
2595 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2596 # load ECC cert
2597 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2598
2599 # correct hostname should verify
2600 server = ThreadedEchoServer(context=server_context, chatty=True)
2601 with server:
2602 with client_context.wrap_socket(socket.socket(),
2603 server_hostname=hostname) as s:
2604 s.connect((HOST, server.port))
2605 cert = s.getpeercert()
2606 self.assertTrue(cert, "Can't get peer certificate.")
2607 cipher = s.cipher()[0].split('-')
2608 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2609
2610 def test_dual_rsa_ecc(self):
2611 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2612 client_context.load_verify_locations(SIGNING_CA)
2613 # only ECDSA certs
2614 client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
2615 hostname = SIGNED_CERTFILE_ECC_HOSTNAME
2616
2617 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2618 # load ECC and RSA key/cert pairs
2619 server_context.load_cert_chain(SIGNED_CERTFILE_ECC)
2620 server_context.load_cert_chain(SIGNED_CERTFILE)
2621
2622 # correct hostname should verify
2623 server = ThreadedEchoServer(context=server_context, chatty=True)
2624 with server:
2625 with client_context.wrap_socket(socket.socket(),
2626 server_hostname=hostname) as s:
2627 s.connect((HOST, server.port))
2628 cert = s.getpeercert()
2629 self.assertTrue(cert, "Can't get peer certificate.")
2630 cipher = s.cipher()[0].split('-')
2631 self.assertTrue(cipher[:2], ('ECDHE', 'ECDSA'))
2632
Christian Heimes66e57422018-01-29 14:25:13 +01002633 def test_check_hostname_idn(self):
2634 if support.verbose:
2635 sys.stdout.write("\n")
2636
2637 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
2638 server_context.load_cert_chain(IDNSANSFILE)
2639
2640 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
2641 context.verify_mode = ssl.CERT_REQUIRED
2642 context.check_hostname = True
2643 context.load_verify_locations(SIGNING_CA)
2644
2645 # correct hostname should verify, when specified in several
2646 # different ways
2647 idn_hostnames = [
2648 ('könig.idn.pythontest.net',
2649 'könig.idn.pythontest.net',),
2650 ('xn--knig-5qa.idn.pythontest.net',
2651 'xn--knig-5qa.idn.pythontest.net'),
2652 (b'xn--knig-5qa.idn.pythontest.net',
2653 b'xn--knig-5qa.idn.pythontest.net'),
2654
2655 ('königsgäßchen.idna2003.pythontest.net',
2656 'königsgäßchen.idna2003.pythontest.net'),
2657 ('xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2658 'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2659 (b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net',
2660 b'xn--knigsgsschen-lcb0w.idna2003.pythontest.net'),
2661 ]
2662 for server_hostname, expected_hostname in idn_hostnames:
2663 server = ThreadedEchoServer(context=server_context, chatty=True)
2664 with server:
2665 with context.wrap_socket(socket.socket(),
2666 server_hostname=server_hostname) as s:
2667 self.assertEqual(s.server_hostname, expected_hostname)
2668 s.connect((HOST, server.port))
2669 cert = s.getpeercert()
2670 self.assertEqual(s.server_hostname, expected_hostname)
2671 self.assertTrue(cert, "Can't get peer certificate.")
2672
2673 with ssl.SSLSocket(socket.socket(),
2674 server_hostname=server_hostname) as s:
2675 s.connect((HOST, server.port))
2676 s.getpeercert()
2677 self.assertEqual(s.server_hostname, expected_hostname)
2678
2679 # bug https://bugs.python.org/issue28414
2680 # IDNA 2008 deviations are broken
2681 idna2008 = 'xn--knigsgchen-b4a3dun.idna2008.pythontest.net'
2682 server = ThreadedEchoServer(context=server_context, chatty=True)
2683 with server:
2684 with self.assertRaises(UnicodeError):
2685 with context.wrap_socket(socket.socket(),
2686 server_hostname=idna2008) as s:
2687 s.connect((HOST, server.port))
2688
2689 # incorrect hostname should raise an exception
2690 server = ThreadedEchoServer(context=server_context, chatty=True)
2691 with server:
2692 with context.wrap_socket(socket.socket(),
2693 server_hostname="python.example.org") as s:
2694 with self.assertRaises(ssl.CertificateError):
2695 s.connect((HOST, server.port))
2696
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002697 def test_wrong_cert(self):
2698 """Connecting when the server rejects the client's certificate
2699
2700 Launch a server with CERT_REQUIRED, and check that trying to
2701 connect to it with a wrong client certificate fails.
2702 """
2703 certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2704 "wrongcert.pem")
2705 server = ThreadedEchoServer(CERTFILE,
2706 certreqs=ssl.CERT_REQUIRED,
2707 cacerts=CERTFILE, chatty=False,
2708 connectionchatty=False)
2709 with server, \
2710 socket.socket() as sock, \
Christian Heimesa170fa12017-09-15 20:27:30 +02002711 test_wrap_socket(sock, certfile=certfile) as s:
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002712 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002713 # Expect either an SSL error about the server rejecting
2714 # the connection, or a low-level connection reset (which
2715 # sometimes happens on Windows)
2716 s.connect((HOST, server.port))
2717 except ssl.SSLError as e:
2718 if support.verbose:
2719 sys.stdout.write("\nSSLError is %r\n" % e)
2720 except OSError as e:
2721 if e.errno != errno.ECONNRESET:
2722 raise
2723 if support.verbose:
2724 sys.stdout.write("\nsocket.error is %r\n" % e)
2725 else:
2726 self.fail("Use of invalid cert should have failed!")
2727
2728 def test_rude_shutdown(self):
2729 """A brutal shutdown of an SSL server should raise an OSError
2730 in the client when attempting handshake.
2731 """
2732 listener_ready = threading.Event()
2733 listener_gone = threading.Event()
2734
2735 s = socket.socket()
2736 port = support.bind_port(s, HOST)
2737
2738 # `listener` runs in a thread. It sits in an accept() until
2739 # the main thread connects. Then it rudely closes the socket,
2740 # and sets Event `listener_gone` to let the main thread know
2741 # the socket is gone.
2742 def listener():
2743 s.listen()
2744 listener_ready.set()
2745 newsock, addr = s.accept()
2746 newsock.close()
2747 s.close()
2748 listener_gone.set()
2749
2750 def connector():
2751 listener_ready.wait()
2752 with socket.socket() as c:
2753 c.connect((HOST, port))
2754 listener_gone.wait()
Antoine Pitrou40f08742010-04-24 22:04:40 +00002755 try:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002756 ssl_sock = test_wrap_socket(c)
2757 except OSError:
2758 pass
2759 else:
2760 self.fail('connecting to closed SSL socket should have failed')
Antoine Pitroud3f8ab82010-04-24 21:26:44 +00002761
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002762 t = threading.Thread(target=listener)
2763 t.start()
2764 try:
2765 connector()
2766 finally:
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002767 t.join()
Antoine Pitrou5c89b4e2012-11-11 01:25:36 +01002768
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002769 def test_ssl_cert_verify_error(self):
2770 if support.verbose:
2771 sys.stdout.write("\n")
2772
2773 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
2774 server_context.load_cert_chain(SIGNED_CERTFILE)
2775
2776 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
2777
2778 server = ThreadedEchoServer(context=server_context, chatty=True)
2779 with server:
2780 with context.wrap_socket(socket.socket(),
Christian Heimesa170fa12017-09-15 20:27:30 +02002781 server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
Christian Heimesb3ad0e52017-09-08 12:00:19 -07002782 try:
2783 s.connect((HOST, server.port))
2784 except ssl.SSLError as e:
2785 msg = 'unable to get local issuer certificate'
2786 self.assertIsInstance(e, ssl.SSLCertVerificationError)
2787 self.assertEqual(e.verify_code, 20)
2788 self.assertEqual(e.verify_message, msg)
2789 self.assertIn(msg, repr(e))
2790 self.assertIn('certificate verify failed', repr(e))
2791
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002792 @skip_if_broken_ubuntu_ssl
2793 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2794 "OpenSSL is compiled without SSLv2 support")
2795 def test_protocol_sslv2(self):
2796 """Connecting to an SSLv2 server with various client options"""
2797 if support.verbose:
2798 sys.stdout.write("\n")
2799 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2800 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2801 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
Christian Heimesa170fa12017-09-15 20:27:30 +02002802 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002803 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2804 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2805 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2806 # SSLv23 client with specific SSL options
2807 if no_sslv2_implies_sslv3_hello():
2808 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002809 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002810 client_options=ssl.OP_NO_SSLv2)
Christian Heimesa170fa12017-09-15 20:27:30 +02002811 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002812 client_options=ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002813 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002814 client_options=ssl.OP_NO_TLSv1)
Antoine Pitrou242db722013-05-01 20:52:07 +02002815
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002816 @skip_if_broken_ubuntu_ssl
Christian Heimesa170fa12017-09-15 20:27:30 +02002817 def test_PROTOCOL_TLS(self):
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002818 """Connecting to an SSLv23 server with various client options"""
2819 if support.verbose:
2820 sys.stdout.write("\n")
2821 if hasattr(ssl, 'PROTOCOL_SSLv2'):
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002822 try:
Christian Heimesa170fa12017-09-15 20:27:30 +02002823 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv2, True)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002824 except OSError as x:
2825 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2826 if support.verbose:
2827 sys.stdout.write(
2828 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2829 % str(x))
2830 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002831 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False)
2832 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True)
2833 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1')
Antoine Pitrou8f85f902012-01-03 22:46:48 +01002834
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002835 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002836 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2837 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_OPTIONAL)
2838 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002839
2840 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002841 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2842 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True, ssl.CERT_REQUIRED)
2843 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002844
2845 # Server with specific SSL options
2846 if hasattr(ssl, 'PROTOCOL_SSLv3'):
Christian Heimesa170fa12017-09-15 20:27:30 +02002847 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_SSLv3, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002848 server_options=ssl.OP_NO_SSLv3)
2849 # Will choose TLSv1
Christian Heimesa170fa12017-09-15 20:27:30 +02002850 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS, True,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002851 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
Christian Heimesa170fa12017-09-15 20:27:30 +02002852 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002853 server_options=ssl.OP_NO_TLSv1)
2854
2855
2856 @skip_if_broken_ubuntu_ssl
2857 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2858 "OpenSSL is compiled without SSLv3 support")
2859 def test_protocol_sslv3(self):
2860 """Connecting to an SSLv3 server with various client options"""
2861 if support.verbose:
2862 sys.stdout.write("\n")
2863 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2864 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2865 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2866 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2867 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002868 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002869 client_options=ssl.OP_NO_SSLv3)
2870 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2871 if no_sslv2_implies_sslv3_hello():
2872 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
Christian Heimesa170fa12017-09-15 20:27:30 +02002873 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002874 False, client_options=ssl.OP_NO_SSLv2)
2875
2876 @skip_if_broken_ubuntu_ssl
2877 def test_protocol_tlsv1(self):
2878 """Connecting to a TLSv1 server with various client options"""
2879 if support.verbose:
2880 sys.stdout.write("\n")
2881 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2882 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2883 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2884 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2885 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2886 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2887 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002888 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002889 client_options=ssl.OP_NO_TLSv1)
2890
2891 @skip_if_broken_ubuntu_ssl
2892 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2893 "TLS version 1.1 not supported.")
2894 def test_protocol_tlsv1_1(self):
2895 """Connecting to a TLSv1.1 server with various client options.
2896 Testing against older TLS versions."""
2897 if support.verbose:
2898 sys.stdout.write("\n")
2899 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2900 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2901 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2902 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2903 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002904 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002905 client_options=ssl.OP_NO_TLSv1_1)
2906
Christian Heimesa170fa12017-09-15 20:27:30 +02002907 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002908 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2909 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2910
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002911 @skip_if_broken_ubuntu_ssl
2912 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2913 "TLS version 1.2 not supported.")
2914 def test_protocol_tlsv1_2(self):
2915 """Connecting to a TLSv1.2 server with various client options.
2916 Testing against older TLS versions."""
2917 if support.verbose:
2918 sys.stdout.write("\n")
2919 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2920 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2921 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2922 if hasattr(ssl, 'PROTOCOL_SSLv2'):
2923 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2924 if hasattr(ssl, 'PROTOCOL_SSLv3'):
2925 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
Christian Heimesa170fa12017-09-15 20:27:30 +02002926 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLS, False,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002927 client_options=ssl.OP_NO_TLSv1_2)
2928
Christian Heimesa170fa12017-09-15 20:27:30 +02002929 try_protocol_combo(ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002930 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2931 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2932 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2933 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2934
2935 def test_starttls(self):
2936 """Switching from clear text to encrypted and back again."""
2937 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2938
2939 server = ThreadedEchoServer(CERTFILE,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002940 starttls_server=True,
2941 chatty=True,
2942 connectionchatty=True)
2943 wrapped = False
2944 with server:
2945 s = socket.socket()
2946 s.setblocking(1)
2947 s.connect((HOST, server.port))
Antoine Pitroud6494802011-07-21 01:11:30 +02002948 if support.verbose:
2949 sys.stdout.write("\n")
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002950 for indata in msgs:
Antoine Pitroud6494802011-07-21 01:11:30 +02002951 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002952 sys.stdout.write(
2953 " client: sending %r...\n" % indata)
2954 if wrapped:
2955 conn.write(indata)
2956 outdata = conn.read()
2957 else:
2958 s.send(indata)
2959 outdata = s.recv(1024)
2960 msg = outdata.strip().lower()
2961 if indata == b"STARTTLS" and msg.startswith(b"ok"):
2962 # STARTTLS ok, switch to secure mode
2963 if support.verbose:
2964 sys.stdout.write(
2965 " client: read %r from server, starting TLS...\n"
2966 % msg)
Christian Heimesa170fa12017-09-15 20:27:30 +02002967 conn = test_wrap_socket(s)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002968 wrapped = True
2969 elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2970 # ENDTLS ok, switch back to clear text
2971 if support.verbose:
2972 sys.stdout.write(
2973 " client: read %r from server, ending TLS...\n"
2974 % msg)
2975 s = conn.unwrap()
2976 wrapped = False
2977 else:
2978 if support.verbose:
2979 sys.stdout.write(
2980 " client: read %r from server\n" % msg)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002981 if support.verbose:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002982 sys.stdout.write(" client: closing connection.\n")
2983 if wrapped:
2984 conn.write(b"over\n")
2985 else:
2986 s.send(b"over\n")
2987 if wrapped:
2988 conn.close()
2989 else:
2990 s.close()
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01002991
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002992 def test_socketserver(self):
2993 """Using socketserver to create and manage SSL connections."""
Christian Heimesbd5c7d22018-01-20 15:16:30 +01002994 server = make_https_server(self, certfile=SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002995 # try to connect
2996 if support.verbose:
2997 sys.stdout.write('\n')
2998 with open(CERTFILE, 'rb') as f:
2999 d1 = f.read()
3000 d2 = ''
3001 # now fetch the same data from the HTTPS server
3002 url = 'https://localhost:%d/%s' % (
3003 server.port, os.path.split(CERTFILE)[1])
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003004 context = ssl.create_default_context(cafile=SIGNING_CA)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003005 f = urllib.request.urlopen(url, context=context)
3006 try:
3007 dlen = f.info().get("content-length")
3008 if dlen and (int(dlen) > 0):
3009 d2 = f.read(int(dlen))
3010 if support.verbose:
3011 sys.stdout.write(
3012 " client: read %d bytes from remote server '%s'\n"
3013 % (len(d2), server))
3014 finally:
3015 f.close()
3016 self.assertEqual(d1, d2)
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003017
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003018 def test_asyncore_server(self):
3019 """Check the example asyncore integration."""
3020 if support.verbose:
3021 sys.stdout.write("\n")
Antoine Pitrou0e576f12011-12-22 10:03:38 +01003022
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003023 indata = b"FOO\n"
3024 server = AsyncoreEchoServer(CERTFILE)
3025 with server:
3026 s = test_wrap_socket(socket.socket())
3027 s.connect(('127.0.0.1', server.port))
3028 if support.verbose:
3029 sys.stdout.write(
3030 " client: sending %r...\n" % indata)
3031 s.write(indata)
3032 outdata = s.read()
3033 if support.verbose:
3034 sys.stdout.write(" client: read %r\n" % outdata)
3035 if outdata != indata.lower():
3036 self.fail(
3037 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
3038 % (outdata[:20], len(outdata),
3039 indata[:20].lower(), len(indata)))
3040 s.write(b"over\n")
3041 if support.verbose:
3042 sys.stdout.write(" client: closing connection.\n")
3043 s.close()
3044 if support.verbose:
3045 sys.stdout.write(" client: connection closed.\n")
Benjamin Petersoncca27322015-01-23 16:35:37 -05003046
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003047 def test_recv_send(self):
3048 """Test recv(), send() and friends."""
3049 if support.verbose:
3050 sys.stdout.write("\n")
3051
3052 server = ThreadedEchoServer(CERTFILE,
3053 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003054 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003055 cacerts=CERTFILE,
3056 chatty=True,
3057 connectionchatty=False)
3058 with server:
3059 s = test_wrap_socket(socket.socket(),
3060 server_side=False,
3061 certfile=CERTFILE,
3062 ca_certs=CERTFILE,
3063 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003064 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003065 s.connect((HOST, server.port))
3066 # helper methods for standardising recv* method signatures
3067 def _recv_into():
3068 b = bytearray(b"\0"*100)
3069 count = s.recv_into(b)
3070 return b[:count]
3071
3072 def _recvfrom_into():
3073 b = bytearray(b"\0"*100)
3074 count, addr = s.recvfrom_into(b)
3075 return b[:count]
3076
3077 # (name, method, expect success?, *args, return value func)
3078 send_methods = [
3079 ('send', s.send, True, [], len),
3080 ('sendto', s.sendto, False, ["some.address"], len),
3081 ('sendall', s.sendall, True, [], lambda x: None),
3082 ]
3083 # (name, method, whether to expect success, *args)
3084 recv_methods = [
3085 ('recv', s.recv, True, []),
3086 ('recvfrom', s.recvfrom, False, ["some.address"]),
3087 ('recv_into', _recv_into, True, []),
3088 ('recvfrom_into', _recvfrom_into, False, []),
3089 ]
3090 data_prefix = "PREFIX_"
3091
3092 for (meth_name, send_meth, expect_success, args,
3093 ret_val_meth) in send_methods:
3094 indata = (data_prefix + meth_name).encode('ascii')
3095 try:
3096 ret = send_meth(indata, *args)
3097 msg = "sending with {}".format(meth_name)
3098 self.assertEqual(ret, ret_val_meth(indata), msg=msg)
3099 outdata = s.read()
3100 if outdata != indata.lower():
3101 self.fail(
3102 "While sending with <<{name:s}>> bad data "
3103 "<<{outdata:r}>> ({nout:d}) received; "
3104 "expected <<{indata:r}>> ({nin:d})\n".format(
3105 name=meth_name, outdata=outdata[:20],
3106 nout=len(outdata),
3107 indata=indata[:20], nin=len(indata)
3108 )
3109 )
3110 except ValueError as e:
3111 if expect_success:
3112 self.fail(
3113 "Failed to send with method <<{name:s}>>; "
3114 "expected to succeed.\n".format(name=meth_name)
3115 )
3116 if not str(e).startswith(meth_name):
3117 self.fail(
3118 "Method <<{name:s}>> failed with unexpected "
3119 "exception message: {exp:s}\n".format(
3120 name=meth_name, exp=e
3121 )
3122 )
3123
3124 for meth_name, recv_meth, expect_success, args in recv_methods:
3125 indata = (data_prefix + meth_name).encode('ascii')
3126 try:
3127 s.send(indata)
3128 outdata = recv_meth(*args)
3129 if outdata != indata.lower():
3130 self.fail(
3131 "While receiving with <<{name:s}>> bad data "
3132 "<<{outdata:r}>> ({nout:d}) received; "
3133 "expected <<{indata:r}>> ({nin:d})\n".format(
3134 name=meth_name, outdata=outdata[:20],
3135 nout=len(outdata),
3136 indata=indata[:20], nin=len(indata)
3137 )
3138 )
3139 except ValueError as e:
3140 if expect_success:
3141 self.fail(
3142 "Failed to receive with method <<{name:s}>>; "
3143 "expected to succeed.\n".format(name=meth_name)
3144 )
3145 if not str(e).startswith(meth_name):
3146 self.fail(
3147 "Method <<{name:s}>> failed with unexpected "
3148 "exception message: {exp:s}\n".format(
3149 name=meth_name, exp=e
3150 )
3151 )
3152 # consume data
3153 s.read()
3154
3155 # read(-1, buffer) is supported, even though read(-1) is not
3156 data = b"data"
3157 s.send(data)
3158 buffer = bytearray(len(data))
3159 self.assertEqual(s.read(-1, buffer), len(data))
3160 self.assertEqual(buffer, data)
3161
Christian Heimes888bbdc2017-09-07 14:18:21 -07003162 # sendall accepts bytes-like objects
3163 if ctypes is not None:
3164 ubyte = ctypes.c_ubyte * len(data)
3165 byteslike = ubyte.from_buffer_copy(data)
3166 s.sendall(byteslike)
3167 self.assertEqual(s.read(), data)
3168
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003169 # Make sure sendmsg et al are disallowed to avoid
3170 # inadvertent disclosure of data and/or corruption
3171 # of the encrypted data stream
3172 self.assertRaises(NotImplementedError, s.sendmsg, [b"data"])
3173 self.assertRaises(NotImplementedError, s.recvmsg, 100)
3174 self.assertRaises(NotImplementedError,
3175 s.recvmsg_into, bytearray(100))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003176 s.write(b"over\n")
3177
3178 self.assertRaises(ValueError, s.recv, -1)
3179 self.assertRaises(ValueError, s.read, -1)
3180
3181 s.close()
3182
3183 def test_recv_zero(self):
3184 server = ThreadedEchoServer(CERTFILE)
3185 server.__enter__()
3186 self.addCleanup(server.__exit__, None, None)
3187 s = socket.create_connection((HOST, server.port))
3188 self.addCleanup(s.close)
3189 s = test_wrap_socket(s, suppress_ragged_eofs=False)
3190 self.addCleanup(s.close)
3191
3192 # recv/read(0) should return no data
3193 s.send(b"data")
3194 self.assertEqual(s.recv(0), b"")
3195 self.assertEqual(s.read(0), b"")
3196 self.assertEqual(s.read(), b"data")
3197
3198 # Should not block if the other end sends no data
3199 s.setblocking(False)
3200 self.assertEqual(s.recv(0), b"")
3201 self.assertEqual(s.recv_into(bytearray()), 0)
3202
3203 def test_nonblocking_send(self):
3204 server = ThreadedEchoServer(CERTFILE,
3205 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003206 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003207 cacerts=CERTFILE,
3208 chatty=True,
3209 connectionchatty=False)
3210 with server:
3211 s = test_wrap_socket(socket.socket(),
3212 server_side=False,
3213 certfile=CERTFILE,
3214 ca_certs=CERTFILE,
3215 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003216 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003217 s.connect((HOST, server.port))
3218 s.setblocking(False)
3219
3220 # If we keep sending data, at some point the buffers
3221 # will be full and the call will block
3222 buf = bytearray(8192)
3223 def fill_buffer():
3224 while True:
3225 s.send(buf)
3226 self.assertRaises((ssl.SSLWantWriteError,
3227 ssl.SSLWantReadError), fill_buffer)
3228
3229 # Now read all the output and discard it
3230 s.setblocking(True)
3231 s.close()
3232
3233 def test_handshake_timeout(self):
3234 # Issue #5103: SSL handshake must respect the socket timeout
3235 server = socket.socket(socket.AF_INET)
3236 host = "127.0.0.1"
3237 port = support.bind_port(server)
3238 started = threading.Event()
3239 finish = False
3240
3241 def serve():
3242 server.listen()
3243 started.set()
3244 conns = []
3245 while not finish:
3246 r, w, e = select.select([server], [], [], 0.1)
3247 if server in r:
3248 # Let the socket hang around rather than having
3249 # it closed by garbage collection.
3250 conns.append(server.accept()[0])
3251 for sock in conns:
3252 sock.close()
3253
3254 t = threading.Thread(target=serve)
3255 t.start()
3256 started.wait()
3257
3258 try:
3259 try:
3260 c = socket.socket(socket.AF_INET)
3261 c.settimeout(0.2)
3262 c.connect((host, port))
3263 # Will attempt handshake and time out
3264 self.assertRaisesRegex(socket.timeout, "timed out",
3265 test_wrap_socket, c)
3266 finally:
3267 c.close()
3268 try:
3269 c = socket.socket(socket.AF_INET)
3270 c = test_wrap_socket(c)
3271 c.settimeout(0.2)
3272 # Will attempt handshake and time out
3273 self.assertRaisesRegex(socket.timeout, "timed out",
3274 c.connect, (host, port))
3275 finally:
3276 c.close()
3277 finally:
3278 finish = True
3279 t.join()
3280 server.close()
3281
3282 def test_server_accept(self):
3283 # Issue #16357: accept() on a SSLSocket created through
3284 # SSLContext.wrap_socket().
Christian Heimesa170fa12017-09-15 20:27:30 +02003285 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003286 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003287 context.load_verify_locations(SIGNING_CA)
3288 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003289 server = socket.socket(socket.AF_INET)
3290 host = "127.0.0.1"
3291 port = support.bind_port(server)
3292 server = context.wrap_socket(server, server_side=True)
3293 self.assertTrue(server.server_side)
3294
3295 evt = threading.Event()
3296 remote = None
3297 peer = None
3298 def serve():
3299 nonlocal remote, peer
3300 server.listen()
3301 # Block on the accept and wait on the connection to close.
3302 evt.set()
3303 remote, peer = server.accept()
3304 remote.recv(1)
3305
3306 t = threading.Thread(target=serve)
3307 t.start()
3308 # Client wait until server setup and perform a connect.
3309 evt.wait()
3310 client = context.wrap_socket(socket.socket())
3311 client.connect((host, port))
3312 client_addr = client.getsockname()
3313 client.close()
3314 t.join()
3315 remote.close()
3316 server.close()
3317 # Sanity checks.
3318 self.assertIsInstance(remote, ssl.SSLSocket)
3319 self.assertEqual(peer, client_addr)
3320
3321 def test_getpeercert_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003322 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003323 with context.wrap_socket(socket.socket()) as sock:
3324 with self.assertRaises(OSError) as cm:
3325 sock.getpeercert()
3326 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3327
3328 def test_do_handshake_enotconn(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003329 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003330 with context.wrap_socket(socket.socket()) as sock:
3331 with self.assertRaises(OSError) as cm:
3332 sock.do_handshake()
3333 self.assertEqual(cm.exception.errno, errno.ENOTCONN)
3334
3335 def test_default_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003336 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003337 try:
3338 # Force a set of weak ciphers on our client context
3339 context.set_ciphers("DES")
3340 except ssl.SSLError:
3341 self.skipTest("no DES cipher available")
3342 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003343 ssl_version=ssl.PROTOCOL_TLS,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003344 chatty=False) as server:
3345 with context.wrap_socket(socket.socket()) as s:
3346 with self.assertRaises(OSError):
3347 s.connect((HOST, server.port))
3348 self.assertIn("no shared cipher", server.conn_errors[0])
3349
3350 def test_version_basic(self):
3351 """
3352 Basic tests for SSLSocket.version().
3353 More tests are done in the test_protocol_*() methods.
3354 """
Christian Heimesa170fa12017-09-15 20:27:30 +02003355 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
3356 context.check_hostname = False
3357 context.verify_mode = ssl.CERT_NONE
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003358 with ThreadedEchoServer(CERTFILE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003359 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003360 chatty=False) as server:
3361 with context.wrap_socket(socket.socket()) as s:
3362 self.assertIs(s.version(), None)
3363 s.connect((HOST, server.port))
Christian Heimesa170fa12017-09-15 20:27:30 +02003364 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3365 self.assertEqual(s.version(), 'TLSv1.2')
3366 else: # 0.9.8 to 1.0.1
3367 self.assertIn(s.version(), ('TLSv1', 'TLSv1.2'))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003368 self.assertIs(s.version(), None)
3369
Christian Heimescb5b68a2017-09-07 18:07:00 -07003370 @unittest.skipUnless(ssl.HAS_TLSv1_3,
3371 "test requires TLSv1.3 enabled OpenSSL")
3372 def test_tls1_3(self):
3373 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
3374 context.load_cert_chain(CERTFILE)
3375 # disable all but TLS 1.3
3376 context.options |= (
3377 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
3378 )
3379 with ThreadedEchoServer(context=context) as server:
3380 with context.wrap_socket(socket.socket()) as s:
3381 s.connect((HOST, server.port))
3382 self.assertIn(s.cipher()[0], [
3383 'TLS13-AES-256-GCM-SHA384',
3384 'TLS13-CHACHA20-POLY1305-SHA256',
3385 'TLS13-AES-128-GCM-SHA256',
3386 ])
3387
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003388 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
3389 def test_default_ecdh_curve(self):
3390 # Issue #21015: elliptic curve-based Diffie Hellman key exchange
3391 # should be enabled by default on SSL contexts.
Christian Heimesa170fa12017-09-15 20:27:30 +02003392 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003393 context.load_cert_chain(CERTFILE)
Christian Heimescb5b68a2017-09-07 18:07:00 -07003394 # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
3395 # cipher name.
3396 context.options |= ssl.OP_NO_TLSv1_3
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003397 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
3398 # explicitly using the 'ECCdraft' cipher alias. Otherwise,
3399 # our default cipher list should prefer ECDH-based ciphers
3400 # automatically.
3401 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
3402 context.set_ciphers("ECCdraft:ECDH")
3403 with ThreadedEchoServer(context=context) as server:
3404 with context.wrap_socket(socket.socket()) as s:
3405 s.connect((HOST, server.port))
3406 self.assertIn("ECDH", s.cipher()[0])
3407
3408 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
3409 "'tls-unique' channel binding not available")
3410 def test_tls_unique_channel_binding(self):
3411 """Test tls-unique channel binding."""
3412 if support.verbose:
3413 sys.stdout.write("\n")
3414
3415 server = ThreadedEchoServer(CERTFILE,
3416 certreqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003417 ssl_version=ssl.PROTOCOL_TLS_SERVER,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003418 cacerts=CERTFILE,
3419 chatty=True,
3420 connectionchatty=False)
3421 with server:
3422 s = test_wrap_socket(socket.socket(),
3423 server_side=False,
3424 certfile=CERTFILE,
3425 ca_certs=CERTFILE,
3426 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003427 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003428 s.connect((HOST, server.port))
3429 # get the data
3430 cb_data = s.get_channel_binding("tls-unique")
3431 if support.verbose:
3432 sys.stdout.write(" got channel binding data: {0!r}\n"
3433 .format(cb_data))
3434
3435 # check if it is sane
3436 self.assertIsNotNone(cb_data)
3437 self.assertEqual(len(cb_data), 12) # True for TLSv1
3438
3439 # and compare with the peers version
3440 s.write(b"CB tls-unique\n")
3441 peer_data_repr = s.read().strip()
3442 self.assertEqual(peer_data_repr,
3443 repr(cb_data).encode("us-ascii"))
3444 s.close()
3445
3446 # now, again
3447 s = test_wrap_socket(socket.socket(),
3448 server_side=False,
3449 certfile=CERTFILE,
3450 ca_certs=CERTFILE,
3451 cert_reqs=ssl.CERT_NONE,
Christian Heimesa170fa12017-09-15 20:27:30 +02003452 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003453 s.connect((HOST, server.port))
3454 new_cb_data = s.get_channel_binding("tls-unique")
3455 if support.verbose:
3456 sys.stdout.write(" got another channel binding data: {0!r}\n"
3457 .format(new_cb_data))
3458 # is it really unique
3459 self.assertNotEqual(cb_data, new_cb_data)
3460 self.assertIsNotNone(cb_data)
3461 self.assertEqual(len(cb_data), 12) # True for TLSv1
3462 s.write(b"CB tls-unique\n")
3463 peer_data_repr = s.read().strip()
3464 self.assertEqual(peer_data_repr,
3465 repr(new_cb_data).encode("us-ascii"))
3466 s.close()
3467
3468 def test_compression(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003469 client_context, server_context, hostname = testing_context()
3470 stats = server_params_test(client_context, server_context,
3471 chatty=True, connectionchatty=True,
3472 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003473 if support.verbose:
3474 sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
3475 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
3476
3477 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
3478 "ssl.OP_NO_COMPRESSION needed for this test")
3479 def test_compression_disabled(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003480 client_context, server_context, hostname = testing_context()
3481 client_context.options |= ssl.OP_NO_COMPRESSION
3482 server_context.options |= ssl.OP_NO_COMPRESSION
3483 stats = server_params_test(client_context, server_context,
3484 chatty=True, connectionchatty=True,
3485 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003486 self.assertIs(stats['compression'], None)
3487
3488 def test_dh_params(self):
3489 # Check we can get a connection with ephemeral Diffie-Hellman
Christian Heimesa170fa12017-09-15 20:27:30 +02003490 client_context, server_context, hostname = testing_context()
3491 server_context.load_dh_params(DHFILE)
3492 server_context.set_ciphers("kEDH")
3493 stats = server_params_test(client_context, server_context,
3494 chatty=True, connectionchatty=True,
3495 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003496 cipher = stats["cipher"][0]
3497 parts = cipher.split("-")
3498 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
3499 self.fail("Non-DH cipher: " + cipher[0])
3500
3501 def test_selected_alpn_protocol(self):
3502 # selected_alpn_protocol() is None unless ALPN is used.
Christian Heimesa170fa12017-09-15 20:27:30 +02003503 client_context, server_context, hostname = testing_context()
3504 stats = server_params_test(client_context, server_context,
3505 chatty=True, connectionchatty=True,
3506 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003507 self.assertIs(stats['client_alpn_protocol'], None)
3508
3509 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
3510 def test_selected_alpn_protocol_if_server_uses_alpn(self):
3511 # selected_alpn_protocol() is None unless ALPN is used by the client.
Christian Heimesa170fa12017-09-15 20:27:30 +02003512 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003513 server_context.set_alpn_protocols(['foo', 'bar'])
3514 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003515 chatty=True, connectionchatty=True,
3516 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003517 self.assertIs(stats['client_alpn_protocol'], None)
3518
3519 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
3520 def test_alpn_protocols(self):
3521 server_protocols = ['foo', 'bar', 'milkshake']
3522 protocol_tests = [
3523 (['foo', 'bar'], 'foo'),
3524 (['bar', 'foo'], 'foo'),
3525 (['milkshake'], 'milkshake'),
3526 (['http/3.0', 'http/4.0'], None)
3527 ]
3528 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003529 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003530 server_context.set_alpn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003531 client_context.set_alpn_protocols(client_protocols)
3532
3533 try:
3534 stats = server_params_test(client_context,
3535 server_context,
3536 chatty=True,
Christian Heimesa170fa12017-09-15 20:27:30 +02003537 connectionchatty=True,
3538 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003539 except ssl.SSLError as e:
3540 stats = e
3541
3542 if (expected is None and IS_OPENSSL_1_1
3543 and ssl.OPENSSL_VERSION_INFO < (1, 1, 0, 6)):
3544 # OpenSSL 1.1.0 to 1.1.0e raises handshake error
3545 self.assertIsInstance(stats, ssl.SSLError)
3546 else:
3547 msg = "failed trying %s (s) and %s (c).\n" \
3548 "was expecting %s, but got %%s from the %%s" \
3549 % (str(server_protocols), str(client_protocols),
3550 str(expected))
3551 client_result = stats['client_alpn_protocol']
3552 self.assertEqual(client_result, expected,
3553 msg % (client_result, "client"))
3554 server_result = stats['server_alpn_protocols'][-1] \
3555 if len(stats['server_alpn_protocols']) else 'nothing'
3556 self.assertEqual(server_result, expected,
3557 msg % (server_result, "server"))
3558
3559 def test_selected_npn_protocol(self):
3560 # selected_npn_protocol() is None unless NPN is used
Christian Heimesa170fa12017-09-15 20:27:30 +02003561 client_context, server_context, hostname = testing_context()
3562 stats = server_params_test(client_context, server_context,
3563 chatty=True, connectionchatty=True,
3564 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003565 self.assertIs(stats['client_npn_protocol'], None)
3566
3567 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
3568 def test_npn_protocols(self):
3569 server_protocols = ['http/1.1', 'spdy/2']
3570 protocol_tests = [
3571 (['http/1.1', 'spdy/2'], 'http/1.1'),
3572 (['spdy/2', 'http/1.1'], 'http/1.1'),
3573 (['spdy/2', 'test'], 'spdy/2'),
3574 (['abc', 'def'], 'abc')
3575 ]
3576 for client_protocols, expected in protocol_tests:
Christian Heimesa170fa12017-09-15 20:27:30 +02003577 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003578 server_context.set_npn_protocols(server_protocols)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003579 client_context.set_npn_protocols(client_protocols)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003580 stats = server_params_test(client_context, server_context,
Christian Heimesa170fa12017-09-15 20:27:30 +02003581 chatty=True, connectionchatty=True,
3582 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003583 msg = "failed trying %s (s) and %s (c).\n" \
3584 "was expecting %s, but got %%s from the %%s" \
3585 % (str(server_protocols), str(client_protocols),
3586 str(expected))
3587 client_result = stats['client_npn_protocol']
3588 self.assertEqual(client_result, expected, msg % (client_result, "client"))
3589 server_result = stats['server_npn_protocols'][-1] \
3590 if len(stats['server_npn_protocols']) else 'nothing'
3591 self.assertEqual(server_result, expected, msg % (server_result, "server"))
3592
3593 def sni_contexts(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003594 server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003595 server_context.load_cert_chain(SIGNED_CERTFILE)
Christian Heimesa170fa12017-09-15 20:27:30 +02003596 other_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003597 other_context.load_cert_chain(SIGNED_CERTFILE2)
Christian Heimesa170fa12017-09-15 20:27:30 +02003598 client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003599 client_context.load_verify_locations(SIGNING_CA)
3600 return server_context, other_context, client_context
3601
3602 def check_common_name(self, stats, name):
3603 cert = stats['peercert']
3604 self.assertIn((('commonName', name),), cert['subject'])
3605
3606 @needs_sni
3607 def test_sni_callback(self):
3608 calls = []
3609 server_context, other_context, client_context = self.sni_contexts()
3610
Christian Heimesa170fa12017-09-15 20:27:30 +02003611 client_context.check_hostname = False
3612
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003613 def servername_cb(ssl_sock, server_name, initial_context):
3614 calls.append((server_name, initial_context))
3615 if server_name is not None:
3616 ssl_sock.context = other_context
3617 server_context.set_servername_callback(servername_cb)
3618
3619 stats = server_params_test(client_context, server_context,
3620 chatty=True,
3621 sni_name='supermessage')
3622 # The hostname was fetched properly, and the certificate was
3623 # changed for the connection.
3624 self.assertEqual(calls, [("supermessage", server_context)])
3625 # CERTFILE4 was selected
3626 self.check_common_name(stats, 'fakehostname')
3627
3628 calls = []
3629 # The callback is called with server_name=None
3630 stats = server_params_test(client_context, server_context,
3631 chatty=True,
3632 sni_name=None)
3633 self.assertEqual(calls, [(None, server_context)])
Christian Heimesa170fa12017-09-15 20:27:30 +02003634 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003635
3636 # Check disabling the callback
3637 calls = []
3638 server_context.set_servername_callback(None)
3639
3640 stats = server_params_test(client_context, server_context,
3641 chatty=True,
3642 sni_name='notfunny')
3643 # Certificate didn't change
Christian Heimesa170fa12017-09-15 20:27:30 +02003644 self.check_common_name(stats, SIGNED_CERTFILE_HOSTNAME)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003645 self.assertEqual(calls, [])
3646
3647 @needs_sni
3648 def test_sni_callback_alert(self):
3649 # Returning a TLS alert is reflected to the connecting client
3650 server_context, other_context, client_context = self.sni_contexts()
3651
3652 def cb_returning_alert(ssl_sock, server_name, initial_context):
3653 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3654 server_context.set_servername_callback(cb_returning_alert)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003655 with self.assertRaises(ssl.SSLError) as cm:
3656 stats = server_params_test(client_context, server_context,
3657 chatty=False,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003658 sni_name='supermessage')
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003659 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003660
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003661 @needs_sni
3662 def test_sni_callback_raising(self):
3663 # Raising fails the connection with a TLS handshake failure alert.
3664 server_context, other_context, client_context = self.sni_contexts()
3665
3666 def cb_raising(ssl_sock, server_name, initial_context):
3667 1/0
3668 server_context.set_servername_callback(cb_raising)
3669
3670 with self.assertRaises(ssl.SSLError) as cm, \
3671 support.captured_stderr() as stderr:
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003672 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003673 chatty=False,
3674 sni_name='supermessage')
3675 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3676 self.assertIn("ZeroDivisionError", stderr.getvalue())
Antoine Pitrou50b24d02013-04-11 20:48:42 +02003677
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003678 @needs_sni
3679 def test_sni_callback_wrong_return_type(self):
3680 # Returning the wrong return type terminates the TLS connection
3681 # with an internal error alert.
3682 server_context, other_context, client_context = self.sni_contexts()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003683
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003684 def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3685 return "foo"
3686 server_context.set_servername_callback(cb_wrong_return_type)
3687
3688 with self.assertRaises(ssl.SSLError) as cm, \
3689 support.captured_stderr() as stderr:
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003690 stats = server_params_test(client_context, server_context,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003691 chatty=False,
3692 sni_name='supermessage')
3693 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3694 self.assertIn("TypeError", stderr.getvalue())
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003695
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003696 def test_shared_ciphers(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003697 client_context, server_context, hostname = testing_context()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003698 if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
3699 client_context.set_ciphers("AES128:AES256")
3700 server_context.set_ciphers("AES256")
3701 alg1 = "AES256"
3702 alg2 = "AES-256"
3703 else:
3704 client_context.set_ciphers("AES:3DES")
3705 server_context.set_ciphers("3DES")
3706 alg1 = "3DES"
3707 alg2 = "DES-CBC3"
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003708
Christian Heimesa170fa12017-09-15 20:27:30 +02003709 stats = server_params_test(client_context, server_context,
3710 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003711 ciphers = stats['server_shared_ciphers'][0]
3712 self.assertGreater(len(ciphers), 0)
3713 for name, tls_version, bits in ciphers:
3714 if not alg1 in name.split("-") and alg2 not in name:
3715 self.fail(name)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003716
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003717 def test_read_write_after_close_raises_valuerror(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003718 client_context, server_context, hostname = testing_context()
3719 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003720
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003721 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003722 s = client_context.wrap_socket(socket.socket(),
3723 server_hostname=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003724 s.connect((HOST, server.port))
3725 s.close()
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003726
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003727 self.assertRaises(ValueError, s.read, 1024)
3728 self.assertRaises(ValueError, s.write, b'hello')
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003729
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003730 def test_sendfile(self):
3731 TEST_DATA = b"x" * 512
3732 with open(support.TESTFN, 'wb') as f:
3733 f.write(TEST_DATA)
3734 self.addCleanup(support.unlink, support.TESTFN)
Christian Heimesa170fa12017-09-15 20:27:30 +02003735 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003736 context.verify_mode = ssl.CERT_REQUIRED
Christian Heimesbd5c7d22018-01-20 15:16:30 +01003737 context.load_verify_locations(SIGNING_CA)
3738 context.load_cert_chain(SIGNED_CERTFILE)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003739 server = ThreadedEchoServer(context=context, chatty=False)
3740 with server:
3741 with context.wrap_socket(socket.socket()) as s:
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003742 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003743 with open(support.TESTFN, 'rb') as file:
3744 s.sendfile(file)
3745 self.assertEqual(s.recv(1024), TEST_DATA)
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003746
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003747 def test_session(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003748 client_context, server_context, hostname = testing_context()
Antoine Pitrou60a26e02013-07-20 19:35:16 +02003749
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003750 # first connection without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003751 stats = server_params_test(client_context, server_context,
3752 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003753 session = stats['session']
3754 self.assertTrue(session.id)
3755 self.assertGreater(session.time, 0)
3756 self.assertGreater(session.timeout, 0)
3757 self.assertTrue(session.has_ticket)
3758 if ssl.OPENSSL_VERSION_INFO > (1, 0, 1):
3759 self.assertGreater(session.ticket_lifetime_hint, 0)
3760 self.assertFalse(stats['session_reused'])
3761 sess_stat = server_context.session_stats()
3762 self.assertEqual(sess_stat['accept'], 1)
3763 self.assertEqual(sess_stat['hits'], 0)
Giampaolo Rodola'915d1412014-06-11 03:54:30 +02003764
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003765 # reuse session
Christian Heimesa170fa12017-09-15 20:27:30 +02003766 stats = server_params_test(client_context, server_context,
3767 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003768 sess_stat = server_context.session_stats()
3769 self.assertEqual(sess_stat['accept'], 2)
3770 self.assertEqual(sess_stat['hits'], 1)
3771 self.assertTrue(stats['session_reused'])
3772 session2 = stats['session']
3773 self.assertEqual(session2.id, session.id)
3774 self.assertEqual(session2, session)
3775 self.assertIsNot(session2, session)
3776 self.assertGreaterEqual(session2.time, session.time)
3777 self.assertGreaterEqual(session2.timeout, session.timeout)
Christian Heimes99a65702016-09-10 23:44:53 +02003778
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003779 # another one without session
Christian Heimesa170fa12017-09-15 20:27:30 +02003780 stats = server_params_test(client_context, server_context,
3781 sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003782 self.assertFalse(stats['session_reused'])
3783 session3 = stats['session']
3784 self.assertNotEqual(session3.id, session.id)
3785 self.assertNotEqual(session3, session)
3786 sess_stat = server_context.session_stats()
3787 self.assertEqual(sess_stat['accept'], 3)
3788 self.assertEqual(sess_stat['hits'], 1)
Christian Heimes99a65702016-09-10 23:44:53 +02003789
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003790 # reuse session again
Christian Heimesa170fa12017-09-15 20:27:30 +02003791 stats = server_params_test(client_context, server_context,
3792 session=session, sni_name=hostname)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003793 self.assertTrue(stats['session_reused'])
3794 session4 = stats['session']
3795 self.assertEqual(session4.id, session.id)
3796 self.assertEqual(session4, session)
3797 self.assertGreaterEqual(session4.time, session.time)
3798 self.assertGreaterEqual(session4.timeout, session.timeout)
3799 sess_stat = server_context.session_stats()
3800 self.assertEqual(sess_stat['accept'], 4)
3801 self.assertEqual(sess_stat['hits'], 2)
Christian Heimes99a65702016-09-10 23:44:53 +02003802
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003803 def test_session_handling(self):
Christian Heimesa170fa12017-09-15 20:27:30 +02003804 client_context, server_context, hostname = testing_context()
3805 client_context2, _, _ = testing_context()
Christian Heimes99a65702016-09-10 23:44:53 +02003806
Christian Heimescb5b68a2017-09-07 18:07:00 -07003807 # TODO: session reuse does not work with TLS 1.3
Christian Heimesa170fa12017-09-15 20:27:30 +02003808 client_context.options |= ssl.OP_NO_TLSv1_3
3809 client_context2.options |= ssl.OP_NO_TLSv1_3
Christian Heimescb5b68a2017-09-07 18:07:00 -07003810
Christian Heimesa170fa12017-09-15 20:27:30 +02003811 server = ThreadedEchoServer(context=server_context, chatty=False)
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003812 with server:
Christian Heimesa170fa12017-09-15 20:27:30 +02003813 with client_context.wrap_socket(socket.socket(),
3814 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003815 # session is None before handshake
3816 self.assertEqual(s.session, None)
3817 self.assertEqual(s.session_reused, None)
3818 s.connect((HOST, server.port))
3819 session = s.session
3820 self.assertTrue(session)
3821 with self.assertRaises(TypeError) as e:
3822 s.session = object
3823 self.assertEqual(str(e.exception), 'Value is not a SSLSession.')
Christian Heimes99a65702016-09-10 23:44:53 +02003824
Christian Heimesa170fa12017-09-15 20:27:30 +02003825 with client_context.wrap_socket(socket.socket(),
3826 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003827 s.connect((HOST, server.port))
3828 # cannot set session after handshake
3829 with self.assertRaises(ValueError) as e:
3830 s.session = session
3831 self.assertEqual(str(e.exception),
3832 'Cannot set session after handshake.')
Christian Heimes99a65702016-09-10 23:44:53 +02003833
Christian Heimesa170fa12017-09-15 20:27:30 +02003834 with client_context.wrap_socket(socket.socket(),
3835 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003836 # can set session before handshake and before the
3837 # connection was established
3838 s.session = session
3839 s.connect((HOST, server.port))
3840 self.assertEqual(s.session.id, session.id)
3841 self.assertEqual(s.session, session)
3842 self.assertEqual(s.session_reused, True)
Christian Heimes99a65702016-09-10 23:44:53 +02003843
Christian Heimesa170fa12017-09-15 20:27:30 +02003844 with client_context2.wrap_socket(socket.socket(),
3845 server_hostname=hostname) as s:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003846 # cannot re-use session with a different SSLContext
3847 with self.assertRaises(ValueError) as e:
Christian Heimes99a65702016-09-10 23:44:53 +02003848 s.session = session
3849 s.connect((HOST, server.port))
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003850 self.assertEqual(str(e.exception),
3851 'Session refers to a different SSLContext.')
Christian Heimes99a65702016-09-10 23:44:53 +02003852
Antoine Pitrou8abdb8a2011-12-20 10:13:40 +01003853
Thomas Woutersed03b412007-08-28 21:37:11 +00003854def test_main(verbose=False):
Antoine Pitrou15cee622010-08-04 16:45:21 +00003855 if support.verbose:
Berker Peksag9e7990a2015-05-16 23:21:26 +03003856 import warnings
Antoine Pitrou15cee622010-08-04 16:45:21 +00003857 plats = {
3858 'Linux': platform.linux_distribution,
3859 'Mac': platform.mac_ver,
3860 'Windows': platform.win32_ver,
3861 }
Berker Peksag9e7990a2015-05-16 23:21:26 +03003862 with warnings.catch_warnings():
3863 warnings.filterwarnings(
3864 'ignore',
R David Murray44b548d2016-09-08 13:59:53 -04003865 r'dist\(\) and linux_distribution\(\) '
Berker Peksag9e7990a2015-05-16 23:21:26 +03003866 'functions are deprecated .*',
3867 PendingDeprecationWarning,
3868 )
3869 for name, func in plats.items():
3870 plat = func()
3871 if plat and plat[0]:
3872 plat = '%s %r' % (name, plat)
3873 break
3874 else:
3875 plat = repr(platform.platform())
Antoine Pitrou15cee622010-08-04 16:45:21 +00003876 print("test_ssl: testing with %r %r" %
3877 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3878 print(" under %s" % plat)
Antoine Pitroud5323212010-10-22 18:19:07 +00003879 print(" HAS_SNI = %r" % ssl.HAS_SNI)
Antoine Pitrou609ef012013-03-29 18:09:06 +01003880 print(" OP_ALL = 0x%8x" % ssl.OP_ALL)
3881 try:
3882 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3883 except AttributeError:
3884 pass
Antoine Pitrou15cee622010-08-04 16:45:21 +00003885
Antoine Pitrou152efa22010-05-16 18:19:27 +00003886 for filename in [
Martin Panter3840b2a2016-03-27 01:53:46 +00003887 CERTFILE, BYTES_CERTFILE,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003888 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
Antoine Pitrou58ddc9d2013-01-05 21:20:29 +01003889 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
Antoine Pitrou152efa22010-05-16 18:19:27 +00003890 BADCERT, BADKEY, EMPTYCERT]:
3891 if not os.path.exists(filename):
3892 raise support.TestFailed("Can't read certificate file %r" % filename)
Thomas Wouters1b7f8912007-09-19 03:06:30 +00003893
Martin Panter3840b2a2016-03-27 01:53:46 +00003894 tests = [
3895 ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003896 SimpleBackgroundTests, ThreadedTests,
Martin Panter3840b2a2016-03-27 01:53:46 +00003897 ]
Thomas Woutersed03b412007-08-28 21:37:11 +00003898
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003899 if support.is_resource_enabled('network'):
Bill Janssen6e027db2007-11-15 22:23:56 +00003900 tests.append(NetworkedTests)
Thomas Woutersed03b412007-08-28 21:37:11 +00003901
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003902 thread_info = support.threading_setup()
Antoine Pitrou480a1242010-04-28 21:37:09 +00003903 try:
3904 support.run_unittest(*tests)
3905 finally:
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02003906 support.threading_cleanup(*thread_info)
Thomas Woutersed03b412007-08-28 21:37:11 +00003907
3908if __name__ == "__main__":
3909 test_main()