blob: d064790d6bf3e8e5e0a3ed1568882b92fadcb40c [file] [log] [blame]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
5from test import test_support
6import socket
7import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import subprocess
9import time
10import os
11import pprint
12import urllib
13import shutil
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000014import traceback
15
16# Optionally test SSL support, if we have it in the tested platform
17skip_expected = False
18try:
19 import ssl
20except ImportError:
21 skip_expected = True
22
23CERTFILE = None
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000024
Bill Janssen98d19da2007-09-10 21:51:02 +000025TESTPORT = 10025
Neal Norwitz3e533c22007-08-27 01:03:18 +000026
27def handle_error(prefix):
28 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Bill Janssen98d19da2007-09-10 21:51:02 +000029 if test_support.verbose:
30 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000031
32
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000033class BasicTests(unittest.TestCase):
34
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000035 def testSSLconnect(self):
36 import os
37 with test_support.transient_internet():
Bill Janssen98d19da2007-09-10 21:51:02 +000038 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
39 cert_reqs=ssl.CERT_NONE)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000040 s.connect(("pop.gmail.com", 995))
41 c = s.getpeercert()
42 if c:
43 raise test_support.TestFailed("Peer cert %s shouldn't be here!")
44 s.close()
45
46 # this should fail because we have no verification certs
Bill Janssen98d19da2007-09-10 21:51:02 +000047 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
48 cert_reqs=ssl.CERT_REQUIRED)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000049 try:
50 s.connect(("pop.gmail.com", 995))
Bill Janssen98d19da2007-09-10 21:51:02 +000051 except ssl.SSLError:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000052 pass
53 finally:
54 s.close()
55
Bill Janssen98d19da2007-09-10 21:51:02 +000056 def testCrucialConstants(self):
57 ssl.PROTOCOL_SSLv2
58 ssl.PROTOCOL_SSLv23
59 ssl.PROTOCOL_SSLv3
60 ssl.PROTOCOL_TLSv1
61 ssl.CERT_NONE
62 ssl.CERT_OPTIONAL
63 ssl.CERT_REQUIRED
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000064
Bill Janssen98d19da2007-09-10 21:51:02 +000065 def testRAND(self):
66 v = ssl.RAND_status()
67 if test_support.verbose:
68 sys.stdout.write("\n RAND_status is %d (%s)\n"
69 % (v, (v and "sufficient randomness") or
70 "insufficient randomness"))
Guido van Rossume4729332007-08-26 19:35:09 +000071 try:
Bill Janssen98d19da2007-09-10 21:51:02 +000072 ssl.RAND_egd(1)
73 except TypeError:
74 pass
Guido van Rossume4729332007-08-26 19:35:09 +000075 else:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 print "didn't raise TypeError"
77 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000078
Bill Janssen98d19da2007-09-10 21:51:02 +000079 def testParseCert(self):
80 # note that this uses an 'unofficial' function in _ssl.c,
81 # provided solely for this test, to exercise the certificate
82 # parsing code
83 p = ssl._ssl._test_decode_cert(CERTFILE, False)
84 if test_support.verbose:
85 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000086
Bill Janssen98d19da2007-09-10 21:51:02 +000087try:
88 import threading
89except ImportError:
90 _have_threads = False
91else:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000092
Bill Janssen98d19da2007-09-10 21:51:02 +000093 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000094
Bill Janssen98d19da2007-09-10 21:51:02 +000095 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000096
Bill Janssen98d19da2007-09-10 21:51:02 +000097 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000098
Bill Janssen98d19da2007-09-10 21:51:02 +000099 """A mildly complicated class, because we want it to work both
100 with and without the SSL wrapper around the socket connection, so
101 that we can test the STARTTLS functionality."""
102
103 def __init__(self, server, connsock):
104 self.server = server
105 self.running = False
106 self.sock = connsock
107 self.sock.setblocking(1)
108 self.sslconn = None
109 threading.Thread.__init__(self)
110 self.setDaemon(True)
111
112 def wrap_conn (self):
113 try:
114 self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
115 certfile=self.server.certificate,
116 ssl_version=self.server.protocol,
117 ca_certs=self.server.cacerts,
118 cert_reqs=self.server.certreqs)
119 except:
120 if self.server.chatty:
121 handle_error("\n server: bad connection attempt from " +
122 str(self.sock.getpeername()) + ":\n")
123 if not self.server.expect_bad_connects:
124 # here, we want to stop the server, because this shouldn't
125 # happen in the context of our test case
126 self.running = False
127 # normally, we'd just stop here, but for the test
128 # harness, we want to stop the server
129 self.server.stop()
130 return False
131
132 else:
133 if self.server.certreqs == ssl.CERT_REQUIRED:
134 cert = self.sslconn.getpeercert()
135 if test_support.verbose and self.server.chatty:
136 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
137 cert_binary = self.sslconn.getpeercert(True)
138 if test_support.verbose and self.server.chatty:
139 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
140 cipher = self.sslconn.cipher()
141 if test_support.verbose and self.server.chatty:
142 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
143 return True
144
145 def read(self):
146 if self.sslconn:
147 return self.sslconn.read()
148 else:
149 return self.sock.recv(1024)
150
151 def write(self, bytes):
152 if self.sslconn:
153 return self.sslconn.write(bytes)
154 else:
155 return self.sock.send(bytes)
156
157 def close(self):
158 if self.sslconn:
159 self.sslconn.close()
160 else:
161 self.sock.close()
162
163 def run (self):
164 self.running = True
165 if not self.server.starttls_server:
166 if not self.wrap_conn():
167 return
168 while self.running:
169 try:
170 msg = self.read()
171 if not msg:
172 # eof, so quit this handler
173 self.running = False
174 self.close()
175 elif msg.strip() == 'over':
176 if test_support.verbose and self.server.connectionchatty:
177 sys.stdout.write(" server: client closed connection\n")
178 self.close()
179 return
180 elif self.server.starttls_server and msg.strip() == 'STARTTLS':
181 if test_support.verbose and self.server.connectionchatty:
182 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
183 self.write("OK\n")
184 if not self.wrap_conn():
185 return
186 else:
187 if (test_support.verbose and
188 self.server.connectionchatty):
189 ctype = (self.sslconn and "encrypted") or "unencrypted"
190 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
191 % (repr(msg), ctype, repr(msg.lower()), ctype))
192 self.write(msg.lower())
193 except ssl.SSLError:
194 if self.server.chatty:
195 handle_error("Test server failure:\n")
196 self.close()
197 self.running = False
198 # normally, we'd just stop here, but for the test
199 # harness, we want to stop the server
200 self.server.stop()
201 except:
202 handle_error('')
203
204 def __init__(self, port, certificate, ssl_version=None,
205 certreqs=None, cacerts=None, expect_bad_connects=False,
206 chatty=True, connectionchatty=False, starttls_server=False):
207 if ssl_version is None:
208 ssl_version = ssl.PROTOCOL_TLSv1
209 if certreqs is None:
210 certreqs = ssl.CERT_NONE
211 self.certificate = certificate
212 self.protocol = ssl_version
213 self.certreqs = certreqs
214 self.cacerts = cacerts
215 self.expect_bad_connects = expect_bad_connects
216 self.chatty = chatty
217 self.connectionchatty = connectionchatty
218 self.starttls_server = starttls_server
219 self.sock = socket.socket()
220 self.flag = None
221 if hasattr(socket, 'SO_REUSEADDR'):
222 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
223 if hasattr(socket, 'SO_REUSEPORT'):
224 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
225 self.sock.bind(('127.0.0.1', port))
226 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000227 threading.Thread.__init__(self)
Bill Janssen98d19da2007-09-10 21:51:02 +0000228 self.setDaemon(False)
229
230 def start (self, flag=None):
231 self.flag = flag
232 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000233
234 def run (self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000235 self.sock.settimeout(0.5)
236 self.sock.listen(5)
237 self.active = True
238 if self.flag:
239 # signal an event
240 self.flag.set()
241 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000242 try:
Bill Janssen98d19da2007-09-10 21:51:02 +0000243 newconn, connaddr = self.sock.accept()
244 if test_support.verbose and self.chatty:
245 sys.stdout.write(' server: new connection from '
246 + str(connaddr) + '\n')
247 handler = self.ConnectionHandler(self, newconn)
248 handler.start()
249 except socket.timeout:
250 pass
251 except KeyboardInterrupt:
252 self.stop()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000253 except:
Bill Janssen98d19da2007-09-10 21:51:02 +0000254 if self.chatty:
255 handle_error("Test server failure:\n")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000256
Bill Janssen98d19da2007-09-10 21:51:02 +0000257 def stop (self):
258 self.active = False
259 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000260
Bill Janssen98d19da2007-09-10 21:51:02 +0000261 def badCertTest (certfile):
262 server = ThreadedEchoServer(TESTPORT, CERTFILE,
263 certreqs=ssl.CERT_REQUIRED,
264 cacerts=CERTFILE, chatty=False)
265 flag = threading.Event()
266 server.start(flag)
267 # wait for it to start
268 flag.wait()
269 # try to connect
270 try:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000271 try:
Bill Janssen98d19da2007-09-10 21:51:02 +0000272 s = ssl.wrap_socket(socket.socket(),
273 certfile=certfile,
274 ssl_version=ssl.PROTOCOL_TLSv1)
275 s.connect(('127.0.0.1', TESTPORT))
276 except ssl.SSLError, x:
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000277 if test_support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000278 sys.stdout.write("\nSSLError is %s\n" % x[1])
279 else:
280 raise test_support.TestFailed(
281 "Use of invalid cert should have failed!")
282 finally:
283 server.stop()
284 server.join()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000285
Bill Janssen98d19da2007-09-10 21:51:02 +0000286 def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
287 client_certfile, client_protocol=None, indata="FOO\n",
288 chatty=True, connectionchatty=False):
289
290 server = ThreadedEchoServer(TESTPORT, certfile,
291 certreqs=certreqs,
292 ssl_version=protocol,
293 cacerts=cacertsfile,
294 chatty=chatty,
295 connectionchatty=connectionchatty)
296 flag = threading.Event()
297 server.start(flag)
298 # wait for it to start
299 flag.wait()
300 # try to connect
301 if client_protocol is None:
302 client_protocol = protocol
303 try:
304 try:
305 s = ssl.wrap_socket(socket.socket(),
306 certfile=client_certfile,
307 ca_certs=cacertsfile,
308 cert_reqs=certreqs,
309 ssl_version=client_protocol)
310 s.connect(('127.0.0.1', TESTPORT))
311 except ssl.SSLError, x:
312 raise test_support.TestFailed("Unexpected SSL error: " + str(x))
313 except Exception, x:
314 raise test_support.TestFailed("Unexpected exception: " + str(x))
315 else:
316 if connectionchatty:
317 if test_support.verbose:
318 sys.stdout.write(
319 " client: sending %s...\n" % (repr(indata)))
320 s.write(indata)
321 outdata = s.read()
322 if connectionchatty:
323 if test_support.verbose:
324 sys.stdout.write(" client: read %s\n" % repr(outdata))
325 if outdata != indata.lower():
326 raise test_support.TestFailed(
327 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
328 % (outdata[:min(len(outdata),20)], len(outdata),
329 indata[:min(len(indata),20)].lower(), len(indata)))
330 s.write("over\n")
331 if connectionchatty:
332 if test_support.verbose:
333 sys.stdout.write(" client: closing connection.\n")
334 s.ssl_shutdown()
335 s.close()
336 finally:
337 server.stop()
338 server.join()
339
340 def tryProtocolCombo (server_protocol,
341 client_protocol,
342 expectedToWork,
Bill Janssene3f1d7d2007-09-11 01:09:19 +0000343 certsreqs=None):
344
345 if certsreqs == None:
346 certsreqs = ssl.CERT_NONE
Bill Janssen98d19da2007-09-10 21:51:02 +0000347
348 if certsreqs == ssl.CERT_NONE:
349 certtype = "CERT_NONE"
350 elif certsreqs == ssl.CERT_OPTIONAL:
351 certtype = "CERT_OPTIONAL"
352 elif certsreqs == ssl.CERT_REQUIRED:
353 certtype = "CERT_REQUIRED"
354 if test_support.verbose:
355 formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
356 sys.stdout.write(formatstr %
357 (ssl.get_protocol_name(client_protocol),
358 ssl.get_protocol_name(server_protocol),
359 certtype))
360 try:
361 serverParamsTest(CERTFILE, server_protocol, certsreqs,
362 CERTFILE, CERTFILE, client_protocol, chatty=False)
363 except test_support.TestFailed:
364 if expectedToWork:
365 raise
366 else:
367 if not expectedToWork:
368 raise test_support.TestFailed(
369 "Client protocol %s succeeded with server protocol %s!"
370 % (ssl.get_protocol_name(client_protocol),
371 ssl.get_protocol_name(server_protocol)))
372
373
374 class ConnectedTests(unittest.TestCase):
375
376 def testRudeShutdown(self):
377
378 listener_ready = threading.Event()
379 listener_gone = threading.Event()
380
381 # `listener` runs in a thread. It opens a socket listening on
382 # PORT, and sits in an accept() until the main thread connects.
383 # Then it rudely closes the socket, and sets Event `listener_gone`
384 # to let the main thread know the socket is gone.
385 def listener():
386 s = socket.socket()
Bill Janssen119c7a62007-09-10 23:41:24 +0000387 if hasattr(socket, 'SO_REUSEADDR'):
388 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000389 if hasattr(socket, 'SO_REUSEPORT'):
390 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
Bill Janssen119c7a62007-09-10 23:41:24 +0000391 port = test_support.bind_port(s, '127.0.0.1', TESTPORT)
Bill Janssen98d19da2007-09-10 21:51:02 +0000392 s.listen(5)
393 listener_ready.set()
394 s.accept()
395 s = None # reclaim the socket object, which also closes it
396 listener_gone.set()
397
398 def connector():
399 listener_ready.wait()
400 s = socket.socket()
Bill Janssen119c7a62007-09-10 23:41:24 +0000401 s.connect(('127.0.0.1', TESTPORT))
Bill Janssen98d19da2007-09-10 21:51:02 +0000402 listener_gone.wait()
403 try:
404 ssl_sock = ssl.wrap_socket(s)
405 except socket.sslerror:
406 pass
407 else:
408 raise test_support.TestFailed(
409 'connecting to closed SSL socket should have failed')
410
411 t = threading.Thread(target=listener)
412 t.start()
413 connector()
414 t.join()
415
416 def testEcho (self):
417
418 if test_support.verbose:
419 sys.stdout.write("\n")
420 serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
421 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
422 chatty=True, connectionchatty=True)
423
424 def testReadCert(self):
425
426 if test_support.verbose:
427 sys.stdout.write("\n")
428 s2 = socket.socket()
429 server = ThreadedEchoServer(TESTPORT, CERTFILE,
430 certreqs=ssl.CERT_NONE,
431 ssl_version=ssl.PROTOCOL_SSLv23,
432 cacerts=CERTFILE,
433 chatty=False)
434 flag = threading.Event()
435 server.start(flag)
436 # wait for it to start
437 flag.wait()
438 # try to connect
439 try:
440 try:
441 s = ssl.wrap_socket(socket.socket(),
442 certfile=CERTFILE,
443 ca_certs=CERTFILE,
444 cert_reqs=ssl.CERT_REQUIRED,
445 ssl_version=ssl.PROTOCOL_SSLv23)
446 s.connect(('127.0.0.1', TESTPORT))
447 except ssl.SSLError, x:
448 raise test_support.TestFailed(
449 "Unexpected SSL error: " + str(x))
450 except Exception, x:
451 raise test_support.TestFailed(
452 "Unexpected exception: " + str(x))
453 else:
454 if not s:
455 raise test_support.TestFailed(
456 "Can't SSL-handshake with test server")
457 cert = s.getpeercert()
458 if not cert:
459 raise test_support.TestFailed(
460 "Can't get peer certificate.")
461 cipher = s.cipher()
462 if test_support.verbose:
463 sys.stdout.write(pprint.pformat(cert) + '\n')
464 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
465 if not cert.has_key('subject'):
466 raise test_support.TestFailed(
467 "No subject field in certificate: %s." %
468 pprint.pformat(cert))
469 if ((('organizationName', 'Python Software Foundation'),)
470 not in cert['subject']):
471 raise test_support.TestFailed(
472 "Missing or invalid 'organizationName' field in certificate subject; "
473 "should be 'Python Software Foundation'.");
474 s.close()
475 finally:
476 server.stop()
477 server.join()
478
479 def testNULLcert(self):
480 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
481 "nullcert.pem"))
482 def testMalformedCert(self):
483 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
484 "badcert.pem"))
485 def testMalformedKey(self):
486 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
487 "badkey.pem"))
488
489 def testProtocolSSL2(self):
490 if test_support.verbose:
491 sys.stdout.write("\n")
492 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
493 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
494 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
495 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
496 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
497 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
498
499 def testProtocolSSL23(self):
500 if test_support.verbose:
501 sys.stdout.write("\n")
502 try:
503 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
504 except test_support.TestFailed, x:
505 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
506 if test_support.verbose:
507 sys.stdout.write(
508 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
509 % str(x))
510 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
511 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
512 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
513
514 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
515 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
516 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
517
518 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
519 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
520 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
521
522 def testProtocolSSL3(self):
523 if test_support.verbose:
524 sys.stdout.write("\n")
525 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
526 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
527 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
528 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
529 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
530 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
531
532 def testProtocolTLS1(self):
533 if test_support.verbose:
534 sys.stdout.write("\n")
535 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
536 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
537 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
538 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
539 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
540 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
541
542 def testSTARTTLS (self):
543
544 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
545
546 server = ThreadedEchoServer(TESTPORT, CERTFILE,
547 ssl_version=ssl.PROTOCOL_TLSv1,
548 starttls_server=True,
549 chatty=True,
550 connectionchatty=True)
551 flag = threading.Event()
552 server.start(flag)
553 # wait for it to start
554 flag.wait()
555 # try to connect
556 wrapped = False
557 try:
558 try:
559 s = socket.socket()
560 s.setblocking(1)
561 s.connect(('127.0.0.1', TESTPORT))
562 except Exception, x:
563 raise test_support.TestFailed("Unexpected exception: " + str(x))
564 else:
565 if test_support.verbose:
566 sys.stdout.write("\n")
567 for indata in msgs:
568 if test_support.verbose:
569 sys.stdout.write(" client: sending %s...\n" % repr(indata))
570 if wrapped:
571 conn.write(indata)
572 outdata = conn.read()
573 else:
574 s.send(indata)
575 outdata = s.recv(1024)
576 if indata == "STARTTLS" and outdata.strip().lower().startswith("ok"):
577 if test_support.verbose:
578 sys.stdout.write(" client: read %s from server, starting TLS...\n" % repr(outdata))
579 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
580
581 wrapped = True
582 else:
583 if test_support.verbose:
584 sys.stdout.write(" client: read %s from server\n" % repr(outdata))
585 if test_support.verbose:
586 sys.stdout.write(" client: closing connection.\n")
587 if wrapped:
588 conn.write("over\n")
589 conn.ssl_shutdown()
590 else:
591 s.send("over\n")
592 s.close()
593 finally:
594 server.stop()
595 server.join()
596
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000597
598CERTFILE_CONFIG_TEMPLATE = """
599# create RSA certs - Server
600
601[ req ]
602default_bits = 1024
603encrypt_key = yes
604distinguished_name = req_dn
605x509_extensions = cert_type
606
607[ req_dn ]
608countryName = Country Name (2 letter code)
609countryName_default = US
610countryName_min = 2
611countryName_max = 2
612
613stateOrProvinceName = State or Province Name (full name)
614stateOrProvinceName_default = %(state)s
615
616localityName = Locality Name (eg, city)
617localityName_default = %(city)s
618
6190.organizationName = Organization Name (eg, company)
6200.organizationName_default = %(organization)s
621
622organizationalUnitName = Organizational Unit Name (eg, section)
623organizationalUnitName_default = %(unit)s
624
6250.commonName = Common Name (FQDN of your server)
6260.commonName_default = %(common-name)s
627
628# To create a certificate for more than one name uncomment:
629# 1.commonName = DNS alias of your server
630# 2.commonName = DNS alias of your server
631# ...
632# See http://home.netscape.com/eng/security/ssl_2.0_certificate.html
633# to see how Netscape understands commonName.
634
635[ cert_type ]
636nsCertType = server
637"""
638
Guido van Rossumba8c5652007-08-27 17:19:42 +0000639def create_cert_files(hostname=None):
640
641 """This is the routine that was run to create the certificate
642 and private key contained in keycert.pem."""
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000643
644 import tempfile, socket, os
645 d = tempfile.mkdtemp()
646 # now create a configuration file for the CA signing cert
Guido van Rossumba8c5652007-08-27 17:19:42 +0000647 fqdn = hostname or socket.getfqdn()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000648 crtfile = os.path.join(d, "cert.pem")
649 conffile = os.path.join(d, "ca.conf")
650 fp = open(conffile, "w")
651 fp.write(CERTFILE_CONFIG_TEMPLATE %
652 {'state': "Delaware",
653 'city': "Wilmington",
654 'organization': "Python Software Foundation",
655 'unit': "SSL",
656 'common-name': fqdn,
657 })
658 fp.close()
Neal Norwitz7fc8e292007-08-26 18:50:39 +0000659 error = os.system(
Guido van Rossumba8c5652007-08-27 17:19:42 +0000660 "openssl req -batch -new -x509 -days 2000 -nodes -config %s "
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000661 "-keyout \"%s\" -out \"%s\" > /dev/null < /dev/null 2>&1" %
662 (conffile, crtfile, crtfile))
663 # now we have a self-signed server cert in crtfile
664 os.unlink(conffile)
Neal Norwitzf6f525b2007-08-27 00:58:33 +0000665 if (os.WEXITSTATUS(error) or
666 not os.path.exists(crtfile) or os.path.getsize(crtfile) == 0):
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000667 if test_support.verbose:
Guido van Rossumba8c5652007-08-27 17:19:42 +0000668 sys.stdout.write("Unable to create certificate for test, "
669 + "error status %d\n" % (error >> 8))
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000670 crtfile = None
671 elif test_support.verbose:
672 sys.stdout.write(open(crtfile, 'r').read() + '\n')
Neal Norwitz7fc8e292007-08-26 18:50:39 +0000673 return d, crtfile
674
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000675
Bill Janssen119c7a62007-09-10 23:41:24 +0000676def findtestsocket(start, end):
677 def testbind(i):
678 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
679 try:
680 s.bind(("127.0.0.1", i))
681 except:
682 return 0
683 else:
684 return 1
685 finally:
686 s.close()
687
688 for i in range(start, end):
689 if testbind(i) and testbind(i+1):
690 return i
691 return 0
692
693
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000694def test_main(verbose=False):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000695 if skip_expected:
Bill Janssenffe576d2007-09-05 00:46:27 +0000696 raise test_support.TestSkipped("No SSL support")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000697
Bill Janssen119c7a62007-09-10 23:41:24 +0000698 global CERTFILE, TESTPORT
Guido van Rossumba8c5652007-08-27 17:19:42 +0000699 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
Bill Janssen98d19da2007-09-10 21:51:02 +0000700 "keycert.pem")
701 if (not os.path.exists(CERTFILE)):
702 raise test_support.TestFailed("Can't read certificate files!")
Bill Janssen119c7a62007-09-10 23:41:24 +0000703 TESTPORT = findtestsocket(10025, 12000)
704 if not TESTPORT:
705 raise test_support.TestFailed("Can't find open port to test servers on!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000706
707 tests = [BasicTests]
708
Bill Janssen98d19da2007-09-10 21:51:02 +0000709 if _have_threads:
710 thread_info = test_support.threading_setup()
711 if CERTFILE and thread_info and test_support.is_resource_enabled('network'):
712 tests.append(ConnectedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000713
Bill Janssen98d19da2007-09-10 21:51:02 +0000714 test_support.run_unittest(*tests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000715
Bill Janssen98d19da2007-09-10 21:51:02 +0000716 if _have_threads:
717 test_support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000718
719if __name__ == "__main__":
720 test_main()