blob: 1b4e3b7414a132e20741b08cd4b1314181df035b [file] [log] [blame]
Jean-Paul Calderone8b63d452008-03-21 18:31:12 -04001# Copyright (C) Jean-Paul Calderone 2008, All rights reserved
2
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -04003"""
4Unit tests for L{OpenSSL.SSL}.
5"""
6
Jean-Paul Calderone52f0d8b2009-03-07 09:10:19 -05007from sys import platform
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -04008from socket import socket
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -04009from os import makedirs, symlink
10from os.path import join
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -040011from unittest import main
Jean-Paul Calderone460cc1f2009-03-07 11:31:12 -050012
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040013from OpenSSL.crypto import TYPE_RSA, FILETYPE_PEM, PKey, dump_privatekey, load_certificate, load_privatekey
Rick Deane15b1472009-07-09 15:53:42 -050014from OpenSSL.SSL import WantReadError, Context, ContextType, Connection, ConnectionType, Error
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040015from OpenSSL.SSL import SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD
Rick Deanb71c0d22009-04-01 14:09:23 -050016from OpenSSL.SSL import OP_NO_SSLv2, OP_NO_SSLv3, OP_SINGLE_DH_USE
17from OpenSSL.SSL import VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, VERIFY_CLIENT_ONCE
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -040018from OpenSSL.test.util import TestCase
Jean-Paul Calderone18808652009-07-05 12:54:05 -040019from OpenSSL.test.test_crypto import cleartextCertificatePEM, cleartextPrivateKeyPEM
Jean-Paul Calderone4bccf5e2008-12-28 22:50:42 -050020try:
21 from OpenSSL.SSL import OP_NO_QUERY_MTU
22except ImportError:
23 OP_NO_QUERY_MTU = None
24try:
25 from OpenSSL.SSL import OP_COOKIE_EXCHANGE
26except ImportError:
27 OP_COOKIE_EXCHANGE = None
28try:
29 from OpenSSL.SSL import OP_NO_TICKET
30except ImportError:
31 OP_NO_TICKET = None
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040032
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040033
Jean-Paul Calderone18808652009-07-05 12:54:05 -040034class ContextTests(TestCase):
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040035 """
36 Unit tests for L{OpenSSL.SSL.Context}.
37 """
38 def test_method(self):
39 """
40 L{Context} can be instantiated with one of L{SSLv2_METHOD},
41 L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}.
42 """
43 for meth in [SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]:
44 Context(meth)
45 self.assertRaises(TypeError, Context, "")
46 self.assertRaises(ValueError, Context, 10)
47
48
Rick Deane15b1472009-07-09 15:53:42 -050049 def test_type(self):
50 """
51 L{Context} must be of type L{ContextType}.
52 """
53 ctx = Context(TLSv1_METHOD)
54 self.assertTrue(isinstance(ctx, ContextType))
55 self.assertEqual(type(ContextType).__name__, 'type')
56 self.assertEqual(type(ctx).__name__, 'Context')
57 self.assertEqual(type(ctx), ContextType)
Rick Dean04113e72009-07-16 12:06:35 -050058 self.assertEqual(type(ctx), Context)
Rick Deane15b1472009-07-09 15:53:42 -050059
60
Jean-Paul Calderone30c09ea2008-03-21 17:04:05 -040061 def test_use_privatekey(self):
62 """
63 L{Context.use_privatekey} takes an L{OpenSSL.crypto.PKey} instance.
64 """
65 key = PKey()
66 key.generate_key(TYPE_RSA, 128)
67 ctx = Context(TLSv1_METHOD)
68 ctx.use_privatekey(key)
69 self.assertRaises(TypeError, ctx.use_privatekey, "")
Jean-Paul Calderone828c9cb2008-04-26 18:06:54 -040070
71
72 def test_set_passwd_cb(self):
73 """
74 L{Context.set_passwd_cb} accepts a callable which will be invoked when
75 a private key is loaded from an encrypted PEM.
76 """
77 key = PKey()
78 key.generate_key(TYPE_RSA, 128)
79 pemFile = self.mktemp()
80 fObj = file(pemFile, 'w')
81 passphrase = "foobar"
82 fObj.write(dump_privatekey(FILETYPE_PEM, key, "blowfish", passphrase))
83 fObj.close()
84
85 calledWith = []
86 def passphraseCallback(maxlen, verify, extra):
87 calledWith.append((maxlen, verify, extra))
88 return passphrase
89 context = Context(TLSv1_METHOD)
90 context.set_passwd_cb(passphraseCallback)
91 context.use_privatekey_file(pemFile)
92 self.assertTrue(len(calledWith), 1)
93 self.assertTrue(isinstance(calledWith[0][0], int))
94 self.assertTrue(isinstance(calledWith[0][1], int))
95 self.assertEqual(calledWith[0][2], None)
Jean-Paul Calderone5ef86512008-04-26 19:06:28 -040096
97
98 def test_set_info_callback(self):
99 """
100 L{Context.set_info_callback} accepts a callable which will be invoked
101 when certain information about an SSL connection is available.
102 """
103 port = socket()
104 port.bind(('', 0))
105 port.listen(1)
106
107 client = socket()
108 client.setblocking(False)
109 client.connect_ex(port.getsockname())
110
111 clientSSL = Connection(Context(TLSv1_METHOD), client)
112 clientSSL.set_connect_state()
113
114 called = []
115 def info(conn, where, ret):
116 called.append((conn, where, ret))
117 context = Context(TLSv1_METHOD)
118 context.set_info_callback(info)
119 context.use_certificate(
120 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
121 context.use_privatekey(
122 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
123
124 server, ignored = port.accept()
125 server.setblocking(False)
126
127 serverSSL = Connection(context, server)
128 serverSSL.set_accept_state()
129
130 while not called:
131 for ssl in clientSSL, serverSSL:
132 try:
133 ssl.do_handshake()
134 except WantReadError:
135 pass
136
137 # Kind of lame. Just make sure it got called somehow.
138 self.assertTrue(called)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400139
140
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400141 def _load_verify_locations_test(self, *args):
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400142 port = socket()
143 port.bind(('', 0))
144 port.listen(1)
145
146 client = socket()
147 client.setblocking(False)
148 client.connect_ex(port.getsockname())
149
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400150 clientContext = Context(TLSv1_METHOD)
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400151 clientContext.load_verify_locations(*args)
Jean-Paul Calderonee1bd4322008-09-07 20:17:17 -0400152 # Require that the server certificate verify properly or the
153 # connection will fail.
154 clientContext.set_verify(
155 VERIFY_PEER,
156 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
157
158 clientSSL = Connection(clientContext, client)
159 clientSSL.set_connect_state()
160
161 server, _ = port.accept()
162 server.setblocking(False)
163
164 serverContext = Context(TLSv1_METHOD)
165 serverContext.use_certificate(
166 load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
167 serverContext.use_privatekey(
168 load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
169
170 serverSSL = Connection(serverContext, server)
171 serverSSL.set_accept_state()
172
173 for i in range(3):
174 for ssl in clientSSL, serverSSL:
175 try:
176 # Without load_verify_locations above, the handshake
177 # will fail:
178 # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
179 # 'certificate verify failed')]
180 ssl.do_handshake()
181 except WantReadError:
182 pass
183
184 cert = clientSSL.get_peer_certificate()
Jean-Paul Calderone20131f52009-04-01 12:05:45 -0400185 self.assertEqual(cert.get_subject().CN, 'Testing Root CA')
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400186
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400187 def test_load_verify_file(self):
188 """
189 L{Context.load_verify_locations} accepts a file name and uses the
190 certificates within for verification purposes.
191 """
192 cafile = self.mktemp()
193 fObj = file(cafile, 'w')
194 fObj.write(cleartextCertificatePEM)
195 fObj.close()
196
197 self._load_verify_locations_test(cafile)
198
Jean-Paul Calderone5075fce2008-09-07 20:18:55 -0400199
200 def test_load_verify_invalid_file(self):
201 """
202 L{Context.load_verify_locations} raises L{Error} when passed a
203 non-existent cafile.
204 """
205 clientContext = Context(TLSv1_METHOD)
206 self.assertRaises(
207 Error, clientContext.load_verify_locations, self.mktemp())
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400208
209
210 def test_load_verify_directory(self):
211 """
212 L{Context.load_verify_locations} accepts a directory name and uses
213 the certificates within for verification purposes.
214 """
215 capath = self.mktemp()
216 makedirs(capath)
217 cafile = join(capath, 'cert.pem')
218 fObj = file(cafile, 'w')
219 fObj.write(cleartextCertificatePEM)
220 fObj.close()
221
222 # Hash value computed manually with c_rehash to avoid depending on
223 # c_rehash in the test suite.
Jean-Paul Calderone20131f52009-04-01 12:05:45 -0400224 symlink('cert.pem', join(capath, 'c7adac82.0'))
Jean-Paul Calderone1cb5d022008-09-07 20:58:50 -0400225
226 self._load_verify_locations_test(None, capath)
227
228
229 def test_set_default_verify_paths(self):
230 """
231 L{Context.set_default_verify_paths} causes the platform-specific CA
232 certificate locations to be used for verification purposes.
233 """
234 # Testing this requires a server with a certificate signed by one of
235 # the CAs in the platform CA location. Getting one of those costs
236 # money. Fortunately (or unfortunately, depending on your
237 # perspective), it's easy to think of a public server on the
238 # internet which has such a certificate. Connecting to the network
239 # in a unit test is bad, but it's the only way I can think of to
240 # really test this. -exarkun
241
242 # Arg, verisign.com doesn't speak TLSv1
243 context = Context(SSLv3_METHOD)
244 context.set_default_verify_paths()
245 context.set_verify(
246 VERIFY_PEER,
247 lambda conn, cert, errno, depth, preverify_ok: preverify_ok)
248
249 client = socket()
250 client.connect(('verisign.com', 443))
251 clientSSL = Connection(context, client)
252 clientSSL.set_connect_state()
253 clientSSL.do_handshake()
254 clientSSL.send('GET / HTTP/1.0\r\n\r\n')
255 self.assertTrue(clientSSL.recv(1024))
Jean-Paul Calderone52f0d8b2009-03-07 09:10:19 -0500256 if platform == "darwin":
Jean-Paul Calderone1d287e52009-03-07 09:09:07 -0500257 test_set_default_verify_paths.todo = (
258 "set_default_verify_paths appears not to work on OS X - a "
259 "problem with the supplied OpenSSL, perhaps?")
Jean-Paul Calderone9eadb962008-09-07 21:20:44 -0400260
261
262 def test_set_default_verify_paths_signature(self):
263 """
264 L{Context.set_default_verify_paths} takes no arguments and raises
265 L{TypeError} if given any.
266 """
267 context = Context(TLSv1_METHOD)
268 self.assertRaises(TypeError, context.set_default_verify_paths, None)
269 self.assertRaises(TypeError, context.set_default_verify_paths, 1)
270 self.assertRaises(TypeError, context.set_default_verify_paths, "")
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500271
272
273
Rick Deane15b1472009-07-09 15:53:42 -0500274class ConnectionTests(TestCase):
275 """
276 Unit tests for L{OpenSSL.SSL.Connection}.
277 """
278 def test_type(self):
279 """
280 L{Connection} must be of type L{ConnectionType}.
281 """
282 ctx = Context(TLSv1_METHOD)
283 conn = Connection(ctx, None)
284 self.assertTrue(isinstance(conn, ConnectionType))
285 self.assertEqual(type(ConnectionType).__name__, 'type')
286 self.assertEqual(type(conn).__name__, 'Connection')
287 self.assertEqual(type(conn), ConnectionType)
Rick Dean04113e72009-07-16 12:06:35 -0500288 self.assertEqual(type(conn), Connection)
Rick Deane15b1472009-07-09 15:53:42 -0500289
290 self.assertEqual(Error.__name__, 'Error')
291 self.assertEqual(type(Error).__name__, 'type')
292
293
294
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500295class ConstantsTests(TestCase):
296 """
297 Tests for the values of constants exposed in L{OpenSSL.SSL}.
298
299 These are values defined by OpenSSL intended only to be used as flags to
300 OpenSSL APIs. The only assertions it seems can be made about them is
301 their values.
302 """
Jean-Paul Calderoned811b682008-12-28 22:59:15 -0500303 # unittest.TestCase has no skip mechanism
304 if OP_NO_QUERY_MTU is not None:
305 def test_op_no_query_mtu(self):
306 """
307 The value of L{OpenSSL.SSL.OP_NO_QUERY_MTU} is 0x1000, the value of
308 I{SSL_OP_NO_QUERY_MTU} defined by I{openssl/ssl.h}.
309 """
310 self.assertEqual(OP_NO_QUERY_MTU, 0x1000)
311 else:
312 "OP_NO_QUERY_MTU unavailable - OpenSSL version may be too old"
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500313
314
Jean-Paul Calderoned811b682008-12-28 22:59:15 -0500315 if OP_COOKIE_EXCHANGE is not None:
316 def test_op_cookie_exchange(self):
317 """
318 The value of L{OpenSSL.SSL.OP_COOKIE_EXCHANGE} is 0x2000, the value
319 of I{SSL_OP_COOKIE_EXCHANGE} defined by I{openssl/ssl.h}.
320 """
321 self.assertEqual(OP_COOKIE_EXCHANGE, 0x2000)
322 else:
323 "OP_COOKIE_EXCHANGE unavailable - OpenSSL version may be too old"
Jean-Paul Calderone327d8f92008-12-28 21:55:56 -0500324
325
Jean-Paul Calderoned811b682008-12-28 22:59:15 -0500326 if OP_NO_TICKET is not None:
327 def test_op_no_ticket(self):
328 """
329 The value of L{OpenSSL.SSL.OP_NO_TICKET} is 0x4000, the value of
330 I{SSL_OP_NO_TICKET} defined by I{openssl/ssl.h}.
331 """
332 self.assertEqual(OP_NO_TICKET, 0x4000)
333 else:
334 "OP_NO_TICKET unavailable - OpenSSL version may be too old"
Rick Dean5b7b6372009-04-01 11:34:06 -0500335
336
Jean-Paul Calderoneeeee26a2009-04-27 11:24:30 -0400337
Rick Deanb71c0d22009-04-01 14:09:23 -0500338root_cert_pem = """-----BEGIN CERTIFICATE-----
339MIIC7TCCAlagAwIBAgIIPQzE4MbeufQwDQYJKoZIhvcNAQEFBQAwWDELMAkGA1UE
340BhMCVVMxCzAJBgNVBAgTAklMMRAwDgYDVQQHEwdDaGljYWdvMRAwDgYDVQQKEwdU
341ZXN0aW5nMRgwFgYDVQQDEw9UZXN0aW5nIFJvb3QgQ0EwIhgPMjAwOTAzMjUxMjM2
342NThaGA8yMDE3MDYxMTEyMzY1OFowWDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAklM
343MRAwDgYDVQQHEwdDaGljYWdvMRAwDgYDVQQKEwdUZXN0aW5nMRgwFgYDVQQDEw9U
344ZXN0aW5nIFJvb3QgQ0EwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAPmaQumL
345urpE527uSEHdL1pqcDRmWzu+98Y6YHzT/J7KWEamyMCNZ6fRW1JCR782UQ8a07fy
3462xXsKy4WdKaxyG8CcatwmXvpvRQ44dSANMihHELpANTdyVp6DCysED6wkQFurHlF
3471dshEaJw8b/ypDhmbVIo6Ci1xvCJqivbLFnbAgMBAAGjgbswgbgwHQYDVR0OBBYE
348FINVdy1eIfFJDAkk51QJEo3IfgSuMIGIBgNVHSMEgYAwfoAUg1V3LV4h8UkMCSTn
349VAkSjch+BK6hXKRaMFgxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJJTDEQMA4GA1UE
350BxMHQ2hpY2FnbzEQMA4GA1UEChMHVGVzdGluZzEYMBYGA1UEAxMPVGVzdGluZyBS
351b290IENBggg9DMTgxt659DAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEBBQUAA4GB
352AGGCDazMJGoWNBpc03u6+smc95dEead2KlZXBATOdFT1VesY3+nUOqZhEhTGlDMi
353hkgaZnzoIq/Uamidegk4hirsCT/R+6vsKAAxNTcBjUeZjlykCJWy5ojShGftXIKY
354w/njVbKMXrvc83qmTdGl3TAM0fxQIpqgcglFLveEBgzn
355-----END CERTIFICATE-----
356"""
357
358root_key_pem = """-----BEGIN RSA PRIVATE KEY-----
359MIICXQIBAAKBgQD5mkLpi7q6ROdu7khB3S9aanA0Zls7vvfGOmB80/yeylhGpsjA
360jWen0VtSQke/NlEPGtO38tsV7CsuFnSmschvAnGrcJl76b0UOOHUgDTIoRxC6QDU
3613claegwsrBA+sJEBbqx5RdXbIRGicPG/8qQ4Zm1SKOgotcbwiaor2yxZ2wIDAQAB
362AoGBAPCgMpmLxzwDaUmcFbTJUvlLW1hoxNNYSu2jIZm1k/hRAcE60JYwvBkgz3UB
363yMEh0AtLxYe0bFk6EHah11tMUPgscbCq73snJ++8koUw+csk22G65hOs51bVb7Aa
3646JBe67oLzdtvgCUFAA2qfrKzWRZzAdhUirQUZgySZk+Xq1pBAkEA/kZG0A6roTSM
365BVnx7LnPfsycKUsTumorpXiylZJjTi9XtmzxhrYN6wgZlDOOwOLgSQhszGpxVoMD
366u3gByT1b2QJBAPtL3mSKdvwRu/+40zaZLwvSJRxaj0mcE4BJOS6Oqs/hS1xRlrNk
367PpQ7WJ4yM6ZOLnXzm2mKyxm50Mv64109FtMCQQDOqS2KkjHaLowTGVxwC0DijMfr
368I9Lf8sSQk32J5VWCySWf5gGTfEnpmUa41gKTMJIbqZZLucNuDcOtzUaeWZlZAkA8
369ttXigLnCqR486JDPTi9ZscoZkZ+w7y6e/hH8t6d5Vjt48JVyfjPIaJY+km58LcN3
3706AWSeGAdtRFHVzR7oHjVAkB4hutvxiOeiIVQNBhM6RSI9aBPMI21DoX2JRoxvNW2
371cbvAhow217X9V0dVerEOKxnNYspXRrh36h7k4mQA+sDq
372-----END RSA PRIVATE KEY-----
373"""
374
375server_cert_pem = """-----BEGIN CERTIFICATE-----
376MIICKDCCAZGgAwIBAgIJAJn/HpR21r/8MA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV
377BAYTAlVTMQswCQYDVQQIEwJJTDEQMA4GA1UEBxMHQ2hpY2FnbzEQMA4GA1UEChMH
378VGVzdGluZzEYMBYGA1UEAxMPVGVzdGluZyBSb290IENBMCIYDzIwMDkwMzI1MTIz
379NzUzWhgPMjAxNzA2MTExMjM3NTNaMBgxFjAUBgNVBAMTDWxvdmVseSBzZXJ2ZXIw
380gZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAL6m+G653V0tpBC/OKl22VxOi2Cv
381lK4TYu9LHSDP9uDVTe7V5D5Tl6qzFoRRx5pfmnkqT5B+W9byp2NU3FC5hLm5zSAr
382b45meUhjEJ/ifkZgbNUjHdBIGP9MAQUHZa5WKdkGIJvGAvs8UzUqlr4TBWQIB24+
383lJ+Ukk/CRgasrYwdAgMBAAGjNjA0MB0GA1UdDgQWBBS4kC7Ij0W1TZXZqXQFAM2e
384gKEG2DATBgNVHSUEDDAKBggrBgEFBQcDATANBgkqhkiG9w0BAQUFAAOBgQBh30Li
385dJ+NlxIOx5343WqIBka3UbsOb2kxWrbkVCrvRapCMLCASO4FqiKWM+L0VDBprqIp
3862mgpFQ6FHpoIENGvJhdEKpptQ5i7KaGhnDNTfdy3x1+h852G99f1iyj0RmbuFcM8
387uzujnS8YXWvM7DM1Ilozk4MzPug8jzFp5uhKCQ==
388-----END CERTIFICATE-----
389"""
390
391server_key_pem = """-----BEGIN RSA PRIVATE KEY-----
392MIICWwIBAAKBgQC+pvhuud1dLaQQvzipdtlcTotgr5SuE2LvSx0gz/bg1U3u1eQ+
393U5eqsxaEUceaX5p5Kk+QflvW8qdjVNxQuYS5uc0gK2+OZnlIYxCf4n5GYGzVIx3Q
394SBj/TAEFB2WuVinZBiCbxgL7PFM1Kpa+EwVkCAduPpSflJJPwkYGrK2MHQIDAQAB
395AoGAbwuZ0AR6JveahBaczjfnSpiFHf+mve2UxoQdpyr6ROJ4zg/PLW5K/KXrC48G
396j6f3tXMrfKHcpEoZrQWUfYBRCUsGD5DCazEhD8zlxEHahIsqpwA0WWssJA2VOLEN
397j6DuV2pCFbw67rfTBkTSo32ahfXxEKev5KswZk0JIzH3ooECQQDgzS9AI89h0gs8
398Dt+1m11Rzqo3vZML7ZIyGApUzVan+a7hbc33nbGRkAXjHaUBJO31it/H6dTO+uwX
399msWwNG5ZAkEA2RyFKs5xR5USTFaKLWCgpH/ydV96KPOpBND7TKQx62snDenFNNbn
400FwwOhpahld+vqhYk+pfuWWUpQciE+Bu7ZQJASjfT4sQv4qbbKK/scePicnDdx9th
4014e1EeB9xwb+tXXXUo/6Bor/AcUNwfiQ6Zt9PZOK9sR3lMZSsP7rMi7kzuQJABie6
4021sXXjFH7nNJvRG4S39cIxq8YRYTy68II/dlB2QzGpKxV/POCxbJ/zu0CU79tuYK7
403NaeNCFfH3aeTrX0LyQJAMBWjWmeKM2G2sCExheeQK0ROnaBC8itCECD4Jsve4nqf
404r50+LF74iLXFwqysVCebPKMOpDWp/qQ1BbJQIPs7/A==
405-----END RSA PRIVATE KEY-----
406"""
407
408client_cert_pem = """-----BEGIN CERTIFICATE-----
409MIICJjCCAY+gAwIBAgIJAKxpFI5lODkjMA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV
410BAYTAlVTMQswCQYDVQQIEwJJTDEQMA4GA1UEBxMHQ2hpY2FnbzEQMA4GA1UEChMH
411VGVzdGluZzEYMBYGA1UEAxMPVGVzdGluZyBSb290IENBMCIYDzIwMDkwMzI1MTIz
412ODA1WhgPMjAxNzA2MTExMjM4MDVaMBYxFDASBgNVBAMTC3VnbHkgY2xpZW50MIGf
413MA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDAZh/SRtNm5ntMT4qb6YzEpTroMlq2
414rn+GrRHRiZ+xkCw/CGNhbtPir7/QxaUj26BSmQrHw1bGKEbPsWiW7bdXSespl+xK
415iku4G/KvnnmWdeJHqsiXeUZtqurMELcPQAw9xPHEuhqqUJvvEoMTsnCEqGM+7Dtb
416oCRajYyHfluARQIDAQABozYwNDAdBgNVHQ4EFgQUNQB+qkaOaEVecf1J3TTUtAff
4170fAwEwYDVR0lBAwwCgYIKwYBBQUHAwIwDQYJKoZIhvcNAQEFBQADgYEAyv/Jh7gM
418Q3OHvmsFEEvRI+hsW8y66zK4K5de239Y44iZrFYkt7Q5nBPMEWDj4F2hLYWL/qtI
4199Zdr0U4UDCU9SmmGYh4o7R4TZ5pGFvBYvjhHbkSFYFQXZxKUi+WUxplP6I0wr2KJ
420PSTJCjJOn3xo2NTKRgV1gaoTf2EhL+RG8TQ=
421-----END CERTIFICATE-----
422"""
423
424client_key_pem = """-----BEGIN RSA PRIVATE KEY-----
425MIICXgIBAAKBgQDAZh/SRtNm5ntMT4qb6YzEpTroMlq2rn+GrRHRiZ+xkCw/CGNh
426btPir7/QxaUj26BSmQrHw1bGKEbPsWiW7bdXSespl+xKiku4G/KvnnmWdeJHqsiX
427eUZtqurMELcPQAw9xPHEuhqqUJvvEoMTsnCEqGM+7DtboCRajYyHfluARQIDAQAB
428AoGATkZ+NceY5Glqyl4mD06SdcKfV65814vg2EL7V9t8+/mi9rYL8KztSXGlQWPX
429zuHgtRoMl78yQ4ZJYOBVo+nsx8KZNRCEBlE19bamSbQLCeQMenWnpeYyQUZ908gF
430h6L9qsFVJepgA9RDgAjyDoS5CaWCdCCPCH2lDkdcqC54SVUCQQDseuduc4wi8h4t
431V8AahUn9fn9gYfhoNuM0gdguTA0nPLVWz4hy1yJiWYQe0H7NLNNTmCKiLQaJpAbb
432TC6vE8C7AkEA0Ee8CMJUc20BnGEmxwgWcVuqFWaKCo8jTH1X38FlATUsyR3krjW2
433dL3yDD9NwHxsYP7nTKp/U8MV7U9IBn4y/wJBAJl7H0/BcLeRmuJk7IqJ7b635iYB
434D/9beFUw3MUXmQXZUfyYz39xf6CDZsu1GEdEC5haykeln3Of4M9d/4Kj+FcCQQCY
435si6xwT7GzMDkk/ko684AV3KPc/h6G0yGtFIrMg7J3uExpR/VdH2KgwMkZXisSMvw
436JJEQjOMCVsEJlRk54WWjAkEAzoZNH6UhDdBK5F38rVt/y4SEHgbSfJHIAmPS32Kq
437f6GGcfNpip0Uk7q7udTKuX7Q/buZi/C4YW7u3VKAquv9NA==
438-----END RSA PRIVATE KEY-----
439"""
440
441def verify_cb(conn, cert, errnum, depth, ok):
442 return ok
443
Jean-Paul Calderone958299e2009-04-27 12:59:12 -0400444class MemoryBIOTests(TestCase):
Rick Deanb71c0d22009-04-01 14:09:23 -0500445 """
Jean-Paul Calderone958299e2009-04-27 12:59:12 -0400446 Tests for L{OpenSSL.SSL.Connection} using a memory BIO.
Rick Deanb71c0d22009-04-01 14:09:23 -0500447 """
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -0400448 def _server(self):
449 # Create the server side Connection. This is mostly setup boilerplate
450 # - use TLSv1, use a particular certificate, etc.
451 server_ctx = Context(TLSv1_METHOD)
452 server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
453 server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
454 server_store = server_ctx.get_cert_store()
455 server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
456 server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
457 server_ctx.check_privatekey()
458 server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
459 # Here the Connection is actually created. None is passed as the 2nd
460 # parameter, indicating a memory BIO should be created.
461 server_conn = Connection(server_ctx, None)
462 server_conn.set_accept_state()
463 return server_conn
464
465
466 def _client(self):
467 # Now create the client side Connection. Similar boilerplate to the above.
468 client_ctx = Context(TLSv1_METHOD)
469 client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
470 client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
471 client_store = client_ctx.get_cert_store()
472 client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
473 client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
474 client_ctx.check_privatekey()
475 client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
476 # Again, None to create a new memory BIO.
477 client_conn = Connection(client_ctx, None)
478 client_conn.set_connect_state()
479 return client_conn
480
481
Jean-Paul Calderone958299e2009-04-27 12:59:12 -0400482 def _loopback(self, client_conn, server_conn):
483 """
484 Try to read application bytes from each of the two L{Connection}
485 objects. Copy bytes back and forth between their send/receive buffers
486 for as long as there is anything to copy. When there is nothing more
487 to copy, return C{None}. If one of them actually manages to deliver
488 some application bytes, return a two-tuple of the connection from which
489 the bytes were read and the bytes themselves.
490 """
491 wrote = True
492 while wrote:
493 # Loop until neither side has anything to say
494 wrote = False
495
496 # Copy stuff from each side's send buffer to the other side's
497 # receive buffer.
498 for (read, write) in [(client_conn, server_conn),
499 (server_conn, client_conn)]:
500
501 # Give the side a chance to generate some more bytes, or
502 # succeed.
503 try:
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -0400504 bytes = read.recv(2 ** 16)
Jean-Paul Calderone958299e2009-04-27 12:59:12 -0400505 except WantReadError:
506 # It didn't succeed, so we'll hope it generated some
507 # output.
508 pass
509 else:
510 # It did succeed, so we'll stop now and let the caller deal
511 # with it.
512 return (read, bytes)
513
514 while True:
515 # Keep copying as long as there's more stuff there.
516 try:
517 dirty = read.bio_read(4096)
518 except WantReadError:
519 # Okay, nothing more waiting to be sent. Stop
520 # processing this send buffer.
521 break
522 else:
523 # Keep track of the fact that someone generated some
524 # output.
525 wrote = True
526 write.bio_write(dirty)
527
528
Rick Deanb71c0d22009-04-01 14:09:23 -0500529 def test_connect(self):
Jean-Paul Calderone958299e2009-04-27 12:59:12 -0400530 """
531 Two L{Connection}s which use memory BIOs can be manually connected by
532 reading from the output of each and writing those bytes to the input of
533 the other and in this way establish a connection and exchange
534 application-level bytes with each other.
535 """
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -0400536 server_conn = self._server()
537 client_conn = self._client()
Rick Deanb71c0d22009-04-01 14:09:23 -0500538
Jean-Paul Calderone958299e2009-04-27 12:59:12 -0400539 # There should be no key or nonces yet.
540 self.assertIdentical(server_conn.master_key(), None)
541 self.assertIdentical(server_conn.client_random(), None)
542 self.assertIdentical(server_conn.server_random(), None)
Rick Deanb71c0d22009-04-01 14:09:23 -0500543
Jean-Paul Calderone958299e2009-04-27 12:59:12 -0400544 # First, the handshake needs to happen. We'll deliver bytes back and
545 # forth between the client and server until neither of them feels like
546 # speaking any more.
547 self.assertIdentical(self._loopback(client_conn, server_conn), None)
548
549 # Now that the handshake is done, there should be a key and nonces.
550 self.assertNotIdentical(server_conn.master_key(), None)
551 self.assertNotIdentical(server_conn.client_random(), None)
552 self.assertNotIdentical(server_conn.server_random(), None)
Jean-Paul Calderone6c051e62009-07-05 13:50:34 -0400553 self.assertEquals(server_conn.client_random(), client_conn.client_random())
554 self.assertEquals(server_conn.server_random(), client_conn.server_random())
555 self.assertNotEquals(server_conn.client_random(), server_conn.server_random())
556 self.assertNotEquals(client_conn.client_random(), client_conn.server_random())
Jean-Paul Calderone958299e2009-04-27 12:59:12 -0400557
558 # Here are the bytes we'll try to send.
Rick Deanb71c0d22009-04-01 14:09:23 -0500559 important_message = 'One if by land, two if by sea.'
560
Jean-Paul Calderone958299e2009-04-27 12:59:12 -0400561 server_conn.write(important_message)
562 self.assertEquals(
563 self._loopback(client_conn, server_conn),
564 (client_conn, important_message))
565
566 client_conn.write(important_message[::-1])
567 self.assertEquals(
568 self._loopback(client_conn, server_conn),
569 (server_conn, important_message[::-1]))
Rick Deanb71c0d22009-04-01 14:09:23 -0500570
571
Jean-Paul Calderonefc4ed0f2009-04-27 11:51:27 -0400572 def test_socketOverridesMemory(self):
Rick Deanb71c0d22009-04-01 14:09:23 -0500573 """
574 Test that L{OpenSSL.SSL.bio_read} and L{OpenSSL.SSL.bio_write} don't
575 work on L{OpenSSL.SSL.Connection}() that use sockets.
576 """
577 context = Context(SSLv3_METHOD)
578 client = socket()
579 clientSSL = Connection(context, client)
580 self.assertRaises( TypeError, clientSSL.bio_read, 100)
581 self.assertRaises( TypeError, clientSSL.bio_write, "foo")
Jean-Paul Calderone07acf3f2009-05-05 13:23:28 -0400582 self.assertRaises( TypeError, clientSSL.bio_shutdown )
Jean-Paul Calderoneaff0fc42009-04-27 17:13:34 -0400583
584
585 def test_outgoingOverflow(self):
586 """
587 If more bytes than can be written to the memory BIO are passed to
588 L{Connection.send} at once, the number of bytes which were written is
589 returned and that many bytes from the beginning of the input can be
590 read from the other end of the connection.
591 """
592 server = self._server()
593 client = self._client()
594
595 self._loopback(client, server)
596
597 size = 2 ** 15
598 sent = client.send("x" * size)
599 # Sanity check. We're trying to test what happens when the entire
600 # input can't be sent. If the entire input was sent, this test is
601 # meaningless.
602 self.assertTrue(sent < size)
603
604 receiver, received = self._loopback(client, server)
605 self.assertIdentical(receiver, server)
606
607 # We can rely on all of these bytes being received at once because
608 # _loopback passes 2 ** 16 to recv - more than 2 ** 15.
609 self.assertEquals(len(received), sent)
Jean-Paul Calderone3ad85d42009-04-30 20:24:35 -0400610
611
612 def test_shutdown(self):
613 """
614 L{Connection.bio_shutdown} signals the end of the data stream from
615 which the L{Connection} reads.
616 """
617 server = self._server()
618 server.bio_shutdown()
619 e = self.assertRaises(Error, server.recv, 1024)
620 # We don't want WantReadError or ZeroReturnError or anything - it's a
621 # handshake failure.
622 self.assertEquals(e.__class__, Error)
Jean-Paul Calderone0b88b6a2009-07-05 12:44:41 -0400623
624
625
626if __name__ == '__main__':
627 main()