blob: 7fbb359f8afa8a9b0792d7fe09a29bf9a9a83340 [file] [log] [blame]
Jean-Paul Calderone8b63d452008-03-21 18:31:12 -04001# Copyright (C) Jean-Paul Calderone 2008, All rights reserved
2
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -04003"""
4Unit tests for L{OpenSSL.SSL}.
5"""
6
7from unittest import TestCase
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -04008from tempfile import mktemp
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -04009from socket import socket
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -040010from os import makedirs, symlink
11from os.path import join
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040012
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040013from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, PKey, dump_privatekey, load_certificate, load_privatekey
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -040014from OpenSSL.SSL import WantReadError, Context, Connection, Error
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040015from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
Jean-Paul Calderone4bccf5e2008-12-28 22:50:42 -050016from OpenSSL.SSL import VERIFY_PEER
Jean-Paul Calderone65407752008-04-26 19:59:01 -040017from OpenSSL.test.test_crypto import _Python23TestCaseHelper, cleartextCertificatePEM, cleartextPrivateKeyPEM
Jean-Paul Calderone4bccf5e2008-12-28 22:50:42 -050018try:
19 from OpenSSL.SSL import OP_NO_QUERY_MTU
20except ImportError:
21 OP_NO_QUERY_MTU = None
22try:
23 from OpenSSL.SSL import OP_COOKIE_EXCHANGE
24except ImportError:
25 OP_COOKIE_EXCHANGE = None
26try:
27 from OpenSSL.SSL import OP_NO_TICKET
28except ImportError:
29 OP_NO_TICKET = None
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040030
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040031
Jean-Paul Calderone65407752008-04-26 19:59:01 -040032class ContextTests(TestCase, _Python23TestCaseHelper):
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040033 """
34 Unit tests for L{OpenSSL.SSL.Context}.
35 """
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -040036 def mktemp(self):
37 """
38 Pathetic substitute for twisted.trial.unittest.TestCase.mktemp.
39 """
40 return mktemp(dir=".")
41
42
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040043 def test_method(self):
44 """
45 L{Context} can be instantiated with one of L{SSLv2_METHOD},
46 L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}.
47 """
48 for meth in [SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]:
49 Context(meth)
50 self.assertRaises(TypeError, Context, "")
51 self.assertRaises(ValueError, Context, 10)
52
53
54 def test_use_privatekey(self):
55 """
56 L{Context.use_privatekey} takes an L{OpenSSL.crypto.PKey} instance.
57 """
58 key = PKey()
59 key.generate_key(TYPE_RSA, 128)
60 ctx = Context(TLSv1_METHOD)
61 ctx.use_privatekey(key)
62 self.assertRaises(TypeError, ctx.use_privatekey, "")
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -040063
64
65 def test_set_passwd_cb(self):
66 """
67 L{Context.set_passwd_cb} accepts a callable which will be invoked when
68 a private key is loaded from an encrypted PEM.
69 """
70 key = PKey()
71 key.generate_key(TYPE_RSA, 128)
72 pemFile = self.mktemp()
73 fObj = file(pemFile, 'w')
74 passphrase = "foobar"
75 fObj.write(dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase))
76 fObj.close()
77
78 calledWith = []
79 def passphraseCallback(maxlen, verify, extra):
80 calledWith.append((maxlen, verify, extra))
81 return passphrase
82 context = Context(TLSv1_METHOD)
83 context.set_passwd_cb(passphraseCallback)
84 context.use_privatekey_file(pemFile)
85 self.assertTrue(len(calledWith), 1)
86 self.assertTrue(isinstance(calledWith[0][0], int))
87 self.assertTrue(isinstance(calledWith[0][1], int))
88 self.assertEqual(calledWith[0][2], None)
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040089
90
91 def test_set_info_callback(self):
92 """
93 L{Context.set_info_callback} accepts a callable which will be invoked
94 when certain information about an SSL connection is available.
95 """
96 port = socket()
97 port.bind(('', 0))
98 port.listen(1)
99
100 client = socket()
101 client.setblocking(False)
102 client.connect_ex(port.getsockname())
103
104 clientSSL = Connection(Context(TLSv1_METHOD), client)
105 clientSSL.set_connect_state()
106
107 called = []
108 def info(conn, where, ret):
109 called.append((conn, where, ret))
110 context = Context(TLSv1_METHOD)
111 context.set_info_callback(info)
112 context.use_certificate(
113 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
114 context.use_privatekey(
115 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
116
117 server, ignored = port.accept()
118 server.setblocking(False)
119
120 serverSSL = Connection(context, server)
121 serverSSL.set_accept_state()
122
123 while not called:
124 for ssl in clientSSL, serverSSL:
125 try:
126 ssl.do_handshake()
127 except WantReadError:
128 pass
129
130 # Kind of lame. Just make sure it got called somehow.
131 self.assertTrue(called)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400132
133
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400134 def _load_verify_locations_test(self, *args):
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400135 port = socket()
136 port.bind(('', 0))
137 port.listen(1)
138
139 client = socket()
140 client.setblocking(False)
141 client.connect_ex(port.getsockname())
142
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400143 clientContext = Context(TLSv1_METHOD)
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400144 clientContext.load_verify_locations(*args)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400145 # Require that the server certificate verify properly or the
146 # connection will fail.
147 clientContext.set_verify(
148 VERIFY_PEER,
149 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
150
151 clientSSL = Connection(clientContext, client)
152 clientSSL.set_connect_state()
153
154 server, _ = port.accept()
155 server.setblocking(False)
156
157 serverContext = Context(TLSv1_METHOD)
158 serverContext.use_certificate(
159 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
160 serverContext.use_privatekey(
161 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
162
163 serverSSL = Connection(serverContext, server)
164 serverSSL.set_accept_state()
165
166 for i in range(3):
167 for ssl in clientSSL, serverSSL:
168 try:
169 # Without load_verify_locations above, the handshake
170 # will fail:
171 # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
172 # 'certificate verify failed')]
173 ssl.do_handshake()
174 except WantReadError:
175 pass
176
177 cert = clientSSL.get_peer_certificate()
178 self.assertEqual(cert.get_subject().CN, 'pyopenssl.sf.net')
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400179
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400180 def test_load_verify_file(self):
181 """
182 L{Context.load_verify_locations} accepts a file name and uses the
183 certificates within for verification purposes.
184 """
185 cafile = self.mktemp()
186 fObj = file(cafile, 'w')
187 fObj.write(cleartextCertificatePEM)
188 fObj.close()
189
190 self._load_verify_locations_test(cafile)
191
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400192
193 def test_load_verify_invalid_file(self):
194 """
195 L{Context.load_verify_locations} raises L{Error} when passed a
196 non-existent cafile.
197 """
198 clientContext = Context(TLSv1_METHOD)
199 self.assertRaises(
200 Error, clientContext.load_verify_locations, self.mktemp())
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400201
202
203 def test_load_verify_directory(self):
204 """
205 L{Context.load_verify_locations} accepts a directory name and uses
206 the certificates within for verification purposes.
207 """
208 capath = self.mktemp()
209 makedirs(capath)
210 cafile = join(capath, 'cert.pem')
211 fObj = file(cafile, 'w')
212 fObj.write(cleartextCertificatePEM)
213 fObj.close()
214
215 # Hash value computed manually with c_rehash to avoid depending on
216 # c_rehash in the test suite.
217 symlink('cert.pem', join(capath, '07497d9e.0'))
218
219 self._load_verify_locations_test(None, capath)
220
221
222 def test_set_default_verify_paths(self):
223 """
224 L{Context.set_default_verify_paths} causes the platform-specific CA
225 certificate locations to be used for verification purposes.
226 """
227 # Testing this requires a server with a certificate signed by one of
228 # the CAs in the platform CA location. Getting one of those costs
229 # money. Fortunately (or unfortunately, depending on your
230 # perspective), it's easy to think of a public server on the
231 # internet which has such a certificate. Connecting to the network
232 # in a unit test is bad, but it's the only way I can think of to
233 # really test this. -exarkun
234
235 # Arg, verisign.com doesn't speak TLSv1
236 context = Context(SSLv3_METHOD)
237 context.set_default_verify_paths()
238 context.set_verify(
239 VERIFY_PEER,
240 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
241
242 client = socket()
243 client.connect(('verisign.com', 443))
244 clientSSL = Connection(context, client)
245 clientSSL.set_connect_state()
246 clientSSL.do_handshake()
247 clientSSL.send('GET / HTTP/1.0\r\n\r\n')
248 self.assertTrue(clientSSL.recv(1024))
Jean-Paul Calderone9eadb962008-09-07 21:20:44 -0400249
250
251 def test_set_default_verify_paths_signature(self):
252 """
253 L{Context.set_default_verify_paths} takes no arguments and raises
254 L{TypeError} if given any.
255 """
256 context = Context(TLSv1_METHOD)
257 self.assertRaises(TypeError, context.set_default_verify_paths, None)
258 self.assertRaises(TypeError, context.set_default_verify_paths, 1)
259 self.assertRaises(TypeError, context.set_default_verify_paths, "")
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500260
261
262
263class ConstantsTests(TestCase):
264 """
265 Tests for the values of constants exposed in L{OpenSSL.SSL}.
266
267 These are values defined by OpenSSL intended only to be used as flags to
268 OpenSSL APIs. The only assertions it seems can be made about them is
269 their values.
270 """
Jean-Paul Calderoned811b682008-12-28 22:59:15 -0500271 # unittest.TestCase has no skip mechanism
272 if OP_NO_QUERY_MTU is not None:
273 def test_op_no_query_mtu(self):
274 """
275 The value of L{OpenSSL.SSL.OP_NO_QUERY_MTU} is 0x1000, the value of
276 I{SSL_OP_NO_QUERY_MTU} defined by I{openssl/ssl.h}.
277 """
278 self.assertEqual(OP_NO_QUERY_MTU, 0x1000)
279 else:
280 "OP_NO_QUERY_MTU unavailable - OpenSSL version may be too old"
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500281
282
Jean-Paul Calderoned811b682008-12-28 22:59:15 -0500283 if OP_COOKIE_EXCHANGE is not None:
284 def test_op_cookie_exchange(self):
285 """
286 The value of L{OpenSSL.SSL.OP_COOKIE_EXCHANGE} is 0x2000, the value
287 of I{SSL_OP_COOKIE_EXCHANGE} defined by I{openssl/ssl.h}.
288 """
289 self.assertEqual(OP_COOKIE_EXCHANGE, 0x2000)
290 else:
291 "OP_COOKIE_EXCHANGE unavailable - OpenSSL version may be too old"
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500292
293
Jean-Paul Calderoned811b682008-12-28 22:59:15 -0500294 if OP_NO_TICKET is not None:
295 def test_op_no_ticket(self):
296 """
297 The value of L{OpenSSL.SSL.OP_NO_TICKET} is 0x4000, the value of
298 I{SSL_OP_NO_TICKET} defined by I{openssl/ssl.h}.
299 """
300 self.assertEqual(OP_NO_TICKET, 0x4000)
301 else:
302 "OP_NO_TICKET unavailable - OpenSSL version may be too old"