blob: 68289a2eaf61a568b4fc6b2b48e492cc863dc830 [file] [log] [blame]
Jean-Paul Calderone8671c852011-03-02 19:26:20 -05001# Copyright (C) Jean-Paul Calderone
2# See LICENSE for details.
Jean-Paul Calderone8b63d452008-03-21 18:31:12 -04003
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -04004"""
Jonathan Ballet648875f2011-07-16 14:14:58 +09005Unit tests for :py:obj:`OpenSSL.SSL`.
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -04006"""
7
Jean-Paul Calderone4c1aacd2014-01-11 08:15:17 -05008from gc import collect, get_referrers
Konstantinos Koukopoulos541150d2014-01-31 01:00:19 +02009from errno import ECONNREFUSED, EINPROGRESS, EWOULDBLOCK, EPIPE, ESHUTDOWN
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -040010from sys import platform, version_info, getfilesystemencoding
Jean-Paul Calderoned899af02013-03-19 22:10:37 -070011from socket import SHUT_RDWR, error, socket
Jean-Paul Calderonea65cf6c2009-07-19 10:26:52 -040012from os import makedirs
Jean-Paul Calderone1461c492013-10-03 16:05:00 -040013from os.path import join
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -040014from unittest import main
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -040015from weakref import ref
Jean-Paul Calderone460cc1f2009-03-07 11:31:12 -050016
Jean-Paul Calderone4e0c43f2015-04-13 10:15:17 -040017from six import PY3, text_type, u
Jean-Paul Calderonec76c61c2014-01-18 13:21:52 -050018
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -040019from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM
Jean-Paul Calderonea63714c2013-03-05 17:02:26 -080020from OpenSSL.crypto import PKey, X509, X509Extension, X509Store
Jean-Paul Calderone7526d172010-09-09 17:55:31 -040021from OpenSSL.crypto import dump_privatekey, load_privatekey
22from OpenSSL.crypto import dump_certificate, load_certificate
Jean-Paul Calderonec09fd582014-04-18 22:00:10 -040023from OpenSSL.crypto import get_elliptic_curves
Jean-Paul Calderone7526d172010-09-09 17:55:31 -040024
Jean-Paul Calderone9f2e38e2011-04-14 09:36:55 -040025from OpenSSL.SSL import OPENSSL_VERSION_NUMBER, SSLEAY_VERSION, SSLEAY_CFLAGS
26from OpenSSL.SSL import SSLEAY_PLATFORM, SSLEAY_DIR, SSLEAY_BUILT_ON
Jean-Paul Calderonee4f6b472010-07-29 22:50:58 -040027from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
Jean-Paul Calderone1461c492013-10-03 16:05:00 -040028from OpenSSL.SSL import (
29 SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD,
30 TLSv1_1_METHOD, TLSv1_2_METHOD)
31from OpenSSL.SSL import OP_SINGLE_DH_USE, OP_NO_SSLv2, OP_NO_SSLv3
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -040032from OpenSSL.SSL import (
33 VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE, VERIFY_NONE)
Jean-Paul Calderone0222a492011-09-08 18:26:03 -040034
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -040035from OpenSSL.SSL import (
Jean-Paul Calderone313bf012012-02-08 13:02:49 -050036 SESS_CACHE_OFF, SESS_CACHE_CLIENT, SESS_CACHE_SERVER, SESS_CACHE_BOTH,
37 SESS_CACHE_NO_AUTO_CLEAR, SESS_CACHE_NO_INTERNAL_LOOKUP,
38 SESS_CACHE_NO_INTERNAL_STORE, SESS_CACHE_NO_INTERNAL)
39
40from OpenSSL.SSL import (
Jean-Paul Calderoned899af02013-03-19 22:10:37 -070041 Error, SysCallError, WantReadError, WantWriteError, ZeroReturnError)
Jean-Paul Calderonee0fcf512012-02-13 09:10:15 -050042from OpenSSL.SSL import (
Jean-Paul Calderoned899af02013-03-19 22:10:37 -070043 Context, ContextType, Session, Connection, ConnectionType, SSLeay_version)
Jean-Paul Calderone7526d172010-09-09 17:55:31 -040044
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -040045from OpenSSL.test.util import NON_ASCII, TestCase, b
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -040046from OpenSSL.test.test_crypto import (
47 cleartextCertificatePEM, cleartextPrivateKeyPEM)
48from OpenSSL.test.test_crypto import (
49 client_cert_pem, client_key_pem, server_cert_pem, server_key_pem,
50 root_cert_pem)
51
Jean-Paul Calderone4bccf5e2008-12-28 22:50:42 -050052try:
53 from OpenSSL.SSL import OP_NO_QUERY_MTU
54except ImportError:
55 OP_NO_QUERY_MTU = None
56try:
57 from OpenSSL.SSL import OP_COOKIE_EXCHANGE
58except ImportError:
59 OP_COOKIE_EXCHANGE = None
60try:
61 from OpenSSL.SSL import OP_NO_TICKET
62except ImportError:
63 OP_NO_TICKET = None
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040064
Jean-Paul Calderone0222a492011-09-08 18:26:03 -040065try:
Jean-Paul Calderonec62d4c12011-09-08 18:29:32 -040066 from OpenSSL.SSL import OP_NO_COMPRESSION
67except ImportError:
68 OP_NO_COMPRESSION = None
69
70try:
Jean-Paul Calderone0222a492011-09-08 18:26:03 -040071 from OpenSSL.SSL import MODE_RELEASE_BUFFERS
72except ImportError:
73 MODE_RELEASE_BUFFERS = None
74
Jean-Paul Calderone1461c492013-10-03 16:05:00 -040075try:
76 from OpenSSL.SSL import OP_NO_TLSv1, OP_NO_TLSv1_1, OP_NO_TLSv1_2
77except ImportError:
78 OP_NO_TLSv1 = OP_NO_TLSv1_1 = OP_NO_TLSv1_2 = None
79
Jean-Paul Calderone31e85a82011-03-21 19:13:35 -040080from OpenSSL.SSL import (
81 SSL_ST_CONNECT, SSL_ST_ACCEPT, SSL_ST_MASK, SSL_ST_INIT, SSL_ST_BEFORE,
82 SSL_ST_OK, SSL_ST_RENEGOTIATE,
83 SSL_CB_LOOP, SSL_CB_EXIT, SSL_CB_READ, SSL_CB_WRITE, SSL_CB_ALERT,
84 SSL_CB_READ_ALERT, SSL_CB_WRITE_ALERT, SSL_CB_ACCEPT_LOOP,
85 SSL_CB_ACCEPT_EXIT, SSL_CB_CONNECT_LOOP, SSL_CB_CONNECT_EXIT,
86 SSL_CB_HANDSHAKE_START, SSL_CB_HANDSHAKE_DONE)
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040087
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -040088# openssl dhparam 128 -out dh-128.pem (note that 128 is a small number of bits
89# to use)
90dhparam = """\
91-----BEGIN DH PARAMETERS-----
92MBYCEQCobsg29c9WZP/54oAPcwiDAgEC
93-----END DH PARAMETERS-----
94"""
95
96
Jean-Paul Calderone05826732015-04-12 11:38:49 -040097def join_bytes_or_unicode(prefix, suffix):
98 """
99 Join two path components of either ``bytes`` or ``unicode``.
100
101 The return type is the same as the type of ``prefix``.
102 """
103 # If the types are the same, nothing special is necessary.
104 if type(prefix) == type(suffix):
105 return join(prefix, suffix)
106
107 # Otherwise, coerce suffix to the type of prefix.
108 if isinstance(prefix, text_type):
109 return join(prefix, suffix.decode(getfilesystemencoding()))
110 else:
111 return join(prefix, suffix.encode(getfilesystemencoding()))
112
113
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400114def verify_cb(conn, cert, errnum, depth, ok):
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400115 return ok
116
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -0400117
Rick Deanb1ccd562009-07-09 23:52:39 -0500118def socket_pair():
Jean-Paul Calderone1a9613b2009-07-16 12:13:36 -0400119 """
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400120 Establish and return a pair of network sockets connected to each other.
Jean-Paul Calderone1a9613b2009-07-16 12:13:36 -0400121 """
122 # Connect a pair of sockets
Rick Deanb1ccd562009-07-09 23:52:39 -0500123 port = socket()
124 port.bind(('', 0))
125 port.listen(1)
126 client = socket()
127 client.setblocking(False)
Jean-Paul Calderonef23c5d92009-07-23 17:58:15 -0400128 client.connect_ex(("127.0.0.1", port.getsockname()[1]))
Jean-Paul Calderone94b24a82009-07-16 19:11:38 -0400129 client.setblocking(True)
Rick Deanb1ccd562009-07-09 23:52:39 -0500130 server = port.accept()[0]
Rick Deanb1ccd562009-07-09 23:52:39 -0500131
Jean-Paul Calderone1a9613b2009-07-16 12:13:36 -0400132 # Let's pass some unencrypted data to make sure our socket connection is
133 # fine. Just one byte, so we don't have to worry about buffers getting
134 # filled up or fragmentation.
Jean-Paul Calderone9e4eeae2010-08-22 21:32:52 -0400135 server.send(b("x"))
136 assert client.recv(1024) == b("x")
137 client.send(b("y"))
138 assert server.recv(1024) == b("y")
Rick Deanb1ccd562009-07-09 23:52:39 -0500139
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -0400140 # Most of our callers want non-blocking sockets, make it easy for them.
Jean-Paul Calderone94b24a82009-07-16 19:11:38 -0400141 server.setblocking(False)
142 client.setblocking(False)
143
Rick Deanb1ccd562009-07-09 23:52:39 -0500144 return (server, client)
145
146
Jean-Paul Calderone94b24a82009-07-16 19:11:38 -0400147
Jean-Paul Calderonef8742032010-09-25 00:00:32 -0400148def handshake(client, server):
149 conns = [client, server]
150 while conns:
151 for conn in conns:
152 try:
153 conn.do_handshake()
154 except WantReadError:
155 pass
156 else:
157 conns.remove(conn)
158
159
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -0400160def _create_certificate_chain():
161 """
162 Construct and return a chain of certificates.
163
164 1. A new self-signed certificate authority certificate (cacert)
165 2. A new intermediate certificate signed by cacert (icert)
166 3. A new server certificate signed by icert (scert)
167 """
168 caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
169
170 # Step 1
171 cakey = PKey()
172 cakey.generate_key(TYPE_RSA, 512)
173 cacert = X509()
174 cacert.get_subject().commonName = "Authority Certificate"
175 cacert.set_issuer(cacert.get_subject())
176 cacert.set_pubkey(cakey)
177 cacert.set_notBefore(b("20000101000000Z"))
178 cacert.set_notAfter(b("20200101000000Z"))
179 cacert.add_extensions([caext])
180 cacert.set_serial_number(0)
181 cacert.sign(cakey, "sha1")
182
183 # Step 2
184 ikey = PKey()
185 ikey.generate_key(TYPE_RSA, 512)
186 icert = X509()
187 icert.get_subject().commonName = "Intermediate Certificate"
188 icert.set_issuer(cacert.get_subject())
189 icert.set_pubkey(ikey)
190 icert.set_notBefore(b("20000101000000Z"))
191 icert.set_notAfter(b("20200101000000Z"))
192 icert.add_extensions([caext])
193 icert.set_serial_number(0)
194 icert.sign(cakey, "sha1")
195
196 # Step 3
197 skey = PKey()
198 skey.generate_key(TYPE_RSA, 512)
199 scert = X509()
200 scert.get_subject().commonName = "Server Certificate"
201 scert.set_issuer(icert.get_subject())
202 scert.set_pubkey(skey)
203 scert.set_notBefore(b("20000101000000Z"))
204 scert.set_notAfter(b("20200101000000Z"))
205 scert.add_extensions([
206 X509Extension(b('basicConstraints'), True, b('CA:false'))])
207 scert.set_serial_number(0)
208 scert.sign(ikey, "sha1")
209
210 return [(cakey, cacert), (ikey, icert), (skey, scert)]
211
212
213
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400214class _LoopbackMixin:
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -0400215 """
216 Helper mixin which defines methods for creating a connected socket pair and
217 for forcing two connected SSL sockets to talk to each other via memory BIOs.
218 """
Jean-Paul Calderonefef5c4b2012-02-14 16:31:52 -0500219 def _loopbackClientFactory(self, socket):
220 client = Connection(Context(TLSv1_METHOD), socket)
221 client.set_connect_state()
222 return client
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400223
Jean-Paul Calderonefef5c4b2012-02-14 16:31:52 -0500224
225 def _loopbackServerFactory(self, socket):
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400226 ctx = Context(TLSv1_METHOD)
227 ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
228 ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
Jean-Paul Calderonefef5c4b2012-02-14 16:31:52 -0500229 server = Connection(ctx, socket)
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400230 server.set_accept_state()
Jean-Paul Calderonefef5c4b2012-02-14 16:31:52 -0500231 return server
232
233
234 def _loopback(self, serverFactory=None, clientFactory=None):
235 if serverFactory is None:
236 serverFactory = self._loopbackServerFactory
237 if clientFactory is None:
238 clientFactory = self._loopbackClientFactory
239
240 (server, client) = socket_pair()
241 server = serverFactory(server)
242 client = clientFactory(client)
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400243
Jean-Paul Calderonef8742032010-09-25 00:00:32 -0400244 handshake(client, server)
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400245
246 server.setblocking(True)
247 client.setblocking(True)
248 return server, client
249
250
251 def _interactInMemory(self, client_conn, server_conn):
252 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900253 Try to read application bytes from each of the two :py:obj:`Connection`
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400254 objects. Copy bytes back and forth between their send/receive buffers
255 for as long as there is anything to copy. When there is nothing more
Jonathan Ballet648875f2011-07-16 14:14:58 +0900256 to copy, return :py:obj:`None`. If one of them actually manages to deliver
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400257 some application bytes, return a two-tuple of the connection from which
258 the bytes were read and the bytes themselves.
259 """
260 wrote = True
261 while wrote:
262 # Loop until neither side has anything to say
263 wrote = False
264
265 # Copy stuff from each side's send buffer to the other side's
266 # receive buffer.
267 for (read, write) in [(client_conn, server_conn),
268 (server_conn, client_conn)]:
269
270 # Give the side a chance to generate some more bytes, or
271 # succeed.
272 try:
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -0400273 data = read.recv(2 ** 16)
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400274 except WantReadError:
275 # It didn't succeed, so we'll hope it generated some
276 # output.
277 pass
278 else:
279 # It did succeed, so we'll stop now and let the caller deal
280 # with it.
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -0400281 return (read, data)
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400282
283 while True:
284 # Keep copying as long as there's more stuff there.
285 try:
286 dirty = read.bio_read(4096)
287 except WantReadError:
288 # Okay, nothing more waiting to be sent. Stop
289 # processing this send buffer.
290 break
291 else:
292 # Keep track of the fact that someone generated some
293 # output.
294 wrote = True
295 write.bio_write(dirty)
296
297
Jean-Paul Calderone6a8cd112014-04-02 21:09:08 -0400298 def _handshakeInMemory(self, client_conn, server_conn):
Jean-Paul Calderoneb2b40782014-05-06 08:47:40 -0400299 """
300 Perform the TLS handshake between two :py:class:`Connection` instances
301 connected to each other via memory BIOs.
302 """
Jean-Paul Calderone6a8cd112014-04-02 21:09:08 -0400303 client_conn.set_connect_state()
304 server_conn.set_accept_state()
305
306 for conn in [client_conn, server_conn]:
307 try:
308 conn.do_handshake()
309 except WantReadError:
310 pass
311
312 self._interactInMemory(client_conn, server_conn)
313
314
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -0400315
Jean-Paul Calderone9f2e38e2011-04-14 09:36:55 -0400316class VersionTests(TestCase):
317 """
318 Tests for version information exposed by
Jonathan Ballet648875f2011-07-16 14:14:58 +0900319 :py:obj:`OpenSSL.SSL.SSLeay_version` and
320 :py:obj:`OpenSSL.SSL.OPENSSL_VERSION_NUMBER`.
Jean-Paul Calderone9f2e38e2011-04-14 09:36:55 -0400321 """
322 def test_OPENSSL_VERSION_NUMBER(self):
323 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900324 :py:obj:`OPENSSL_VERSION_NUMBER` is an integer with status in the low
Jean-Paul Calderone9f2e38e2011-04-14 09:36:55 -0400325 byte and the patch, fix, minor, and major versions in the
326 nibbles above that.
327 """
328 self.assertTrue(isinstance(OPENSSL_VERSION_NUMBER, int))
329
330
331 def test_SSLeay_version(self):
332 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900333 :py:obj:`SSLeay_version` takes a version type indicator and returns
Jean-Paul Calderone9f2e38e2011-04-14 09:36:55 -0400334 one of a number of version strings based on that indicator.
335 """
336 versions = {}
337 for t in [SSLEAY_VERSION, SSLEAY_CFLAGS, SSLEAY_BUILT_ON,
338 SSLEAY_PLATFORM, SSLEAY_DIR]:
339 version = SSLeay_version(t)
340 versions[version] = t
341 self.assertTrue(isinstance(version, bytes))
342 self.assertEqual(len(versions), 5)
343
344
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400345
346class ContextTests(TestCase, _LoopbackMixin):
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -0400347 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900348 Unit tests for :py:obj:`OpenSSL.SSL.Context`.
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -0400349 """
350 def test_method(self):
351 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900352 :py:obj:`Context` can be instantiated with one of :py:obj:`SSLv2_METHOD`,
Jean-Paul Calderone1461c492013-10-03 16:05:00 -0400353 :py:obj:`SSLv3_METHOD`, :py:obj:`SSLv23_METHOD`, :py:obj:`TLSv1_METHOD`,
354 :py:obj:`TLSv1_1_METHOD`, or :py:obj:`TLSv1_2_METHOD`.
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -0400355 """
Jean-Paul Calderone1461c492013-10-03 16:05:00 -0400356 methods = [
357 SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]
358 for meth in methods:
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -0400359 Context(meth)
Jean-Paul Calderone9f2e38e2011-04-14 09:36:55 -0400360
Jean-Paul Calderone1461c492013-10-03 16:05:00 -0400361
362 maybe = [SSLv2_METHOD, TLSv1_1_METHOD, TLSv1_2_METHOD]
363 for meth in maybe:
364 try:
365 Context(meth)
366 except (Error, ValueError):
367 # Some versions of OpenSSL have SSLv2 / TLSv1.1 / TLSv1.2, some
368 # don't. Difficult to say in advance.
369 pass
Jean-Paul Calderone9f2e38e2011-04-14 09:36:55 -0400370
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -0400371 self.assertRaises(TypeError, Context, "")
372 self.assertRaises(ValueError, Context, 10)
373
374
Jean-Paul Calderonef73a3cb2014-02-09 08:49:06 -0500375 if not PY3:
376 def test_method_long(self):
377 """
378 On Python 2 :py:class:`Context` accepts values of type
379 :py:obj:`long` as well as :py:obj:`int`.
380 """
381 Context(long(TLSv1_METHOD))
382
383
384
Rick Deane15b1472009-07-09 15:53:42 -0500385 def test_type(self):
386 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900387 :py:obj:`Context` and :py:obj:`ContextType` refer to the same type object and can be
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400388 used to create instances of that type.
Rick Deane15b1472009-07-09 15:53:42 -0500389 """
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400390 self.assertIdentical(Context, ContextType)
391 self.assertConsistentType(Context, 'Context', TLSv1_METHOD)
Rick Deane15b1472009-07-09 15:53:42 -0500392
393
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -0400394 def test_use_privatekey(self):
395 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900396 :py:obj:`Context.use_privatekey` takes an :py:obj:`OpenSSL.crypto.PKey` instance.
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -0400397 """
398 key = PKey()
399 key.generate_key(TYPE_RSA, 128)
400 ctx = Context(TLSv1_METHOD)
401 ctx.use_privatekey(key)
402 self.assertRaises(TypeError, ctx.use_privatekey, "")
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -0400403
404
Jean-Paul Calderone173cff92013-03-06 10:29:21 -0800405 def test_use_privatekey_file_missing(self):
406 """
407 :py:obj:`Context.use_privatekey_file` raises :py:obj:`OpenSSL.SSL.Error`
408 when passed the name of a file which does not exist.
409 """
410 ctx = Context(TLSv1_METHOD)
411 self.assertRaises(Error, ctx.use_privatekey_file, self.mktemp())
412
413
Jean-Paul Calderone69a4e5b2015-04-12 10:04:28 -0400414 def _use_privatekey_file_test(self, pemfile, filetype):
415 """
416 Verify that calling ``Context.use_privatekey_file`` with the given
417 arguments does not raise an exception.
418 """
419 key = PKey()
420 key.generate_key(TYPE_RSA, 128)
421
422 with open(pemfile, "wt") as pem:
423 pem.write(
424 dump_privatekey(FILETYPE_PEM, key).decode("ascii")
425 )
426
427 ctx = Context(TLSv1_METHOD)
428 ctx.use_privatekey_file(pemfile, filetype)
429
430
431 def test_use_privatekey_file_bytes(self):
432 """
433 A private key can be specified from a file by passing a ``bytes``
434 instance giving the file name to ``Context.use_privatekey_file``.
435 """
436 self._use_privatekey_file_test(
437 self.mktemp() + NON_ASCII.encode(getfilesystemencoding()),
438 FILETYPE_PEM,
439 )
440
441
442 def test_use_privatekey_file_unicode(self):
443 """
444 A private key can be specified from a file by passing a ``unicode``
445 instance giving the file name to ``Context.use_privatekey_file``.
446 """
447 self._use_privatekey_file_test(
448 self.mktemp().decode(getfilesystemencoding()) + NON_ASCII,
449 FILETYPE_PEM,
450 )
451
452
Jean-Paul Calderonef73a3cb2014-02-09 08:49:06 -0500453 if not PY3:
454 def test_use_privatekey_file_long(self):
455 """
456 On Python 2 :py:obj:`Context.use_privatekey_file` accepts a
457 filetype of type :py:obj:`long` as well as :py:obj:`int`.
458 """
Jean-Paul Calderone69a4e5b2015-04-12 10:04:28 -0400459 self._use_privatekey_file_test(self.mktemp(), long(FILETYPE_PEM))
Jean-Paul Calderonef73a3cb2014-02-09 08:49:06 -0500460
461
Jean-Paul Calderone173cff92013-03-06 10:29:21 -0800462 def test_use_certificate_wrong_args(self):
463 """
464 :py:obj:`Context.use_certificate_wrong_args` raises :py:obj:`TypeError`
465 when not passed exactly one :py:obj:`OpenSSL.crypto.X509` instance as an
466 argument.
467 """
468 ctx = Context(TLSv1_METHOD)
469 self.assertRaises(TypeError, ctx.use_certificate)
470 self.assertRaises(TypeError, ctx.use_certificate, "hello, world")
471 self.assertRaises(TypeError, ctx.use_certificate, X509(), "hello, world")
472
473
474 def test_use_certificate_uninitialized(self):
475 """
476 :py:obj:`Context.use_certificate` raises :py:obj:`OpenSSL.SSL.Error`
477 when passed a :py:obj:`OpenSSL.crypto.X509` instance which has not been
478 initialized (ie, which does not actually have any certificate data).
479 """
480 ctx = Context(TLSv1_METHOD)
481 self.assertRaises(Error, ctx.use_certificate, X509())
482
483
484 def test_use_certificate(self):
485 """
486 :py:obj:`Context.use_certificate` sets the certificate which will be
487 used to identify connections created using the context.
488 """
489 # TODO
490 # Hard to assert anything. But we could set a privatekey then ask
491 # OpenSSL if the cert and key agree using check_privatekey. Then as
492 # long as check_privatekey works right we're good...
493 ctx = Context(TLSv1_METHOD)
494 ctx.use_certificate(load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
495
496
497 def test_use_certificate_file_wrong_args(self):
498 """
499 :py:obj:`Context.use_certificate_file` raises :py:obj:`TypeError` if
500 called with zero arguments or more than two arguments, or if the first
501 argument is not a byte string or the second argumnent is not an integer.
502 """
503 ctx = Context(TLSv1_METHOD)
504 self.assertRaises(TypeError, ctx.use_certificate_file)
505 self.assertRaises(TypeError, ctx.use_certificate_file, b"somefile", object())
506 self.assertRaises(
507 TypeError, ctx.use_certificate_file, b"somefile", FILETYPE_PEM, object())
508 self.assertRaises(
509 TypeError, ctx.use_certificate_file, object(), FILETYPE_PEM)
510 self.assertRaises(
511 TypeError, ctx.use_certificate_file, b"somefile", object())
512
513
514 def test_use_certificate_file_missing(self):
515 """
516 :py:obj:`Context.use_certificate_file` raises
517 `:py:obj:`OpenSSL.SSL.Error` if passed the name of a file which does not
518 exist.
519 """
520 ctx = Context(TLSv1_METHOD)
521 self.assertRaises(Error, ctx.use_certificate_file, self.mktemp())
522
523
Jean-Paul Calderoned57a7b62015-04-12 09:57:36 -0400524 def _use_certificate_file_test(self, certificate_file):
Jean-Paul Calderone173cff92013-03-06 10:29:21 -0800525 """
Jean-Paul Calderoned57a7b62015-04-12 09:57:36 -0400526 Verify that calling ``Context.use_certificate_file`` with the given
527 filename doesn't raise an exception.
Jean-Paul Calderone173cff92013-03-06 10:29:21 -0800528 """
529 # TODO
530 # Hard to assert anything. But we could set a privatekey then ask
531 # OpenSSL if the cert and key agree using check_privatekey. Then as
532 # long as check_privatekey works right we're good...
Jean-Paul Calderoned57a7b62015-04-12 09:57:36 -0400533 with open(certificate_file, "wb") as pem_file:
Jean-Paul Calderone173cff92013-03-06 10:29:21 -0800534 pem_file.write(cleartextCertificatePEM)
535
536 ctx = Context(TLSv1_METHOD)
Jean-Paul Calderoned57a7b62015-04-12 09:57:36 -0400537 ctx.use_certificate_file(certificate_file)
538
539
540 def test_use_certificate_file_bytes(self):
541 """
542 :py:obj:`Context.use_certificate_file` sets the certificate (given as a
543 ``bytes`` filename) which will be used to identify connections created
544 using the context.
545 """
546 filename = self.mktemp() + NON_ASCII.encode(getfilesystemencoding())
547 self._use_certificate_file_test(filename)
548
549
550 def test_use_certificate_file_unicode(self):
551 """
552 :py:obj:`Context.use_certificate_file` sets the certificate (given as a
553 ``bytes`` filename) which will be used to identify connections created
554 using the context.
555 """
556 filename = self.mktemp().decode(getfilesystemencoding()) + NON_ASCII
557 self._use_certificate_file_test(filename)
Jean-Paul Calderone173cff92013-03-06 10:29:21 -0800558
559
Jean-Paul Calderonef73a3cb2014-02-09 08:49:06 -0500560 if not PY3:
561 def test_use_certificate_file_long(self):
562 """
563 On Python 2 :py:obj:`Context.use_certificate_file` accepts a
564 filetype of type :py:obj:`long` as well as :py:obj:`int`.
565 """
566 pem_filename = self.mktemp()
567 with open(pem_filename, "wb") as pem_file:
568 pem_file.write(cleartextCertificatePEM)
569
570 ctx = Context(TLSv1_METHOD)
571 ctx.use_certificate_file(pem_filename, long(FILETYPE_PEM))
572
573
Jean-Paul Calderone932f5cc2014-12-11 13:58:00 -0500574 def test_check_privatekey_valid(self):
575 """
576 :py:obj:`Context.check_privatekey` returns :py:obj:`None` if the
577 :py:obj:`Context` instance has been configured to use a matched key and
578 certificate pair.
579 """
580 key = load_privatekey(FILETYPE_PEM, client_key_pem)
581 cert = load_certificate(FILETYPE_PEM, client_cert_pem)
582 context = Context(TLSv1_METHOD)
583 context.use_privatekey(key)
584 context.use_certificate(cert)
585 self.assertIs(None, context.check_privatekey())
586
587
588 def test_check_privatekey_invalid(self):
589 """
590 :py:obj:`Context.check_privatekey` raises :py:obj:`Error` if the
591 :py:obj:`Context` instance has been configured to use a key and
592 certificate pair which don't relate to each other.
593 """
594 key = load_privatekey(FILETYPE_PEM, client_key_pem)
595 cert = load_certificate(FILETYPE_PEM, server_cert_pem)
596 context = Context(TLSv1_METHOD)
597 context.use_privatekey(key)
598 context.use_certificate(cert)
599 self.assertRaises(Error, context.check_privatekey)
600
601
602 def test_check_privatekey_wrong_args(self):
603 """
604 :py:obj:`Context.check_privatekey` raises :py:obj:`TypeError` if called
605 with other than no arguments.
606 """
607 context = Context(TLSv1_METHOD)
608 self.assertRaises(TypeError, context.check_privatekey, object())
609
610
Jean-Paul Calderonebd479162010-07-30 18:03:25 -0400611 def test_set_app_data_wrong_args(self):
612 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900613 :py:obj:`Context.set_app_data` raises :py:obj:`TypeError` if called with other than
Jean-Paul Calderonebd479162010-07-30 18:03:25 -0400614 one argument.
615 """
616 context = Context(TLSv1_METHOD)
617 self.assertRaises(TypeError, context.set_app_data)
618 self.assertRaises(TypeError, context.set_app_data, None, None)
619
620
621 def test_get_app_data_wrong_args(self):
622 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900623 :py:obj:`Context.get_app_data` raises :py:obj:`TypeError` if called with any
Jean-Paul Calderonebd479162010-07-30 18:03:25 -0400624 arguments.
625 """
626 context = Context(TLSv1_METHOD)
627 self.assertRaises(TypeError, context.get_app_data, None)
628
629
630 def test_app_data(self):
631 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900632 :py:obj:`Context.set_app_data` stores an object for later retrieval using
633 :py:obj:`Context.get_app_data`.
Jean-Paul Calderonebd479162010-07-30 18:03:25 -0400634 """
635 app_data = object()
636 context = Context(TLSv1_METHOD)
637 context.set_app_data(app_data)
638 self.assertIdentical(context.get_app_data(), app_data)
639
640
Jean-Paul Calderone40569ca2010-07-30 18:04:58 -0400641 def test_set_options_wrong_args(self):
642 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900643 :py:obj:`Context.set_options` raises :py:obj:`TypeError` if called with the wrong
644 number of arguments or a non-:py:obj:`int` argument.
Jean-Paul Calderone40569ca2010-07-30 18:04:58 -0400645 """
646 context = Context(TLSv1_METHOD)
647 self.assertRaises(TypeError, context.set_options)
648 self.assertRaises(TypeError, context.set_options, None)
649 self.assertRaises(TypeError, context.set_options, 1, None)
650
651
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -0500652 def test_set_options(self):
653 """
654 :py:obj:`Context.set_options` returns the new options value.
655 """
656 context = Context(TLSv1_METHOD)
657 options = context.set_options(OP_NO_SSLv2)
658 self.assertTrue(OP_NO_SSLv2 & options)
659
660
661 if not PY3:
662 def test_set_options_long(self):
663 """
664 On Python 2 :py:obj:`Context.set_options` accepts values of type
665 :py:obj:`long` as well as :py:obj:`int`.
666 """
667 context = Context(TLSv1_METHOD)
668 options = context.set_options(long(OP_NO_SSLv2))
669 self.assertTrue(OP_NO_SSLv2 & options)
670
671
Guillermo Gonzalez74a2c292011-08-29 16:16:58 -0300672 def test_set_mode_wrong_args(self):
673 """
Jean-Paul Calderone64efa2c2011-09-11 10:00:09 -0400674 :py:obj:`Context.set`mode} raises :py:obj:`TypeError` if called with the wrong
675 number of arguments or a non-:py:obj:`int` argument.
Guillermo Gonzalez74a2c292011-08-29 16:16:58 -0300676 """
677 context = Context(TLSv1_METHOD)
678 self.assertRaises(TypeError, context.set_mode)
679 self.assertRaises(TypeError, context.set_mode, None)
680 self.assertRaises(TypeError, context.set_mode, 1, None)
681
682
Jean-Paul Calderone0222a492011-09-08 18:26:03 -0400683 if MODE_RELEASE_BUFFERS is not None:
684 def test_set_mode(self):
685 """
Jean-Paul Calderone64efa2c2011-09-11 10:00:09 -0400686 :py:obj:`Context.set_mode` accepts a mode bitvector and returns the newly
Jean-Paul Calderone0222a492011-09-08 18:26:03 -0400687 set mode.
688 """
689 context = Context(TLSv1_METHOD)
690 self.assertTrue(
691 MODE_RELEASE_BUFFERS & context.set_mode(MODE_RELEASE_BUFFERS))
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -0500692
693 if not PY3:
694 def test_set_mode_long(self):
695 """
696 On Python 2 :py:obj:`Context.set_mode` accepts values of type
697 :py:obj:`long` as well as :py:obj:`int`.
698 """
699 context = Context(TLSv1_METHOD)
700 mode = context.set_mode(long(MODE_RELEASE_BUFFERS))
701 self.assertTrue(MODE_RELEASE_BUFFERS & mode)
Jean-Paul Calderone0222a492011-09-08 18:26:03 -0400702 else:
703 "MODE_RELEASE_BUFFERS unavailable - OpenSSL version may be too old"
704
705
Jean-Paul Calderone5ebb44a2010-07-30 18:09:41 -0400706 def test_set_timeout_wrong_args(self):
707 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900708 :py:obj:`Context.set_timeout` raises :py:obj:`TypeError` if called with the wrong
709 number of arguments or a non-:py:obj:`int` argument.
Jean-Paul Calderone5ebb44a2010-07-30 18:09:41 -0400710 """
711 context = Context(TLSv1_METHOD)
712 self.assertRaises(TypeError, context.set_timeout)
713 self.assertRaises(TypeError, context.set_timeout, None)
714 self.assertRaises(TypeError, context.set_timeout, 1, None)
715
716
717 def test_get_timeout_wrong_args(self):
718 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900719 :py:obj:`Context.get_timeout` raises :py:obj:`TypeError` if called with any arguments.
Jean-Paul Calderone5ebb44a2010-07-30 18:09:41 -0400720 """
721 context = Context(TLSv1_METHOD)
722 self.assertRaises(TypeError, context.get_timeout, None)
723
724
725 def test_timeout(self):
726 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900727 :py:obj:`Context.set_timeout` sets the session timeout for all connections
728 created using the context object. :py:obj:`Context.get_timeout` retrieves this
Jean-Paul Calderone5ebb44a2010-07-30 18:09:41 -0400729 value.
730 """
731 context = Context(TLSv1_METHOD)
732 context.set_timeout(1234)
733 self.assertEquals(context.get_timeout(), 1234)
734
735
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -0500736 if not PY3:
737 def test_timeout_long(self):
738 """
739 On Python 2 :py:obj:`Context.set_timeout` accepts values of type
740 `long` as well as int.
741 """
742 context = Context(TLSv1_METHOD)
743 context.set_timeout(long(1234))
744 self.assertEquals(context.get_timeout(), 1234)
745
746
Jean-Paul Calderonee2b69512010-07-30 18:22:06 -0400747 def test_set_verify_depth_wrong_args(self):
748 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900749 :py:obj:`Context.set_verify_depth` raises :py:obj:`TypeError` if called with the wrong
750 number of arguments or a non-:py:obj:`int` argument.
Jean-Paul Calderonee2b69512010-07-30 18:22:06 -0400751 """
752 context = Context(TLSv1_METHOD)
753 self.assertRaises(TypeError, context.set_verify_depth)
754 self.assertRaises(TypeError, context.set_verify_depth, None)
755 self.assertRaises(TypeError, context.set_verify_depth, 1, None)
756
757
758 def test_get_verify_depth_wrong_args(self):
759 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900760 :py:obj:`Context.get_verify_depth` raises :py:obj:`TypeError` if called with any arguments.
Jean-Paul Calderonee2b69512010-07-30 18:22:06 -0400761 """
762 context = Context(TLSv1_METHOD)
763 self.assertRaises(TypeError, context.get_verify_depth, None)
764
765
766 def test_verify_depth(self):
767 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900768 :py:obj:`Context.set_verify_depth` sets the number of certificates in a chain
Jean-Paul Calderonee2b69512010-07-30 18:22:06 -0400769 to follow before giving up. The value can be retrieved with
Jonathan Ballet648875f2011-07-16 14:14:58 +0900770 :py:obj:`Context.get_verify_depth`.
Jean-Paul Calderonee2b69512010-07-30 18:22:06 -0400771 """
772 context = Context(TLSv1_METHOD)
773 context.set_verify_depth(11)
774 self.assertEquals(context.get_verify_depth(), 11)
775
776
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -0500777 if not PY3:
778 def test_verify_depth_long(self):
779 """
780 On Python 2 :py:obj:`Context.set_verify_depth` accepts values of
781 type `long` as well as int.
782 """
783 context = Context(TLSv1_METHOD)
784 context.set_verify_depth(long(11))
785 self.assertEquals(context.get_verify_depth(), 11)
786
787
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400788 def _write_encrypted_pem(self, passphrase):
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -0400789 """
790 Write a new private key out to a new file, encrypted using the given
791 passphrase. Return the path to the new file.
792 """
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400793 key = PKey()
794 key.generate_key(TYPE_RSA, 128)
795 pemFile = self.mktemp()
Jean-Paul Calderonea9868832010-08-22 21:38:34 -0400796 fObj = open(pemFile, 'w')
797 pem = dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase)
798 fObj.write(pem.decode('ascii'))
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400799 fObj.close()
800 return pemFile
801
802
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400803 def test_set_passwd_cb_wrong_args(self):
804 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900805 :py:obj:`Context.set_passwd_cb` raises :py:obj:`TypeError` if called with the
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400806 wrong arguments or with a non-callable first argument.
807 """
808 context = Context(TLSv1_METHOD)
809 self.assertRaises(TypeError, context.set_passwd_cb)
810 self.assertRaises(TypeError, context.set_passwd_cb, None)
811 self.assertRaises(TypeError, context.set_passwd_cb, lambda: None, None, None)
812
813
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -0400814 def test_set_passwd_cb(self):
815 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900816 :py:obj:`Context.set_passwd_cb` accepts a callable which will be invoked when
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -0400817 a private key is loaded from an encrypted PEM.
818 """
Jean-Paul Calderonea9868832010-08-22 21:38:34 -0400819 passphrase = b("foobar")
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400820 pemFile = self._write_encrypted_pem(passphrase)
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -0400821 calledWith = []
822 def passphraseCallback(maxlen, verify, extra):
823 calledWith.append((maxlen, verify, extra))
824 return passphrase
825 context = Context(TLSv1_METHOD)
826 context.set_passwd_cb(passphraseCallback)
827 context.use_privatekey_file(pemFile)
828 self.assertTrue(len(calledWith), 1)
829 self.assertTrue(isinstance(calledWith[0][0], int))
830 self.assertTrue(isinstance(calledWith[0][1], int))
831 self.assertEqual(calledWith[0][2], None)
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400832
833
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400834 def test_passwd_callback_exception(self):
835 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900836 :py:obj:`Context.use_privatekey_file` propagates any exception raised by the
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400837 passphrase callback.
838 """
Jean-Paul Calderonea9868832010-08-22 21:38:34 -0400839 pemFile = self._write_encrypted_pem(b("monkeys are nice"))
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400840 def passphraseCallback(maxlen, verify, extra):
841 raise RuntimeError("Sorry, I am a fail.")
842
843 context = Context(TLSv1_METHOD)
844 context.set_passwd_cb(passphraseCallback)
845 self.assertRaises(RuntimeError, context.use_privatekey_file, pemFile)
846
847
848 def test_passwd_callback_false(self):
849 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900850 :py:obj:`Context.use_privatekey_file` raises :py:obj:`OpenSSL.SSL.Error` if the
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400851 passphrase callback returns a false value.
852 """
Jean-Paul Calderonea9868832010-08-22 21:38:34 -0400853 pemFile = self._write_encrypted_pem(b("monkeys are nice"))
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400854 def passphraseCallback(maxlen, verify, extra):
Jean-Paul Calderone4f0467a2014-01-11 11:58:41 -0500855 return b""
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400856
857 context = Context(TLSv1_METHOD)
858 context.set_passwd_cb(passphraseCallback)
859 self.assertRaises(Error, context.use_privatekey_file, pemFile)
860
861
862 def test_passwd_callback_non_string(self):
863 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900864 :py:obj:`Context.use_privatekey_file` raises :py:obj:`OpenSSL.SSL.Error` if the
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400865 passphrase callback returns a true non-string value.
866 """
Jean-Paul Calderonea9868832010-08-22 21:38:34 -0400867 pemFile = self._write_encrypted_pem(b("monkeys are nice"))
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400868 def passphraseCallback(maxlen, verify, extra):
869 return 10
870
871 context = Context(TLSv1_METHOD)
872 context.set_passwd_cb(passphraseCallback)
Jean-Paul Calderone8a1bea52013-03-05 07:57:57 -0800873 self.assertRaises(ValueError, context.use_privatekey_file, pemFile)
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400874
875
876 def test_passwd_callback_too_long(self):
877 """
878 If the passphrase returned by the passphrase callback returns a string
879 longer than the indicated maximum length, it is truncated.
880 """
881 # A priori knowledge!
Jean-Paul Calderonea9868832010-08-22 21:38:34 -0400882 passphrase = b("x") * 1024
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400883 pemFile = self._write_encrypted_pem(passphrase)
884 def passphraseCallback(maxlen, verify, extra):
885 assert maxlen == 1024
Jean-Paul Calderoneb1f7f5f2010-08-22 21:40:52 -0400886 return passphrase + b("y")
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400887
888 context = Context(TLSv1_METHOD)
889 context.set_passwd_cb(passphraseCallback)
890 # This shall succeed because the truncated result is the correct
891 # passphrase.
892 context.use_privatekey_file(pemFile)
893
894
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400895 def test_set_info_callback(self):
896 """
Jonathan Ballet648875f2011-07-16 14:14:58 +0900897 :py:obj:`Context.set_info_callback` accepts a callable which will be invoked
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400898 when certain information about an SSL connection is available.
899 """
Rick Deanb1ccd562009-07-09 23:52:39 -0500900 (server, client) = socket_pair()
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400901
902 clientSSL = Connection(Context(TLSv1_METHOD), client)
903 clientSSL.set_connect_state()
904
905 called = []
906 def info(conn, where, ret):
907 called.append((conn, where, ret))
908 context = Context(TLSv1_METHOD)
909 context.set_info_callback(info)
910 context.use_certificate(
911 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
912 context.use_privatekey(
913 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
914
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400915 serverSSL = Connection(context, server)
916 serverSSL.set_accept_state()
917
Jean-Paul Calderonef2bbc9c2014-02-02 10:59:14 -0500918 handshake(clientSSL, serverSSL)
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400919
Jean-Paul Calderone3835e522014-02-02 11:12:30 -0500920 # The callback must always be called with a Connection instance as the
921 # first argument. It would probably be better to split this into
922 # separate tests for client and server side info callbacks so we could
923 # assert it is called with the right Connection instance. It would
924 # also be good to assert *something* about `where` and `ret`.
Jean-Paul Calderonef2bbc9c2014-02-02 10:59:14 -0500925 notConnections = [
926 conn for (conn, where, ret) in called
927 if not isinstance(conn, Connection)]
928 self.assertEqual(
929 [], notConnections,
930 "Some info callback arguments were not Connection instaces.")
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400931
932
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400933 def _load_verify_locations_test(self, *args):
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -0400934 """
935 Create a client context which will verify the peer certificate and call
Jean-Paul Calderone64efa2c2011-09-11 10:00:09 -0400936 its :py:obj:`load_verify_locations` method with the given arguments.
937 Then connect it to a server and ensure that the handshake succeeds.
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -0400938 """
Rick Deanb1ccd562009-07-09 23:52:39 -0500939 (server, client) = socket_pair()
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400940
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400941 clientContext = Context(TLSv1_METHOD)
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400942 clientContext.load_verify_locations(*args)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400943 # Require that the server certificate verify properly or the
944 # connection will fail.
945 clientContext.set_verify(
946 VERIFY_PEER,
947 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
948
949 clientSSL = Connection(clientContext, client)
950 clientSSL.set_connect_state()
951
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400952 serverContext = Context(TLSv1_METHOD)
953 serverContext.use_certificate(
954 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
955 serverContext.use_privatekey(
956 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
957
958 serverSSL = Connection(serverContext, server)
959 serverSSL.set_accept_state()
960
Jean-Paul Calderonef8742032010-09-25 00:00:32 -0400961 # Without load_verify_locations above, the handshake
962 # will fail:
963 # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
964 # 'certificate verify failed')]
965 handshake(clientSSL, serverSSL)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400966
967 cert = clientSSL.get_peer_certificate()
Jean-Paul Calderone20131f52009-04-01 12:05:45 -0400968 self.assertEqual(cert.get_subject().CN, 'Testing Root CA')
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400969
Jean-Paul Calderone12608a82009-11-07 10:35:15 -0500970
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -0400971 def _load_verify_cafile(self, cafile):
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400972 """
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -0400973 Verify that if path to a file containing a certificate is passed to
974 ``Context.load_verify_locations`` for the ``cafile`` parameter, that
975 certificate is used as a trust root for the purposes of verifying
976 connections created using that ``Context``.
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400977 """
Jean-Paul Calderonea9868832010-08-22 21:38:34 -0400978 fObj = open(cafile, 'w')
Jean-Paul Calderoneb1f7f5f2010-08-22 21:40:52 -0400979 fObj.write(cleartextCertificatePEM.decode('ascii'))
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400980 fObj.close()
981
982 self._load_verify_locations_test(cafile)
983
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400984
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -0400985 def test_load_verify_bytes_cafile(self):
986 """
987 :py:obj:`Context.load_verify_locations` accepts a file name as a
988 ``bytes`` instance and uses the certificates within for verification
989 purposes.
990 """
991 cafile = self.mktemp() + NON_ASCII.encode(getfilesystemencoding())
992 self._load_verify_cafile(cafile)
993
994
995 def test_load_verify_unicode_cafile(self):
996 """
997 :py:obj:`Context.load_verify_locations` accepts a file name as a
998 ``unicode`` instance and uses the certificates within for verification
999 purposes.
1000 """
Jean-Paul Calderone4f70c802015-04-12 11:26:47 -04001001 self._load_verify_cafile(
1002 self.mktemp().decode(getfilesystemencoding()) + NON_ASCII
1003 )
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -04001004
1005
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -04001006 def test_load_verify_invalid_file(self):
1007 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001008 :py:obj:`Context.load_verify_locations` raises :py:obj:`Error` when passed a
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -04001009 non-existent cafile.
1010 """
1011 clientContext = Context(TLSv1_METHOD)
1012 self.assertRaises(
1013 Error, clientContext.load_verify_locations, self.mktemp())
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -04001014
1015
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -04001016 def _load_verify_directory_locations_capath(self, capath):
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -04001017 """
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -04001018 Verify that if path to a directory containing certificate files is
1019 passed to ``Context.load_verify_locations`` for the ``capath``
1020 parameter, those certificates are used as trust roots for the purposes
1021 of verifying connections created using that ``Context``.
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -04001022 """
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -04001023 makedirs(capath)
Jean-Paul Calderone24dfb332011-05-04 18:10:26 -04001024 # Hash values computed manually with c_rehash to avoid depending on
1025 # c_rehash in the test suite. One is from OpenSSL 0.9.8, the other
1026 # from OpenSSL 1.0.0.
Jean-Paul Calderone4f0467a2014-01-11 11:58:41 -05001027 for name in [b'c7adac82.0', b'c3705638.0']:
Jean-Paul Calderone05826732015-04-12 11:38:49 -04001028 cafile = join_bytes_or_unicode(capath, name)
1029 with open(cafile, 'w') as fObj:
1030 fObj.write(cleartextCertificatePEM.decode('ascii'))
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -04001031
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -04001032 self._load_verify_locations_test(None, capath)
1033
1034
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -04001035 def test_load_verify_directory_bytes_capath(self):
1036 """
1037 :py:obj:`Context.load_verify_locations` accepts a directory name as a
1038 ``bytes`` instance and uses the certificates within for verification
1039 purposes.
1040 """
1041 self._load_verify_directory_locations_capath(
1042 self.mktemp() + NON_ASCII.encode(getfilesystemencoding())
1043 )
1044
1045
1046 def test_load_verify_directory_unicode_capath(self):
1047 """
1048 :py:obj:`Context.load_verify_locations` accepts a directory name as a
1049 ``unicode`` instance and uses the certificates within for verification
1050 purposes.
1051 """
Jean-Paul Calderone4f70c802015-04-12 11:26:47 -04001052 self._load_verify_directory_locations_capath(
1053 self.mktemp().decode(getfilesystemencoding()) + NON_ASCII
1054 )
Jean-Paul Calderone210c0f32015-04-12 09:20:31 -04001055
1056
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001057 def test_load_verify_locations_wrong_args(self):
1058 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001059 :py:obj:`Context.load_verify_locations` raises :py:obj:`TypeError` if called with
1060 the wrong number of arguments or with non-:py:obj:`str` arguments.
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001061 """
1062 context = Context(TLSv1_METHOD)
1063 self.assertRaises(TypeError, context.load_verify_locations)
1064 self.assertRaises(TypeError, context.load_verify_locations, object())
1065 self.assertRaises(TypeError, context.load_verify_locations, object(), object())
1066 self.assertRaises(TypeError, context.load_verify_locations, None, None, None)
1067
1068
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -04001069 if platform == "win32":
1070 "set_default_verify_paths appears not to work on Windows. "
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -04001071 "See LP#404343 and LP#404344."
1072 else:
1073 def test_set_default_verify_paths(self):
1074 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001075 :py:obj:`Context.set_default_verify_paths` causes the platform-specific CA
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -04001076 certificate locations to be used for verification purposes.
1077 """
1078 # Testing this requires a server with a certificate signed by one of
1079 # the CAs in the platform CA location. Getting one of those costs
1080 # money. Fortunately (or unfortunately, depending on your
1081 # perspective), it's easy to think of a public server on the
1082 # internet which has such a certificate. Connecting to the network
1083 # in a unit test is bad, but it's the only way I can think of to
1084 # really test this. -exarkun
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -04001085
Alex Gaynorb586da32014-11-15 09:22:21 -08001086 # Arg, verisign.com doesn't speak anything newer than TLS 1.0
1087 context = Context(TLSv1_METHOD)
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -04001088 context.set_default_verify_paths()
1089 context.set_verify(
Ziga Seilnacht44611bf2009-08-31 20:49:30 +02001090 VERIFY_PEER,
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -04001091 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -04001092
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -04001093 client = socket()
1094 client.connect(('verisign.com', 443))
1095 clientSSL = Connection(context, client)
1096 clientSSL.set_connect_state()
1097 clientSSL.do_handshake()
Jean-Paul Calderone4f0467a2014-01-11 11:58:41 -05001098 clientSSL.send(b"GET / HTTP/1.0\r\n\r\n")
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -04001099 self.assertTrue(clientSSL.recv(1024))
Jean-Paul Calderone9eadb962008-09-07 21:20:44 -04001100
1101
1102 def test_set_default_verify_paths_signature(self):
1103 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001104 :py:obj:`Context.set_default_verify_paths` takes no arguments and raises
1105 :py:obj:`TypeError` if given any.
Jean-Paul Calderone9eadb962008-09-07 21:20:44 -04001106 """
1107 context = Context(TLSv1_METHOD)
1108 self.assertRaises(TypeError, context.set_default_verify_paths, None)
1109 self.assertRaises(TypeError, context.set_default_verify_paths, 1)
1110 self.assertRaises(TypeError, context.set_default_verify_paths, "")
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -05001111
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001112
Jean-Paul Calderone12608a82009-11-07 10:35:15 -05001113 def test_add_extra_chain_cert_invalid_cert(self):
1114 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001115 :py:obj:`Context.add_extra_chain_cert` raises :py:obj:`TypeError` if called with
Jean-Paul Calderone12608a82009-11-07 10:35:15 -05001116 other than one argument or if called with an object which is not an
Jonathan Ballet648875f2011-07-16 14:14:58 +09001117 instance of :py:obj:`X509`.
Jean-Paul Calderone12608a82009-11-07 10:35:15 -05001118 """
1119 context = Context(TLSv1_METHOD)
1120 self.assertRaises(TypeError, context.add_extra_chain_cert)
1121 self.assertRaises(TypeError, context.add_extra_chain_cert, object())
1122 self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object())
1123
1124
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001125 def _handshake_test(self, serverContext, clientContext):
1126 """
1127 Verify that a client and server created with the given contexts can
1128 successfully handshake and communicate.
1129 """
1130 serverSocket, clientSocket = socket_pair()
1131
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001132 server = Connection(serverContext, serverSocket)
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04001133 server.set_accept_state()
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001134
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001135 client = Connection(clientContext, clientSocket)
1136 client.set_connect_state()
1137
1138 # Make them talk to each other.
1139 # self._interactInMemory(client, server)
1140 for i in range(3):
1141 for s in [client, server]:
1142 try:
1143 s.do_handshake()
1144 except WantReadError:
1145 pass
1146
1147
Jean-Paul Calderone6a8cd112014-04-02 21:09:08 -04001148 def test_set_verify_callback_connection_argument(self):
1149 """
1150 The first argument passed to the verify callback is the
1151 :py:class:`Connection` instance for which verification is taking place.
1152 """
1153 serverContext = Context(TLSv1_METHOD)
1154 serverContext.use_privatekey(
1155 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
1156 serverContext.use_certificate(
1157 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
1158 serverConnection = Connection(serverContext, None)
1159
1160 class VerifyCallback(object):
1161 def callback(self, connection, *args):
1162 self.connection = connection
1163 return 1
1164
1165 verify = VerifyCallback()
1166 clientContext = Context(TLSv1_METHOD)
1167 clientContext.set_verify(VERIFY_PEER, verify.callback)
1168 clientConnection = Connection(clientContext, None)
1169 clientConnection.set_connect_state()
1170
1171 self._handshakeInMemory(clientConnection, serverConnection)
1172
1173 self.assertIdentical(verify.connection, clientConnection)
1174
1175
Jean-Paul Calderone7e166fe2013-03-06 20:54:38 -08001176 def test_set_verify_callback_exception(self):
1177 """
1178 If the verify callback passed to :py:obj:`Context.set_verify` raises an
1179 exception, verification fails and the exception is propagated to the
1180 caller of :py:obj:`Connection.do_handshake`.
1181 """
1182 serverContext = Context(TLSv1_METHOD)
1183 serverContext.use_privatekey(
1184 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
1185 serverContext.use_certificate(
1186 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
1187
1188 clientContext = Context(TLSv1_METHOD)
1189 def verify_callback(*args):
1190 raise Exception("silly verify failure")
1191 clientContext.set_verify(VERIFY_PEER, verify_callback)
1192
1193 exc = self.assertRaises(
1194 Exception, self._handshake_test, serverContext, clientContext)
1195 self.assertEqual("silly verify failure", str(exc))
1196
1197
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001198 def test_add_extra_chain_cert(self):
1199 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001200 :py:obj:`Context.add_extra_chain_cert` accepts an :py:obj:`X509` instance to add to
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001201 the certificate chain.
1202
Jonathan Ballet648875f2011-07-16 14:14:58 +09001203 See :py:obj:`_create_certificate_chain` for the details of the certificate
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001204 chain tested.
1205
1206 The chain is tested by starting a server with scert and connecting
1207 to it with a client which trusts cacert and requires verification to
1208 succeed.
1209 """
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -04001210 chain = _create_certificate_chain()
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001211 [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
1212
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001213 # Dump the CA certificate to a file because that's the only way to load
1214 # it as a trusted CA in the client context.
1215 for cert, name in [(cacert, 'ca.pem'), (icert, 'i.pem'), (scert, 's.pem')]:
Jean-Paul Calderonea9868832010-08-22 21:38:34 -04001216 fObj = open(name, 'w')
Jean-Paul Calderoneb1f7f5f2010-08-22 21:40:52 -04001217 fObj.write(dump_certificate(FILETYPE_PEM, cert).decode('ascii'))
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001218 fObj.close()
1219
1220 for key, name in [(cakey, 'ca.key'), (ikey, 'i.key'), (skey, 's.key')]:
Jean-Paul Calderonea9868832010-08-22 21:38:34 -04001221 fObj = open(name, 'w')
Jean-Paul Calderoneb1f7f5f2010-08-22 21:40:52 -04001222 fObj.write(dump_privatekey(FILETYPE_PEM, key).decode('ascii'))
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001223 fObj.close()
1224
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001225 # Create the server context
1226 serverContext = Context(TLSv1_METHOD)
1227 serverContext.use_privatekey(skey)
1228 serverContext.use_certificate(scert)
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -04001229 # The client already has cacert, we only need to give them icert.
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001230 serverContext.add_extra_chain_cert(icert)
1231
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001232 # Create the client
1233 clientContext = Context(TLSv1_METHOD)
1234 clientContext.set_verify(
1235 VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
Jean-Paul Calderone4f0467a2014-01-11 11:58:41 -05001236 clientContext.load_verify_locations(b"ca.pem")
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04001237
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001238 # Try it out.
1239 self._handshake_test(serverContext, clientContext)
1240
1241
Jean-Paul Calderoneaac43a32015-04-12 09:51:21 -04001242 def _use_certificate_chain_file_test(self, certdir):
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001243 """
Jean-Paul Calderoneaac43a32015-04-12 09:51:21 -04001244 Verify that :py:obj:`Context.use_certificate_chain_file` reads a
1245 certificate chain from a specified file.
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001246
Jean-Paul Calderoneaac43a32015-04-12 09:51:21 -04001247 The chain is tested by starting a server with scert and connecting to
1248 it with a client which trusts cacert and requires verification to
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001249 succeed.
1250 """
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -04001251 chain = _create_certificate_chain()
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001252 [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
1253
Jean-Paul Calderoneaac43a32015-04-12 09:51:21 -04001254 makedirs(certdir)
1255
Jean-Paul Calderone05826732015-04-12 11:38:49 -04001256 chainFile = join_bytes_or_unicode(certdir, "chain.pem")
1257 caFile = join_bytes_or_unicode(certdir, "ca.pem")
Jean-Paul Calderoneaac43a32015-04-12 09:51:21 -04001258
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001259 # Write out the chain file.
Jean-Paul Calderoneaac43a32015-04-12 09:51:21 -04001260 with open(chainFile, 'wb') as fObj:
1261 # Most specific to least general.
1262 fObj.write(dump_certificate(FILETYPE_PEM, scert))
1263 fObj.write(dump_certificate(FILETYPE_PEM, icert))
1264 fObj.write(dump_certificate(FILETYPE_PEM, cacert))
1265
1266 with open(caFile, 'w') as fObj:
1267 fObj.write(dump_certificate(FILETYPE_PEM, cacert).decode('ascii'))
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001268
1269 serverContext = Context(TLSv1_METHOD)
1270 serverContext.use_certificate_chain_file(chainFile)
1271 serverContext.use_privatekey(skey)
1272
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001273 clientContext = Context(TLSv1_METHOD)
1274 clientContext.set_verify(
1275 VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
Jean-Paul Calderoneaac43a32015-04-12 09:51:21 -04001276 clientContext.load_verify_locations(caFile)
Jean-Paul Calderonef4480622010-08-02 18:25:03 -04001277
1278 self._handshake_test(serverContext, clientContext)
1279
Jean-Paul Calderone131052e2013-03-05 11:56:19 -08001280
Jean-Paul Calderoneaac43a32015-04-12 09:51:21 -04001281 def test_use_certificate_chain_file_bytes(self):
1282 """
1283 ``Context.use_certificate_chain_file`` accepts the name of a file (as
1284 an instance of ``bytes``) to specify additional certificates to use to
1285 construct and verify a trust chain.
1286 """
1287 self._use_certificate_chain_file_test(
1288 self.mktemp() + NON_ASCII.encode(getfilesystemencoding())
1289 )
1290
1291
1292 def test_use_certificate_chain_file_unicode(self):
1293 """
1294 ``Context.use_certificate_chain_file`` accepts the name of a file (as
1295 an instance of ``unicode``) to specify additional certificates to use
1296 to construct and verify a trust chain.
1297 """
1298 self._use_certificate_chain_file_test(
1299 self.mktemp().decode(getfilesystemencoding()) + NON_ASCII
1300 )
1301
1302
Jean-Paul Calderone131052e2013-03-05 11:56:19 -08001303 def test_use_certificate_chain_file_wrong_args(self):
1304 """
1305 :py:obj:`Context.use_certificate_chain_file` raises :py:obj:`TypeError`
1306 if passed zero or more than one argument or when passed a non-byte
1307 string single argument. It also raises :py:obj:`OpenSSL.SSL.Error` when
1308 passed a bad chain file name (for example, the name of a file which does
1309 not exist).
1310 """
1311 context = Context(TLSv1_METHOD)
1312 self.assertRaises(TypeError, context.use_certificate_chain_file)
1313 self.assertRaises(TypeError, context.use_certificate_chain_file, object())
1314 self.assertRaises(TypeError, context.use_certificate_chain_file, b"foo", object())
1315
1316 self.assertRaises(Error, context.use_certificate_chain_file, self.mktemp())
1317
Jean-Paul Calderonee0d79362010-08-03 18:46:46 -04001318 # XXX load_client_ca
1319 # XXX set_session_id
Jean-Paul Calderone0294e3d2010-09-09 18:17:48 -04001320
1321 def test_get_verify_mode_wrong_args(self):
1322 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001323 :py:obj:`Context.get_verify_mode` raises :py:obj:`TypeError` if called with any
Jean-Paul Calderone0294e3d2010-09-09 18:17:48 -04001324 arguments.
1325 """
1326 context = Context(TLSv1_METHOD)
1327 self.assertRaises(TypeError, context.get_verify_mode, None)
1328
1329
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05001330 def test_set_verify_mode(self):
Jean-Paul Calderone0294e3d2010-09-09 18:17:48 -04001331 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001332 :py:obj:`Context.get_verify_mode` returns the verify mode flags previously
1333 passed to :py:obj:`Context.set_verify`.
Jean-Paul Calderone0294e3d2010-09-09 18:17:48 -04001334 """
1335 context = Context(TLSv1_METHOD)
1336 self.assertEquals(context.get_verify_mode(), 0)
1337 context.set_verify(
1338 VERIFY_PEER | VERIFY_CLIENT_ONCE, lambda *args: None)
1339 self.assertEquals(
1340 context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE)
1341
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001342
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05001343 if not PY3:
1344 def test_set_verify_mode_long(self):
1345 """
1346 On Python 2 :py:obj:`Context.set_verify_mode` accepts values of
1347 type :py:obj:`long` as well as :py:obj:`int`.
1348 """
1349 context = Context(TLSv1_METHOD)
1350 self.assertEquals(context.get_verify_mode(), 0)
1351 context.set_verify(
1352 long(VERIFY_PEER | VERIFY_CLIENT_ONCE), lambda *args: None)
1353 self.assertEquals(
1354 context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE)
1355
1356
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001357 def test_load_tmp_dh_wrong_args(self):
1358 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001359 :py:obj:`Context.load_tmp_dh` raises :py:obj:`TypeError` if called with the wrong
1360 number of arguments or with a non-:py:obj:`str` argument.
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001361 """
1362 context = Context(TLSv1_METHOD)
1363 self.assertRaises(TypeError, context.load_tmp_dh)
1364 self.assertRaises(TypeError, context.load_tmp_dh, "foo", None)
1365 self.assertRaises(TypeError, context.load_tmp_dh, object())
1366
1367
1368 def test_load_tmp_dh_missing_file(self):
1369 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001370 :py:obj:`Context.load_tmp_dh` raises :py:obj:`OpenSSL.SSL.Error` if the specified file
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001371 does not exist.
1372 """
1373 context = Context(TLSv1_METHOD)
Jean-Paul Calderone4f0467a2014-01-11 11:58:41 -05001374 self.assertRaises(Error, context.load_tmp_dh, b"hello")
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001375
1376
Jean-Paul Calderone9e1c1dd2015-04-12 10:13:13 -04001377 def _load_tmp_dh_test(self, dhfilename):
Jean-Paul Calderone4e0c43f2015-04-13 10:15:17 -04001378 """
1379 Verify that calling ``Context.load_tmp_dh`` with the given filename
1380 does not raise an exception.
1381 """
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001382 context = Context(TLSv1_METHOD)
Jean-Paul Calderone9e1c1dd2015-04-12 10:13:13 -04001383 with open(dhfilename, "w") as dhfile:
1384 dhfile.write(dhparam)
1385
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001386 context.load_tmp_dh(dhfilename)
1387 # XXX What should I assert here? -exarkun
1388
1389
Jean-Paul Calderone9e1c1dd2015-04-12 10:13:13 -04001390 def test_load_tmp_dh_bytes(self):
1391 """
1392 :py:obj:`Context.load_tmp_dh` loads Diffie-Hellman parameters from the
1393 specified file (given as ``bytes``).
1394 """
1395 self._load_tmp_dh_test(
1396 self.mktemp() + NON_ASCII.encode(getfilesystemencoding()),
1397 )
1398
1399
1400 def test_load_tmp_dh_unicode(self):
1401 """
1402 :py:obj:`Context.load_tmp_dh` loads Diffie-Hellman parameters from the
1403 specified file (given as ``unicode``).
1404 """
1405 self._load_tmp_dh_test(
1406 self.mktemp().decode(getfilesystemencoding()) + NON_ASCII,
1407 )
1408
1409
Jean-Paul Calderone3e4e3352014-04-19 09:28:28 -04001410 def test_set_tmp_ecdh(self):
Andy Lutomirskif05a2732014-03-13 17:22:25 -07001411 """
Jean-Paul Calderone3e4e3352014-04-19 09:28:28 -04001412 :py:obj:`Context.set_tmp_ecdh` sets the elliptic curve for
Jean-Paul Calderonec09fd582014-04-18 22:00:10 -04001413 Diffie-Hellman to the specified curve.
Andy Lutomirskif05a2732014-03-13 17:22:25 -07001414 """
1415 context = Context(TLSv1_METHOD)
Jean-Paul Calderonec09fd582014-04-18 22:00:10 -04001416 for curve in get_elliptic_curves():
1417 # The only easily "assertable" thing is that it does not raise an
1418 # exception.
Jean-Paul Calderone3e4e3352014-04-19 09:28:28 -04001419 context.set_tmp_ecdh(curve)
Alex Gaynor12dc0842014-01-17 12:51:31 -06001420
1421
Jean-Paul Calderone63eab692014-01-18 10:19:56 -05001422 def test_set_cipher_list_bytes(self):
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001423 """
Jean-Paul Calderone63eab692014-01-18 10:19:56 -05001424 :py:obj:`Context.set_cipher_list` accepts a :py:obj:`bytes` naming the
1425 ciphers which connections created with the context object will be able
1426 to choose from.
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001427 """
1428 context = Context(TLSv1_METHOD)
Jean-Paul Calderone4f0467a2014-01-11 11:58:41 -05001429 context.set_cipher_list(b"hello world:EXP-RC4-MD5")
Jean-Paul Calderone6ace4782010-09-09 18:43:40 -04001430 conn = Connection(context, None)
1431 self.assertEquals(conn.get_cipher_list(), ["EXP-RC4-MD5"])
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04001432
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04001433
Jean-Paul Calderone63eab692014-01-18 10:19:56 -05001434 def test_set_cipher_list_text(self):
1435 """
1436 :py:obj:`Context.set_cipher_list` accepts a :py:obj:`unicode` naming
1437 the ciphers which connections created with the context object will be
1438 able to choose from.
1439 """
1440 context = Context(TLSv1_METHOD)
Jean-Paul Calderonec76c61c2014-01-18 13:21:52 -05001441 context.set_cipher_list(u("hello world:EXP-RC4-MD5"))
Jean-Paul Calderone63eab692014-01-18 10:19:56 -05001442 conn = Connection(context, None)
1443 self.assertEquals(conn.get_cipher_list(), ["EXP-RC4-MD5"])
1444
1445
Jean-Paul Calderone131052e2013-03-05 11:56:19 -08001446 def test_set_cipher_list_wrong_args(self):
1447 """
Jean-Paul Calderone17044832014-01-18 10:20:11 -05001448 :py:obj:`Context.set_cipher_list` raises :py:obj:`TypeError` when
1449 passed zero arguments or more than one argument or when passed a
1450 non-string single argument and raises :py:obj:`OpenSSL.SSL.Error` when
Jean-Paul Calderone131052e2013-03-05 11:56:19 -08001451 passed an incorrect cipher list string.
1452 """
1453 context = Context(TLSv1_METHOD)
1454 self.assertRaises(TypeError, context.set_cipher_list)
1455 self.assertRaises(TypeError, context.set_cipher_list, object())
1456 self.assertRaises(TypeError, context.set_cipher_list, b"EXP-RC4-MD5", object())
1457
Jean-Paul Calderone63eab692014-01-18 10:19:56 -05001458 self.assertRaises(Error, context.set_cipher_list, "imaginary-cipher")
Jean-Paul Calderone131052e2013-03-05 11:56:19 -08001459
1460
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05001461 def test_set_session_cache_mode_wrong_args(self):
1462 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05001463 :py:obj:`Context.set_session_cache_mode` raises :py:obj:`TypeError` if
1464 called with other than one integer argument.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05001465 """
1466 context = Context(TLSv1_METHOD)
1467 self.assertRaises(TypeError, context.set_session_cache_mode)
1468 self.assertRaises(TypeError, context.set_session_cache_mode, object())
1469
1470
1471 def test_get_session_cache_mode_wrong_args(self):
1472 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05001473 :py:obj:`Context.get_session_cache_mode` raises :py:obj:`TypeError` if
1474 called with any arguments.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05001475 """
1476 context = Context(TLSv1_METHOD)
1477 self.assertRaises(TypeError, context.get_session_cache_mode, 1)
1478
1479
1480 def test_session_cache_mode(self):
1481 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05001482 :py:obj:`Context.set_session_cache_mode` specifies how sessions are
1483 cached. The setting can be retrieved via
1484 :py:obj:`Context.get_session_cache_mode`.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05001485 """
1486 context = Context(TLSv1_METHOD)
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05001487 context.set_session_cache_mode(SESS_CACHE_OFF)
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05001488 off = context.set_session_cache_mode(SESS_CACHE_BOTH)
1489 self.assertEqual(SESS_CACHE_OFF, off)
1490 self.assertEqual(SESS_CACHE_BOTH, context.get_session_cache_mode())
1491
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05001492 if not PY3:
1493 def test_session_cache_mode_long(self):
1494 """
1495 On Python 2 :py:obj:`Context.set_session_cache_mode` accepts values
1496 of type :py:obj:`long` as well as :py:obj:`int`.
1497 """
1498 context = Context(TLSv1_METHOD)
1499 context.set_session_cache_mode(long(SESS_CACHE_BOTH))
1500 self.assertEqual(
1501 SESS_CACHE_BOTH, context.get_session_cache_mode())
1502
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05001503
Jean-Paul Calderonea63714c2013-03-05 17:02:26 -08001504 def test_get_cert_store(self):
1505 """
1506 :py:obj:`Context.get_cert_store` returns a :py:obj:`X509Store` instance.
1507 """
1508 context = Context(TLSv1_METHOD)
1509 store = context.get_cert_store()
1510 self.assertIsInstance(store, X509Store)
1511
1512
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04001513
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001514class ServerNameCallbackTests(TestCase, _LoopbackMixin):
1515 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001516 Tests for :py:obj:`Context.set_tlsext_servername_callback` and its interaction with
1517 :py:obj:`Connection`.
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001518 """
1519 def test_wrong_args(self):
1520 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001521 :py:obj:`Context.set_tlsext_servername_callback` raises :py:obj:`TypeError` if called
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001522 with other than one argument.
1523 """
1524 context = Context(TLSv1_METHOD)
1525 self.assertRaises(TypeError, context.set_tlsext_servername_callback)
1526 self.assertRaises(
1527 TypeError, context.set_tlsext_servername_callback, 1, 2)
1528
Jean-Paul Calderone4c1aacd2014-01-11 08:15:17 -05001529
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001530 def test_old_callback_forgotten(self):
1531 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001532 If :py:obj:`Context.set_tlsext_servername_callback` is used to specify a new
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001533 callback, the one it replaces is dereferenced.
1534 """
1535 def callback(connection):
1536 pass
1537
1538 def replacement(connection):
1539 pass
1540
1541 context = Context(TLSv1_METHOD)
1542 context.set_tlsext_servername_callback(callback)
1543
1544 tracker = ref(callback)
1545 del callback
1546
1547 context.set_tlsext_servername_callback(replacement)
Jean-Paul Calderoned4033eb2014-01-11 08:21:46 -05001548
1549 # One run of the garbage collector happens to work on CPython. PyPy
1550 # doesn't collect the underlying object until a second run for whatever
1551 # reason. That's fine, it still demonstrates our code has properly
1552 # dropped the reference.
1553 collect()
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001554 collect()
Jean-Paul Calderone4c1aacd2014-01-11 08:15:17 -05001555
1556 callback = tracker()
1557 if callback is not None:
1558 referrers = get_referrers(callback)
Jean-Paul Calderone7de39562014-01-11 08:17:59 -05001559 if len(referrers) > 1:
Jean-Paul Calderone4c1aacd2014-01-11 08:15:17 -05001560 self.fail("Some references remain: %r" % (referrers,))
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001561
1562
1563 def test_no_servername(self):
1564 """
1565 When a client specifies no server name, the callback passed to
Jonathan Ballet648875f2011-07-16 14:14:58 +09001566 :py:obj:`Context.set_tlsext_servername_callback` is invoked and the result of
1567 :py:obj:`Connection.get_servername` is :py:obj:`None`.
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001568 """
1569 args = []
1570 def servername(conn):
1571 args.append((conn, conn.get_servername()))
1572 context = Context(TLSv1_METHOD)
1573 context.set_tlsext_servername_callback(servername)
1574
1575 # Lose our reference to it. The Context is responsible for keeping it
1576 # alive now.
1577 del servername
1578 collect()
1579
1580 # Necessary to actually accept the connection
1581 context.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
1582 context.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
1583
1584 # Do a little connection to trigger the logic
1585 server = Connection(context, None)
1586 server.set_accept_state()
1587
1588 client = Connection(Context(TLSv1_METHOD), None)
1589 client.set_connect_state()
1590
1591 self._interactInMemory(server, client)
1592
1593 self.assertEqual([(server, None)], args)
1594
1595
1596 def test_servername(self):
1597 """
1598 When a client specifies a server name in its hello message, the callback
Jonathan Ballet648875f2011-07-16 14:14:58 +09001599 passed to :py:obj:`Contexts.set_tlsext_servername_callback` is invoked and the
1600 result of :py:obj:`Connection.get_servername` is that server name.
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001601 """
1602 args = []
1603 def servername(conn):
1604 args.append((conn, conn.get_servername()))
1605 context = Context(TLSv1_METHOD)
1606 context.set_tlsext_servername_callback(servername)
1607
1608 # Necessary to actually accept the connection
1609 context.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
1610 context.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
1611
1612 # Do a little connection to trigger the logic
1613 server = Connection(context, None)
1614 server.set_accept_state()
1615
1616 client = Connection(Context(TLSv1_METHOD), None)
1617 client.set_connect_state()
1618 client.set_tlsext_host_name(b("foo1.example.com"))
1619
1620 self._interactInMemory(server, client)
1621
1622 self.assertEqual([(server, b("foo1.example.com"))], args)
1623
1624
Cory Benfield84a121e2014-03-31 20:30:25 +01001625class NextProtoNegotiationTests(TestCase, _LoopbackMixin):
1626 """
1627 Test for Next Protocol Negotiation in PyOpenSSL.
1628 """
1629 def test_npn_success(self):
Cory Benfieldbe3e7b82014-05-10 09:48:55 +01001630 """
1631 Tests that clients and servers that agree on the negotiated next
1632 protocol can correct establish a connection, and that the agreed
1633 protocol is reported by the connections.
1634 """
1635 advertise_args = []
Cory Benfield84a121e2014-03-31 20:30:25 +01001636 select_args = []
1637 def advertise(conn):
1638 advertise_args.append((conn,))
Cory Benfieldbe3e7b82014-05-10 09:48:55 +01001639 return [b'http/1.1', b'spdy/2']
Cory Benfield84a121e2014-03-31 20:30:25 +01001640 def select(conn, options):
1641 select_args.append((conn, options))
Cory Benfieldbe3e7b82014-05-10 09:48:55 +01001642 return b'spdy/2'
Cory Benfield84a121e2014-03-31 20:30:25 +01001643
1644 server_context = Context(TLSv1_METHOD)
1645 server_context.set_npn_advertise_callback(advertise)
1646
1647 client_context = Context(TLSv1_METHOD)
1648 client_context.set_npn_select_callback(select)
1649
1650 # Necessary to actually accept the connection
1651 server_context.use_privatekey(
1652 load_privatekey(FILETYPE_PEM, server_key_pem))
1653 server_context.use_certificate(
1654 load_certificate(FILETYPE_PEM, server_cert_pem))
1655
1656 # Do a little connection to trigger the logic
1657 server = Connection(server_context, None)
1658 server.set_accept_state()
1659
1660 client = Connection(client_context, None)
1661 client.set_connect_state()
1662
1663 self._interactInMemory(server, client)
1664
1665 self.assertEqual([(server,)], advertise_args)
Cory Benfieldbe3e7b82014-05-10 09:48:55 +01001666 self.assertEqual([(client, [b'http/1.1', b'spdy/2'])], select_args)
Cory Benfield84a121e2014-03-31 20:30:25 +01001667
Cory Benfieldbe3e7b82014-05-10 09:48:55 +01001668 self.assertEqual(server.get_next_proto_negotiated(), b'spdy/2')
1669 self.assertEqual(client.get_next_proto_negotiated(), b'spdy/2')
Cory Benfield84a121e2014-03-31 20:30:25 +01001670
1671
1672 def test_npn_client_fail(self):
Cory Benfieldbe3e7b82014-05-10 09:48:55 +01001673 """
1674 Tests that when clients and servers cannot agree on what protocol to
1675 use next that the TLS connection does not get established.
1676 """
1677 advertise_args = []
Cory Benfield84a121e2014-03-31 20:30:25 +01001678 select_args = []
1679 def advertise(conn):
1680 advertise_args.append((conn,))
Cory Benfieldbe3e7b82014-05-10 09:48:55 +01001681 return [b'http/1.1', b'spdy/2']
Cory Benfield84a121e2014-03-31 20:30:25 +01001682 def select(conn, options):
1683 select_args.append((conn, options))
Cory Benfieldbe3e7b82014-05-10 09:48:55 +01001684 return b''
Cory Benfield84a121e2014-03-31 20:30:25 +01001685
1686 server_context = Context(TLSv1_METHOD)
1687 server_context.set_npn_advertise_callback(advertise)
1688
1689 client_context = Context(TLSv1_METHOD)
1690 client_context.set_npn_select_callback(select)
1691
1692 # Necessary to actually accept the connection
1693 server_context.use_privatekey(
1694 load_privatekey(FILETYPE_PEM, server_key_pem))
1695 server_context.use_certificate(
1696 load_certificate(FILETYPE_PEM, server_cert_pem))
1697
1698 # Do a little connection to trigger the logic
1699 server = Connection(server_context, None)
1700 server.set_accept_state()
1701
1702 client = Connection(client_context, None)
1703 client.set_connect_state()
1704
1705 # If the client doesn't return anything, the connection will fail.
1706 self.assertRaises(Error, self._interactInMemory, server, client)
1707
1708 self.assertEqual([(server,)], advertise_args)
Cory Benfieldbe3e7b82014-05-10 09:48:55 +01001709 self.assertEqual([(client, [b'http/1.1', b'spdy/2'])], select_args)
Cory Benfield84a121e2014-03-31 20:30:25 +01001710
1711
Cory Benfield0ea76e72015-03-22 09:05:28 +00001712 def test_npn_select_error(self):
1713 """
1714 Test that we can handle exceptions in the select callback. If select
1715 fails it should be fatal to the connection.
1716 """
1717 advertise_args = []
1718 def advertise(conn):
1719 advertise_args.append((conn,))
1720 return [b'http/1.1', b'spdy/2']
1721 def select(conn, options):
1722 raise TypeError
1723
1724 server_context = Context(TLSv1_METHOD)
1725 server_context.set_npn_advertise_callback(advertise)
1726
1727 client_context = Context(TLSv1_METHOD)
1728 client_context.set_npn_select_callback(select)
1729
1730 # Necessary to actually accept the connection
1731 server_context.use_privatekey(
1732 load_privatekey(FILETYPE_PEM, server_key_pem))
1733 server_context.use_certificate(
1734 load_certificate(FILETYPE_PEM, server_cert_pem))
1735
1736 # Do a little connection to trigger the logic
1737 server = Connection(server_context, None)
1738 server.set_accept_state()
1739
1740 client = Connection(client_context, None)
1741 client.set_connect_state()
1742
1743 # If the callback throws an exception it should be raised here.
1744 self.assertRaises(TypeError, self._interactInMemory, server, client)
1745 self.assertEqual([(server,)], advertise_args)
1746
1747
1748 def test_npn_advertise_error(self):
1749 """
1750 Test that we can handle exceptions in the advertise callback. If
1751 advertise fails no NPN is advertised to the client.
1752 """
1753 select_args = []
1754 def advertise(conn):
1755 raise TypeError
1756 def select(conn, options):
1757 select_args.append((conn, options))
1758 return b''
1759
1760 server_context = Context(TLSv1_METHOD)
1761 server_context.set_npn_advertise_callback(advertise)
1762
1763 client_context = Context(TLSv1_METHOD)
1764 client_context.set_npn_select_callback(select)
1765
1766 # Necessary to actually accept the connection
1767 server_context.use_privatekey(
1768 load_privatekey(FILETYPE_PEM, server_key_pem))
1769 server_context.use_certificate(
1770 load_certificate(FILETYPE_PEM, server_cert_pem))
1771
1772 # Do a little connection to trigger the logic
1773 server = Connection(server_context, None)
1774 server.set_accept_state()
1775
1776 client = Connection(client_context, None)
1777 client.set_connect_state()
1778
1779 # If the client doesn't return anything, the connection will fail.
1780 self.assertRaises(TypeError, self._interactInMemory, server, client)
1781 self.assertEqual([], select_args)
1782
1783
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04001784
Cory Benfield12eae892014-06-07 15:42:56 +01001785class ApplicationLayerProtoNegotiationTests(TestCase, _LoopbackMixin):
1786 """
1787 Tests for ALPN in PyOpenSSL.
1788 """
1789 def test_alpn_success(self):
1790 """
1791 Tests that clients and servers that agree on the negotiated ALPN
1792 protocol can correct establish a connection, and that the agreed
1793 protocol is reported by the connections.
1794 """
1795 select_args = []
1796 def select(conn, options):
1797 select_args.append((conn, options))
1798 return b'spdy/2'
1799
1800 client_context = Context(TLSv1_METHOD)
1801 client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
1802
1803 server_context = Context(TLSv1_METHOD)
1804 server_context.set_alpn_select_callback(select)
1805
1806 # Necessary to actually accept the connection
1807 server_context.use_privatekey(
1808 load_privatekey(FILETYPE_PEM, server_key_pem))
1809 server_context.use_certificate(
1810 load_certificate(FILETYPE_PEM, server_cert_pem))
1811
1812 # Do a little connection to trigger the logic
1813 server = Connection(server_context, None)
1814 server.set_accept_state()
1815
1816 client = Connection(client_context, None)
1817 client.set_connect_state()
1818
1819 self._interactInMemory(server, client)
1820
Cory Benfield75279582015-04-12 08:52:17 -04001821 self.assertEqual([(server, [b'http/1.1', b'spdy/2'])], select_args)
Cory Benfield12eae892014-06-07 15:42:56 +01001822
1823 self.assertEqual(server.get_alpn_proto_negotiated(), b'spdy/2')
1824 self.assertEqual(client.get_alpn_proto_negotiated(), b'spdy/2')
1825
1826
1827 def test_alpn_set_on_connection(self):
1828 """
1829 The same as test_alpn_success, but setting the ALPN protocols on the
1830 connection rather than the context.
1831 """
1832 select_args = []
1833 def select(conn, options):
1834 select_args.append((conn, options))
1835 return b'spdy/2'
1836
1837 # Setup the client context but don't set any ALPN protocols.
1838 client_context = Context(TLSv1_METHOD)
1839
1840 server_context = Context(TLSv1_METHOD)
1841 server_context.set_alpn_select_callback(select)
1842
1843 # Necessary to actually accept the connection
1844 server_context.use_privatekey(
1845 load_privatekey(FILETYPE_PEM, server_key_pem))
1846 server_context.use_certificate(
1847 load_certificate(FILETYPE_PEM, server_cert_pem))
1848
1849 # Do a little connection to trigger the logic
1850 server = Connection(server_context, None)
1851 server.set_accept_state()
1852
1853 # Set the ALPN protocols on the client connection.
1854 client = Connection(client_context, None)
1855 client.set_alpn_protos([b'http/1.1', b'spdy/2'])
1856 client.set_connect_state()
1857
1858 self._interactInMemory(server, client)
1859
Cory Benfield75279582015-04-12 08:52:17 -04001860 self.assertEqual([(server, [b'http/1.1', b'spdy/2'])], select_args)
Cory Benfield12eae892014-06-07 15:42:56 +01001861
1862 self.assertEqual(server.get_alpn_proto_negotiated(), b'spdy/2')
1863 self.assertEqual(client.get_alpn_proto_negotiated(), b'spdy/2')
1864
1865
1866 def test_alpn_server_fail(self):
1867 """
1868 Tests that when clients and servers cannot agree on what protocol to
1869 use next that the TLS connection does not get established.
1870 """
1871 select_args = []
1872 def select(conn, options):
1873 select_args.append((conn, options))
1874 return b''
1875
1876 client_context = Context(TLSv1_METHOD)
1877 client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
1878
1879 server_context = Context(TLSv1_METHOD)
1880 server_context.set_alpn_select_callback(select)
1881
1882 # Necessary to actually accept the connection
1883 server_context.use_privatekey(
1884 load_privatekey(FILETYPE_PEM, server_key_pem))
1885 server_context.use_certificate(
1886 load_certificate(FILETYPE_PEM, server_cert_pem))
1887
1888 # Do a little connection to trigger the logic
1889 server = Connection(server_context, None)
1890 server.set_accept_state()
1891
1892 client = Connection(client_context, None)
1893 client.set_connect_state()
1894
1895 # If the client doesn't return anything, the connection will fail.
1896 self.assertRaises(Error, self._interactInMemory, server, client)
1897
Cory Benfield75279582015-04-12 08:52:17 -04001898 self.assertEqual([(server, [b'http/1.1', b'spdy/2'])], select_args)
Cory Benfield12eae892014-06-07 15:42:56 +01001899
1900
Cory Benfielde3d57152015-04-11 17:57:35 -04001901 def test_alpn_no_server(self):
1902 """
1903 Tests that when clients and servers cannot agree on what protocol to
1904 use next because the server doesn't offer ALPN.
1905 """
Cory Benfielde3d57152015-04-11 17:57:35 -04001906 client_context = Context(TLSv1_METHOD)
1907 client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
1908
1909 server_context = Context(TLSv1_METHOD)
1910
1911 # Necessary to actually accept the connection
1912 server_context.use_privatekey(
1913 load_privatekey(FILETYPE_PEM, server_key_pem))
1914 server_context.use_certificate(
1915 load_certificate(FILETYPE_PEM, server_cert_pem))
1916
1917 # Do a little connection to trigger the logic
1918 server = Connection(server_context, None)
1919 server.set_accept_state()
1920
1921 client = Connection(client_context, None)
1922 client.set_connect_state()
1923
1924 # Do the dance.
1925 self._interactInMemory(server, client)
1926
Cory Benfielde3d57152015-04-11 17:57:35 -04001927 self.assertEqual(client.get_alpn_proto_negotiated(), b'')
1928
1929
Cory Benfieldf1177e72015-04-12 09:11:49 -04001930 def test_alpn_callback_exception(self):
1931 """
1932 Test that we can handle exceptions in the ALPN select callback.
1933 """
1934 select_args = []
1935 def select(conn, options):
1936 select_args.append((conn, options))
1937 raise TypeError
1938
1939 client_context = Context(TLSv1_METHOD)
1940 client_context.set_alpn_protos([b'http/1.1', b'spdy/2'])
1941
1942 server_context = Context(TLSv1_METHOD)
1943 server_context.set_alpn_select_callback(select)
1944
1945 # Necessary to actually accept the connection
1946 server_context.use_privatekey(
1947 load_privatekey(FILETYPE_PEM, server_key_pem))
1948 server_context.use_certificate(
1949 load_certificate(FILETYPE_PEM, server_cert_pem))
1950
1951 # Do a little connection to trigger the logic
1952 server = Connection(server_context, None)
1953 server.set_accept_state()
1954
1955 client = Connection(client_context, None)
1956 client.set_connect_state()
1957
1958 # If the client doesn't return anything, the connection will fail.
1959 self.assertRaises(TypeError, self._interactInMemory, server, client)
1960 self.assertEqual([(server, [b'http/1.1', b'spdy/2'])], select_args)
1961
1962
Cory Benfield12eae892014-06-07 15:42:56 +01001963
Jean-Paul Calderonee0fcf512012-02-13 09:10:15 -05001964class SessionTests(TestCase):
1965 """
1966 Unit tests for :py:obj:`OpenSSL.SSL.Session`.
1967 """
1968 def test_construction(self):
1969 """
1970 :py:class:`Session` can be constructed with no arguments, creating a new
1971 instance of that type.
1972 """
1973 new_session = Session()
1974 self.assertTrue(isinstance(new_session, Session))
1975
1976
1977 def test_construction_wrong_args(self):
1978 """
1979 If any arguments are passed to :py:class:`Session`, :py:obj:`TypeError`
1980 is raised.
1981 """
1982 self.assertRaises(TypeError, Session, 123)
1983 self.assertRaises(TypeError, Session, "hello")
1984 self.assertRaises(TypeError, Session, object())
1985
1986
1987
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04001988class ConnectionTests(TestCase, _LoopbackMixin):
Rick Deane15b1472009-07-09 15:53:42 -05001989 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09001990 Unit tests for :py:obj:`OpenSSL.SSL.Connection`.
Rick Deane15b1472009-07-09 15:53:42 -05001991 """
Jean-Paul Calderone541eaf22010-09-09 18:55:08 -04001992 # XXX get_peer_certificate -> None
1993 # XXX sock_shutdown
1994 # XXX master_key -> TypeError
1995 # XXX server_random -> TypeError
1996 # XXX state_string
1997 # XXX connect -> TypeError
1998 # XXX connect_ex -> TypeError
1999 # XXX set_connect_state -> TypeError
2000 # XXX set_accept_state -> TypeError
2001 # XXX renegotiate_pending
2002 # XXX do_handshake -> TypeError
2003 # XXX bio_read -> TypeError
2004 # XXX recv -> TypeError
2005 # XXX send -> TypeError
2006 # XXX bio_write -> TypeError
2007
Rick Deane15b1472009-07-09 15:53:42 -05002008 def test_type(self):
2009 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002010 :py:obj:`Connection` and :py:obj:`ConnectionType` refer to the same type object and
Jean-Paul Calderone68649052009-07-17 21:14:27 -04002011 can be used to create instances of that type.
Rick Deane15b1472009-07-09 15:53:42 -05002012 """
Jean-Paul Calderone68649052009-07-17 21:14:27 -04002013 self.assertIdentical(Connection, ConnectionType)
Rick Deane15b1472009-07-09 15:53:42 -05002014 ctx = Context(TLSv1_METHOD)
Jean-Paul Calderone68649052009-07-17 21:14:27 -04002015 self.assertConsistentType(Connection, 'Connection', ctx, None)
Rick Deane15b1472009-07-09 15:53:42 -05002016
Jean-Paul Calderone68649052009-07-17 21:14:27 -04002017
Jean-Paul Calderone4fd058a2009-11-22 11:46:42 -05002018 def test_get_context(self):
2019 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002020 :py:obj:`Connection.get_context` returns the :py:obj:`Context` instance used to
2021 construct the :py:obj:`Connection` instance.
Jean-Paul Calderone4fd058a2009-11-22 11:46:42 -05002022 """
2023 context = Context(TLSv1_METHOD)
2024 connection = Connection(context, None)
2025 self.assertIdentical(connection.get_context(), context)
2026
2027
2028 def test_get_context_wrong_args(self):
2029 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002030 :py:obj:`Connection.get_context` raises :py:obj:`TypeError` if called with any
Jean-Paul Calderone4fd058a2009-11-22 11:46:42 -05002031 arguments.
2032 """
2033 connection = Connection(Context(TLSv1_METHOD), None)
2034 self.assertRaises(TypeError, connection.get_context, None)
2035
2036
Jean-Paul Calderone95613b72011-05-25 22:30:21 -04002037 def test_set_context_wrong_args(self):
2038 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002039 :py:obj:`Connection.set_context` raises :py:obj:`TypeError` if called with a
2040 non-:py:obj:`Context` instance argument or with any number of arguments other
Jean-Paul Calderone95613b72011-05-25 22:30:21 -04002041 than 1.
2042 """
2043 ctx = Context(TLSv1_METHOD)
2044 connection = Connection(ctx, None)
2045 self.assertRaises(TypeError, connection.set_context)
2046 self.assertRaises(TypeError, connection.set_context, object())
2047 self.assertRaises(TypeError, connection.set_context, "hello")
2048 self.assertRaises(TypeError, connection.set_context, 1)
2049 self.assertRaises(TypeError, connection.set_context, 1, 2)
2050 self.assertRaises(
2051 TypeError, connection.set_context, Context(TLSv1_METHOD), 2)
2052 self.assertIdentical(ctx, connection.get_context())
2053
2054
2055 def test_set_context(self):
2056 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002057 :py:obj:`Connection.set_context` specifies a new :py:obj:`Context` instance to be used
Jean-Paul Calderone95613b72011-05-25 22:30:21 -04002058 for the connection.
2059 """
2060 original = Context(SSLv23_METHOD)
2061 replacement = Context(TLSv1_METHOD)
2062 connection = Connection(original, None)
2063 connection.set_context(replacement)
2064 self.assertIdentical(replacement, connection.get_context())
2065 # Lose our references to the contexts, just in case the Connection isn't
2066 # properly managing its own contributions to their reference counts.
2067 del original, replacement
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04002068 collect()
2069
2070
2071 def test_set_tlsext_host_name_wrong_args(self):
2072 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002073 If :py:obj:`Connection.set_tlsext_host_name` is called with a non-byte string
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04002074 argument or a byte string with an embedded NUL or other than one
Jonathan Ballet648875f2011-07-16 14:14:58 +09002075 argument, :py:obj:`TypeError` is raised.
Jean-Paul Calderonec4cb6582011-05-26 18:47:00 -04002076 """
2077 conn = Connection(Context(TLSv1_METHOD), None)
2078 self.assertRaises(TypeError, conn.set_tlsext_host_name)
2079 self.assertRaises(TypeError, conn.set_tlsext_host_name, object())
2080 self.assertRaises(TypeError, conn.set_tlsext_host_name, 123, 456)
2081 self.assertRaises(
2082 TypeError, conn.set_tlsext_host_name, b("with\0null"))
2083
2084 if version_info >= (3,):
2085 # On Python 3.x, don't accidentally implicitly convert from text.
2086 self.assertRaises(
2087 TypeError,
2088 conn.set_tlsext_host_name, b("example.com").decode("ascii"))
Jean-Paul Calderone95613b72011-05-25 22:30:21 -04002089
2090
Jean-Paul Calderone871a4d82011-05-26 19:24:02 -04002091 def test_get_servername_wrong_args(self):
2092 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002093 :py:obj:`Connection.get_servername` raises :py:obj:`TypeError` if called with any
Jean-Paul Calderone871a4d82011-05-26 19:24:02 -04002094 arguments.
2095 """
2096 connection = Connection(Context(TLSv1_METHOD), None)
2097 self.assertRaises(TypeError, connection.get_servername, object())
2098 self.assertRaises(TypeError, connection.get_servername, 1)
2099 self.assertRaises(TypeError, connection.get_servername, "hello")
2100
2101
Jean-Paul Calderone1d69a722010-07-29 09:05:53 -04002102 def test_pending(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002103 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002104 :py:obj:`Connection.pending` returns the number of bytes available for
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002105 immediate read.
2106 """
Jean-Paul Calderone1d69a722010-07-29 09:05:53 -04002107 connection = Connection(Context(TLSv1_METHOD), None)
2108 self.assertEquals(connection.pending(), 0)
2109
2110
2111 def test_pending_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002112 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002113 :py:obj:`Connection.pending` raises :py:obj:`TypeError` if called with any arguments.
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002114 """
Jean-Paul Calderone1d69a722010-07-29 09:05:53 -04002115 connection = Connection(Context(TLSv1_METHOD), None)
2116 self.assertRaises(TypeError, connection.pending, None)
2117
2118
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002119 def test_connect_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002120 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002121 :py:obj:`Connection.connect` raises :py:obj:`TypeError` if called with a non-address
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002122 argument or with the wrong number of arguments.
2123 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002124 connection = Connection(Context(TLSv1_METHOD), socket())
2125 self.assertRaises(TypeError, connection.connect, None)
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002126 self.assertRaises(TypeError, connection.connect)
2127 self.assertRaises(TypeError, connection.connect, ("127.0.0.1", 1), None)
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002128
2129
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -04002130 def test_connect_refused(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002131 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002132 :py:obj:`Connection.connect` raises :py:obj:`socket.error` if the underlying socket
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002133 connect method raises it.
2134 """
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -04002135 client = socket()
2136 context = Context(TLSv1_METHOD)
2137 clientSSL = Connection(context, client)
2138 exc = self.assertRaises(error, clientSSL.connect, ("127.0.0.1", 1))
Jean-Paul Calderone35adf9d2010-07-29 18:40:44 -04002139 self.assertEquals(exc.args[0], ECONNREFUSED)
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -04002140
2141
2142 def test_connect(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002143 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002144 :py:obj:`Connection.connect` establishes a connection to the specified address.
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002145 """
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -04002146 port = socket()
2147 port.bind(('', 0))
2148 port.listen(3)
2149
2150 clientSSL = Connection(Context(TLSv1_METHOD), socket())
Jean-Paul Calderoneb6e0fd92010-09-19 09:22:13 -04002151 clientSSL.connect(('127.0.0.1', port.getsockname()[1]))
2152 # XXX An assertion? Or something?
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -04002153
2154
Jean-Paul Calderone7b614872010-09-24 17:43:44 -04002155 if platform == "darwin":
2156 "connect_ex sometimes causes a kernel panic on OS X 10.6.4"
2157 else:
2158 def test_connect_ex(self):
2159 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002160 If there is a connection error, :py:obj:`Connection.connect_ex` returns the
Jean-Paul Calderone7b614872010-09-24 17:43:44 -04002161 errno instead of raising an exception.
2162 """
2163 port = socket()
2164 port.bind(('', 0))
2165 port.listen(3)
Jean-Paul Calderone55d91f42010-07-29 19:22:17 -04002166
Jean-Paul Calderone7b614872010-09-24 17:43:44 -04002167 clientSSL = Connection(Context(TLSv1_METHOD), socket())
2168 clientSSL.setblocking(False)
2169 result = clientSSL.connect_ex(port.getsockname())
2170 expected = (EINPROGRESS, EWOULDBLOCK)
2171 self.assertTrue(
2172 result in expected, "%r not in %r" % (result, expected))
Jean-Paul Calderone55d91f42010-07-29 19:22:17 -04002173
2174
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002175 def test_accept_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002176 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002177 :py:obj:`Connection.accept` raises :py:obj:`TypeError` if called with any arguments.
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002178 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002179 connection = Connection(Context(TLSv1_METHOD), socket())
2180 self.assertRaises(TypeError, connection.accept, None)
2181
2182
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -04002183 def test_accept(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002184 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002185 :py:obj:`Connection.accept` accepts a pending connection attempt and returns a
2186 tuple of a new :py:obj:`Connection` (the accepted client) and the address the
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002187 connection originated from.
2188 """
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -04002189 ctx = Context(TLSv1_METHOD)
2190 ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
2191 ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -04002192 port = socket()
2193 portSSL = Connection(ctx, port)
2194 portSSL.bind(('', 0))
2195 portSSL.listen(3)
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -04002196
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -04002197 clientSSL = Connection(Context(TLSv1_METHOD), socket())
Jean-Paul Calderoneb6e0fd92010-09-19 09:22:13 -04002198
2199 # Calling portSSL.getsockname() here to get the server IP address sounds
2200 # great, but frequently fails on Windows.
2201 clientSSL.connect(('127.0.0.1', portSSL.getsockname()[1]))
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -04002202
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -04002203 serverSSL, address = portSSL.accept()
2204
2205 self.assertTrue(isinstance(serverSSL, Connection))
2206 self.assertIdentical(serverSSL.get_context(), ctx)
2207 self.assertEquals(address, clientSSL.getsockname())
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -04002208
2209
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002210 def test_shutdown_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002211 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002212 :py:obj:`Connection.shutdown` raises :py:obj:`TypeError` if called with the wrong
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002213 number of arguments or with arguments other than integers.
2214 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002215 connection = Connection(Context(TLSv1_METHOD), None)
2216 self.assertRaises(TypeError, connection.shutdown, None)
Jean-Paul Calderone1d3e0222010-07-29 22:52:45 -04002217 self.assertRaises(TypeError, connection.get_shutdown, None)
2218 self.assertRaises(TypeError, connection.set_shutdown)
2219 self.assertRaises(TypeError, connection.set_shutdown, None)
2220 self.assertRaises(TypeError, connection.set_shutdown, 0, 1)
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002221
2222
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04002223 def test_shutdown(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002224 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002225 :py:obj:`Connection.shutdown` performs an SSL-level connection shutdown.
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002226 """
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04002227 server, client = self._loopback()
Jean-Paul Calderonee4f6b472010-07-29 22:50:58 -04002228 self.assertFalse(server.shutdown())
2229 self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN)
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04002230 self.assertRaises(ZeroReturnError, client.recv, 1024)
Jean-Paul Calderonee4f6b472010-07-29 22:50:58 -04002231 self.assertEquals(client.get_shutdown(), RECEIVED_SHUTDOWN)
2232 client.shutdown()
2233 self.assertEquals(client.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN)
2234 self.assertRaises(ZeroReturnError, server.recv, 1024)
2235 self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN)
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -04002236
2237
Paul Aurichc85e0862015-01-08 08:34:33 -08002238 def test_shutdown_closed(self):
2239 """
2240 If the underlying socket is closed, :py:obj:`Connection.shutdown` propagates the
2241 write error from the low level write call.
2242 """
2243 server, client = self._loopback()
2244 server.sock_shutdown(2)
2245 exc = self.assertRaises(SysCallError, server.shutdown)
2246 if platform == "win32":
2247 self.assertEqual(exc.args[0], ESHUTDOWN)
2248 else:
2249 self.assertEqual(exc.args[0], EPIPE)
2250
2251
Jean-Paul Calderonec89eef22010-07-29 22:51:58 -04002252 def test_set_shutdown(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002253 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002254 :py:obj:`Connection.set_shutdown` sets the state of the SSL connection shutdown
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002255 process.
2256 """
Jean-Paul Calderonec89eef22010-07-29 22:51:58 -04002257 connection = Connection(Context(TLSv1_METHOD), socket())
2258 connection.set_shutdown(RECEIVED_SHUTDOWN)
2259 self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
2260
2261
Jean-Paul Calderonef73a3cb2014-02-09 08:49:06 -05002262 if not PY3:
2263 def test_set_shutdown_long(self):
2264 """
2265 On Python 2 :py:obj:`Connection.set_shutdown` accepts an argument
2266 of type :py:obj:`long` as well as :py:obj:`int`.
2267 """
2268 connection = Connection(Context(TLSv1_METHOD), socket())
2269 connection.set_shutdown(long(RECEIVED_SHUTDOWN))
2270 self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
2271
2272
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002273 def test_app_data_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002274 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002275 :py:obj:`Connection.set_app_data` raises :py:obj:`TypeError` if called with other than
2276 one argument. :py:obj:`Connection.get_app_data` raises :py:obj:`TypeError` if called
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002277 with any arguments.
2278 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002279 conn = Connection(Context(TLSv1_METHOD), None)
2280 self.assertRaises(TypeError, conn.get_app_data, None)
2281 self.assertRaises(TypeError, conn.set_app_data)
2282 self.assertRaises(TypeError, conn.set_app_data, None, None)
2283
2284
Jean-Paul Calderone1bd8c792010-07-29 09:51:06 -04002285 def test_app_data(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002286 """
2287 Any object can be set as app data by passing it to
Jonathan Ballet648875f2011-07-16 14:14:58 +09002288 :py:obj:`Connection.set_app_data` and later retrieved with
2289 :py:obj:`Connection.get_app_data`.
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002290 """
Jean-Paul Calderone1bd8c792010-07-29 09:51:06 -04002291 conn = Connection(Context(TLSv1_METHOD), None)
2292 app_data = object()
2293 conn.set_app_data(app_data)
2294 self.assertIdentical(conn.get_app_data(), app_data)
2295
2296
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002297 def test_makefile(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002298 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002299 :py:obj:`Connection.makefile` is not implemented and calling that method raises
2300 :py:obj:`NotImplementedError`.
Jean-Paul Calderone93dba222010-09-08 22:59:37 -04002301 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04002302 conn = Connection(Context(TLSv1_METHOD), None)
2303 self.assertRaises(NotImplementedError, conn.makefile)
2304
2305
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -04002306 def test_get_peer_cert_chain_wrong_args(self):
2307 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002308 :py:obj:`Connection.get_peer_cert_chain` raises :py:obj:`TypeError` if called with any
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -04002309 arguments.
2310 """
2311 conn = Connection(Context(TLSv1_METHOD), None)
2312 self.assertRaises(TypeError, conn.get_peer_cert_chain, 1)
2313 self.assertRaises(TypeError, conn.get_peer_cert_chain, "foo")
2314 self.assertRaises(TypeError, conn.get_peer_cert_chain, object())
2315 self.assertRaises(TypeError, conn.get_peer_cert_chain, [])
2316
2317
2318 def test_get_peer_cert_chain(self):
2319 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002320 :py:obj:`Connection.get_peer_cert_chain` returns a list of certificates which
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -04002321 the connected server returned for the certification verification.
2322 """
2323 chain = _create_certificate_chain()
2324 [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
2325
2326 serverContext = Context(TLSv1_METHOD)
2327 serverContext.use_privatekey(skey)
2328 serverContext.use_certificate(scert)
2329 serverContext.add_extra_chain_cert(icert)
2330 serverContext.add_extra_chain_cert(cacert)
2331 server = Connection(serverContext, None)
2332 server.set_accept_state()
2333
2334 # Create the client
2335 clientContext = Context(TLSv1_METHOD)
2336 clientContext.set_verify(VERIFY_NONE, verify_cb)
2337 client = Connection(clientContext, None)
2338 client.set_connect_state()
2339
2340 self._interactInMemory(client, server)
2341
2342 chain = client.get_peer_cert_chain()
2343 self.assertEqual(len(chain), 3)
Jean-Paul Calderonee33300c2011-05-19 21:44:38 -04002344 self.assertEqual(
Jean-Paul Calderone24a42432011-05-19 22:21:18 -04002345 "Server Certificate", chain[0].get_subject().CN)
Jean-Paul Calderonee33300c2011-05-19 21:44:38 -04002346 self.assertEqual(
Jean-Paul Calderone24a42432011-05-19 22:21:18 -04002347 "Intermediate Certificate", chain[1].get_subject().CN)
Jean-Paul Calderonee33300c2011-05-19 21:44:38 -04002348 self.assertEqual(
Jean-Paul Calderone24a42432011-05-19 22:21:18 -04002349 "Authority Certificate", chain[2].get_subject().CN)
2350
2351
2352 def test_get_peer_cert_chain_none(self):
2353 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002354 :py:obj:`Connection.get_peer_cert_chain` returns :py:obj:`None` if the peer sends no
Jean-Paul Calderone24a42432011-05-19 22:21:18 -04002355 certificate chain.
2356 """
2357 ctx = Context(TLSv1_METHOD)
2358 ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
2359 ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
2360 server = Connection(ctx, None)
2361 server.set_accept_state()
2362 client = Connection(Context(TLSv1_METHOD), None)
2363 client.set_connect_state()
2364 self._interactInMemory(client, server)
2365 self.assertIdentical(None, server.get_peer_cert_chain())
Jean-Paul Calderone20222ae2011-05-19 21:43:46 -04002366
2367
Jean-Paul Calderone64eaffc2012-02-13 11:53:49 -05002368 def test_get_session_wrong_args(self):
2369 """
2370 :py:obj:`Connection.get_session` raises :py:obj:`TypeError` if called
2371 with any arguments.
2372 """
2373 ctx = Context(TLSv1_METHOD)
2374 server = Connection(ctx, None)
2375 self.assertRaises(TypeError, server.get_session, 123)
2376 self.assertRaises(TypeError, server.get_session, "hello")
2377 self.assertRaises(TypeError, server.get_session, object())
2378
2379
2380 def test_get_session_unconnected(self):
2381 """
2382 :py:obj:`Connection.get_session` returns :py:obj:`None` when used with
2383 an object which has not been connected.
2384 """
2385 ctx = Context(TLSv1_METHOD)
2386 server = Connection(ctx, None)
2387 session = server.get_session()
2388 self.assertIdentical(None, session)
2389
2390
2391 def test_server_get_session(self):
2392 """
2393 On the server side of a connection, :py:obj:`Connection.get_session`
2394 returns a :py:class:`Session` instance representing the SSL session for
2395 that connection.
2396 """
2397 server, client = self._loopback()
2398 session = server.get_session()
Jean-Paul Calderonea63714c2013-03-05 17:02:26 -08002399 self.assertIsInstance(session, Session)
Jean-Paul Calderone64eaffc2012-02-13 11:53:49 -05002400
2401
2402 def test_client_get_session(self):
2403 """
2404 On the client side of a connection, :py:obj:`Connection.get_session`
2405 returns a :py:class:`Session` instance representing the SSL session for
2406 that connection.
2407 """
2408 server, client = self._loopback()
2409 session = client.get_session()
Jean-Paul Calderonea63714c2013-03-05 17:02:26 -08002410 self.assertIsInstance(session, Session)
Jean-Paul Calderone64eaffc2012-02-13 11:53:49 -05002411
2412
Jean-Paul Calderonefef5c4b2012-02-14 16:31:52 -05002413 def test_set_session_wrong_args(self):
2414 """
2415 If called with an object that is not an instance of :py:class:`Session`,
2416 or with other than one argument, :py:obj:`Connection.set_session` raises
2417 :py:obj:`TypeError`.
2418 """
2419 ctx = Context(TLSv1_METHOD)
2420 connection = Connection(ctx, None)
2421 self.assertRaises(TypeError, connection.set_session)
2422 self.assertRaises(TypeError, connection.set_session, 123)
2423 self.assertRaises(TypeError, connection.set_session, "hello")
2424 self.assertRaises(TypeError, connection.set_session, object())
2425 self.assertRaises(
2426 TypeError, connection.set_session, Session(), Session())
2427
2428
2429 def test_client_set_session(self):
2430 """
2431 :py:obj:`Connection.set_session`, when used prior to a connection being
2432 established, accepts a :py:class:`Session` instance and causes an
2433 attempt to re-use the session it represents when the SSL handshake is
2434 performed.
2435 """
2436 key = load_privatekey(FILETYPE_PEM, server_key_pem)
2437 cert = load_certificate(FILETYPE_PEM, server_cert_pem)
2438 ctx = Context(TLSv1_METHOD)
2439 ctx.use_privatekey(key)
2440 ctx.use_certificate(cert)
2441 ctx.set_session_id("unity-test")
2442
2443 def makeServer(socket):
2444 server = Connection(ctx, socket)
2445 server.set_accept_state()
2446 return server
2447
2448 originalServer, originalClient = self._loopback(
2449 serverFactory=makeServer)
2450 originalSession = originalClient.get_session()
2451
2452 def makeClient(socket):
2453 client = self._loopbackClientFactory(socket)
2454 client.set_session(originalSession)
2455 return client
2456 resumedServer, resumedClient = self._loopback(
2457 serverFactory=makeServer,
2458 clientFactory=makeClient)
2459
2460 # This is a proxy: in general, we have no access to any unique
2461 # identifier for the session (new enough versions of OpenSSL expose a
2462 # hash which could be usable, but "new enough" is very, very new).
2463 # Instead, exploit the fact that the master key is re-used if the
2464 # session is re-used. As long as the master key for the two connections
2465 # is the same, the session was re-used!
2466 self.assertEqual(
2467 originalServer.master_key(), resumedServer.master_key())
2468
2469
Jean-Paul Calderone5ea41492012-02-14 16:51:35 -05002470 def test_set_session_wrong_method(self):
2471 """
Jean-Paul Calderone068cb592012-02-14 16:52:43 -05002472 If :py:obj:`Connection.set_session` is passed a :py:class:`Session`
2473 instance associated with a context using a different SSL method than the
2474 :py:obj:`Connection` is using, a :py:class:`OpenSSL.SSL.Error` is
2475 raised.
Jean-Paul Calderone5ea41492012-02-14 16:51:35 -05002476 """
2477 key = load_privatekey(FILETYPE_PEM, server_key_pem)
2478 cert = load_certificate(FILETYPE_PEM, server_cert_pem)
2479 ctx = Context(TLSv1_METHOD)
2480 ctx.use_privatekey(key)
2481 ctx.use_certificate(cert)
2482 ctx.set_session_id("unity-test")
2483
2484 def makeServer(socket):
2485 server = Connection(ctx, socket)
2486 server.set_accept_state()
2487 return server
2488
2489 originalServer, originalClient = self._loopback(
2490 serverFactory=makeServer)
2491 originalSession = originalClient.get_session()
2492
2493 def makeClient(socket):
2494 # Intentionally use a different, incompatible method here.
2495 client = Connection(Context(SSLv3_METHOD), socket)
2496 client.set_connect_state()
2497 client.set_session(originalSession)
2498 return client
2499
2500 self.assertRaises(
2501 Error,
2502 self._loopback, clientFactory=makeClient, serverFactory=makeServer)
2503
2504
Jean-Paul Calderoned899af02013-03-19 22:10:37 -07002505 def test_wantWriteError(self):
2506 """
2507 :py:obj:`Connection` methods which generate output raise
2508 :py:obj:`OpenSSL.SSL.WantWriteError` if writing to the connection's BIO
2509 fail indicating a should-write state.
2510 """
2511 client_socket, server_socket = socket_pair()
2512 # Fill up the client's send buffer so Connection won't be able to write
Jean-Paul Calderonebbff8b92014-03-22 14:20:30 -04002513 # anything. Only write a single byte at a time so we can be sure we
2514 # completely fill the buffer. Even though the socket API is allowed to
2515 # signal a short write via its return value it seems this doesn't
2516 # always happen on all platforms (FreeBSD and OS X particular) for the
2517 # very last bit of available buffer space.
2518 msg = b"x"
2519 for i in range(1024 * 1024 * 4):
Jean-Paul Calderoned899af02013-03-19 22:10:37 -07002520 try:
2521 client_socket.send(msg)
2522 except error as e:
2523 if e.errno == EWOULDBLOCK:
2524 break
2525 raise
2526 else:
2527 self.fail(
2528 "Failed to fill socket buffer, cannot test BIO want write")
2529
2530 ctx = Context(TLSv1_METHOD)
2531 conn = Connection(ctx, client_socket)
2532 # Client's speak first, so make it an SSL client
2533 conn.set_connect_state()
2534 self.assertRaises(WantWriteError, conn.do_handshake)
2535
2536 # XXX want_read
2537
Fedor Brunner416f4a12014-03-28 13:18:38 +01002538 def test_get_finished_before_connect(self):
Fedor Brunner5747b932014-03-05 14:22:34 +01002539 """
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002540 :py:obj:`Connection.get_finished` returns :py:obj:`None` before TLS
2541 handshake is completed.
Fedor Brunner5747b932014-03-05 14:22:34 +01002542 """
Fedor Brunner5747b932014-03-05 14:22:34 +01002543 ctx = Context(TLSv1_METHOD)
2544 connection = Connection(ctx, None)
2545 self.assertEqual(connection.get_finished(), None)
Fedor Brunner416f4a12014-03-28 13:18:38 +01002546
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002547
Fedor Brunner416f4a12014-03-28 13:18:38 +01002548 def test_get_peer_finished_before_connect(self):
2549 """
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002550 :py:obj:`Connection.get_peer_finished` returns :py:obj:`None` before
2551 TLS handshake is completed.
Fedor Brunner416f4a12014-03-28 13:18:38 +01002552 """
Fedor Brunner416f4a12014-03-28 13:18:38 +01002553 ctx = Context(TLSv1_METHOD)
2554 connection = Connection(ctx, None)
Fedor Brunner5747b932014-03-05 14:22:34 +01002555 self.assertEqual(connection.get_peer_finished(), None)
2556
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002557
Fedor Brunner416f4a12014-03-28 13:18:38 +01002558 def test_get_finished(self):
2559 """
2560 :py:obj:`Connection.get_finished` method returns the TLS Finished
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002561 message send from client, or server. Finished messages are send during
2562 TLS handshake.
Fedor Brunner416f4a12014-03-28 13:18:38 +01002563 """
2564
Fedor Brunner5747b932014-03-05 14:22:34 +01002565 server, client = self._loopback()
2566
2567 self.assertNotEqual(server.get_finished(), None)
2568 self.assertTrue(len(server.get_finished()) > 0)
Fedor Brunner416f4a12014-03-28 13:18:38 +01002569
Jean-Paul Calderoned979f232014-03-30 11:58:00 -04002570
Fedor Brunner416f4a12014-03-28 13:18:38 +01002571 def test_get_peer_finished(self):
2572 """
2573 :py:obj:`Connection.get_peer_finished` method returns the TLS Finished
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002574 message received from client, or server. Finished messages are send
2575 during TLS handshake.
Fedor Brunner416f4a12014-03-28 13:18:38 +01002576 """
Fedor Brunner416f4a12014-03-28 13:18:38 +01002577 server, client = self._loopback()
2578
2579 self.assertNotEqual(server.get_peer_finished(), None)
Fedor Brunner5747b932014-03-05 14:22:34 +01002580 self.assertTrue(len(server.get_peer_finished()) > 0)
2581
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002582
Fedor Brunner416f4a12014-03-28 13:18:38 +01002583 def test_tls_finished_message_symmetry(self):
2584 """
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002585 The TLS Finished message send by server must be the TLS Finished message
Fedor Brunner416f4a12014-03-28 13:18:38 +01002586 received by client.
2587
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002588 The TLS Finished message send by client must be the TLS Finished message
Fedor Brunner416f4a12014-03-28 13:18:38 +01002589 received by server.
2590 """
Fedor Brunner416f4a12014-03-28 13:18:38 +01002591 server, client = self._loopback()
2592
Fedor Brunner5747b932014-03-05 14:22:34 +01002593 self.assertEqual(server.get_finished(), client.get_peer_finished())
2594 self.assertEqual(client.get_finished(), server.get_peer_finished())
Jean-Paul Calderone68649052009-07-17 21:14:27 -04002595
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002596
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002597 def test_get_cipher_name_before_connect(self):
2598 """
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002599 :py:obj:`Connection.get_cipher_name` returns :py:obj:`None` if no
2600 connection has been established.
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002601 """
2602 ctx = Context(TLSv1_METHOD)
2603 conn = Connection(ctx, None)
Jean-Paul Calderone069e2bf2014-03-29 18:21:58 -04002604 self.assertIdentical(conn.get_cipher_name(), None)
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002605
Jean-Paul Calderonedbd76272014-03-29 18:09:40 -04002606
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002607 def test_get_cipher_name(self):
2608 """
Jean-Paul Calderone7f0ded42014-03-30 10:34:17 -04002609 :py:obj:`Connection.get_cipher_name` returns a :py:class:`unicode`
2610 string giving the name of the currently used cipher.
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002611 """
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002612 server, client = self._loopback()
2613 server_cipher_name, client_cipher_name = \
2614 server.get_cipher_name(), client.get_cipher_name()
2615
Jean-Paul Calderone7f0ded42014-03-30 10:34:17 -04002616 self.assertIsInstance(server_cipher_name, text_type)
2617 self.assertIsInstance(client_cipher_name, text_type)
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002618
2619 self.assertEqual(server_cipher_name, client_cipher_name)
2620
Jean-Paul Calderonedbd76272014-03-29 18:09:40 -04002621
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002622 def test_get_cipher_version_before_connect(self):
2623 """
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002624 :py:obj:`Connection.get_cipher_version` returns :py:obj:`None` if no
2625 connection has been established.
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002626 """
2627 ctx = Context(TLSv1_METHOD)
2628 conn = Connection(ctx, None)
Jean-Paul Calderone069e2bf2014-03-29 18:21:58 -04002629 self.assertIdentical(conn.get_cipher_version(), None)
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002630
Jean-Paul Calderonedbd76272014-03-29 18:09:40 -04002631
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002632 def test_get_cipher_version(self):
2633 """
Jean-Paul Calderone7f0ded42014-03-30 10:34:17 -04002634 :py:obj:`Connection.get_cipher_version` returns a :py:class:`unicode`
2635 string giving the protocol name of the currently used cipher.
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002636 """
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002637 server, client = self._loopback()
2638 server_cipher_version, client_cipher_version = \
2639 server.get_cipher_version(), client.get_cipher_version()
2640
Jean-Paul Calderone7f0ded42014-03-30 10:34:17 -04002641 self.assertIsInstance(server_cipher_version, text_type)
2642 self.assertIsInstance(client_cipher_version, text_type)
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002643
2644 self.assertEqual(server_cipher_version, client_cipher_version)
2645
Jean-Paul Calderonedbd76272014-03-29 18:09:40 -04002646
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002647 def test_get_cipher_bits_before_connect(self):
2648 """
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002649 :py:obj:`Connection.get_cipher_bits` returns :py:obj:`None` if no
2650 connection has been established.
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002651 """
2652 ctx = Context(TLSv1_METHOD)
2653 conn = Connection(ctx, None)
Jean-Paul Calderone069e2bf2014-03-29 18:21:58 -04002654 self.assertIdentical(conn.get_cipher_bits(), None)
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002655
Jean-Paul Calderonedbd76272014-03-29 18:09:40 -04002656
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002657 def test_get_cipher_bits(self):
2658 """
Jean-Paul Calderone5b05b482014-03-30 11:28:54 -04002659 :py:obj:`Connection.get_cipher_bits` returns the number of secret bits
2660 of the currently used cipher.
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002661 """
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002662 server, client = self._loopback()
2663 server_cipher_bits, client_cipher_bits = \
2664 server.get_cipher_bits(), client.get_cipher_bits()
2665
Fedor Brunner2cffdbc2014-03-10 10:35:23 +01002666 self.assertIsInstance(server_cipher_bits, int)
2667 self.assertIsInstance(client_cipher_bits, int)
Fedor Brunnerd95014a2014-03-03 17:34:41 +01002668
2669 self.assertEqual(server_cipher_bits, client_cipher_bits)
Jean-Paul Calderone68649052009-07-17 21:14:27 -04002670
Jean-Paul Calderonedbd76272014-03-29 18:09:40 -04002671
2672
Jean-Paul Calderonef135e622010-07-28 19:14:16 -04002673class ConnectionGetCipherListTests(TestCase):
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04002674 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002675 Tests for :py:obj:`Connection.get_cipher_list`.
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04002676 """
Jean-Paul Calderone54a2bc02010-09-09 17:52:38 -04002677 def test_wrong_args(self):
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04002678 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002679 :py:obj:`Connection.get_cipher_list` raises :py:obj:`TypeError` if called with any
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04002680 arguments.
2681 """
Jean-Paul Calderonef135e622010-07-28 19:14:16 -04002682 connection = Connection(Context(TLSv1_METHOD), None)
2683 self.assertRaises(TypeError, connection.get_cipher_list, None)
2684
2685
2686 def test_result(self):
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04002687 """
Jean-Paul Calderone4f0467a2014-01-11 11:58:41 -05002688 :py:obj:`Connection.get_cipher_list` returns a :py:obj:`list` of
2689 :py:obj:`bytes` giving the names of the ciphers which might be used.
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04002690 """
Jean-Paul Calderonef135e622010-07-28 19:14:16 -04002691 connection = Connection(Context(TLSv1_METHOD), None)
2692 ciphers = connection.get_cipher_list()
2693 self.assertTrue(isinstance(ciphers, list))
2694 for cipher in ciphers:
2695 self.assertTrue(isinstance(cipher, str))
2696
2697
2698
Jean-Paul Calderone9cbbe262011-01-05 14:53:43 -05002699class ConnectionSendTests(TestCase, _LoopbackMixin):
2700 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002701 Tests for :py:obj:`Connection.send`
Jean-Paul Calderone9cbbe262011-01-05 14:53:43 -05002702 """
2703 def test_wrong_args(self):
2704 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05002705 When called with arguments other than string argument for its first
2706 parameter or more than two arguments, :py:obj:`Connection.send` raises
2707 :py:obj:`TypeError`.
Jean-Paul Calderone9cbbe262011-01-05 14:53:43 -05002708 """
2709 connection = Connection(Context(TLSv1_METHOD), None)
Jean-Paul Calderone77fa2602011-01-05 18:23:39 -05002710 self.assertRaises(TypeError, connection.send)
2711 self.assertRaises(TypeError, connection.send, object())
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05002712 self.assertRaises(TypeError, connection.send, "foo", object(), "bar")
Jean-Paul Calderone9cbbe262011-01-05 14:53:43 -05002713
2714
2715 def test_short_bytes(self):
2716 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002717 When passed a short byte string, :py:obj:`Connection.send` transmits all of it
Jean-Paul Calderone9cbbe262011-01-05 14:53:43 -05002718 and returns the number of bytes sent.
2719 """
2720 server, client = self._loopback()
2721 count = server.send(b('xy'))
2722 self.assertEquals(count, 2)
2723 self.assertEquals(client.recv(2), b('xy'))
2724
2725 try:
2726 memoryview
2727 except NameError:
2728 "cannot test sending memoryview without memoryview"
2729 else:
2730 def test_short_memoryview(self):
2731 """
2732 When passed a memoryview onto a small number of bytes,
Jonathan Ballet648875f2011-07-16 14:14:58 +09002733 :py:obj:`Connection.send` transmits all of them and returns the number of
Jean-Paul Calderone9cbbe262011-01-05 14:53:43 -05002734 bytes sent.
2735 """
2736 server, client = self._loopback()
2737 count = server.send(memoryview(b('xy')))
2738 self.assertEquals(count, 2)
2739 self.assertEquals(client.recv(2), b('xy'))
2740
2741
Markus Unterwaditzer8e41d022014-04-19 12:27:11 +02002742 try:
2743 buffer
2744 except NameError:
2745 "cannot test sending buffer without buffer"
2746 else:
2747 def test_short_buffer(self):
2748 """
2749 When passed a buffer containing a small number of bytes,
2750 :py:obj:`Connection.send` transmits all of them and returns the number of
2751 bytes sent.
2752 """
2753 server, client = self._loopback()
2754 count = server.send(buffer(b('xy')))
2755 self.assertEquals(count, 2)
2756 self.assertEquals(client.recv(2), b('xy'))
2757
2758
Jean-Paul Calderone691e6c92011-01-21 22:04:35 -05002759
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002760def _make_memoryview(size):
2761 """
2762 Create a new ``memoryview`` wrapped around a ``bytearray`` of the given
2763 size.
2764 """
2765 return memoryview(bytearray(size))
2766
2767
2768
Cory Benfield62d10332014-06-15 10:03:41 +01002769class ConnectionRecvIntoTests(TestCase, _LoopbackMixin):
2770 """
2771 Tests for :py:obj:`Connection.recv_into`
2772 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002773 def _no_length_test(self, factory):
Jean-Paul Calderone61559d82015-03-15 17:04:50 -04002774 """
2775 Assert that when the given buffer is passed to
2776 ``Connection.recv_into``, whatever bytes are available to be received
2777 that fit into that buffer are written into that buffer.
2778 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002779 output_buffer = factory(5)
2780
Jean-Paul Calderone332a85e2015-03-15 17:02:40 -04002781 server, client = self._loopback()
2782 server.send(b('xy'))
2783
Jean-Paul Calderone320af1e2015-03-15 17:20:13 -04002784 self.assertEqual(client.recv_into(output_buffer), 2)
2785 self.assertEqual(output_buffer, bytearray(b('xy\x00\x00\x00')))
Jean-Paul Calderone332a85e2015-03-15 17:02:40 -04002786
2787
Jean-Paul Calderoned335b272015-03-15 17:26:38 -04002788 def test_bytearray_no_length(self):
Cory Benfield62d10332014-06-15 10:03:41 +01002789 """
Jean-Paul Calderone61559d82015-03-15 17:04:50 -04002790 :py:obj:`Connection.recv_into` can be passed a ``bytearray`` instance
2791 and data in the receive buffer is written to it.
Cory Benfield62d10332014-06-15 10:03:41 +01002792 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002793 self._no_length_test(bytearray)
Cory Benfield62d10332014-06-15 10:03:41 +01002794
2795
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002796 def _respects_length_test(self, factory):
Cory Benfield62d10332014-06-15 10:03:41 +01002797 """
Jean-Paul Calderonec295a2f2015-03-15 17:11:18 -04002798 Assert that when the given buffer is passed to ``Connection.recv_into``
2799 along with a value for ``nbytes`` that is less than the size of that
2800 buffer, only ``nbytes`` bytes are written into the buffer.
Cory Benfield62d10332014-06-15 10:03:41 +01002801 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002802 output_buffer = factory(10)
2803
Cory Benfield62d10332014-06-15 10:03:41 +01002804 server, client = self._loopback()
2805 server.send(b('abcdefghij'))
Cory Benfield62d10332014-06-15 10:03:41 +01002806
Jean-Paul Calderone320af1e2015-03-15 17:20:13 -04002807 self.assertEqual(client.recv_into(output_buffer, 5), 5)
2808 self.assertEqual(
Jean-Paul Calderonec295a2f2015-03-15 17:11:18 -04002809 output_buffer, bytearray(b('abcde\x00\x00\x00\x00\x00'))
2810 )
2811
2812
Jean-Paul Calderoned335b272015-03-15 17:26:38 -04002813 def test_bytearray_respects_length(self):
Jean-Paul Calderonec295a2f2015-03-15 17:11:18 -04002814 """
2815 When called with a ``bytearray`` instance,
2816 :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter and
2817 doesn't copy in more than that number of bytes.
2818 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002819 self._respects_length_test(bytearray)
Cory Benfield62d10332014-06-15 10:03:41 +01002820
2821
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002822 def _doesnt_overfill_test(self, factory):
Cory Benfield62d10332014-06-15 10:03:41 +01002823 """
Jean-Paul Calderone2b41ad32015-03-15 17:19:39 -04002824 Assert that if there are more bytes available to be read from the
2825 receive buffer than would fit into the buffer passed to
2826 :py:obj:`Connection.recv_into`, only as many as fit are written into
2827 it.
Cory Benfield62d10332014-06-15 10:03:41 +01002828 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002829 output_buffer = factory(5)
2830
Cory Benfield62d10332014-06-15 10:03:41 +01002831 server, client = self._loopback()
2832 server.send(b('abcdefghij'))
Cory Benfield62d10332014-06-15 10:03:41 +01002833
Jean-Paul Calderone320af1e2015-03-15 17:20:13 -04002834 self.assertEqual(client.recv_into(output_buffer), 5)
2835 self.assertEqual(output_buffer, bytearray(b('abcde')))
Jean-Paul Calderone2b41ad32015-03-15 17:19:39 -04002836 rest = client.recv(5)
2837 self.assertEqual(b('fghij'), rest)
2838
2839
Jean-Paul Calderoned335b272015-03-15 17:26:38 -04002840 def test_bytearray_doesnt_overfill(self):
Jean-Paul Calderone2b41ad32015-03-15 17:19:39 -04002841 """
2842 When called with a ``bytearray`` instance,
2843 :py:obj:`Connection.recv_into` respects the size of the array and
2844 doesn't write more bytes into it than will fit.
2845 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002846 self._doesnt_overfill_test(bytearray)
Cory Benfield62d10332014-06-15 10:03:41 +01002847
2848
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002849 def _really_doesnt_overfill_test(self, factory):
Jean-Paul Calderone4bec59a2015-03-15 17:25:57 -04002850 """
2851 Assert that if the value given by ``nbytes`` is greater than the actual
2852 size of the output buffer passed to :py:obj:`Connection.recv_into`, the
2853 behavior is as if no value was given for ``nbytes`` at all.
2854 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002855 output_buffer = factory(5)
2856
Jean-Paul Calderone4bec59a2015-03-15 17:25:57 -04002857 server, client = self._loopback()
2858 server.send(b('abcdefghij'))
2859
2860 self.assertEqual(client.recv_into(output_buffer, 50), 5)
2861 self.assertEqual(output_buffer, bytearray(b('abcde')))
2862 rest = client.recv(5)
2863 self.assertEqual(b('fghij'), rest)
2864
2865
Jean-Paul Calderoned335b272015-03-15 17:26:38 -04002866 def test_bytearray_really_doesnt_overfill(self):
Jean-Paul Calderone4bec59a2015-03-15 17:25:57 -04002867 """
2868 When called with a ``bytearray`` instance and an ``nbytes`` value that
2869 is too large, :py:obj:`Connection.recv_into` respects the size of the
2870 array and not the ``nbytes`` value and doesn't write more bytes into
2871 the buffer than will fit.
2872 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002873 self._doesnt_overfill_test(bytearray)
Jean-Paul Calderone4bec59a2015-03-15 17:25:57 -04002874
2875
Cory Benfield62d10332014-06-15 10:03:41 +01002876 try:
2877 memoryview
2878 except NameError:
2879 "cannot test recv_into memoryview without memoryview"
2880 else:
Jean-Paul Calderone332a85e2015-03-15 17:02:40 -04002881 def test_memoryview_no_length(self):
Cory Benfield62d10332014-06-15 10:03:41 +01002882 """
Jean-Paul Calderone61559d82015-03-15 17:04:50 -04002883 :py:obj:`Connection.recv_into` can be passed a ``memoryview``
2884 instance and data in the receive buffer is written to it.
Cory Benfield62d10332014-06-15 10:03:41 +01002885 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002886 self._no_length_test(_make_memoryview)
Cory Benfield62d10332014-06-15 10:03:41 +01002887
2888
Jean-Paul Calderonec295a2f2015-03-15 17:11:18 -04002889 def test_memoryview_respects_length(self):
Cory Benfield62d10332014-06-15 10:03:41 +01002890 """
Jean-Paul Calderonec295a2f2015-03-15 17:11:18 -04002891 When called with a ``memoryview`` instance,
2892 :py:obj:`Connection.recv_into` respects the ``nbytes`` parameter
2893 and doesn't copy more than that number of bytes in.
Cory Benfield62d10332014-06-15 10:03:41 +01002894 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002895 self._respects_length_test(_make_memoryview)
Cory Benfield62d10332014-06-15 10:03:41 +01002896
2897
Jean-Paul Calderone2b41ad32015-03-15 17:19:39 -04002898 def test_memoryview_doesnt_overfill(self):
Cory Benfield62d10332014-06-15 10:03:41 +01002899 """
Jean-Paul Calderone8fd82552015-03-15 17:21:07 -04002900 When called with a ``memoryview`` instance,
2901 :py:obj:`Connection.recv_into` respects the size of the array and
2902 doesn't write more bytes into it than will fit.
Cory Benfield62d10332014-06-15 10:03:41 +01002903 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002904 self._doesnt_overfill_test(_make_memoryview)
Cory Benfield62d10332014-06-15 10:03:41 +01002905
2906
Jean-Paul Calderone4bec59a2015-03-15 17:25:57 -04002907 def test_memoryview_really_doesnt_overfill(self):
2908 """
2909 When called with a ``memoryview`` instance and an ``nbytes`` value
2910 that is too large, :py:obj:`Connection.recv_into` respects the size
2911 of the array and not the ``nbytes`` value and doesn't write more
2912 bytes into the buffer than will fit.
2913 """
Jean-Paul Calderone6c840102015-03-15 17:32:15 -04002914 self._doesnt_overfill_test(_make_memoryview)
Jean-Paul Calderone4bec59a2015-03-15 17:25:57 -04002915
2916
Cory Benfield62d10332014-06-15 10:03:41 +01002917
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -04002918class ConnectionSendallTests(TestCase, _LoopbackMixin):
2919 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002920 Tests for :py:obj:`Connection.sendall`.
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -04002921 """
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04002922 def test_wrong_args(self):
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -04002923 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05002924 When called with arguments other than a string argument for its first
2925 parameter or with more than two arguments, :py:obj:`Connection.sendall`
2926 raises :py:obj:`TypeError`.
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -04002927 """
2928 connection = Connection(Context(TLSv1_METHOD), None)
2929 self.assertRaises(TypeError, connection.sendall)
2930 self.assertRaises(TypeError, connection.sendall, object())
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05002931 self.assertRaises(
2932 TypeError, connection.sendall, "foo", object(), "bar")
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -04002933
2934
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -04002935 def test_short(self):
2936 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002937 :py:obj:`Connection.sendall` transmits all of the bytes in the string passed to
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -04002938 it.
2939 """
2940 server, client = self._loopback()
Jean-Paul Calderonea9868832010-08-22 21:38:34 -04002941 server.sendall(b('x'))
2942 self.assertEquals(client.recv(1), b('x'))
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -04002943
2944
Jean-Paul Calderone691e6c92011-01-21 22:04:35 -05002945 try:
2946 memoryview
2947 except NameError:
2948 "cannot test sending memoryview without memoryview"
2949 else:
2950 def test_short_memoryview(self):
2951 """
2952 When passed a memoryview onto a small number of bytes,
Jonathan Ballet648875f2011-07-16 14:14:58 +09002953 :py:obj:`Connection.sendall` transmits all of them.
Jean-Paul Calderone691e6c92011-01-21 22:04:35 -05002954 """
2955 server, client = self._loopback()
2956 server.sendall(memoryview(b('x')))
2957 self.assertEquals(client.recv(1), b('x'))
2958
2959
Markus Unterwaditzer8e41d022014-04-19 12:27:11 +02002960 try:
2961 buffer
2962 except NameError:
2963 "cannot test sending buffers without buffers"
2964 else:
2965 def test_short_buffers(self):
2966 """
2967 When passed a buffer containing a small number of bytes,
2968 :py:obj:`Connection.sendall` transmits all of them.
2969 """
2970 server, client = self._loopback()
2971 server.sendall(buffer(b('x')))
2972 self.assertEquals(client.recv(1), b('x'))
2973
2974
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -04002975 def test_long(self):
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04002976 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002977 :py:obj:`Connection.sendall` transmits all of the bytes in the string passed to
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04002978 it even if this requires multiple calls of an underlying write function.
2979 """
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -04002980 server, client = self._loopback()
Jean-Paul Calderone6241c702010-09-16 19:59:24 -04002981 # Should be enough, underlying SSL_write should only do 16k at a time.
2982 # On Windows, after 32k of bytes the write will block (forever - because
2983 # no one is yet reading).
Jean-Paul Calderone9e4c1352010-09-19 09:34:43 -04002984 message = b('x') * (1024 * 32 - 1) + b('y')
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -04002985 server.sendall(message)
2986 accum = []
2987 received = 0
2988 while received < len(message):
Jean-Paul Calderoneb1f7f5f2010-08-22 21:40:52 -04002989 data = client.recv(1024)
2990 accum.append(data)
2991 received += len(data)
Jean-Paul Calderonef4047852010-09-19 09:45:57 -04002992 self.assertEquals(message, b('').join(accum))
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -04002993
2994
Jean-Paul Calderone974bdc02010-07-28 19:06:10 -04002995 def test_closed(self):
2996 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09002997 If the underlying socket is closed, :py:obj:`Connection.sendall` propagates the
Jean-Paul Calderone974bdc02010-07-28 19:06:10 -04002998 write error from the low level write call.
2999 """
3000 server, client = self._loopback()
Jean-Paul Calderone05d43e82010-09-24 18:01:36 -04003001 server.sock_shutdown(2)
Jean-Paul Calderone4f0467a2014-01-11 11:58:41 -05003002 exc = self.assertRaises(SysCallError, server.sendall, b"hello, world")
Konstantinos Koukopoulos541150d2014-01-31 01:00:19 +02003003 if platform == "win32":
3004 self.assertEqual(exc.args[0], ESHUTDOWN)
3005 else:
3006 self.assertEqual(exc.args[0], EPIPE)
Jean-Paul Calderone974bdc02010-07-28 19:06:10 -04003007
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -04003008
Jean-Paul Calderone1d69a722010-07-29 09:05:53 -04003009
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -04003010class ConnectionRenegotiateTests(TestCase, _LoopbackMixin):
3011 """
3012 Tests for SSL renegotiation APIs.
3013 """
3014 def test_renegotiate_wrong_args(self):
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04003015 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003016 :py:obj:`Connection.renegotiate` raises :py:obj:`TypeError` if called with any
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04003017 arguments.
3018 """
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -04003019 connection = Connection(Context(TLSv1_METHOD), None)
3020 self.assertRaises(TypeError, connection.renegotiate, None)
3021
3022
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04003023 def test_total_renegotiations_wrong_args(self):
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04003024 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003025 :py:obj:`Connection.total_renegotiations` raises :py:obj:`TypeError` if called with
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04003026 any arguments.
3027 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04003028 connection = Connection(Context(TLSv1_METHOD), None)
3029 self.assertRaises(TypeError, connection.total_renegotiations, None)
3030
3031
3032 def test_total_renegotiations(self):
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04003033 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003034 :py:obj:`Connection.total_renegotiations` returns :py:obj:`0` before any
Jean-Paul Calderonea8fb0c82010-09-09 18:04:56 -04003035 renegotiations have happened.
3036 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04003037 connection = Connection(Context(TLSv1_METHOD), None)
3038 self.assertEquals(connection.total_renegotiations(), 0)
3039
3040
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -04003041# def test_renegotiate(self):
3042# """
3043# """
3044# server, client = self._loopback()
3045
3046# server.send("hello world")
3047# self.assertEquals(client.recv(len("hello world")), "hello world")
3048
3049# self.assertEquals(server.total_renegotiations(), 0)
3050# self.assertTrue(server.renegotiate())
3051
3052# server.setblocking(False)
3053# client.setblocking(False)
3054# while server.renegotiate_pending():
3055# client.do_handshake()
3056# server.do_handshake()
3057
3058# self.assertEquals(server.total_renegotiations(), 1)
3059
3060
3061
3062
Jean-Paul Calderone68649052009-07-17 21:14:27 -04003063class ErrorTests(TestCase):
3064 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003065 Unit tests for :py:obj:`OpenSSL.SSL.Error`.
Jean-Paul Calderone68649052009-07-17 21:14:27 -04003066 """
3067 def test_type(self):
3068 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003069 :py:obj:`Error` is an exception type.
Jean-Paul Calderone68649052009-07-17 21:14:27 -04003070 """
3071 self.assertTrue(issubclass(Error, Exception))
Rick Deane15b1472009-07-09 15:53:42 -05003072 self.assertEqual(Error.__name__, 'Error')
Rick Deane15b1472009-07-09 15:53:42 -05003073
3074
3075
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -05003076class ConstantsTests(TestCase):
3077 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003078 Tests for the values of constants exposed in :py:obj:`OpenSSL.SSL`.
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -05003079
3080 These are values defined by OpenSSL intended only to be used as flags to
3081 OpenSSL APIs. The only assertions it seems can be made about them is
3082 their values.
3083 """
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05003084 # unittest.TestCase has no skip mechanism
3085 if OP_NO_QUERY_MTU is not None:
3086 def test_op_no_query_mtu(self):
3087 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003088 The value of :py:obj:`OpenSSL.SSL.OP_NO_QUERY_MTU` is 0x1000, the value of
Jean-Paul Calderone64efa2c2011-09-11 10:00:09 -04003089 :py:const:`SSL_OP_NO_QUERY_MTU` defined by :file:`openssl/ssl.h`.
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05003090 """
3091 self.assertEqual(OP_NO_QUERY_MTU, 0x1000)
3092 else:
3093 "OP_NO_QUERY_MTU unavailable - OpenSSL version may be too old"
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -05003094
3095
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05003096 if OP_COOKIE_EXCHANGE is not None:
3097 def test_op_cookie_exchange(self):
3098 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003099 The value of :py:obj:`OpenSSL.SSL.OP_COOKIE_EXCHANGE` is 0x2000, the value
Jean-Paul Calderone64efa2c2011-09-11 10:00:09 -04003100 of :py:const:`SSL_OP_COOKIE_EXCHANGE` defined by :file:`openssl/ssl.h`.
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05003101 """
3102 self.assertEqual(OP_COOKIE_EXCHANGE, 0x2000)
3103 else:
3104 "OP_COOKIE_EXCHANGE unavailable - OpenSSL version may be too old"
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -05003105
3106
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05003107 if OP_NO_TICKET is not None:
3108 def test_op_no_ticket(self):
3109 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003110 The value of :py:obj:`OpenSSL.SSL.OP_NO_TICKET` is 0x4000, the value of
Jean-Paul Calderone64efa2c2011-09-11 10:00:09 -04003111 :py:const:`SSL_OP_NO_TICKET` defined by :file:`openssl/ssl.h`.
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05003112 """
3113 self.assertEqual(OP_NO_TICKET, 0x4000)
Jean-Paul Calderonecebd1782011-09-11 10:01:31 -04003114 else:
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05003115 "OP_NO_TICKET unavailable - OpenSSL version may be too old"
Rick Dean5b7b6372009-04-01 11:34:06 -05003116
3117
Jean-Paul Calderonec62d4c12011-09-08 18:29:32 -04003118 if OP_NO_COMPRESSION is not None:
3119 def test_op_no_compression(self):
3120 """
Jean-Paul Calderone64efa2c2011-09-11 10:00:09 -04003121 The value of :py:obj:`OpenSSL.SSL.OP_NO_COMPRESSION` is 0x20000, the value
3122 of :py:const:`SSL_OP_NO_COMPRESSION` defined by :file:`openssl/ssl.h`.
Jean-Paul Calderonec62d4c12011-09-08 18:29:32 -04003123 """
3124 self.assertEqual(OP_NO_COMPRESSION, 0x20000)
3125 else:
3126 "OP_NO_COMPRESSION unavailable - OpenSSL version may be too old"
3127
3128
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05003129 def test_sess_cache_off(self):
3130 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05003131 The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_OFF` 0x0, the value of
3132 :py:obj:`SSL_SESS_CACHE_OFF` defined by ``openssl/ssl.h``.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05003133 """
3134 self.assertEqual(0x0, SESS_CACHE_OFF)
3135
3136
3137 def test_sess_cache_client(self):
3138 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05003139 The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_CLIENT` 0x1, the value of
3140 :py:obj:`SSL_SESS_CACHE_CLIENT` defined by ``openssl/ssl.h``.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05003141 """
3142 self.assertEqual(0x1, SESS_CACHE_CLIENT)
3143
3144
3145 def test_sess_cache_server(self):
3146 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05003147 The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_SERVER` 0x2, the value of
3148 :py:obj:`SSL_SESS_CACHE_SERVER` defined by ``openssl/ssl.h``.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05003149 """
3150 self.assertEqual(0x2, SESS_CACHE_SERVER)
3151
3152
3153 def test_sess_cache_both(self):
3154 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05003155 The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_BOTH` 0x3, the value of
3156 :py:obj:`SSL_SESS_CACHE_BOTH` defined by ``openssl/ssl.h``.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05003157 """
3158 self.assertEqual(0x3, SESS_CACHE_BOTH)
3159
3160
3161 def test_sess_cache_no_auto_clear(self):
3162 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05003163 The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_AUTO_CLEAR` 0x80, the
3164 value of :py:obj:`SSL_SESS_CACHE_NO_AUTO_CLEAR` defined by
3165 ``openssl/ssl.h``.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05003166 """
3167 self.assertEqual(0x80, SESS_CACHE_NO_AUTO_CLEAR)
3168
3169
3170 def test_sess_cache_no_internal_lookup(self):
3171 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05003172 The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_LOOKUP` 0x100,
3173 the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_LOOKUP` defined by
3174 ``openssl/ssl.h``.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05003175 """
3176 self.assertEqual(0x100, SESS_CACHE_NO_INTERNAL_LOOKUP)
3177
3178
3179 def test_sess_cache_no_internal_store(self):
3180 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05003181 The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL_STORE` 0x200,
3182 the value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL_STORE` defined by
3183 ``openssl/ssl.h``.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05003184 """
3185 self.assertEqual(0x200, SESS_CACHE_NO_INTERNAL_STORE)
3186
3187
3188 def test_sess_cache_no_internal(self):
3189 """
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05003190 The value of :py:obj:`OpenSSL.SSL.SESS_CACHE_NO_INTERNAL` 0x300, the
3191 value of :py:obj:`SSL_SESS_CACHE_NO_INTERNAL` defined by
3192 ``openssl/ssl.h``.
Jean-Paul Calderone313bf012012-02-08 13:02:49 -05003193 """
3194 self.assertEqual(0x300, SESS_CACHE_NO_INTERNAL)
3195
3196
Jean-Paul Calderoneeeee26a2009-04-27 11:24:30 -04003197
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04003198class MemoryBIOTests(TestCase, _LoopbackMixin):
Rick Deanb71c0d22009-04-01 14:09:23 -05003199 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003200 Tests for :py:obj:`OpenSSL.SSL.Connection` using a memory BIO.
Rick Deanb71c0d22009-04-01 14:09:23 -05003201 """
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003202 def _server(self, sock):
3203 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003204 Create a new server-side SSL :py:obj:`Connection` object wrapped around
3205 :py:obj:`sock`.
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003206 """
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003207 # Create the server side Connection. This is mostly setup boilerplate
3208 # - use TLSv1, use a particular certificate, etc.
3209 server_ctx = Context(TLSv1_METHOD)
3210 server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
3211 server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
3212 server_store = server_ctx.get_cert_store()
3213 server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
3214 server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
3215 server_ctx.check_privatekey()
3216 server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
Rick Deanb1ccd562009-07-09 23:52:39 -05003217 # Here the Connection is actually created. If None is passed as the 2nd
3218 # parameter, it indicates a memory BIO should be created.
3219 server_conn = Connection(server_ctx, sock)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003220 server_conn.set_accept_state()
3221 return server_conn
3222
3223
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003224 def _client(self, sock):
3225 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003226 Create a new client-side SSL :py:obj:`Connection` object wrapped around
3227 :py:obj:`sock`.
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003228 """
3229 # Now create the client side Connection. Similar boilerplate to the
3230 # above.
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003231 client_ctx = Context(TLSv1_METHOD)
3232 client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
3233 client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
3234 client_store = client_ctx.get_cert_store()
3235 client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
3236 client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
3237 client_ctx.check_privatekey()
3238 client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
Rick Deanb1ccd562009-07-09 23:52:39 -05003239 client_conn = Connection(client_ctx, sock)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003240 client_conn.set_connect_state()
3241 return client_conn
3242
3243
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003244 def test_memoryConnect(self):
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04003245 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003246 Two :py:obj:`Connection`s which use memory BIOs can be manually connected by
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04003247 reading from the output of each and writing those bytes to the input of
3248 the other and in this way establish a connection and exchange
3249 application-level bytes with each other.
3250 """
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003251 server_conn = self._server(None)
3252 client_conn = self._client(None)
Rick Deanb71c0d22009-04-01 14:09:23 -05003253
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04003254 # There should be no key or nonces yet.
3255 self.assertIdentical(server_conn.master_key(), None)
3256 self.assertIdentical(server_conn.client_random(), None)
3257 self.assertIdentical(server_conn.server_random(), None)
Rick Deanb71c0d22009-04-01 14:09:23 -05003258
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04003259 # First, the handshake needs to happen. We'll deliver bytes back and
3260 # forth between the client and server until neither of them feels like
3261 # speaking any more.
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04003262 self.assertIdentical(
3263 self._interactInMemory(client_conn, server_conn), None)
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04003264
3265 # Now that the handshake is done, there should be a key and nonces.
3266 self.assertNotIdentical(server_conn.master_key(), None)
3267 self.assertNotIdentical(server_conn.client_random(), None)
3268 self.assertNotIdentical(server_conn.server_random(), None)
Jean-Paul Calderone6c051e62009-07-05 13:50:34 -04003269 self.assertEquals(server_conn.client_random(), client_conn.client_random())
3270 self.assertEquals(server_conn.server_random(), client_conn.server_random())
3271 self.assertNotEquals(server_conn.client_random(), server_conn.server_random())
3272 self.assertNotEquals(client_conn.client_random(), client_conn.server_random())
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04003273
3274 # Here are the bytes we'll try to send.
Jean-Paul Calderonea441fdc2010-08-22 21:33:57 -04003275 important_message = b('One if by land, two if by sea.')
Rick Deanb71c0d22009-04-01 14:09:23 -05003276
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04003277 server_conn.write(important_message)
3278 self.assertEquals(
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04003279 self._interactInMemory(client_conn, server_conn),
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04003280 (client_conn, important_message))
3281
3282 client_conn.write(important_message[::-1])
3283 self.assertEquals(
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04003284 self._interactInMemory(client_conn, server_conn),
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04003285 (server_conn, important_message[::-1]))
Rick Deanb71c0d22009-04-01 14:09:23 -05003286
3287
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003288 def test_socketConnect(self):
Rick Deanb1ccd562009-07-09 23:52:39 -05003289 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003290 Just like :py:obj:`test_memoryConnect` but with an actual socket.
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003291
3292 This is primarily to rule out the memory BIO code as the source of
Jonathan Ballet648875f2011-07-16 14:14:58 +09003293 any problems encountered while passing data over a :py:obj:`Connection` (if
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003294 this test fails, there must be a problem outside the memory BIO
3295 code, as no memory BIO is involved here). Even though this isn't a
3296 memory BIO test, it's convenient to have it here.
Rick Deanb1ccd562009-07-09 23:52:39 -05003297 """
Jean-Paul Calderone06c7cc92010-09-24 23:52:00 -04003298 server_conn, client_conn = self._loopback()
Rick Deanb1ccd562009-07-09 23:52:39 -05003299
Jean-Paul Calderonea441fdc2010-08-22 21:33:57 -04003300 important_message = b("Help me Obi Wan Kenobi, you're my only hope.")
Rick Deanb1ccd562009-07-09 23:52:39 -05003301 client_conn.send(important_message)
3302 msg = server_conn.recv(1024)
3303 self.assertEqual(msg, important_message)
3304
3305 # Again in the other direction, just for fun.
3306 important_message = important_message[::-1]
3307 server_conn.send(important_message)
3308 msg = client_conn.recv(1024)
3309 self.assertEqual(msg, important_message)
3310
3311
Jean-Paul Calderonefc4ed0f2009-04-27 11:51:27 -04003312 def test_socketOverridesMemory(self):
Rick Deanb71c0d22009-04-01 14:09:23 -05003313 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003314 Test that :py:obj:`OpenSSL.SSL.bio_read` and :py:obj:`OpenSSL.SSL.bio_write` don't
3315 work on :py:obj:`OpenSSL.SSL.Connection`() that use sockets.
Rick Deanb71c0d22009-04-01 14:09:23 -05003316 """
3317 context = Context(SSLv3_METHOD)
3318 client = socket()
3319 clientSSL = Connection(context, client)
3320 self.assertRaises( TypeError, clientSSL.bio_read, 100)
3321 self.assertRaises( TypeError, clientSSL.bio_write, "foo")
Jean-Paul Calderone07acf3f2009-05-05 13:23:28 -04003322 self.assertRaises( TypeError, clientSSL.bio_shutdown )
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003323
3324
3325 def test_outgoingOverflow(self):
3326 """
3327 If more bytes than can be written to the memory BIO are passed to
Jonathan Ballet648875f2011-07-16 14:14:58 +09003328 :py:obj:`Connection.send` at once, the number of bytes which were written is
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003329 returned and that many bytes from the beginning of the input can be
3330 read from the other end of the connection.
3331 """
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003332 server = self._server(None)
3333 client = self._client(None)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003334
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04003335 self._interactInMemory(client, server)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003336
3337 size = 2 ** 15
Jean-Paul Calderone4f0467a2014-01-11 11:58:41 -05003338 sent = client.send(b"x" * size)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003339 # Sanity check. We're trying to test what happens when the entire
3340 # input can't be sent. If the entire input was sent, this test is
3341 # meaningless.
3342 self.assertTrue(sent < size)
3343
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04003344 receiver, received = self._interactInMemory(client, server)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04003345 self.assertIdentical(receiver, server)
3346
3347 # We can rely on all of these bytes being received at once because
3348 # _loopback passes 2 ** 16 to recv - more than 2 ** 15.
3349 self.assertEquals(len(received), sent)
Jean-Paul Calderone3ad85d42009-04-30 20:24:35 -04003350
3351
3352 def test_shutdown(self):
3353 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003354 :py:obj:`Connection.bio_shutdown` signals the end of the data stream from
3355 which the :py:obj:`Connection` reads.
Jean-Paul Calderone3ad85d42009-04-30 20:24:35 -04003356 """
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04003357 server = self._server(None)
Jean-Paul Calderone3ad85d42009-04-30 20:24:35 -04003358 server.bio_shutdown()
3359 e = self.assertRaises(Error, server.recv, 1024)
3360 # We don't want WantReadError or ZeroReturnError or anything - it's a
3361 # handshake failure.
3362 self.assertEquals(e.__class__, Error)
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -04003363
3364
Jean-Paul Calderoned899af02013-03-19 22:10:37 -07003365 def test_unexpectedEndOfFile(self):
3366 """
3367 If the connection is lost before an orderly SSL shutdown occurs,
3368 :py:obj:`OpenSSL.SSL.SysCallError` is raised with a message of
3369 "Unexpected EOF".
3370 """
3371 server_conn, client_conn = self._loopback()
3372 client_conn.sock_shutdown(SHUT_RDWR)
3373 exc = self.assertRaises(SysCallError, server_conn.recv, 1024)
3374 self.assertEqual(exc.args, (-1, "Unexpected EOF"))
3375
3376
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003377 def _check_client_ca_list(self, func):
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003378 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003379 Verify the return value of the :py:obj:`get_client_ca_list` method for server and client connections.
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003380
Jonathan Ballet78b92a22011-07-16 08:07:26 +09003381 :param func: A function which will be called with the server context
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003382 before the client and server are connected to each other. This
3383 function should specify a list of CAs for the server to send to the
3384 client and return that same list. The list will be used to verify
Jonathan Ballet648875f2011-07-16 14:14:58 +09003385 that :py:obj:`get_client_ca_list` returns the proper value at various
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003386 times.
3387 """
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003388 server = self._server(None)
3389 client = self._client(None)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003390 self.assertEqual(client.get_client_ca_list(), [])
3391 self.assertEqual(server.get_client_ca_list(), [])
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003392 ctx = server.get_context()
3393 expected = func(ctx)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003394 self.assertEqual(client.get_client_ca_list(), [])
3395 self.assertEqual(server.get_client_ca_list(), expected)
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04003396 self._interactInMemory(client, server)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003397 self.assertEqual(client.get_client_ca_list(), expected)
3398 self.assertEqual(server.get_client_ca_list(), expected)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003399
3400
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003401 def test_set_client_ca_list_errors(self):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003402 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003403 :py:obj:`Context.set_client_ca_list` raises a :py:obj:`TypeError` if called with a
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003404 non-list or a list that contains objects other than X509Names.
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003405 """
3406 ctx = Context(TLSv1_METHOD)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003407 self.assertRaises(TypeError, ctx.set_client_ca_list, "spam")
3408 self.assertRaises(TypeError, ctx.set_client_ca_list, ["spam"])
3409 self.assertIdentical(ctx.set_client_ca_list([]), None)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003410
3411
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003412 def test_set_empty_ca_list(self):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003413 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003414 If passed an empty list, :py:obj:`Context.set_client_ca_list` configures the
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003415 context to send no CA names to the client and, on both the server and
Jonathan Ballet648875f2011-07-16 14:14:58 +09003416 client sides, :py:obj:`Connection.get_client_ca_list` returns an empty list
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003417 after the connection is set up.
3418 """
3419 def no_ca(ctx):
3420 ctx.set_client_ca_list([])
3421 return []
3422 self._check_client_ca_list(no_ca)
3423
3424
3425 def test_set_one_ca_list(self):
3426 """
3427 If passed a list containing a single X509Name,
Jonathan Ballet648875f2011-07-16 14:14:58 +09003428 :py:obj:`Context.set_client_ca_list` configures the context to send that CA
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003429 name to the client and, on both the server and client sides,
Jonathan Ballet648875f2011-07-16 14:14:58 +09003430 :py:obj:`Connection.get_client_ca_list` returns a list containing that
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003431 X509Name after the connection is set up.
3432 """
3433 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
3434 cadesc = cacert.get_subject()
3435 def single_ca(ctx):
3436 ctx.set_client_ca_list([cadesc])
3437 return [cadesc]
3438 self._check_client_ca_list(single_ca)
3439
3440
3441 def test_set_multiple_ca_list(self):
3442 """
3443 If passed a list containing multiple X509Name objects,
Jonathan Ballet648875f2011-07-16 14:14:58 +09003444 :py:obj:`Context.set_client_ca_list` configures the context to send those CA
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003445 names to the client and, on both the server and client sides,
Jonathan Ballet648875f2011-07-16 14:14:58 +09003446 :py:obj:`Connection.get_client_ca_list` returns a list containing those
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003447 X509Names after the connection is set up.
3448 """
3449 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
3450 clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
3451
3452 sedesc = secert.get_subject()
3453 cldesc = clcert.get_subject()
3454
3455 def multiple_ca(ctx):
3456 L = [sedesc, cldesc]
3457 ctx.set_client_ca_list(L)
3458 return L
3459 self._check_client_ca_list(multiple_ca)
3460
3461
3462 def test_reset_ca_list(self):
3463 """
3464 If called multiple times, only the X509Names passed to the final call
Jonathan Ballet648875f2011-07-16 14:14:58 +09003465 of :py:obj:`Context.set_client_ca_list` are used to configure the CA names
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003466 sent to the client.
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003467 """
3468 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
3469 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
3470 clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
3471
3472 cadesc = cacert.get_subject()
3473 sedesc = secert.get_subject()
3474 cldesc = clcert.get_subject()
3475
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003476 def changed_ca(ctx):
3477 ctx.set_client_ca_list([sedesc, cldesc])
3478 ctx.set_client_ca_list([cadesc])
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003479 return [cadesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003480 self._check_client_ca_list(changed_ca)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003481
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003482
3483 def test_mutated_ca_list(self):
3484 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003485 If the list passed to :py:obj:`Context.set_client_ca_list` is mutated
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003486 afterwards, this does not affect the list of CA names sent to the
3487 client.
3488 """
3489 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
3490 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
3491
3492 cadesc = cacert.get_subject()
3493 sedesc = secert.get_subject()
3494
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003495 def mutated_ca(ctx):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003496 L = [cadesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003497 ctx.set_client_ca_list([cadesc])
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003498 L.append(sedesc)
3499 return [cadesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003500 self._check_client_ca_list(mutated_ca)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003501
3502
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003503 def test_add_client_ca_errors(self):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003504 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003505 :py:obj:`Context.add_client_ca` raises :py:obj:`TypeError` if called with a non-X509
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003506 object or with a number of arguments other than one.
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003507 """
3508 ctx = Context(TLSv1_METHOD)
3509 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003510 self.assertRaises(TypeError, ctx.add_client_ca)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003511 self.assertRaises(TypeError, ctx.add_client_ca, "spam")
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04003512 self.assertRaises(TypeError, ctx.add_client_ca, cacert, cacert)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003513
3514
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04003515 def test_one_add_client_ca(self):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003516 """
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04003517 A certificate's subject can be added as a CA to be sent to the client
Jonathan Ballet648875f2011-07-16 14:14:58 +09003518 with :py:obj:`Context.add_client_ca`.
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04003519 """
3520 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
3521 cadesc = cacert.get_subject()
3522 def single_ca(ctx):
3523 ctx.add_client_ca(cacert)
3524 return [cadesc]
3525 self._check_client_ca_list(single_ca)
3526
3527
3528 def test_multiple_add_client_ca(self):
3529 """
3530 Multiple CA names can be sent to the client by calling
Jonathan Ballet648875f2011-07-16 14:14:58 +09003531 :py:obj:`Context.add_client_ca` with multiple X509 objects.
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04003532 """
3533 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
3534 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
3535
3536 cadesc = cacert.get_subject()
3537 sedesc = secert.get_subject()
3538
3539 def multiple_ca(ctx):
3540 ctx.add_client_ca(cacert)
3541 ctx.add_client_ca(secert)
3542 return [cadesc, sedesc]
3543 self._check_client_ca_list(multiple_ca)
3544
3545
3546 def test_set_and_add_client_ca(self):
3547 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003548 A call to :py:obj:`Context.set_client_ca_list` followed by a call to
3549 :py:obj:`Context.add_client_ca` results in using the CA names from the first
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04003550 call and the CA name from the second call.
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003551 """
3552 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
3553 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
3554 clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
3555
3556 cadesc = cacert.get_subject()
3557 sedesc = secert.get_subject()
3558 cldesc = clcert.get_subject()
3559
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003560 def mixed_set_add_ca(ctx):
3561 ctx.set_client_ca_list([cadesc, sedesc])
3562 ctx.add_client_ca(clcert)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003563 return [cadesc, sedesc, cldesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003564 self._check_client_ca_list(mixed_set_add_ca)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003565
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04003566
3567 def test_set_after_add_client_ca(self):
3568 """
Jonathan Ballet648875f2011-07-16 14:14:58 +09003569 A call to :py:obj:`Context.set_client_ca_list` after a call to
3570 :py:obj:`Context.add_client_ca` replaces the CA name specified by the former
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04003571 call with the names specified by the latter cal.
3572 """
3573 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
3574 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
3575 clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
3576
3577 cadesc = cacert.get_subject()
3578 sedesc = secert.get_subject()
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04003579
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003580 def set_replaces_add_ca(ctx):
3581 ctx.add_client_ca(clcert)
3582 ctx.set_client_ca_list([cadesc])
3583 ctx.add_client_ca(secert)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003584 return [cadesc, sedesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02003585 self._check_client_ca_list(set_replaces_add_ca)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02003586
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -04003587
Jean-Paul Calderoned899af02013-03-19 22:10:37 -07003588
3589class ConnectionBIOTests(TestCase):
3590 """
3591 Tests for :py:obj:`Connection.bio_read` and :py:obj:`Connection.bio_write`.
3592 """
3593 def test_wantReadError(self):
3594 """
3595 :py:obj:`Connection.bio_read` raises :py:obj:`OpenSSL.SSL.WantReadError`
3596 if there are no bytes available to be read from the BIO.
3597 """
3598 ctx = Context(TLSv1_METHOD)
3599 conn = Connection(ctx, None)
3600 self.assertRaises(WantReadError, conn.bio_read, 1024)
3601
3602
Jean-Paul Calderonebef4f4c2014-02-02 18:13:31 -05003603 def test_buffer_size(self):
3604 """
3605 :py:obj:`Connection.bio_read` accepts an integer giving the maximum
3606 number of bytes to read and return.
3607 """
3608 ctx = Context(TLSv1_METHOD)
3609 conn = Connection(ctx, None)
3610 conn.set_connect_state()
3611 try:
3612 conn.do_handshake()
3613 except WantReadError:
3614 pass
3615 data = conn.bio_read(2)
3616 self.assertEqual(2, len(data))
3617
3618
3619 if not PY3:
3620 def test_buffer_size_long(self):
3621 """
3622 On Python 2 :py:obj:`Connection.bio_read` accepts values of type
3623 :py:obj:`long` as well as :py:obj:`int`.
3624 """
3625 ctx = Context(TLSv1_METHOD)
3626 conn = Connection(ctx, None)
3627 conn.set_connect_state()
3628 try:
3629 conn.do_handshake()
3630 except WantReadError:
3631 pass
3632 data = conn.bio_read(long(2))
3633 self.assertEqual(2, len(data))
3634
3635
3636
Jean-Paul Calderoned899af02013-03-19 22:10:37 -07003637
Jean-Paul Calderone31e85a82011-03-21 19:13:35 -04003638class InfoConstantTests(TestCase):
3639 """
3640 Tests for assorted constants exposed for use in info callbacks.
3641 """
3642 def test_integers(self):
3643 """
3644 All of the info constants are integers.
3645
3646 This is a very weak test. It would be nice to have one that actually
3647 verifies that as certain info events happen, the value passed to the
3648 info callback matches up with the constant exposed by OpenSSL.SSL.
3649 """
3650 for const in [
3651 SSL_ST_CONNECT, SSL_ST_ACCEPT, SSL_ST_MASK, SSL_ST_INIT,
3652 SSL_ST_BEFORE, SSL_ST_OK, SSL_ST_RENEGOTIATE,
3653 SSL_CB_LOOP, SSL_CB_EXIT, SSL_CB_READ, SSL_CB_WRITE, SSL_CB_ALERT,
3654 SSL_CB_READ_ALERT, SSL_CB_WRITE_ALERT, SSL_CB_ACCEPT_LOOP,
3655 SSL_CB_ACCEPT_EXIT, SSL_CB_CONNECT_LOOP, SSL_CB_CONNECT_EXIT,
3656 SSL_CB_HANDSHAKE_START, SSL_CB_HANDSHAKE_DONE]:
3657
3658 self.assertTrue(isinstance(const, int))
3659
Ziga Seilnacht44611bf2009-08-31 20:49:30 +02003660
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -04003661if __name__ == '__main__':
3662 main()