blob: 8254c407442fb0c8cd40e64a732a1205364ba121 [file] [log] [blame]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
5from test import test_support
6import socket
7import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import subprocess
9import time
10import os
11import pprint
12import urllib
13import shutil
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000014import traceback
15
16# Optionally test SSL support, if we have it in the tested platform
17skip_expected = False
18try:
19 import ssl
20except ImportError:
21 skip_expected = True
22
23CERTFILE = None
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000024
Bill Janssen98d19da2007-09-10 21:51:02 +000025TESTPORT = 10025
Neal Norwitz3e533c22007-08-27 01:03:18 +000026
27def handle_error(prefix):
28 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Bill Janssen98d19da2007-09-10 21:51:02 +000029 if test_support.verbose:
30 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000031
32
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000033class BasicTests(unittest.TestCase):
34
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000035 def testSSLconnect(self):
36 import os
37 with test_support.transient_internet():
Bill Janssen98d19da2007-09-10 21:51:02 +000038 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
39 cert_reqs=ssl.CERT_NONE)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000040 s.connect(("pop.gmail.com", 995))
41 c = s.getpeercert()
42 if c:
43 raise test_support.TestFailed("Peer cert %s shouldn't be here!")
44 s.close()
45
46 # this should fail because we have no verification certs
Bill Janssen98d19da2007-09-10 21:51:02 +000047 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
48 cert_reqs=ssl.CERT_REQUIRED)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000049 try:
50 s.connect(("pop.gmail.com", 995))
Bill Janssen98d19da2007-09-10 21:51:02 +000051 except ssl.SSLError:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000052 pass
53 finally:
54 s.close()
55
Bill Janssen98d19da2007-09-10 21:51:02 +000056 def testCrucialConstants(self):
57 ssl.PROTOCOL_SSLv2
58 ssl.PROTOCOL_SSLv23
59 ssl.PROTOCOL_SSLv3
60 ssl.PROTOCOL_TLSv1
61 ssl.CERT_NONE
62 ssl.CERT_OPTIONAL
63 ssl.CERT_REQUIRED
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000064
Bill Janssen98d19da2007-09-10 21:51:02 +000065 def testRAND(self):
66 v = ssl.RAND_status()
67 if test_support.verbose:
68 sys.stdout.write("\n RAND_status is %d (%s)\n"
69 % (v, (v and "sufficient randomness") or
70 "insufficient randomness"))
Guido van Rossume4729332007-08-26 19:35:09 +000071 try:
Bill Janssen98d19da2007-09-10 21:51:02 +000072 ssl.RAND_egd(1)
73 except TypeError:
74 pass
Guido van Rossume4729332007-08-26 19:35:09 +000075 else:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 print "didn't raise TypeError"
77 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000078
Bill Janssen98d19da2007-09-10 21:51:02 +000079 def testParseCert(self):
80 # note that this uses an 'unofficial' function in _ssl.c,
81 # provided solely for this test, to exercise the certificate
82 # parsing code
83 p = ssl._ssl._test_decode_cert(CERTFILE, False)
84 if test_support.verbose:
85 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000086
Bill Janssen98d19da2007-09-10 21:51:02 +000087try:
88 import threading
89except ImportError:
90 _have_threads = False
91else:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000092
Bill Janssen98d19da2007-09-10 21:51:02 +000093 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000094
Bill Janssen98d19da2007-09-10 21:51:02 +000095 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000096
Bill Janssen98d19da2007-09-10 21:51:02 +000097 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000098
Bill Janssen98d19da2007-09-10 21:51:02 +000099 """A mildly complicated class, because we want it to work both
100 with and without the SSL wrapper around the socket connection, so
101 that we can test the STARTTLS functionality."""
102
103 def __init__(self, server, connsock):
104 self.server = server
105 self.running = False
106 self.sock = connsock
107 self.sock.setblocking(1)
108 self.sslconn = None
109 threading.Thread.__init__(self)
110 self.setDaemon(True)
111
112 def wrap_conn (self):
113 try:
114 self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
115 certfile=self.server.certificate,
116 ssl_version=self.server.protocol,
117 ca_certs=self.server.cacerts,
118 cert_reqs=self.server.certreqs)
119 except:
120 if self.server.chatty:
121 handle_error("\n server: bad connection attempt from " +
122 str(self.sock.getpeername()) + ":\n")
123 if not self.server.expect_bad_connects:
124 # here, we want to stop the server, because this shouldn't
125 # happen in the context of our test case
126 self.running = False
127 # normally, we'd just stop here, but for the test
128 # harness, we want to stop the server
129 self.server.stop()
130 return False
131
132 else:
133 if self.server.certreqs == ssl.CERT_REQUIRED:
134 cert = self.sslconn.getpeercert()
135 if test_support.verbose and self.server.chatty:
136 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
137 cert_binary = self.sslconn.getpeercert(True)
138 if test_support.verbose and self.server.chatty:
139 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
140 cipher = self.sslconn.cipher()
141 if test_support.verbose and self.server.chatty:
142 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
143 return True
144
145 def read(self):
146 if self.sslconn:
147 return self.sslconn.read()
148 else:
149 return self.sock.recv(1024)
150
151 def write(self, bytes):
152 if self.sslconn:
153 return self.sslconn.write(bytes)
154 else:
155 return self.sock.send(bytes)
156
157 def close(self):
158 if self.sslconn:
159 self.sslconn.close()
160 else:
161 self.sock.close()
162
163 def run (self):
164 self.running = True
165 if not self.server.starttls_server:
166 if not self.wrap_conn():
167 return
168 while self.running:
169 try:
170 msg = self.read()
171 if not msg:
172 # eof, so quit this handler
173 self.running = False
174 self.close()
175 elif msg.strip() == 'over':
176 if test_support.verbose and self.server.connectionchatty:
177 sys.stdout.write(" server: client closed connection\n")
178 self.close()
179 return
180 elif self.server.starttls_server and msg.strip() == 'STARTTLS':
181 if test_support.verbose and self.server.connectionchatty:
182 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
183 self.write("OK\n")
184 if not self.wrap_conn():
185 return
186 else:
187 if (test_support.verbose and
188 self.server.connectionchatty):
189 ctype = (self.sslconn and "encrypted") or "unencrypted"
190 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
191 % (repr(msg), ctype, repr(msg.lower()), ctype))
192 self.write(msg.lower())
193 except ssl.SSLError:
194 if self.server.chatty:
195 handle_error("Test server failure:\n")
196 self.close()
197 self.running = False
198 # normally, we'd just stop here, but for the test
199 # harness, we want to stop the server
200 self.server.stop()
201 except:
202 handle_error('')
203
204 def __init__(self, port, certificate, ssl_version=None,
205 certreqs=None, cacerts=None, expect_bad_connects=False,
206 chatty=True, connectionchatty=False, starttls_server=False):
207 if ssl_version is None:
208 ssl_version = ssl.PROTOCOL_TLSv1
209 if certreqs is None:
210 certreqs = ssl.CERT_NONE
211 self.certificate = certificate
212 self.protocol = ssl_version
213 self.certreqs = certreqs
214 self.cacerts = cacerts
215 self.expect_bad_connects = expect_bad_connects
216 self.chatty = chatty
217 self.connectionchatty = connectionchatty
218 self.starttls_server = starttls_server
219 self.sock = socket.socket()
220 self.flag = None
221 if hasattr(socket, 'SO_REUSEADDR'):
222 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
223 if hasattr(socket, 'SO_REUSEPORT'):
224 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
225 self.sock.bind(('127.0.0.1', port))
226 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000227 threading.Thread.__init__(self)
Bill Janssen98d19da2007-09-10 21:51:02 +0000228 self.setDaemon(False)
229
230 def start (self, flag=None):
231 self.flag = flag
232 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000233
234 def run (self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000235 self.sock.settimeout(0.5)
236 self.sock.listen(5)
237 self.active = True
238 if self.flag:
239 # signal an event
240 self.flag.set()
241 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000242 try:
Bill Janssen98d19da2007-09-10 21:51:02 +0000243 newconn, connaddr = self.sock.accept()
244 if test_support.verbose and self.chatty:
245 sys.stdout.write(' server: new connection from '
246 + str(connaddr) + '\n')
247 handler = self.ConnectionHandler(self, newconn)
248 handler.start()
249 except socket.timeout:
250 pass
251 except KeyboardInterrupt:
252 self.stop()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000253 except:
Bill Janssen98d19da2007-09-10 21:51:02 +0000254 if self.chatty:
255 handle_error("Test server failure:\n")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000256
Bill Janssen98d19da2007-09-10 21:51:02 +0000257 def stop (self):
258 self.active = False
259 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000260
Bill Janssen98d19da2007-09-10 21:51:02 +0000261 def badCertTest (certfile):
262 server = ThreadedEchoServer(TESTPORT, CERTFILE,
263 certreqs=ssl.CERT_REQUIRED,
264 cacerts=CERTFILE, chatty=False)
265 flag = threading.Event()
266 server.start(flag)
267 # wait for it to start
268 flag.wait()
269 # try to connect
270 try:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000271 try:
Bill Janssen98d19da2007-09-10 21:51:02 +0000272 s = ssl.wrap_socket(socket.socket(),
273 certfile=certfile,
274 ssl_version=ssl.PROTOCOL_TLSv1)
275 s.connect(('127.0.0.1', TESTPORT))
276 except ssl.SSLError, x:
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000277 if test_support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000278 sys.stdout.write("\nSSLError is %s\n" % x[1])
279 else:
280 raise test_support.TestFailed(
281 "Use of invalid cert should have failed!")
282 finally:
283 server.stop()
284 server.join()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000285
Bill Janssen98d19da2007-09-10 21:51:02 +0000286 def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
287 client_certfile, client_protocol=None, indata="FOO\n",
288 chatty=True, connectionchatty=False):
289
290 server = ThreadedEchoServer(TESTPORT, certfile,
291 certreqs=certreqs,
292 ssl_version=protocol,
293 cacerts=cacertsfile,
294 chatty=chatty,
295 connectionchatty=connectionchatty)
296 flag = threading.Event()
297 server.start(flag)
298 # wait for it to start
299 flag.wait()
300 # try to connect
301 if client_protocol is None:
302 client_protocol = protocol
303 try:
304 try:
305 s = ssl.wrap_socket(socket.socket(),
306 certfile=client_certfile,
307 ca_certs=cacertsfile,
308 cert_reqs=certreqs,
309 ssl_version=client_protocol)
310 s.connect(('127.0.0.1', TESTPORT))
311 except ssl.SSLError, x:
312 raise test_support.TestFailed("Unexpected SSL error: " + str(x))
313 except Exception, x:
314 raise test_support.TestFailed("Unexpected exception: " + str(x))
315 else:
316 if connectionchatty:
317 if test_support.verbose:
318 sys.stdout.write(
319 " client: sending %s...\n" % (repr(indata)))
320 s.write(indata)
321 outdata = s.read()
322 if connectionchatty:
323 if test_support.verbose:
324 sys.stdout.write(" client: read %s\n" % repr(outdata))
325 if outdata != indata.lower():
326 raise test_support.TestFailed(
327 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
328 % (outdata[:min(len(outdata),20)], len(outdata),
329 indata[:min(len(indata),20)].lower(), len(indata)))
330 s.write("over\n")
331 if connectionchatty:
332 if test_support.verbose:
333 sys.stdout.write(" client: closing connection.\n")
334 s.ssl_shutdown()
335 s.close()
336 finally:
337 server.stop()
338 server.join()
339
340 def tryProtocolCombo (server_protocol,
341 client_protocol,
342 expectedToWork,
343 certsreqs=ssl.CERT_NONE):
344
345 if certsreqs == ssl.CERT_NONE:
346 certtype = "CERT_NONE"
347 elif certsreqs == ssl.CERT_OPTIONAL:
348 certtype = "CERT_OPTIONAL"
349 elif certsreqs == ssl.CERT_REQUIRED:
350 certtype = "CERT_REQUIRED"
351 if test_support.verbose:
352 formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
353 sys.stdout.write(formatstr %
354 (ssl.get_protocol_name(client_protocol),
355 ssl.get_protocol_name(server_protocol),
356 certtype))
357 try:
358 serverParamsTest(CERTFILE, server_protocol, certsreqs,
359 CERTFILE, CERTFILE, client_protocol, chatty=False)
360 except test_support.TestFailed:
361 if expectedToWork:
362 raise
363 else:
364 if not expectedToWork:
365 raise test_support.TestFailed(
366 "Client protocol %s succeeded with server protocol %s!"
367 % (ssl.get_protocol_name(client_protocol),
368 ssl.get_protocol_name(server_protocol)))
369
370
371 class ConnectedTests(unittest.TestCase):
372
373 def testRudeShutdown(self):
374
375 listener_ready = threading.Event()
376 listener_gone = threading.Event()
377
378 # `listener` runs in a thread. It opens a socket listening on
379 # PORT, and sits in an accept() until the main thread connects.
380 # Then it rudely closes the socket, and sets Event `listener_gone`
381 # to let the main thread know the socket is gone.
382 def listener():
383 s = socket.socket()
Bill Janssen119c7a62007-09-10 23:41:24 +0000384 if hasattr(socket, 'SO_REUSEADDR'):
385 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000386 if hasattr(socket, 'SO_REUSEPORT'):
387 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
Bill Janssen119c7a62007-09-10 23:41:24 +0000388 port = test_support.bind_port(s, '127.0.0.1', TESTPORT)
Bill Janssen98d19da2007-09-10 21:51:02 +0000389 s.listen(5)
390 listener_ready.set()
391 s.accept()
392 s = None # reclaim the socket object, which also closes it
393 listener_gone.set()
394
395 def connector():
396 listener_ready.wait()
397 s = socket.socket()
Bill Janssen119c7a62007-09-10 23:41:24 +0000398 s.connect(('127.0.0.1', TESTPORT))
Bill Janssen98d19da2007-09-10 21:51:02 +0000399 listener_gone.wait()
400 try:
401 ssl_sock = ssl.wrap_socket(s)
402 except socket.sslerror:
403 pass
404 else:
405 raise test_support.TestFailed(
406 'connecting to closed SSL socket should have failed')
407
408 t = threading.Thread(target=listener)
409 t.start()
410 connector()
411 t.join()
412
413 def testEcho (self):
414
415 if test_support.verbose:
416 sys.stdout.write("\n")
417 serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
418 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
419 chatty=True, connectionchatty=True)
420
421 def testReadCert(self):
422
423 if test_support.verbose:
424 sys.stdout.write("\n")
425 s2 = socket.socket()
426 server = ThreadedEchoServer(TESTPORT, CERTFILE,
427 certreqs=ssl.CERT_NONE,
428 ssl_version=ssl.PROTOCOL_SSLv23,
429 cacerts=CERTFILE,
430 chatty=False)
431 flag = threading.Event()
432 server.start(flag)
433 # wait for it to start
434 flag.wait()
435 # try to connect
436 try:
437 try:
438 s = ssl.wrap_socket(socket.socket(),
439 certfile=CERTFILE,
440 ca_certs=CERTFILE,
441 cert_reqs=ssl.CERT_REQUIRED,
442 ssl_version=ssl.PROTOCOL_SSLv23)
443 s.connect(('127.0.0.1', TESTPORT))
444 except ssl.SSLError, x:
445 raise test_support.TestFailed(
446 "Unexpected SSL error: " + str(x))
447 except Exception, x:
448 raise test_support.TestFailed(
449 "Unexpected exception: " + str(x))
450 else:
451 if not s:
452 raise test_support.TestFailed(
453 "Can't SSL-handshake with test server")
454 cert = s.getpeercert()
455 if not cert:
456 raise test_support.TestFailed(
457 "Can't get peer certificate.")
458 cipher = s.cipher()
459 if test_support.verbose:
460 sys.stdout.write(pprint.pformat(cert) + '\n')
461 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
462 if not cert.has_key('subject'):
463 raise test_support.TestFailed(
464 "No subject field in certificate: %s." %
465 pprint.pformat(cert))
466 if ((('organizationName', 'Python Software Foundation'),)
467 not in cert['subject']):
468 raise test_support.TestFailed(
469 "Missing or invalid 'organizationName' field in certificate subject; "
470 "should be 'Python Software Foundation'.");
471 s.close()
472 finally:
473 server.stop()
474 server.join()
475
476 def testNULLcert(self):
477 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
478 "nullcert.pem"))
479 def testMalformedCert(self):
480 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
481 "badcert.pem"))
482 def testMalformedKey(self):
483 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
484 "badkey.pem"))
485
486 def testProtocolSSL2(self):
487 if test_support.verbose:
488 sys.stdout.write("\n")
489 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
490 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
491 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
492 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
493 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
494 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
495
496 def testProtocolSSL23(self):
497 if test_support.verbose:
498 sys.stdout.write("\n")
499 try:
500 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
501 except test_support.TestFailed, x:
502 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
503 if test_support.verbose:
504 sys.stdout.write(
505 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
506 % str(x))
507 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
508 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
509 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
510
511 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
512 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
513 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
514
515 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
516 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
517 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
518
519 def testProtocolSSL3(self):
520 if test_support.verbose:
521 sys.stdout.write("\n")
522 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
523 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
524 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
525 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
526 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
527 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
528
529 def testProtocolTLS1(self):
530 if test_support.verbose:
531 sys.stdout.write("\n")
532 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
533 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
534 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
535 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
536 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
537 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
538
539 def testSTARTTLS (self):
540
541 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
542
543 server = ThreadedEchoServer(TESTPORT, CERTFILE,
544 ssl_version=ssl.PROTOCOL_TLSv1,
545 starttls_server=True,
546 chatty=True,
547 connectionchatty=True)
548 flag = threading.Event()
549 server.start(flag)
550 # wait for it to start
551 flag.wait()
552 # try to connect
553 wrapped = False
554 try:
555 try:
556 s = socket.socket()
557 s.setblocking(1)
558 s.connect(('127.0.0.1', TESTPORT))
559 except Exception, x:
560 raise test_support.TestFailed("Unexpected exception: " + str(x))
561 else:
562 if test_support.verbose:
563 sys.stdout.write("\n")
564 for indata in msgs:
565 if test_support.verbose:
566 sys.stdout.write(" client: sending %s...\n" % repr(indata))
567 if wrapped:
568 conn.write(indata)
569 outdata = conn.read()
570 else:
571 s.send(indata)
572 outdata = s.recv(1024)
573 if indata == "STARTTLS" and outdata.strip().lower().startswith("ok"):
574 if test_support.verbose:
575 sys.stdout.write(" client: read %s from server, starting TLS...\n" % repr(outdata))
576 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
577
578 wrapped = True
579 else:
580 if test_support.verbose:
581 sys.stdout.write(" client: read %s from server\n" % repr(outdata))
582 if test_support.verbose:
583 sys.stdout.write(" client: closing connection.\n")
584 if wrapped:
585 conn.write("over\n")
586 conn.ssl_shutdown()
587 else:
588 s.send("over\n")
589 s.close()
590 finally:
591 server.stop()
592 server.join()
593
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000594
595CERTFILE_CONFIG_TEMPLATE = """
596# create RSA certs - Server
597
598[ req ]
599default_bits = 1024
600encrypt_key = yes
601distinguished_name = req_dn
602x509_extensions = cert_type
603
604[ req_dn ]
605countryName = Country Name (2 letter code)
606countryName_default = US
607countryName_min = 2
608countryName_max = 2
609
610stateOrProvinceName = State or Province Name (full name)
611stateOrProvinceName_default = %(state)s
612
613localityName = Locality Name (eg, city)
614localityName_default = %(city)s
615
6160.organizationName = Organization Name (eg, company)
6170.organizationName_default = %(organization)s
618
619organizationalUnitName = Organizational Unit Name (eg, section)
620organizationalUnitName_default = %(unit)s
621
6220.commonName = Common Name (FQDN of your server)
6230.commonName_default = %(common-name)s
624
625# To create a certificate for more than one name uncomment:
626# 1.commonName = DNS alias of your server
627# 2.commonName = DNS alias of your server
628# ...
629# See http://home.netscape.com/eng/security/ssl_2.0_certificate.html
630# to see how Netscape understands commonName.
631
632[ cert_type ]
633nsCertType = server
634"""
635
Guido van Rossumba8c5652007-08-27 17:19:42 +0000636def create_cert_files(hostname=None):
637
638 """This is the routine that was run to create the certificate
639 and private key contained in keycert.pem."""
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000640
641 import tempfile, socket, os
642 d = tempfile.mkdtemp()
643 # now create a configuration file for the CA signing cert
Guido van Rossumba8c5652007-08-27 17:19:42 +0000644 fqdn = hostname or socket.getfqdn()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000645 crtfile = os.path.join(d, "cert.pem")
646 conffile = os.path.join(d, "ca.conf")
647 fp = open(conffile, "w")
648 fp.write(CERTFILE_CONFIG_TEMPLATE %
649 {'state': "Delaware",
650 'city': "Wilmington",
651 'organization': "Python Software Foundation",
652 'unit': "SSL",
653 'common-name': fqdn,
654 })
655 fp.close()
Neal Norwitz7fc8e292007-08-26 18:50:39 +0000656 error = os.system(
Guido van Rossumba8c5652007-08-27 17:19:42 +0000657 "openssl req -batch -new -x509 -days 2000 -nodes -config %s "
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000658 "-keyout \"%s\" -out \"%s\" > /dev/null < /dev/null 2>&1" %
659 (conffile, crtfile, crtfile))
660 # now we have a self-signed server cert in crtfile
661 os.unlink(conffile)
Neal Norwitzf6f525b2007-08-27 00:58:33 +0000662 if (os.WEXITSTATUS(error) or
663 not os.path.exists(crtfile) or os.path.getsize(crtfile) == 0):
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000664 if test_support.verbose:
Guido van Rossumba8c5652007-08-27 17:19:42 +0000665 sys.stdout.write("Unable to create certificate for test, "
666 + "error status %d\n" % (error >> 8))
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000667 crtfile = None
668 elif test_support.verbose:
669 sys.stdout.write(open(crtfile, 'r').read() + '\n')
Neal Norwitz7fc8e292007-08-26 18:50:39 +0000670 return d, crtfile
671
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000672
Bill Janssen119c7a62007-09-10 23:41:24 +0000673def findtestsocket(start, end):
674 def testbind(i):
675 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
676 try:
677 s.bind(("127.0.0.1", i))
678 except:
679 return 0
680 else:
681 return 1
682 finally:
683 s.close()
684
685 for i in range(start, end):
686 if testbind(i) and testbind(i+1):
687 return i
688 return 0
689
690
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000691def test_main(verbose=False):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000692 if skip_expected:
Bill Janssenffe576d2007-09-05 00:46:27 +0000693 raise test_support.TestSkipped("No SSL support")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000694
Bill Janssen119c7a62007-09-10 23:41:24 +0000695 global CERTFILE, TESTPORT
Guido van Rossumba8c5652007-08-27 17:19:42 +0000696 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
Bill Janssen98d19da2007-09-10 21:51:02 +0000697 "keycert.pem")
698 if (not os.path.exists(CERTFILE)):
699 raise test_support.TestFailed("Can't read certificate files!")
Bill Janssen119c7a62007-09-10 23:41:24 +0000700 TESTPORT = findtestsocket(10025, 12000)
701 if not TESTPORT:
702 raise test_support.TestFailed("Can't find open port to test servers on!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000703
704 tests = [BasicTests]
705
Bill Janssen98d19da2007-09-10 21:51:02 +0000706 if _have_threads:
707 thread_info = test_support.threading_setup()
708 if CERTFILE and thread_info and test_support.is_resource_enabled('network'):
709 tests.append(ConnectedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000710
Bill Janssen98d19da2007-09-10 21:51:02 +0000711 test_support.run_unittest(*tests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000712
Bill Janssen98d19da2007-09-10 21:51:02 +0000713 if _have_threads:
714 test_support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000715
716if __name__ == "__main__":
717 test_main()