blob: 0b697c533165233b128882feb54581650f49e054 [file] [log] [blame]
Jean-Paul Calderone8b63d452008-03-21 18:31:12 -04001# Copyright (C) Jean-Paul Calderone 2008, All rights reserved
2
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -04003"""
4Unit tests for L{OpenSSL.SSL}.
5"""
6
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04007from twisted.internet.ssl import *
8
Jean-Paul Calderone55d91f42010-07-29 19:22:17 -04009from errno import ECONNREFUSED, EINPROGRESS
Jean-Paul Calderone52f0d8b2009-03-07 09:10:19 -050010from sys import platform
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -040011from socket import error, socket
Jean-Paul Calderonea65cf6c2009-07-19 10:26:52 -040012from os import makedirs
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -040013from os.path import join
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -040014from unittest import main
Jean-Paul Calderone460cc1f2009-03-07 11:31:12 -050015
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -040016from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, FILETYPE_ASN1, PKey, X509Req, X509, X509Extension, dump_privatekey, load_certificate, dump_certificate, load_certificate_request, dump_certificate_request, load_privatekey, X509_verify_cert_error_string
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -040017from OpenSSL.SSL import SysCallError, WantReadError, WantWriteError, ZeroReturnError, Context, ContextType, Connection, ConnectionType, Error
Jean-Paul Calderonee4f6b472010-07-29 22:50:58 -040018from OpenSSL.SSL import SENT_SHUTDOWN, RECEIVED_SHUTDOWN
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040019from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
Jean-Paul Calderone68649052009-07-17 21:14:27 -040020from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE
Rick Deanb71c0d22009-04-01 14:09:23 -050021from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -040022from OpenSSL.test.util import TestCase
Jean-Paul Calderone18808652009-07-05 12:54:05 -040023from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM
Rick Dean94e46fd2009-07-18 14:51:24 -050024from OpenSSL.test.test_crypto import client_cert_pem, client_key_pem, server_cert_pem, server_key_pem, root_cert_pem
Jean-Paul Calderone4bccf5e2008-12-28 22:50:42 -050025try:
26 from OpenSSL.SSL import OP_NO_QUERY_MTU
27except ImportError:
28 OP_NO_QUERY_MTU = None
29try:
30 from OpenSSL.SSL import OP_COOKIE_EXCHANGE
31except ImportError:
32 OP_COOKIE_EXCHANGE = None
33try:
34 from OpenSSL.SSL import OP_NO_TICKET
35except ImportError:
36 OP_NO_TICKET = None
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040037
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040038
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -040039def verify_cb(conn, cert, errnum, depth, ok):
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -040040 # print conn, cert, X509_verify_cert_error_string(errnum), depth, ok
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -040041 return ok
42
Rick Deanb1ccd562009-07-09 23:52:39 -050043def socket_pair():
Jean-Paul Calderone1a9613b2009-07-16 12:13:36 -040044 """
Jean-Paul Calderone68649052009-07-17 21:14:27 -040045 Establish and return a pair of network sockets connected to each other.
Jean-Paul Calderone1a9613b2009-07-16 12:13:36 -040046 """
47 # Connect a pair of sockets
Rick Deanb1ccd562009-07-09 23:52:39 -050048 port = socket()
49 port.bind(('', 0))
50 port.listen(1)
51 client = socket()
52 client.setblocking(False)
Jean-Paul Calderonef23c5d92009-07-23 17:58:15 -040053 client.connect_ex(("127.0.0.1", port.getsockname()[1]))
Jean-Paul Calderone94b24a82009-07-16 19:11:38 -040054 client.setblocking(True)
Rick Deanb1ccd562009-07-09 23:52:39 -050055 server = port.accept()[0]
Rick Deanb1ccd562009-07-09 23:52:39 -050056
Jean-Paul Calderone1a9613b2009-07-16 12:13:36 -040057 # Let's pass some unencrypted data to make sure our socket connection is
58 # fine. Just one byte, so we don't have to worry about buffers getting
59 # filled up or fragmentation.
60 server.send("x")
61 assert client.recv(1024) == "x"
62 client.send("y")
63 assert server.recv(1024) == "y"
Rick Deanb1ccd562009-07-09 23:52:39 -050064
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -040065 # Most of our callers want non-blocking sockets, make it easy for them.
Jean-Paul Calderone94b24a82009-07-16 19:11:38 -040066 server.setblocking(False)
67 client.setblocking(False)
68
Rick Deanb1ccd562009-07-09 23:52:39 -050069 return (server, client)
70
71
Jean-Paul Calderone94b24a82009-07-16 19:11:38 -040072
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -040073class _LoopbackMixin:
74 def _loopback(self):
75 (server, client) = socket_pair()
76
77 ctx = Context(TLSv1_METHOD)
78 ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
79 ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
80 server = Connection(ctx, server)
81 server.set_accept_state()
82 client = Connection(Context(TLSv1_METHOD), client)
83 client.set_connect_state()
84
85 for i in range(3):
86 for conn in [client, server]:
87 try:
88 conn.do_handshake()
89 except WantReadError:
90 pass
91
92 server.setblocking(True)
93 client.setblocking(True)
94 return server, client
95
96
97 def _interactInMemory(self, client_conn, server_conn):
98 """
99 Try to read application bytes from each of the two L{Connection}
100 objects. Copy bytes back and forth between their send/receive buffers
101 for as long as there is anything to copy. When there is nothing more
102 to copy, return C{None}. If one of them actually manages to deliver
103 some application bytes, return a two-tuple of the connection from which
104 the bytes were read and the bytes themselves.
105 """
106 wrote = True
107 while wrote:
108 # Loop until neither side has anything to say
109 wrote = False
110
111 # Copy stuff from each side's send buffer to the other side's
112 # receive buffer.
113 for (read, write) in [(client_conn, server_conn),
114 (server_conn, client_conn)]:
115
116 # Give the side a chance to generate some more bytes, or
117 # succeed.
118 try:
119 bytes = read.recv(2 ** 16)
120 except WantReadError:
121 # It didn't succeed, so we'll hope it generated some
122 # output.
123 pass
124 else:
125 # It did succeed, so we'll stop now and let the caller deal
126 # with it.
127 return (read, bytes)
128
129 while True:
130 # Keep copying as long as there's more stuff there.
131 try:
132 dirty = read.bio_read(4096)
133 except WantReadError:
134 # Okay, nothing more waiting to be sent. Stop
135 # processing this send buffer.
136 break
137 else:
138 # Keep track of the fact that someone generated some
139 # output.
140 wrote = True
141 write.bio_write(dirty)
142
143
144
145class ContextTests(TestCase, _LoopbackMixin):
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -0400146 """
147 Unit tests for L{OpenSSL.SSL.Context}.
148 """
149 def test_method(self):
150 """
151 L{Context} can be instantiated with one of L{SSLv2_METHOD},
152 L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}.
153 """
154 for meth in [SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]:
155 Context(meth)
156 self.assertRaises(TypeError, Context, "")
157 self.assertRaises(ValueError, Context, 10)
158
159
Rick Deane15b1472009-07-09 15:53:42 -0500160 def test_type(self):
161 """
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400162 L{Context} and L{ContextType} refer to the same type object and can be
163 used to create instances of that type.
Rick Deane15b1472009-07-09 15:53:42 -0500164 """
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400165 self.assertIdentical(Context, ContextType)
166 self.assertConsistentType(Context, 'Context', TLSv1_METHOD)
Rick Deane15b1472009-07-09 15:53:42 -0500167
168
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -0400169 def test_use_privatekey(self):
170 """
171 L{Context.use_privatekey} takes an L{OpenSSL.crypto.PKey} instance.
172 """
173 key = PKey()
174 key.generate_key(TYPE_RSA, 128)
175 ctx = Context(TLSv1_METHOD)
176 ctx.use_privatekey(key)
177 self.assertRaises(TypeError, ctx.use_privatekey, "")
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -0400178
179
Jean-Paul Calderonebd479162010-07-30 18:03:25 -0400180 def test_set_app_data_wrong_args(self):
181 """
182 L{Context.set_app_data} raises L{TypeError} if called with other than
183 one argument.
184 """
185 context = Context(TLSv1_METHOD)
186 self.assertRaises(TypeError, context.set_app_data)
187 self.assertRaises(TypeError, context.set_app_data, None, None)
188
189
190 def test_get_app_data_wrong_args(self):
191 """
192 L{Context.get_app_data} raises L{TypeError} if called with any
193 arguments.
194 """
195 context = Context(TLSv1_METHOD)
196 self.assertRaises(TypeError, context.get_app_data, None)
197
198
199 def test_app_data(self):
200 """
201 L{Context.set_app_data} stores an object for later retrieval using
202 L{Context.get_app_data}.
203 """
204 app_data = object()
205 context = Context(TLSv1_METHOD)
206 context.set_app_data(app_data)
207 self.assertIdentical(context.get_app_data(), app_data)
208
209
Jean-Paul Calderone40569ca2010-07-30 18:04:58 -0400210 def test_set_options_wrong_args(self):
211 """
212 L{Context.set_options} raises L{TypeError} if called with the wrong
213 number of arguments or a non-C{int} argument.
214 """
215 context = Context(TLSv1_METHOD)
216 self.assertRaises(TypeError, context.set_options)
217 self.assertRaises(TypeError, context.set_options, None)
218 self.assertRaises(TypeError, context.set_options, 1, None)
219
220
Jean-Paul Calderone5ebb44a2010-07-30 18:09:41 -0400221 def test_set_timeout_wrong_args(self):
222 """
223 L{Context.set_timeout} raises L{TypeError} if called with the wrong
224 number of arguments or a non-C{int} argument.
225 """
226 context = Context(TLSv1_METHOD)
227 self.assertRaises(TypeError, context.set_timeout)
228 self.assertRaises(TypeError, context.set_timeout, None)
229 self.assertRaises(TypeError, context.set_timeout, 1, None)
230
231
232 def test_get_timeout_wrong_args(self):
233 """
234 L{Context.get_timeout} raises L{TypeError} if called with any arguments.
235 """
236 context = Context(TLSv1_METHOD)
237 self.assertRaises(TypeError, context.get_timeout, None)
238
239
240 def test_timeout(self):
241 """
242 L{Context.set_timeout} sets the session timeout for all connections
243 created using the context object. L{Context.get_timeout} retrieves this
244 value.
245 """
246 context = Context(TLSv1_METHOD)
247 context.set_timeout(1234)
248 self.assertEquals(context.get_timeout(), 1234)
249
250
Jean-Paul Calderonee2b69512010-07-30 18:22:06 -0400251 def test_set_verify_depth_wrong_args(self):
252 """
253 L{Context.set_verify_depth} raises L{TypeError} if called with the wrong
254 number of arguments or a non-C{int} argument.
255 """
256 context = Context(TLSv1_METHOD)
257 self.assertRaises(TypeError, context.set_verify_depth)
258 self.assertRaises(TypeError, context.set_verify_depth, None)
259 self.assertRaises(TypeError, context.set_verify_depth, 1, None)
260
261
262 def test_get_verify_depth_wrong_args(self):
263 """
264 L{Context.get_verify_depth} raises L{TypeError} if called with any arguments.
265 """
266 context = Context(TLSv1_METHOD)
267 self.assertRaises(TypeError, context.get_verify_depth, None)
268
269
270 def test_verify_depth(self):
271 """
272 L{Context.set_verify_depth} sets the number of certificates in a chain
273 to follow before giving up. The value can be retrieved with
274 L{Context.get_verify_depth}.
275 """
276 context = Context(TLSv1_METHOD)
277 context.set_verify_depth(11)
278 self.assertEquals(context.get_verify_depth(), 11)
279
280
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400281 def _write_encrypted_pem(self, passphrase):
282 key = PKey()
283 key.generate_key(TYPE_RSA, 128)
284 pemFile = self.mktemp()
285 fObj = file(pemFile, 'w')
286 fObj.write(dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase))
287 fObj.close()
288 return pemFile
289
290
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400291 def test_set_passwd_cb_wrong_args(self):
292 """
293 L{Context.set_passwd_cb} raises L{TypeError} if called with the
294 wrong arguments or with a non-callable first argument.
295 """
296 context = Context(TLSv1_METHOD)
297 self.assertRaises(TypeError, context.set_passwd_cb)
298 self.assertRaises(TypeError, context.set_passwd_cb, None)
299 self.assertRaises(TypeError, context.set_passwd_cb, lambda: None, None, None)
300
301
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -0400302 def test_set_passwd_cb(self):
303 """
304 L{Context.set_passwd_cb} accepts a callable which will be invoked when
305 a private key is loaded from an encrypted PEM.
306 """
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -0400307 passphrase = "foobar"
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400308 pemFile = self._write_encrypted_pem(passphrase)
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -0400309 calledWith = []
310 def passphraseCallback(maxlen, verify, extra):
311 calledWith.append((maxlen, verify, extra))
312 return passphrase
313 context = Context(TLSv1_METHOD)
314 context.set_passwd_cb(passphraseCallback)
315 context.use_privatekey_file(pemFile)
316 self.assertTrue(len(calledWith), 1)
317 self.assertTrue(isinstance(calledWith[0][0], int))
318 self.assertTrue(isinstance(calledWith[0][1], int))
319 self.assertEqual(calledWith[0][2], None)
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400320
321
Jean-Paul Calderone389d76d2010-07-30 17:57:53 -0400322 def test_passwd_callback_exception(self):
323 """
324 L{Context.use_privatekey_file} propagates any exception raised by the
325 passphrase callback.
326 """
327 pemFile = self._write_encrypted_pem("monkeys are nice")
328 def passphraseCallback(maxlen, verify, extra):
329 raise RuntimeError("Sorry, I am a fail.")
330
331 context = Context(TLSv1_METHOD)
332 context.set_passwd_cb(passphraseCallback)
333 self.assertRaises(RuntimeError, context.use_privatekey_file, pemFile)
334
335
336 def test_passwd_callback_false(self):
337 """
338 L{Context.use_privatekey_file} raises L{OpenSSL.SSL.Error} if the
339 passphrase callback returns a false value.
340 """
341 pemFile = self._write_encrypted_pem("monkeys are nice")
342 def passphraseCallback(maxlen, verify, extra):
343 return None
344
345 context = Context(TLSv1_METHOD)
346 context.set_passwd_cb(passphraseCallback)
347 self.assertRaises(Error, context.use_privatekey_file, pemFile)
348
349
350 def test_passwd_callback_non_string(self):
351 """
352 L{Context.use_privatekey_file} raises L{OpenSSL.SSL.Error} if the
353 passphrase callback returns a true non-string value.
354 """
355 pemFile = self._write_encrypted_pem("monkeys are nice")
356 def passphraseCallback(maxlen, verify, extra):
357 return 10
358
359 context = Context(TLSv1_METHOD)
360 context.set_passwd_cb(passphraseCallback)
361 self.assertRaises(Error, context.use_privatekey_file, pemFile)
362
363
364 def test_passwd_callback_too_long(self):
365 """
366 If the passphrase returned by the passphrase callback returns a string
367 longer than the indicated maximum length, it is truncated.
368 """
369 # A priori knowledge!
370 passphrase = "x" * 1024
371 pemFile = self._write_encrypted_pem(passphrase)
372 def passphraseCallback(maxlen, verify, extra):
373 assert maxlen == 1024
374 return passphrase + "y"
375
376 context = Context(TLSv1_METHOD)
377 context.set_passwd_cb(passphraseCallback)
378 # This shall succeed because the truncated result is the correct
379 # passphrase.
380 context.use_privatekey_file(pemFile)
381
382
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400383 def test_set_info_callback(self):
384 """
385 L{Context.set_info_callback} accepts a callable which will be invoked
386 when certain information about an SSL connection is available.
387 """
Rick Deanb1ccd562009-07-09 23:52:39 -0500388 (server, client) = socket_pair()
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400389
390 clientSSL = Connection(Context(TLSv1_METHOD), client)
391 clientSSL.set_connect_state()
392
393 called = []
394 def info(conn, where, ret):
395 called.append((conn, where, ret))
396 context = Context(TLSv1_METHOD)
397 context.set_info_callback(info)
398 context.use_certificate(
399 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
400 context.use_privatekey(
401 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
402
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -0400403 serverSSL = Connection(context, server)
404 serverSSL.set_accept_state()
405
406 while not called:
407 for ssl in clientSSL, serverSSL:
408 try:
409 ssl.do_handshake()
410 except WantReadError:
411 pass
412
413 # Kind of lame. Just make sure it got called somehow.
414 self.assertTrue(called)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400415
416
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400417 def _load_verify_locations_test(self, *args):
Rick Deanb1ccd562009-07-09 23:52:39 -0500418 (server, client) = socket_pair()
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400419
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400420 clientContext = Context(TLSv1_METHOD)
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400421 clientContext.load_verify_locations(*args)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400422 # Require that the server certificate verify properly or the
423 # connection will fail.
424 clientContext.set_verify(
425 VERIFY_PEER,
426 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
427
428 clientSSL = Connection(clientContext, client)
429 clientSSL.set_connect_state()
430
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400431 serverContext = Context(TLSv1_METHOD)
432 serverContext.use_certificate(
433 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
434 serverContext.use_privatekey(
435 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
436
437 serverSSL = Connection(serverContext, server)
438 serverSSL.set_accept_state()
439
440 for i in range(3):
441 for ssl in clientSSL, serverSSL:
442 try:
443 # Without load_verify_locations above, the handshake
444 # will fail:
445 # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
446 # 'certificate verify failed')]
447 ssl.do_handshake()
448 except WantReadError:
449 pass
450
451 cert = clientSSL.get_peer_certificate()
Jean-Paul Calderone20131f52009-04-01 12:05:45 -0400452 self.assertEqual(cert.get_subject().CN, 'Testing Root CA')
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400453
Jean-Paul Calderone12608a82009-11-07 10:35:15 -0500454
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400455 def test_load_verify_file(self):
456 """
457 L{Context.load_verify_locations} accepts a file name and uses the
458 certificates within for verification purposes.
459 """
460 cafile = self.mktemp()
461 fObj = file(cafile, 'w')
462 fObj.write(cleartextCertificatePEM)
463 fObj.close()
464
465 self._load_verify_locations_test(cafile)
466
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400467
468 def test_load_verify_invalid_file(self):
469 """
470 L{Context.load_verify_locations} raises L{Error} when passed a
471 non-existent cafile.
472 """
473 clientContext = Context(TLSv1_METHOD)
474 self.assertRaises(
475 Error, clientContext.load_verify_locations, self.mktemp())
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400476
477
478 def test_load_verify_directory(self):
479 """
480 L{Context.load_verify_locations} accepts a directory name and uses
481 the certificates within for verification purposes.
482 """
483 capath = self.mktemp()
484 makedirs(capath)
Jean-Paul Calderonea65cf6c2009-07-19 10:26:52 -0400485 # Hash value computed manually with c_rehash to avoid depending on
486 # c_rehash in the test suite.
487 cafile = join(capath, 'c7adac82.0')
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400488 fObj = file(cafile, 'w')
489 fObj.write(cleartextCertificatePEM)
490 fObj.close()
491
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400492 self._load_verify_locations_test(None, capath)
493
494
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400495 def test_load_verify_locations_wrong_args(self):
496 """
497 L{Context.load_verify_locations} raises L{TypeError} if called with
498 the wrong number of arguments or with non-C{str} arguments.
499 """
500 context = Context(TLSv1_METHOD)
501 self.assertRaises(TypeError, context.load_verify_locations)
502 self.assertRaises(TypeError, context.load_verify_locations, object())
503 self.assertRaises(TypeError, context.load_verify_locations, object(), object())
504 self.assertRaises(TypeError, context.load_verify_locations, None, None, None)
505
506
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -0400507 if platform == "win32":
508 "set_default_verify_paths appears not to work on Windows. "
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -0400509 "See LP#404343 and LP#404344."
510 else:
511 def test_set_default_verify_paths(self):
512 """
513 L{Context.set_default_verify_paths} causes the platform-specific CA
514 certificate locations to be used for verification purposes.
515 """
516 # Testing this requires a server with a certificate signed by one of
517 # the CAs in the platform CA location. Getting one of those costs
518 # money. Fortunately (or unfortunately, depending on your
519 # perspective), it's easy to think of a public server on the
520 # internet which has such a certificate. Connecting to the network
521 # in a unit test is bad, but it's the only way I can think of to
522 # really test this. -exarkun
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400523
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -0400524 # Arg, verisign.com doesn't speak TLSv1
525 context = Context(SSLv3_METHOD)
526 context.set_default_verify_paths()
527 context.set_verify(
Ziga Seilnacht44611bf2009-08-31 20:49:30 +0200528 VERIFY_PEER,
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -0400529 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400530
Jean-Paul Calderone28fb8f02009-07-24 18:01:31 -0400531 client = socket()
532 client.connect(('verisign.com', 443))
533 clientSSL = Connection(context, client)
534 clientSSL.set_connect_state()
535 clientSSL.do_handshake()
536 clientSSL.send('GET / HTTP/1.0\r\n\r\n')
537 self.assertTrue(clientSSL.recv(1024))
Jean-Paul Calderone9eadb962008-09-07 21:20:44 -0400538
539
540 def test_set_default_verify_paths_signature(self):
541 """
542 L{Context.set_default_verify_paths} takes no arguments and raises
543 L{TypeError} if given any.
544 """
545 context = Context(TLSv1_METHOD)
546 self.assertRaises(TypeError, context.set_default_verify_paths, None)
547 self.assertRaises(TypeError, context.set_default_verify_paths, 1)
548 self.assertRaises(TypeError, context.set_default_verify_paths, "")
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500549
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400550
Jean-Paul Calderone12608a82009-11-07 10:35:15 -0500551 def test_add_extra_chain_cert_invalid_cert(self):
552 """
553 L{Context.add_extra_chain_cert} raises L{TypeError} if called with
554 other than one argument or if called with an object which is not an
555 instance of L{X509}.
556 """
557 context = Context(TLSv1_METHOD)
558 self.assertRaises(TypeError, context.add_extra_chain_cert)
559 self.assertRaises(TypeError, context.add_extra_chain_cert, object())
560 self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object())
561
562
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400563 def _create_certificate_chain(self):
Jean-Paul Calderone12608a82009-11-07 10:35:15 -0500564 """
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400565 Construct and return a chain of certificates.
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400566
567 1. A new self-signed certificate authority certificate (cacert)
568 2. A new intermediate certificate signed by cacert (icert)
569 3. A new server certificate signed by icert (scert)
Jean-Paul Calderone12608a82009-11-07 10:35:15 -0500570 """
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -0400571 caext = X509Extension('basicConstraints', False, 'CA:true')
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500572
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400573 # Step 1
574 cakey = PKey()
575 cakey.generate_key(TYPE_RSA, 512)
576 cacert = X509()
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -0400577 cacert.get_subject().commonName = "Authority Certificate"
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400578 cacert.set_issuer(cacert.get_subject())
579 cacert.set_pubkey(cakey)
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -0400580 cacert.set_notBefore("20000101000000Z")
581 cacert.set_notAfter("20200101000000Z")
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400582 cacert.add_extensions([caext])
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -0400583 cacert.set_serial_number(0)
584 cacert.sign(cakey, "sha1")
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500585
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400586 # Step 2
587 ikey = PKey()
588 ikey.generate_key(TYPE_RSA, 512)
589 icert = X509()
590 icert.get_subject().commonName = "Intermediate Certificate"
591 icert.set_issuer(cacert.get_subject())
592 icert.set_pubkey(ikey)
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -0400593 icert.set_notBefore("20000101000000Z")
594 icert.set_notAfter("20200101000000Z")
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400595 icert.add_extensions([caext])
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -0400596 icert.set_serial_number(0)
597 icert.sign(cakey, "sha1")
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -0400598
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400599 # Step 3
600 skey = PKey()
601 skey.generate_key(TYPE_RSA, 512)
602 scert = X509()
603 scert.get_subject().commonName = "Server Certificate"
604 scert.set_issuer(icert.get_subject())
605 scert.set_pubkey(skey)
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -0400606 scert.set_notBefore("20000101000000Z")
607 scert.set_notAfter("20200101000000Z")
608 scert.add_extensions([X509Extension('basicConstraints', True, 'CA:false')])
609 scert.set_serial_number(0)
610 scert.sign(ikey, "sha1")
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -0400611
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400612 return [(cakey, cacert), (ikey, icert), (skey, scert)]
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400613
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400614
615 def _handshake_test(self, serverContext, clientContext):
616 """
617 Verify that a client and server created with the given contexts can
618 successfully handshake and communicate.
619 """
620 serverSocket, clientSocket = socket_pair()
621
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400622 server = Connection(serverContext, serverSocket)
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -0400623 server.set_accept_state()
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400624
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400625 client = Connection(clientContext, clientSocket)
626 client.set_connect_state()
627
628 # Make them talk to each other.
629 # self._interactInMemory(client, server)
630 for i in range(3):
631 for s in [client, server]:
632 try:
633 s.do_handshake()
634 except WantReadError:
635 pass
636
637
638 def test_add_extra_chain_cert(self):
639 """
640 L{Context.add_extra_chain_cert} accepts an L{X509} instance to add to
641 the certificate chain.
642
643 See L{_create_certificate_chain} for the details of the certificate
644 chain tested.
645
646 The chain is tested by starting a server with scert and connecting
647 to it with a client which trusts cacert and requires verification to
648 succeed.
649 """
650 chain = self._create_certificate_chain()
651 [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
652
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400653 # Dump the CA certificate to a file because that's the only way to load
654 # it as a trusted CA in the client context.
655 for cert, name in [(cacert, 'ca.pem'), (icert, 'i.pem'), (scert, 's.pem')]:
656 fObj = file(name, 'w')
657 fObj.write(dump_certificate(FILETYPE_PEM, cert))
658 fObj.close()
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -0400659 fObj = file(name.replace('pem', 'asn1'), 'w')
660 fObj.write(dump_certificate(FILETYPE_ASN1, cert))
661 fObj.close()
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400662
663 for key, name in [(cakey, 'ca.key'), (ikey, 'i.key'), (skey, 's.key')]:
664 fObj = file(name, 'w')
665 fObj.write(dump_privatekey(FILETYPE_PEM, key))
666 fObj.close()
667
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400668 # Create the server context
669 serverContext = Context(TLSv1_METHOD)
670 serverContext.use_privatekey(skey)
671 serverContext.use_certificate(scert)
Jean-Paul Calderone16cf03d2010-09-08 18:53:39 -0400672 # The client already has cacert, we only need to give them icert.
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400673 serverContext.add_extra_chain_cert(icert)
674
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -0400675 # Create the client
676 clientContext = Context(TLSv1_METHOD)
677 clientContext.set_verify(
678 VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
679 clientContext.load_verify_locations('ca.pem')
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -0400680
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400681 # Try it out.
682 self._handshake_test(serverContext, clientContext)
683
684
Jean-Paul Calderonef4480622010-08-02 18:25:03 -0400685 def test_use_certificate_chain_file(self):
686 """
687 L{Context.use_certificate_chain_file} reads a certificate chain from
688 the specified file.
689
690 The chain is tested by starting a server with scert and connecting
691 to it with a client which trusts cacert and requires verification to
692 succeed.
693 """
694 chain = self._create_certificate_chain()
695 [(cakey, cacert), (ikey, icert), (skey, scert)] = chain
696
697 # Write out the chain file.
698 chainFile = self.mktemp()
699 fObj = file(chainFile, 'w')
700 # Most specific to least general.
701 fObj.write(dump_certificate(FILETYPE_PEM, scert))
702 fObj.write(dump_certificate(FILETYPE_PEM, icert))
703 fObj.write(dump_certificate(FILETYPE_PEM, cacert))
704 fObj.close()
705
706 serverContext = Context(TLSv1_METHOD)
707 serverContext.use_certificate_chain_file(chainFile)
708 serverContext.use_privatekey(skey)
709
710 fObj = file('ca.pem', 'w')
711 fObj.write(dump_certificate(FILETYPE_PEM, cacert))
712 fObj.close()
713
714 clientContext = Context(TLSv1_METHOD)
715 clientContext.set_verify(
716 VERIFY_PEER | VERIFY_FAIL_IF_NO_PEER_CERT, verify_cb)
717 clientContext.load_verify_locations('ca.pem')
718
719 self._handshake_test(serverContext, clientContext)
720
Jean-Paul Calderonee0d79362010-08-03 18:46:46 -0400721 # XXX load_client_ca
722 # XXX set_session_id
723 # XXX get_verify
724 # XXX load_temp_dh
725 # XXX set_cipher_list
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -0400726
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -0400727
728
729class ConnectionTests(TestCase, _LoopbackMixin):
Rick Deane15b1472009-07-09 15:53:42 -0500730 """
731 Unit tests for L{OpenSSL.SSL.Connection}.
732 """
733 def test_type(self):
734 """
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400735 L{Connection} and L{ConnectionType} refer to the same type object and
736 can be used to create instances of that type.
Rick Deane15b1472009-07-09 15:53:42 -0500737 """
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400738 self.assertIdentical(Connection, ConnectionType)
Rick Deane15b1472009-07-09 15:53:42 -0500739 ctx = Context(TLSv1_METHOD)
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400740 self.assertConsistentType(Connection, 'Connection', ctx, None)
Rick Deane15b1472009-07-09 15:53:42 -0500741
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400742
Jean-Paul Calderone4fd058a2009-11-22 11:46:42 -0500743 def test_get_context(self):
744 """
745 L{Connection.get_context} returns the L{Context} instance used to
746 construct the L{Connection} instance.
747 """
748 context = Context(TLSv1_METHOD)
749 connection = Connection(context, None)
750 self.assertIdentical(connection.get_context(), context)
751
752
753 def test_get_context_wrong_args(self):
754 """
755 L{Connection.get_context} raises L{TypeError} if called with any
756 arguments.
757 """
758 connection = Connection(Context(TLSv1_METHOD), None)
759 self.assertRaises(TypeError, connection.get_context, None)
760
761
Jean-Paul Calderone1d69a722010-07-29 09:05:53 -0400762 def test_pending(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400763 """
764 L{Connection.pending} returns the number of bytes available for
765 immediate read.
766 """
Jean-Paul Calderone1d69a722010-07-29 09:05:53 -0400767 connection = Connection(Context(TLSv1_METHOD), None)
768 self.assertEquals(connection.pending(), 0)
769
770
771 def test_pending_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400772 """
773 L{Connection.pending} raises L{TypeError} if called with any arguments.
774 """
Jean-Paul Calderone1d69a722010-07-29 09:05:53 -0400775 connection = Connection(Context(TLSv1_METHOD), None)
776 self.assertRaises(TypeError, connection.pending, None)
777
778
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400779 def test_connect_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400780 """
781 L{Connection.connect} raises L{TypeError} if called with a non-address
782 argument or with the wrong number of arguments.
783 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400784 connection = Connection(Context(TLSv1_METHOD), socket())
785 self.assertRaises(TypeError, connection.connect, None)
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400786 self.assertRaises(TypeError, connection.connect)
787 self.assertRaises(TypeError, connection.connect, ("127.0.0.1", 1), None)
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400788
789
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -0400790 def test_connect_refused(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400791 """
792 L{Connection.connect} raises L{socket.error} if the underlying socket
793 connect method raises it.
794 """
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -0400795 client = socket()
796 context = Context(TLSv1_METHOD)
797 clientSSL = Connection(context, client)
798 exc = self.assertRaises(error, clientSSL.connect, ("127.0.0.1", 1))
Jean-Paul Calderone35adf9d2010-07-29 18:40:44 -0400799 self.assertEquals(exc.args[0], ECONNREFUSED)
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -0400800
801
802 def test_connect(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400803 """
804 L{Connection.connect} establishes a connection to the specified address.
805 """
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -0400806 port = socket()
807 port.bind(('', 0))
808 port.listen(3)
809
810 clientSSL = Connection(Context(TLSv1_METHOD), socket())
811 clientSSL.connect(port.getsockname())
812
813
Jean-Paul Calderone55d91f42010-07-29 19:22:17 -0400814 def test_connect_ex(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400815 """
816 If there is a connection error, L{Connection.connect_ex} returns the
817 errno instead of raising an exception.
818 """
Jean-Paul Calderone55d91f42010-07-29 19:22:17 -0400819 port = socket()
820 port.bind(('', 0))
821 port.listen(3)
822
823 clientSSL = Connection(Context(TLSv1_METHOD), socket())
824 clientSSL.setblocking(False)
825 self.assertEquals(
826 clientSSL.connect_ex(port.getsockname()), EINPROGRESS)
827
828
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400829 def test_accept_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400830 """
831 L{Connection.accept} raises L{TypeError} if called with any arguments.
832 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400833 connection = Connection(Context(TLSv1_METHOD), socket())
834 self.assertRaises(TypeError, connection.accept, None)
835
836
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -0400837 def test_accept(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400838 """
839 L{Connection.accept} accepts a pending connection attempt and returns a
840 tuple of a new L{Connection} (the accepted client) and the address the
841 connection originated from.
842 """
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -0400843 ctx = Context(TLSv1_METHOD)
844 ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
845 ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -0400846 port = socket()
847 portSSL = Connection(ctx, port)
848 portSSL.bind(('', 0))
849 portSSL.listen(3)
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -0400850
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -0400851 clientSSL = Connection(Context(TLSv1_METHOD), socket())
852 clientSSL.connect(portSSL.getsockname())
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -0400853
Jean-Paul Calderone0bcb0e02010-07-29 09:49:59 -0400854 serverSSL, address = portSSL.accept()
855
856 self.assertTrue(isinstance(serverSSL, Connection))
857 self.assertIdentical(serverSSL.get_context(), ctx)
858 self.assertEquals(address, clientSSL.getsockname())
Jean-Paul Calderone8bdeba22010-07-29 09:45:07 -0400859
860
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400861 def test_shutdown_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400862 """
863 L{Connection.shutdown} raises L{TypeError} if called with the wrong
864 number of arguments or with arguments other than integers.
865 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400866 connection = Connection(Context(TLSv1_METHOD), None)
867 self.assertRaises(TypeError, connection.shutdown, None)
Jean-Paul Calderone1d3e0222010-07-29 22:52:45 -0400868 self.assertRaises(TypeError, connection.get_shutdown, None)
869 self.assertRaises(TypeError, connection.set_shutdown)
870 self.assertRaises(TypeError, connection.set_shutdown, None)
871 self.assertRaises(TypeError, connection.set_shutdown, 0, 1)
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400872
873
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -0400874 def test_shutdown(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400875 """
876 L{Connection.shutdown} performs an SSL-level connection shutdown.
877 """
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -0400878 server, client = self._loopback()
Jean-Paul Calderonee4f6b472010-07-29 22:50:58 -0400879 self.assertFalse(server.shutdown())
880 self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN)
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -0400881 self.assertRaises(ZeroReturnError, client.recv, 1024)
Jean-Paul Calderonee4f6b472010-07-29 22:50:58 -0400882 self.assertEquals(client.get_shutdown(), RECEIVED_SHUTDOWN)
883 client.shutdown()
884 self.assertEquals(client.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN)
885 self.assertRaises(ZeroReturnError, server.recv, 1024)
886 self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN|RECEIVED_SHUTDOWN)
Jean-Paul Calderone9485f2c2010-07-29 22:38:42 -0400887
888
Jean-Paul Calderonec89eef22010-07-29 22:51:58 -0400889 def test_set_shutdown(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400890 """
891 L{Connection.set_shutdown} sets the state of the SSL connection shutdown
892 process.
893 """
Jean-Paul Calderonec89eef22010-07-29 22:51:58 -0400894 connection = Connection(Context(TLSv1_METHOD), socket())
895 connection.set_shutdown(RECEIVED_SHUTDOWN)
896 self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
897
898
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400899 def test_app_data_wrong_args(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400900 """
901 L{Connection.set_app_data} raises L{TypeError} if called with other than
902 one argument. L{Connection.get_app_data} raises L{TypeError} if called
903 with any arguments.
904 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400905 conn = Connection(Context(TLSv1_METHOD), None)
906 self.assertRaises(TypeError, conn.get_app_data, None)
907 self.assertRaises(TypeError, conn.set_app_data)
908 self.assertRaises(TypeError, conn.set_app_data, None, None)
909
910
Jean-Paul Calderone1bd8c792010-07-29 09:51:06 -0400911 def test_app_data(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400912 """
913 Any object can be set as app data by passing it to
914 L{Connection.set_app_data} and later retrieved with
915 L{Connection.get_app_data}.
916 """
Jean-Paul Calderone1bd8c792010-07-29 09:51:06 -0400917 conn = Connection(Context(TLSv1_METHOD), None)
918 app_data = object()
919 conn.set_app_data(app_data)
920 self.assertIdentical(conn.get_app_data(), app_data)
921
922
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400923 def test_makefile(self):
Jean-Paul Calderone93dba222010-09-08 22:59:37 -0400924 """
925 L{Connection.makefile} is not implemented and calling that method raises
926 L{NotImplementedError}.
927 """
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -0400928 conn = Connection(Context(TLSv1_METHOD), None)
929 self.assertRaises(NotImplementedError, conn.makefile)
930
931
Jean-Paul Calderone68649052009-07-17 21:14:27 -0400932
Jean-Paul Calderonef135e622010-07-28 19:14:16 -0400933class ConnectionGetCipherListTests(TestCase):
934 def test_wrongargs(self):
935 connection = Connection(Context(TLSv1_METHOD), None)
936 self.assertRaises(TypeError, connection.get_cipher_list, None)
937
938
939 def test_result(self):
940 connection = Connection(Context(TLSv1_METHOD), None)
941 ciphers = connection.get_cipher_list()
942 self.assertTrue(isinstance(ciphers, list))
943 for cipher in ciphers:
944 self.assertTrue(isinstance(cipher, str))
945
946
947
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -0400948class ConnectionSendallTests(TestCase, _LoopbackMixin):
949 """
950 Tests for L{Connection.sendall}.
951 """
952 def test_wrongargs(self):
953 """
954 When called with arguments other than a single string,
955 L{Connection.sendall} raises L{TypeError}.
956 """
957 connection = Connection(Context(TLSv1_METHOD), None)
958 self.assertRaises(TypeError, connection.sendall)
959 self.assertRaises(TypeError, connection.sendall, object())
960 self.assertRaises(TypeError, connection.sendall, "foo", "bar")
961
962
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -0400963 def test_short(self):
964 """
965 L{Connection.sendall} transmits all of the bytes in the string passed to
966 it.
967 """
968 server, client = self._loopback()
969 server.sendall('x')
970 self.assertEquals(client.recv(1), 'x')
971
972
973 def test_long(self):
974 server, client = self._loopback()
975 message ='x' * 1024 * 128 + 'y'
976 server.sendall(message)
977 accum = []
978 received = 0
979 while received < len(message):
980 bytes = client.recv(1024)
981 accum.append(bytes)
982 received += len(bytes)
983 self.assertEquals(message, ''.join(accum))
984
985
Jean-Paul Calderone974bdc02010-07-28 19:06:10 -0400986 def test_closed(self):
987 """
988 If the underlying socket is closed, L{Connection.sendall} propagates the
989 write error from the low level write call.
990 """
991 server, client = self._loopback()
992 client.close()
993 server.sendall("hello, world")
994 self.assertRaises(SysCallError, server.sendall, "hello, world")
995
Jean-Paul Calderone7ca48b52010-07-28 18:57:21 -0400996
Jean-Paul Calderone1d69a722010-07-29 09:05:53 -0400997
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -0400998class ConnectionRenegotiateTests(TestCase, _LoopbackMixin):
999 """
1000 Tests for SSL renegotiation APIs.
1001 """
1002 def test_renegotiate_wrong_args(self):
1003 connection = Connection(Context(TLSv1_METHOD), None)
1004 self.assertRaises(TypeError, connection.renegotiate, None)
1005
1006
Jean-Paul Calderonecfecc242010-07-29 22:47:06 -04001007 def test_total_renegotiations_wrong_args(self):
1008 connection = Connection(Context(TLSv1_METHOD), None)
1009 self.assertRaises(TypeError, connection.total_renegotiations, None)
1010
1011
1012 def test_total_renegotiations(self):
1013 connection = Connection(Context(TLSv1_METHOD), None)
1014 self.assertEquals(connection.total_renegotiations(), 0)
1015
1016
Jean-Paul Calderone8ea22522010-07-29 09:39:39 -04001017# def test_renegotiate(self):
1018# """
1019# """
1020# server, client = self._loopback()
1021
1022# server.send("hello world")
1023# self.assertEquals(client.recv(len("hello world")), "hello world")
1024
1025# self.assertEquals(server.total_renegotiations(), 0)
1026# self.assertTrue(server.renegotiate())
1027
1028# server.setblocking(False)
1029# client.setblocking(False)
1030# while server.renegotiate_pending():
1031# client.do_handshake()
1032# server.do_handshake()
1033
1034# self.assertEquals(server.total_renegotiations(), 1)
1035
1036
1037
1038
Jean-Paul Calderone68649052009-07-17 21:14:27 -04001039class ErrorTests(TestCase):
1040 """
1041 Unit tests for L{OpenSSL.SSL.Error}.
1042 """
1043 def test_type(self):
1044 """
1045 L{Error} is an exception type.
1046 """
1047 self.assertTrue(issubclass(Error, Exception))
Rick Deane15b1472009-07-09 15:53:42 -05001048 self.assertEqual(Error.__name__, 'Error')
Rick Deane15b1472009-07-09 15:53:42 -05001049
1050
1051
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -05001052class ConstantsTests(TestCase):
1053 """
1054 Tests for the values of constants exposed in L{OpenSSL.SSL}.
1055
1056 These are values defined by OpenSSL intended only to be used as flags to
1057 OpenSSL APIs. The only assertions it seems can be made about them is
1058 their values.
1059 """
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05001060 # unittest.TestCase has no skip mechanism
1061 if OP_NO_QUERY_MTU is not None:
1062 def test_op_no_query_mtu(self):
1063 """
1064 The value of L{OpenSSL.SSL.OP_NO_QUERY_MTU} is 0x1000, the value of
1065 I{SSL_OP_NO_QUERY_MTU} defined by I{openssl/ssl.h}.
1066 """
1067 self.assertEqual(OP_NO_QUERY_MTU, 0x1000)
1068 else:
1069 "OP_NO_QUERY_MTU unavailable - OpenSSL version may be too old"
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -05001070
1071
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05001072 if OP_COOKIE_EXCHANGE is not None:
1073 def test_op_cookie_exchange(self):
1074 """
1075 The value of L{OpenSSL.SSL.OP_COOKIE_EXCHANGE} is 0x2000, the value
1076 of I{SSL_OP_COOKIE_EXCHANGE} defined by I{openssl/ssl.h}.
1077 """
1078 self.assertEqual(OP_COOKIE_EXCHANGE, 0x2000)
1079 else:
1080 "OP_COOKIE_EXCHANGE unavailable - OpenSSL version may be too old"
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -05001081
1082
Jean-Paul Calderoned811b682008-12-28 22:59:15 -05001083 if OP_NO_TICKET is not None:
1084 def test_op_no_ticket(self):
1085 """
1086 The value of L{OpenSSL.SSL.OP_NO_TICKET} is 0x4000, the value of
1087 I{SSL_OP_NO_TICKET} defined by I{openssl/ssl.h}.
1088 """
1089 self.assertEqual(OP_NO_TICKET, 0x4000)
1090 else:
1091 "OP_NO_TICKET unavailable - OpenSSL version may be too old"
Rick Dean5b7b6372009-04-01 11:34:06 -05001092
1093
Jean-Paul Calderoneeeee26a2009-04-27 11:24:30 -04001094
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001095class MemoryBIOTests(TestCase, _LoopbackMixin):
Rick Deanb71c0d22009-04-01 14:09:23 -05001096 """
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04001097 Tests for L{OpenSSL.SSL.Connection} using a memory BIO.
Rick Deanb71c0d22009-04-01 14:09:23 -05001098 """
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04001099 def _server(self, sock):
1100 """
1101 Create a new server-side SSL L{Connection} object wrapped around
1102 C{sock}.
1103 """
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04001104 # Create the server side Connection. This is mostly setup boilerplate
1105 # - use TLSv1, use a particular certificate, etc.
1106 server_ctx = Context(TLSv1_METHOD)
1107 server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
1108 server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
1109 server_store = server_ctx.get_cert_store()
1110 server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
1111 server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
1112 server_ctx.check_privatekey()
1113 server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
Rick Deanb1ccd562009-07-09 23:52:39 -05001114 # Here the Connection is actually created. If None is passed as the 2nd
1115 # parameter, it indicates a memory BIO should be created.
1116 server_conn = Connection(server_ctx, sock)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04001117 server_conn.set_accept_state()
1118 return server_conn
1119
1120
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04001121 def _client(self, sock):
1122 """
1123 Create a new client-side SSL L{Connection} object wrapped around
1124 C{sock}.
1125 """
1126 # Now create the client side Connection. Similar boilerplate to the
1127 # above.
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04001128 client_ctx = Context(TLSv1_METHOD)
1129 client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
1130 client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
1131 client_store = client_ctx.get_cert_store()
1132 client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
1133 client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
1134 client_ctx.check_privatekey()
1135 client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
Rick Deanb1ccd562009-07-09 23:52:39 -05001136 client_conn = Connection(client_ctx, sock)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04001137 client_conn.set_connect_state()
1138 return client_conn
1139
1140
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04001141 def test_memoryConnect(self):
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04001142 """
1143 Two L{Connection}s which use memory BIOs can be manually connected by
1144 reading from the output of each and writing those bytes to the input of
1145 the other and in this way establish a connection and exchange
1146 application-level bytes with each other.
1147 """
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04001148 server_conn = self._server(None)
1149 client_conn = self._client(None)
Rick Deanb71c0d22009-04-01 14:09:23 -05001150
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04001151 # There should be no key or nonces yet.
1152 self.assertIdentical(server_conn.master_key(), None)
1153 self.assertIdentical(server_conn.client_random(), None)
1154 self.assertIdentical(server_conn.server_random(), None)
Rick Deanb71c0d22009-04-01 14:09:23 -05001155
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04001156 # First, the handshake needs to happen. We'll deliver bytes back and
1157 # forth between the client and server until neither of them feels like
1158 # speaking any more.
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001159 self.assertIdentical(
1160 self._interactInMemory(client_conn, server_conn), None)
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04001161
1162 # Now that the handshake is done, there should be a key and nonces.
1163 self.assertNotIdentical(server_conn.master_key(), None)
1164 self.assertNotIdentical(server_conn.client_random(), None)
1165 self.assertNotIdentical(server_conn.server_random(), None)
Jean-Paul Calderone6c051e62009-07-05 13:50:34 -04001166 self.assertEquals(server_conn.client_random(), client_conn.client_random())
1167 self.assertEquals(server_conn.server_random(), client_conn.server_random())
1168 self.assertNotEquals(server_conn.client_random(), server_conn.server_random())
1169 self.assertNotEquals(client_conn.client_random(), client_conn.server_random())
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04001170
1171 # Here are the bytes we'll try to send.
Rick Deanb71c0d22009-04-01 14:09:23 -05001172 important_message = 'One if by land, two if by sea.'
1173
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04001174 server_conn.write(important_message)
1175 self.assertEquals(
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001176 self._interactInMemory(client_conn, server_conn),
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04001177 (client_conn, important_message))
1178
1179 client_conn.write(important_message[::-1])
1180 self.assertEquals(
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001181 self._interactInMemory(client_conn, server_conn),
Jean-Paul Calderone958299e2009-04-27 12:59:12 -04001182 (server_conn, important_message[::-1]))
Rick Deanb71c0d22009-04-01 14:09:23 -05001183
1184
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04001185 def test_socketConnect(self):
Rick Deanb1ccd562009-07-09 23:52:39 -05001186 """
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04001187 Just like L{test_memoryConnect} but with an actual socket.
1188
1189 This is primarily to rule out the memory BIO code as the source of
1190 any problems encountered while passing data over a L{Connection} (if
1191 this test fails, there must be a problem outside the memory BIO
1192 code, as no memory BIO is involved here). Even though this isn't a
1193 memory BIO test, it's convenient to have it here.
Rick Deanb1ccd562009-07-09 23:52:39 -05001194 """
1195 (server, client) = socket_pair()
1196
1197 # Let the encryption begin...
1198 client_conn = self._client(client)
Rick Deanb1ccd562009-07-09 23:52:39 -05001199 server_conn = self._server(server)
Jean-Paul Calderone7903cbd2009-07-16 12:23:34 -04001200
Rick Deanb1ccd562009-07-09 23:52:39 -05001201 # Establish the connection
1202 established = False
1203 while not established:
1204 established = True # assume the best
1205 for ssl in client_conn, server_conn:
1206 try:
Ziga Seilnacht44611bf2009-08-31 20:49:30 +02001207 # Generally a recv() or send() could also work instead
1208 # of do_handshake(), and we would stop on the first
Rick Deanb1ccd562009-07-09 23:52:39 -05001209 # non-exception.
1210 ssl.do_handshake()
1211 except WantReadError:
1212 established = False
1213
1214 important_message = "Help me Obi Wan Kenobi, you're my only hope."
1215 client_conn.send(important_message)
1216 msg = server_conn.recv(1024)
1217 self.assertEqual(msg, important_message)
1218
1219 # Again in the other direction, just for fun.
1220 important_message = important_message[::-1]
1221 server_conn.send(important_message)
1222 msg = client_conn.recv(1024)
1223 self.assertEqual(msg, important_message)
1224
1225
Jean-Paul Calderonefc4ed0f2009-04-27 11:51:27 -04001226 def test_socketOverridesMemory(self):
Rick Deanb71c0d22009-04-01 14:09:23 -05001227 """
1228 Test that L{OpenSSL.SSL.bio_read} and L{OpenSSL.SSL.bio_write} don't
1229 work on L{OpenSSL.SSL.Connection}() that use sockets.
1230 """
1231 context = Context(SSLv3_METHOD)
1232 client = socket()
1233 clientSSL = Connection(context, client)
1234 self.assertRaises( TypeError, clientSSL.bio_read, 100)
1235 self.assertRaises( TypeError, clientSSL.bio_write, "foo")
Jean-Paul Calderone07acf3f2009-05-05 13:23:28 -04001236 self.assertRaises( TypeError, clientSSL.bio_shutdown )
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04001237
1238
1239 def test_outgoingOverflow(self):
1240 """
1241 If more bytes than can be written to the memory BIO are passed to
1242 L{Connection.send} at once, the number of bytes which were written is
1243 returned and that many bytes from the beginning of the input can be
1244 read from the other end of the connection.
1245 """
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04001246 server = self._server(None)
1247 client = self._client(None)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04001248
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001249 self._interactInMemory(client, server)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04001250
1251 size = 2 ** 15
1252 sent = client.send("x" * size)
1253 # Sanity check. We're trying to test what happens when the entire
1254 # input can't be sent. If the entire input was sent, this test is
1255 # meaningless.
1256 self.assertTrue(sent < size)
1257
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001258 receiver, received = self._interactInMemory(client, server)
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -04001259 self.assertIdentical(receiver, server)
1260
1261 # We can rely on all of these bytes being received at once because
1262 # _loopback passes 2 ** 16 to recv - more than 2 ** 15.
1263 self.assertEquals(len(received), sent)
Jean-Paul Calderone3ad85d42009-04-30 20:24:35 -04001264
1265
1266 def test_shutdown(self):
1267 """
1268 L{Connection.bio_shutdown} signals the end of the data stream from
1269 which the L{Connection} reads.
1270 """
Jean-Paul Calderonece8324d2009-07-16 12:22:52 -04001271 server = self._server(None)
Jean-Paul Calderone3ad85d42009-04-30 20:24:35 -04001272 server.bio_shutdown()
1273 e = self.assertRaises(Error, server.recv, 1024)
1274 # We don't want WantReadError or ZeroReturnError or anything - it's a
1275 # handshake failure.
1276 self.assertEquals(e.__class__, Error)
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -04001277
1278
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001279 def _check_client_ca_list(self, func):
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001280 """
1281 Verify the return value of the C{get_client_ca_list} method for server and client connections.
1282
1283 @param func: A function which will be called with the server context
1284 before the client and server are connected to each other. This
1285 function should specify a list of CAs for the server to send to the
1286 client and return that same list. The list will be used to verify
1287 that C{get_client_ca_list} returns the proper value at various
1288 times.
1289 """
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001290 server = self._server(None)
1291 client = self._client(None)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001292 self.assertEqual(client.get_client_ca_list(), [])
1293 self.assertEqual(server.get_client_ca_list(), [])
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001294 ctx = server.get_context()
1295 expected = func(ctx)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001296 self.assertEqual(client.get_client_ca_list(), [])
1297 self.assertEqual(server.get_client_ca_list(), expected)
Jean-Paul Calderonebf37f0f2010-07-31 14:56:20 -04001298 self._interactInMemory(client, server)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001299 self.assertEqual(client.get_client_ca_list(), expected)
1300 self.assertEqual(server.get_client_ca_list(), expected)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001301
1302
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001303 def test_set_client_ca_list_errors(self):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001304 """
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001305 L{Context.set_client_ca_list} raises a L{TypeError} if called with a
1306 non-list or a list that contains objects other than X509Names.
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001307 """
1308 ctx = Context(TLSv1_METHOD)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001309 self.assertRaises(TypeError, ctx.set_client_ca_list, "spam")
1310 self.assertRaises(TypeError, ctx.set_client_ca_list, ["spam"])
1311 self.assertIdentical(ctx.set_client_ca_list([]), None)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001312
1313
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001314 def test_set_empty_ca_list(self):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001315 """
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001316 If passed an empty list, L{Context.set_client_ca_list} configures the
1317 context to send no CA names to the client and, on both the server and
1318 client sides, L{Connection.get_client_ca_list} returns an empty list
1319 after the connection is set up.
1320 """
1321 def no_ca(ctx):
1322 ctx.set_client_ca_list([])
1323 return []
1324 self._check_client_ca_list(no_ca)
1325
1326
1327 def test_set_one_ca_list(self):
1328 """
1329 If passed a list containing a single X509Name,
1330 L{Context.set_client_ca_list} configures the context to send that CA
1331 name to the client and, on both the server and client sides,
1332 L{Connection.get_client_ca_list} returns a list containing that
1333 X509Name after the connection is set up.
1334 """
1335 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1336 cadesc = cacert.get_subject()
1337 def single_ca(ctx):
1338 ctx.set_client_ca_list([cadesc])
1339 return [cadesc]
1340 self._check_client_ca_list(single_ca)
1341
1342
1343 def test_set_multiple_ca_list(self):
1344 """
1345 If passed a list containing multiple X509Name objects,
1346 L{Context.set_client_ca_list} configures the context to send those CA
1347 names to the client and, on both the server and client sides,
1348 L{Connection.get_client_ca_list} returns a list containing those
1349 X509Names after the connection is set up.
1350 """
1351 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1352 clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
1353
1354 sedesc = secert.get_subject()
1355 cldesc = clcert.get_subject()
1356
1357 def multiple_ca(ctx):
1358 L = [sedesc, cldesc]
1359 ctx.set_client_ca_list(L)
1360 return L
1361 self._check_client_ca_list(multiple_ca)
1362
1363
1364 def test_reset_ca_list(self):
1365 """
1366 If called multiple times, only the X509Names passed to the final call
1367 of L{Context.set_client_ca_list} are used to configure the CA names
1368 sent to the client.
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001369 """
1370 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1371 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1372 clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
1373
1374 cadesc = cacert.get_subject()
1375 sedesc = secert.get_subject()
1376 cldesc = clcert.get_subject()
1377
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001378 def changed_ca(ctx):
1379 ctx.set_client_ca_list([sedesc, cldesc])
1380 ctx.set_client_ca_list([cadesc])
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001381 return [cadesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001382 self._check_client_ca_list(changed_ca)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001383
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001384
1385 def test_mutated_ca_list(self):
1386 """
1387 If the list passed to L{Context.set_client_ca_list} is mutated
1388 afterwards, this does not affect the list of CA names sent to the
1389 client.
1390 """
1391 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1392 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1393
1394 cadesc = cacert.get_subject()
1395 sedesc = secert.get_subject()
1396
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001397 def mutated_ca(ctx):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001398 L = [cadesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001399 ctx.set_client_ca_list([cadesc])
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001400 L.append(sedesc)
1401 return [cadesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001402 self._check_client_ca_list(mutated_ca)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001403
1404
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001405 def test_add_client_ca_errors(self):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001406 """
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001407 L{Context.add_client_ca} raises L{TypeError} if called with a non-X509
1408 object or with a number of arguments other than one.
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001409 """
1410 ctx = Context(TLSv1_METHOD)
1411 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001412 self.assertRaises(TypeError, ctx.add_client_ca)
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001413 self.assertRaises(TypeError, ctx.add_client_ca, "spam")
Jean-Paul Calderone911c9112009-10-24 11:12:00 -04001414 self.assertRaises(TypeError, ctx.add_client_ca, cacert, cacert)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001415
1416
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04001417 def test_one_add_client_ca(self):
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001418 """
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04001419 A certificate's subject can be added as a CA to be sent to the client
1420 with L{Context.add_client_ca}.
1421 """
1422 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1423 cadesc = cacert.get_subject()
1424 def single_ca(ctx):
1425 ctx.add_client_ca(cacert)
1426 return [cadesc]
1427 self._check_client_ca_list(single_ca)
1428
1429
1430 def test_multiple_add_client_ca(self):
1431 """
1432 Multiple CA names can be sent to the client by calling
1433 L{Context.add_client_ca} with multiple X509 objects.
1434 """
1435 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1436 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1437
1438 cadesc = cacert.get_subject()
1439 sedesc = secert.get_subject()
1440
1441 def multiple_ca(ctx):
1442 ctx.add_client_ca(cacert)
1443 ctx.add_client_ca(secert)
1444 return [cadesc, sedesc]
1445 self._check_client_ca_list(multiple_ca)
1446
1447
1448 def test_set_and_add_client_ca(self):
1449 """
1450 A call to L{Context.set_client_ca_list} followed by a call to
1451 L{Context.add_client_ca} results in using the CA names from the first
1452 call and the CA name from the second call.
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001453 """
1454 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1455 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1456 clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
1457
1458 cadesc = cacert.get_subject()
1459 sedesc = secert.get_subject()
1460 cldesc = clcert.get_subject()
1461
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001462 def mixed_set_add_ca(ctx):
1463 ctx.set_client_ca_list([cadesc, sedesc])
1464 ctx.add_client_ca(clcert)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001465 return [cadesc, sedesc, cldesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001466 self._check_client_ca_list(mixed_set_add_ca)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001467
Jean-Paul Calderone055a9172009-10-24 13:45:11 -04001468
1469 def test_set_after_add_client_ca(self):
1470 """
1471 A call to L{Context.set_client_ca_list} after a call to
1472 L{Context.add_client_ca} replaces the CA name specified by the former
1473 call with the names specified by the latter cal.
1474 """
1475 cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
1476 secert = load_certificate(FILETYPE_PEM, server_cert_pem)
1477 clcert = load_certificate(FILETYPE_PEM, server_cert_pem)
1478
1479 cadesc = cacert.get_subject()
1480 sedesc = secert.get_subject()
1481 cldesc = clcert.get_subject()
1482
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001483 def set_replaces_add_ca(ctx):
1484 ctx.add_client_ca(clcert)
1485 ctx.set_client_ca_list([cadesc])
1486 ctx.add_client_ca(secert)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001487 return [cadesc, sedesc]
Ziga Seilnachtf93bf102009-10-23 09:51:07 +02001488 self._check_client_ca_list(set_replaces_add_ca)
Ziga Seilnacht679c4262009-09-01 01:32:29 +02001489
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -04001490
Ziga Seilnacht44611bf2009-08-31 20:49:30 +02001491
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -04001492if __name__ == '__main__':
1493 main()