Jean-Paul Calderone | 8b63d45 | 2008-03-21 18:31:12 -0400 | [diff] [blame] | 1 | # Copyright (C) Jean-Paul Calderone 2008, All rights reserved |
| 2 | |
Jean-Paul Calderone | 30c09ea | 2008-03-21 17:04:05 -0400 | [diff] [blame] | 3 | """ |
| 4 | Unit tests for L{OpenSSL.SSL}. |
| 5 | """ |
| 6 | |
| 7 | from unittest import TestCase |
Jean-Paul Calderone | 828c9cb | 2008-04-26 18:06:54 -0400 | [diff] [blame] | 8 | from tempfile import mktemp |
Jean-Paul Calderone | 5ef8651 | 2008-04-26 19:06:28 -0400 | [diff] [blame] | 9 | from socket import socket |
Jean-Paul Calderone | 30c09ea | 2008-03-21 17:04:05 -0400 | [diff] [blame] | 10 | |
Jean-Paul Calderone | 5ef8651 | 2008-04-26 19:06:28 -0400 | [diff] [blame] | 11 | from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, PKey, dump_privatekey, load_certificate, load_privatekey |
| 12 | from OpenSSL.SSL import WantReadError, Context, Connection |
Jean-Paul Calderone | 30c09ea | 2008-03-21 17:04:05 -0400 | [diff] [blame] | 13 | from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD |
| 14 | |
Jean-Paul Calderone | 6540775 | 2008-04-26 19:59:01 -0400 | [diff] [blame] | 15 | from OpenSSL.test.test_crypto import _Python23TestCaseHelper, cleartextCertificatePEM, cleartextPrivateKeyPEM |
Jean-Paul Calderone | 5ef8651 | 2008-04-26 19:06:28 -0400 | [diff] [blame] | 16 | |
Jean-Paul Calderone | 30c09ea | 2008-03-21 17:04:05 -0400 | [diff] [blame] | 17 | |
Jean-Paul Calderone | 6540775 | 2008-04-26 19:59:01 -0400 | [diff] [blame] | 18 | class ContextTests(TestCase, _Python23TestCaseHelper): |
Jean-Paul Calderone | 30c09ea | 2008-03-21 17:04:05 -0400 | [diff] [blame] | 19 | """ |
| 20 | Unit tests for L{OpenSSL.SSL.Context}. |
| 21 | """ |
Jean-Paul Calderone | 828c9cb | 2008-04-26 18:06:54 -0400 | [diff] [blame] | 22 | def mktemp(self): |
| 23 | """ |
| 24 | Pathetic substitute for twisted.trial.unittest.TestCase.mktemp. |
| 25 | """ |
| 26 | return mktemp(dir=".") |
| 27 | |
| 28 | |
Jean-Paul Calderone | 30c09ea | 2008-03-21 17:04:05 -0400 | [diff] [blame] | 29 | def test_method(self): |
| 30 | """ |
| 31 | L{Context} can be instantiated with one of L{SSLv2_METHOD}, |
| 32 | L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}. |
| 33 | """ |
| 34 | for meth in [SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]: |
| 35 | Context(meth) |
| 36 | self.assertRaises(TypeError, Context, "") |
| 37 | self.assertRaises(ValueError, Context, 10) |
| 38 | |
| 39 | |
| 40 | def test_use_privatekey(self): |
| 41 | """ |
| 42 | L{Context.use_privatekey} takes an L{OpenSSL.crypto.PKey} instance. |
| 43 | """ |
| 44 | key = PKey() |
| 45 | key.generate_key(TYPE_RSA, 128) |
| 46 | ctx = Context(TLSv1_METHOD) |
| 47 | ctx.use_privatekey(key) |
| 48 | self.assertRaises(TypeError, ctx.use_privatekey, "") |
Jean-Paul Calderone | 828c9cb | 2008-04-26 18:06:54 -0400 | [diff] [blame] | 49 | |
| 50 | |
| 51 | def test_set_passwd_cb(self): |
| 52 | """ |
| 53 | L{Context.set_passwd_cb} accepts a callable which will be invoked when |
| 54 | a private key is loaded from an encrypted PEM. |
| 55 | """ |
| 56 | key = PKey() |
| 57 | key.generate_key(TYPE_RSA, 128) |
| 58 | pemFile = self.mktemp() |
| 59 | fObj = file(pemFile, 'w') |
| 60 | passphrase = "foobar" |
| 61 | fObj.write(dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase)) |
| 62 | fObj.close() |
| 63 | |
| 64 | calledWith = [] |
| 65 | def passphraseCallback(maxlen, verify, extra): |
| 66 | calledWith.append((maxlen, verify, extra)) |
| 67 | return passphrase |
| 68 | context = Context(TLSv1_METHOD) |
| 69 | context.set_passwd_cb(passphraseCallback) |
| 70 | context.use_privatekey_file(pemFile) |
| 71 | self.assertTrue(len(calledWith), 1) |
| 72 | self.assertTrue(isinstance(calledWith[0][0], int)) |
| 73 | self.assertTrue(isinstance(calledWith[0][1], int)) |
| 74 | self.assertEqual(calledWith[0][2], None) |
Jean-Paul Calderone | 5ef8651 | 2008-04-26 19:06:28 -0400 | [diff] [blame] | 75 | |
| 76 | |
| 77 | def test_set_info_callback(self): |
| 78 | """ |
| 79 | L{Context.set_info_callback} accepts a callable which will be invoked |
| 80 | when certain information about an SSL connection is available. |
| 81 | """ |
| 82 | port = socket() |
| 83 | port.bind(('', 0)) |
| 84 | port.listen(1) |
| 85 | |
| 86 | client = socket() |
| 87 | client.setblocking(False) |
| 88 | client.connect_ex(port.getsockname()) |
| 89 | |
| 90 | clientSSL = Connection(Context(TLSv1_METHOD), client) |
| 91 | clientSSL.set_connect_state() |
| 92 | |
| 93 | called = [] |
| 94 | def info(conn, where, ret): |
| 95 | called.append((conn, where, ret)) |
| 96 | context = Context(TLSv1_METHOD) |
| 97 | context.set_info_callback(info) |
| 98 | context.use_certificate( |
| 99 | load_certificate(FILETYPE_PEM, cleartextCertificatePEM)) |
| 100 | context.use_privatekey( |
| 101 | load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM)) |
| 102 | |
| 103 | server, ignored = port.accept() |
| 104 | server.setblocking(False) |
| 105 | |
| 106 | serverSSL = Connection(context, server) |
| 107 | serverSSL.set_accept_state() |
| 108 | |
| 109 | while not called: |
| 110 | for ssl in clientSSL, serverSSL: |
| 111 | try: |
| 112 | ssl.do_handshake() |
| 113 | except WantReadError: |
| 114 | pass |
| 115 | |
| 116 | # Kind of lame. Just make sure it got called somehow. |
| 117 | self.assertTrue(called) |