blob: dd42a10d25a438822d7f49d4f30eee2d5172f9dc [file] [log] [blame]
Jean-Paul Calderone8b63d452008-03-21 18:31:12 -04001# Copyright (C) Jean-Paul Calderone 2008, All rights reserved
2
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -04003"""
4Unit tests for L{OpenSSL.SSL}.
5"""
6
7from unittest import TestCase
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -04008from tempfile import mktemp
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -04009from socket import socket
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -040010from os import makedirs, symlink
11from os.path import join
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040012
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040013from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, PKey, dump_privatekey, load_certificate, load_privatekey
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -040014from OpenSSL.SSL import WantReadError, Context, Connection, Error
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040015from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -040016from OpenSSL.SSL import VERIFY_PEER
Jean-Paul Calderone65407752008-04-26 19:59:01 -040017from OpenSSL.test.test_crypto import _Python23TestCaseHelper, cleartextCertificatePEM, cleartextPrivateKeyPEM
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040018
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040019
Jean-Paul Calderone65407752008-04-26 19:59:01 -040020class ContextTests(TestCase, _Python23TestCaseHelper):
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040021 """
22 Unit tests for L{OpenSSL.SSL.Context}.
23 """
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -040024 def mktemp(self):
25 """
26 Pathetic substitute for twisted.trial.unittest.TestCase.mktemp.
27 """
28 return mktemp(dir=".")
29
30
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040031 def test_method(self):
32 """
33 L{Context} can be instantiated with one of L{SSLv2_METHOD},
34 L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}.
35 """
36 for meth in [SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]:
37 Context(meth)
38 self.assertRaises(TypeError, Context, "")
39 self.assertRaises(ValueError, Context, 10)
40
41
42 def test_use_privatekey(self):
43 """
44 L{Context.use_privatekey} takes an L{OpenSSL.crypto.PKey} instance.
45 """
46 key = PKey()
47 key.generate_key(TYPE_RSA, 128)
48 ctx = Context(TLSv1_METHOD)
49 ctx.use_privatekey(key)
50 self.assertRaises(TypeError, ctx.use_privatekey, "")
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -040051
52
53 def test_set_passwd_cb(self):
54 """
55 L{Context.set_passwd_cb} accepts a callable which will be invoked when
56 a private key is loaded from an encrypted PEM.
57 """
58 key = PKey()
59 key.generate_key(TYPE_RSA, 128)
60 pemFile = self.mktemp()
61 fObj = file(pemFile, 'w')
62 passphrase = "foobar"
63 fObj.write(dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase))
64 fObj.close()
65
66 calledWith = []
67 def passphraseCallback(maxlen, verify, extra):
68 calledWith.append((maxlen, verify, extra))
69 return passphrase
70 context = Context(TLSv1_METHOD)
71 context.set_passwd_cb(passphraseCallback)
72 context.use_privatekey_file(pemFile)
73 self.assertTrue(len(calledWith), 1)
74 self.assertTrue(isinstance(calledWith[0][0], int))
75 self.assertTrue(isinstance(calledWith[0][1], int))
76 self.assertEqual(calledWith[0][2], None)
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040077
78
79 def test_set_info_callback(self):
80 """
81 L{Context.set_info_callback} accepts a callable which will be invoked
82 when certain information about an SSL connection is available.
83 """
84 port = socket()
85 port.bind(('', 0))
86 port.listen(1)
87
88 client = socket()
89 client.setblocking(False)
90 client.connect_ex(port.getsockname())
91
92 clientSSL = Connection(Context(TLSv1_METHOD), client)
93 clientSSL.set_connect_state()
94
95 called = []
96 def info(conn, where, ret):
97 called.append((conn, where, ret))
98 context = Context(TLSv1_METHOD)
99 context.set_info_callback(info)
100 context.use_certificate(
101 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
102 context.use_privatekey(
103 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
104
105 server, ignored = port.accept()
106 server.setblocking(False)
107
108 serverSSL = Connection(context, server)
109 serverSSL.set_accept_state()
110
111 while not called:
112 for ssl in clientSSL, serverSSL:
113 try:
114 ssl.do_handshake()
115 except WantReadError:
116 pass
117
118 # Kind of lame. Just make sure it got called somehow.
119 self.assertTrue(called)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400120
121
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400122 def _load_verify_locations_test(self, *args):
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400123 port = socket()
124 port.bind(('', 0))
125 port.listen(1)
126
127 client = socket()
128 client.setblocking(False)
129 client.connect_ex(port.getsockname())
130
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400131 clientContext = Context(TLSv1_METHOD)
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400132 clientContext.load_verify_locations(*args)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400133 # Require that the server certificate verify properly or the
134 # connection will fail.
135 clientContext.set_verify(
136 VERIFY_PEER,
137 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
138
139 clientSSL = Connection(clientContext, client)
140 clientSSL.set_connect_state()
141
142 server, _ = port.accept()
143 server.setblocking(False)
144
145 serverContext = Context(TLSv1_METHOD)
146 serverContext.use_certificate(
147 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
148 serverContext.use_privatekey(
149 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
150
151 serverSSL = Connection(serverContext, server)
152 serverSSL.set_accept_state()
153
154 for i in range(3):
155 for ssl in clientSSL, serverSSL:
156 try:
157 # Without load_verify_locations above, the handshake
158 # will fail:
159 # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
160 # 'certificate verify failed')]
161 ssl.do_handshake()
162 except WantReadError:
163 pass
164
165 cert = clientSSL.get_peer_certificate()
166 self.assertEqual(cert.get_subject().CN, 'pyopenssl.sf.net')
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400167
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400168 def test_load_verify_file(self):
169 """
170 L{Context.load_verify_locations} accepts a file name and uses the
171 certificates within for verification purposes.
172 """
173 cafile = self.mktemp()
174 fObj = file(cafile, 'w')
175 fObj.write(cleartextCertificatePEM)
176 fObj.close()
177
178 self._load_verify_locations_test(cafile)
179
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400180
181 def test_load_verify_invalid_file(self):
182 """
183 L{Context.load_verify_locations} raises L{Error} when passed a
184 non-existent cafile.
185 """
186 clientContext = Context(TLSv1_METHOD)
187 self.assertRaises(
188 Error, clientContext.load_verify_locations, self.mktemp())
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400189
190
191 def test_load_verify_directory(self):
192 """
193 L{Context.load_verify_locations} accepts a directory name and uses
194 the certificates within for verification purposes.
195 """
196 capath = self.mktemp()
197 makedirs(capath)
198 cafile = join(capath, 'cert.pem')
199 fObj = file(cafile, 'w')
200 fObj.write(cleartextCertificatePEM)
201 fObj.close()
202
203 # Hash value computed manually with c_rehash to avoid depending on
204 # c_rehash in the test suite.
205 symlink('cert.pem', join(capath, '07497d9e.0'))
206
207 self._load_verify_locations_test(None, capath)
208
209
210 def test_set_default_verify_paths(self):
211 """
212 L{Context.set_default_verify_paths} causes the platform-specific CA
213 certificate locations to be used for verification purposes.
214 """
215 # Testing this requires a server with a certificate signed by one of
216 # the CAs in the platform CA location. Getting one of those costs
217 # money. Fortunately (or unfortunately, depending on your
218 # perspective), it's easy to think of a public server on the
219 # internet which has such a certificate. Connecting to the network
220 # in a unit test is bad, but it's the only way I can think of to
221 # really test this. -exarkun
222
223 # Arg, verisign.com doesn't speak TLSv1
224 context = Context(SSLv3_METHOD)
225 context.set_default_verify_paths()
226 context.set_verify(
227 VERIFY_PEER,
228 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
229
230 client = socket()
231 client.connect(('verisign.com', 443))
232 clientSSL = Connection(context, client)
233 clientSSL.set_connect_state()
234 clientSSL.do_handshake()
235 clientSSL.send('GET / HTTP/1.0\r\n\r\n')
236 self.assertTrue(clientSSL.recv(1024))