blob: 7eedbdc8ac4c9408fe700322550c15abeeff6a6d [file] [log] [blame]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
5from test import test_support
6import socket
7import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import subprocess
9import time
10import os
11import pprint
12import urllib
13import shutil
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000014import traceback
15
16# Optionally test SSL support, if we have it in the tested platform
17skip_expected = False
18try:
19 import ssl
20except ImportError:
21 skip_expected = True
22
23CERTFILE = None
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000024
Bill Janssen98d19da2007-09-10 21:51:02 +000025TESTPORT = 10025
Neal Norwitz3e533c22007-08-27 01:03:18 +000026
27def handle_error(prefix):
28 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Bill Janssen98d19da2007-09-10 21:51:02 +000029 if test_support.verbose:
30 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000031
32
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000033class BasicTests(unittest.TestCase):
34
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000035 def testSSLconnect(self):
36 import os
37 with test_support.transient_internet():
Bill Janssen98d19da2007-09-10 21:51:02 +000038 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
39 cert_reqs=ssl.CERT_NONE)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000040 s.connect(("pop.gmail.com", 995))
41 c = s.getpeercert()
42 if c:
43 raise test_support.TestFailed("Peer cert %s shouldn't be here!")
44 s.close()
45
46 # this should fail because we have no verification certs
Bill Janssen98d19da2007-09-10 21:51:02 +000047 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
48 cert_reqs=ssl.CERT_REQUIRED)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000049 try:
50 s.connect(("pop.gmail.com", 995))
Bill Janssen98d19da2007-09-10 21:51:02 +000051 except ssl.SSLError:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000052 pass
53 finally:
54 s.close()
55
Bill Janssen98d19da2007-09-10 21:51:02 +000056 def testCrucialConstants(self):
57 ssl.PROTOCOL_SSLv2
58 ssl.PROTOCOL_SSLv23
59 ssl.PROTOCOL_SSLv3
60 ssl.PROTOCOL_TLSv1
61 ssl.CERT_NONE
62 ssl.CERT_OPTIONAL
63 ssl.CERT_REQUIRED
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000064
Bill Janssen98d19da2007-09-10 21:51:02 +000065 def testRAND(self):
66 v = ssl.RAND_status()
67 if test_support.verbose:
68 sys.stdout.write("\n RAND_status is %d (%s)\n"
69 % (v, (v and "sufficient randomness") or
70 "insufficient randomness"))
Guido van Rossume4729332007-08-26 19:35:09 +000071 try:
Bill Janssen98d19da2007-09-10 21:51:02 +000072 ssl.RAND_egd(1)
73 except TypeError:
74 pass
Guido van Rossume4729332007-08-26 19:35:09 +000075 else:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 print "didn't raise TypeError"
77 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000078
Bill Janssen98d19da2007-09-10 21:51:02 +000079 def testParseCert(self):
80 # note that this uses an 'unofficial' function in _ssl.c,
81 # provided solely for this test, to exercise the certificate
82 # parsing code
83 p = ssl._ssl._test_decode_cert(CERTFILE, False)
84 if test_support.verbose:
85 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000086
Bill Janssen98d19da2007-09-10 21:51:02 +000087try:
88 import threading
89except ImportError:
90 _have_threads = False
91else:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000092
Bill Janssen98d19da2007-09-10 21:51:02 +000093 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000094
Bill Janssen98d19da2007-09-10 21:51:02 +000095 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000096
Bill Janssen98d19da2007-09-10 21:51:02 +000097 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000098
Bill Janssen98d19da2007-09-10 21:51:02 +000099 """A mildly complicated class, because we want it to work both
100 with and without the SSL wrapper around the socket connection, so
101 that we can test the STARTTLS functionality."""
102
103 def __init__(self, server, connsock):
104 self.server = server
105 self.running = False
106 self.sock = connsock
107 self.sock.setblocking(1)
108 self.sslconn = None
109 threading.Thread.__init__(self)
110 self.setDaemon(True)
111
112 def wrap_conn (self):
113 try:
114 self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
115 certfile=self.server.certificate,
116 ssl_version=self.server.protocol,
117 ca_certs=self.server.cacerts,
118 cert_reqs=self.server.certreqs)
119 except:
120 if self.server.chatty:
121 handle_error("\n server: bad connection attempt from " +
122 str(self.sock.getpeername()) + ":\n")
123 if not self.server.expect_bad_connects:
124 # here, we want to stop the server, because this shouldn't
125 # happen in the context of our test case
126 self.running = False
127 # normally, we'd just stop here, but for the test
128 # harness, we want to stop the server
129 self.server.stop()
130 return False
131
132 else:
133 if self.server.certreqs == ssl.CERT_REQUIRED:
134 cert = self.sslconn.getpeercert()
135 if test_support.verbose and self.server.chatty:
136 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
137 cert_binary = self.sslconn.getpeercert(True)
138 if test_support.verbose and self.server.chatty:
139 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
140 cipher = self.sslconn.cipher()
141 if test_support.verbose and self.server.chatty:
142 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
143 return True
144
145 def read(self):
146 if self.sslconn:
147 return self.sslconn.read()
148 else:
149 return self.sock.recv(1024)
150
151 def write(self, bytes):
152 if self.sslconn:
153 return self.sslconn.write(bytes)
154 else:
155 return self.sock.send(bytes)
156
157 def close(self):
158 if self.sslconn:
159 self.sslconn.close()
160 else:
161 self.sock.close()
162
163 def run (self):
164 self.running = True
165 if not self.server.starttls_server:
166 if not self.wrap_conn():
167 return
168 while self.running:
169 try:
170 msg = self.read()
171 if not msg:
172 # eof, so quit this handler
173 self.running = False
174 self.close()
175 elif msg.strip() == 'over':
176 if test_support.verbose and self.server.connectionchatty:
177 sys.stdout.write(" server: client closed connection\n")
178 self.close()
179 return
180 elif self.server.starttls_server and msg.strip() == 'STARTTLS':
181 if test_support.verbose and self.server.connectionchatty:
182 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
183 self.write("OK\n")
184 if not self.wrap_conn():
185 return
186 else:
187 if (test_support.verbose and
188 self.server.connectionchatty):
189 ctype = (self.sslconn and "encrypted") or "unencrypted"
190 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
191 % (repr(msg), ctype, repr(msg.lower()), ctype))
192 self.write(msg.lower())
193 except ssl.SSLError:
194 if self.server.chatty:
195 handle_error("Test server failure:\n")
196 self.close()
197 self.running = False
198 # normally, we'd just stop here, but for the test
199 # harness, we want to stop the server
200 self.server.stop()
201 except:
202 handle_error('')
203
204 def __init__(self, port, certificate, ssl_version=None,
205 certreqs=None, cacerts=None, expect_bad_connects=False,
206 chatty=True, connectionchatty=False, starttls_server=False):
207 if ssl_version is None:
208 ssl_version = ssl.PROTOCOL_TLSv1
209 if certreqs is None:
210 certreqs = ssl.CERT_NONE
211 self.certificate = certificate
212 self.protocol = ssl_version
213 self.certreqs = certreqs
214 self.cacerts = cacerts
215 self.expect_bad_connects = expect_bad_connects
216 self.chatty = chatty
217 self.connectionchatty = connectionchatty
218 self.starttls_server = starttls_server
219 self.sock = socket.socket()
220 self.flag = None
221 if hasattr(socket, 'SO_REUSEADDR'):
222 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
223 if hasattr(socket, 'SO_REUSEPORT'):
224 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
225 self.sock.bind(('127.0.0.1', port))
226 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000227 threading.Thread.__init__(self)
Bill Janssen98d19da2007-09-10 21:51:02 +0000228 self.setDaemon(False)
229
230 def start (self, flag=None):
231 self.flag = flag
232 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000233
234 def run (self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000235 self.sock.settimeout(0.5)
236 self.sock.listen(5)
237 self.active = True
238 if self.flag:
239 # signal an event
240 self.flag.set()
241 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000242 try:
Bill Janssen98d19da2007-09-10 21:51:02 +0000243 newconn, connaddr = self.sock.accept()
244 if test_support.verbose and self.chatty:
245 sys.stdout.write(' server: new connection from '
246 + str(connaddr) + '\n')
247 handler = self.ConnectionHandler(self, newconn)
248 handler.start()
249 except socket.timeout:
250 pass
251 except KeyboardInterrupt:
252 self.stop()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000253 except:
Bill Janssen98d19da2007-09-10 21:51:02 +0000254 if self.chatty:
255 handle_error("Test server failure:\n")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000256
Bill Janssen98d19da2007-09-10 21:51:02 +0000257 def stop (self):
258 self.active = False
259 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000260
Bill Janssen98d19da2007-09-10 21:51:02 +0000261 def badCertTest (certfile):
262 server = ThreadedEchoServer(TESTPORT, CERTFILE,
263 certreqs=ssl.CERT_REQUIRED,
264 cacerts=CERTFILE, chatty=False)
265 flag = threading.Event()
266 server.start(flag)
267 # wait for it to start
268 flag.wait()
269 # try to connect
270 try:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000271 try:
Bill Janssen98d19da2007-09-10 21:51:02 +0000272 s = ssl.wrap_socket(socket.socket(),
273 certfile=certfile,
274 ssl_version=ssl.PROTOCOL_TLSv1)
275 s.connect(('127.0.0.1', TESTPORT))
276 except ssl.SSLError, x:
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000277 if test_support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000278 sys.stdout.write("\nSSLError is %s\n" % x[1])
279 else:
280 raise test_support.TestFailed(
281 "Use of invalid cert should have failed!")
282 finally:
283 server.stop()
284 server.join()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000285
Bill Janssen98d19da2007-09-10 21:51:02 +0000286 def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
287 client_certfile, client_protocol=None, indata="FOO\n",
288 chatty=True, connectionchatty=False):
289
290 server = ThreadedEchoServer(TESTPORT, certfile,
291 certreqs=certreqs,
292 ssl_version=protocol,
293 cacerts=cacertsfile,
294 chatty=chatty,
295 connectionchatty=connectionchatty)
296 flag = threading.Event()
297 server.start(flag)
298 # wait for it to start
299 flag.wait()
300 # try to connect
301 if client_protocol is None:
302 client_protocol = protocol
303 try:
304 try:
305 s = ssl.wrap_socket(socket.socket(),
306 certfile=client_certfile,
307 ca_certs=cacertsfile,
308 cert_reqs=certreqs,
309 ssl_version=client_protocol)
310 s.connect(('127.0.0.1', TESTPORT))
311 except ssl.SSLError, x:
312 raise test_support.TestFailed("Unexpected SSL error: " + str(x))
313 except Exception, x:
314 raise test_support.TestFailed("Unexpected exception: " + str(x))
315 else:
316 if connectionchatty:
317 if test_support.verbose:
318 sys.stdout.write(
319 " client: sending %s...\n" % (repr(indata)))
320 s.write(indata)
321 outdata = s.read()
322 if connectionchatty:
323 if test_support.verbose:
324 sys.stdout.write(" client: read %s\n" % repr(outdata))
325 if outdata != indata.lower():
326 raise test_support.TestFailed(
327 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
328 % (outdata[:min(len(outdata),20)], len(outdata),
329 indata[:min(len(indata),20)].lower(), len(indata)))
330 s.write("over\n")
331 if connectionchatty:
332 if test_support.verbose:
333 sys.stdout.write(" client: closing connection.\n")
334 s.ssl_shutdown()
335 s.close()
336 finally:
337 server.stop()
338 server.join()
339
340 def tryProtocolCombo (server_protocol,
341 client_protocol,
342 expectedToWork,
343 certsreqs=ssl.CERT_NONE):
344
345 if certsreqs == ssl.CERT_NONE:
346 certtype = "CERT_NONE"
347 elif certsreqs == ssl.CERT_OPTIONAL:
348 certtype = "CERT_OPTIONAL"
349 elif certsreqs == ssl.CERT_REQUIRED:
350 certtype = "CERT_REQUIRED"
351 if test_support.verbose:
352 formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
353 sys.stdout.write(formatstr %
354 (ssl.get_protocol_name(client_protocol),
355 ssl.get_protocol_name(server_protocol),
356 certtype))
357 try:
358 serverParamsTest(CERTFILE, server_protocol, certsreqs,
359 CERTFILE, CERTFILE, client_protocol, chatty=False)
360 except test_support.TestFailed:
361 if expectedToWork:
362 raise
363 else:
364 if not expectedToWork:
365 raise test_support.TestFailed(
366 "Client protocol %s succeeded with server protocol %s!"
367 % (ssl.get_protocol_name(client_protocol),
368 ssl.get_protocol_name(server_protocol)))
369
370
371 class ConnectedTests(unittest.TestCase):
372
373 def testRudeShutdown(self):
374
375 listener_ready = threading.Event()
376 listener_gone = threading.Event()
377
378 # `listener` runs in a thread. It opens a socket listening on
379 # PORT, and sits in an accept() until the main thread connects.
380 # Then it rudely closes the socket, and sets Event `listener_gone`
381 # to let the main thread know the socket is gone.
382 def listener():
383 s = socket.socket()
384 if hasattr(socket, 'SO_REUSEPORT'):
385 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
386 port = test_support.bind_port(s, 'localhost', TESTPORT)
387 s.listen(5)
388 listener_ready.set()
389 s.accept()
390 s = None # reclaim the socket object, which also closes it
391 listener_gone.set()
392
393 def connector():
394 listener_ready.wait()
395 s = socket.socket()
396 s.connect(('localhost', TESTPORT))
397 listener_gone.wait()
398 try:
399 ssl_sock = ssl.wrap_socket(s)
400 except socket.sslerror:
401 pass
402 else:
403 raise test_support.TestFailed(
404 'connecting to closed SSL socket should have failed')
405
406 t = threading.Thread(target=listener)
407 t.start()
408 connector()
409 t.join()
410
411 def testEcho (self):
412
413 if test_support.verbose:
414 sys.stdout.write("\n")
415 serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
416 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
417 chatty=True, connectionchatty=True)
418
419 def testReadCert(self):
420
421 if test_support.verbose:
422 sys.stdout.write("\n")
423 s2 = socket.socket()
424 server = ThreadedEchoServer(TESTPORT, CERTFILE,
425 certreqs=ssl.CERT_NONE,
426 ssl_version=ssl.PROTOCOL_SSLv23,
427 cacerts=CERTFILE,
428 chatty=False)
429 flag = threading.Event()
430 server.start(flag)
431 # wait for it to start
432 flag.wait()
433 # try to connect
434 try:
435 try:
436 s = ssl.wrap_socket(socket.socket(),
437 certfile=CERTFILE,
438 ca_certs=CERTFILE,
439 cert_reqs=ssl.CERT_REQUIRED,
440 ssl_version=ssl.PROTOCOL_SSLv23)
441 s.connect(('127.0.0.1', TESTPORT))
442 except ssl.SSLError, x:
443 raise test_support.TestFailed(
444 "Unexpected SSL error: " + str(x))
445 except Exception, x:
446 raise test_support.TestFailed(
447 "Unexpected exception: " + str(x))
448 else:
449 if not s:
450 raise test_support.TestFailed(
451 "Can't SSL-handshake with test server")
452 cert = s.getpeercert()
453 if not cert:
454 raise test_support.TestFailed(
455 "Can't get peer certificate.")
456 cipher = s.cipher()
457 if test_support.verbose:
458 sys.stdout.write(pprint.pformat(cert) + '\n')
459 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
460 if not cert.has_key('subject'):
461 raise test_support.TestFailed(
462 "No subject field in certificate: %s." %
463 pprint.pformat(cert))
464 if ((('organizationName', 'Python Software Foundation'),)
465 not in cert['subject']):
466 raise test_support.TestFailed(
467 "Missing or invalid 'organizationName' field in certificate subject; "
468 "should be 'Python Software Foundation'.");
469 s.close()
470 finally:
471 server.stop()
472 server.join()
473
474 def testNULLcert(self):
475 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
476 "nullcert.pem"))
477 def testMalformedCert(self):
478 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
479 "badcert.pem"))
480 def testMalformedKey(self):
481 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
482 "badkey.pem"))
483
484 def testProtocolSSL2(self):
485 if test_support.verbose:
486 sys.stdout.write("\n")
487 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
488 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
489 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
490 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
491 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
492 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
493
494 def testProtocolSSL23(self):
495 if test_support.verbose:
496 sys.stdout.write("\n")
497 try:
498 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
499 except test_support.TestFailed, x:
500 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
501 if test_support.verbose:
502 sys.stdout.write(
503 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
504 % str(x))
505 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
506 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
507 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
508
509 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
510 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
511 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
512
513 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
514 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
515 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
516
517 def testProtocolSSL3(self):
518 if test_support.verbose:
519 sys.stdout.write("\n")
520 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
521 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
522 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
523 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
524 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
525 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
526
527 def testProtocolTLS1(self):
528 if test_support.verbose:
529 sys.stdout.write("\n")
530 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
531 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
532 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
533 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
534 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
535 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
536
537 def testSTARTTLS (self):
538
539 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
540
541 server = ThreadedEchoServer(TESTPORT, CERTFILE,
542 ssl_version=ssl.PROTOCOL_TLSv1,
543 starttls_server=True,
544 chatty=True,
545 connectionchatty=True)
546 flag = threading.Event()
547 server.start(flag)
548 # wait for it to start
549 flag.wait()
550 # try to connect
551 wrapped = False
552 try:
553 try:
554 s = socket.socket()
555 s.setblocking(1)
556 s.connect(('127.0.0.1', TESTPORT))
557 except Exception, x:
558 raise test_support.TestFailed("Unexpected exception: " + str(x))
559 else:
560 if test_support.verbose:
561 sys.stdout.write("\n")
562 for indata in msgs:
563 if test_support.verbose:
564 sys.stdout.write(" client: sending %s...\n" % repr(indata))
565 if wrapped:
566 conn.write(indata)
567 outdata = conn.read()
568 else:
569 s.send(indata)
570 outdata = s.recv(1024)
571 if indata == "STARTTLS" and outdata.strip().lower().startswith("ok"):
572 if test_support.verbose:
573 sys.stdout.write(" client: read %s from server, starting TLS...\n" % repr(outdata))
574 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
575
576 wrapped = True
577 else:
578 if test_support.verbose:
579 sys.stdout.write(" client: read %s from server\n" % repr(outdata))
580 if test_support.verbose:
581 sys.stdout.write(" client: closing connection.\n")
582 if wrapped:
583 conn.write("over\n")
584 conn.ssl_shutdown()
585 else:
586 s.send("over\n")
587 s.close()
588 finally:
589 server.stop()
590 server.join()
591
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000592
593CERTFILE_CONFIG_TEMPLATE = """
594# create RSA certs - Server
595
596[ req ]
597default_bits = 1024
598encrypt_key = yes
599distinguished_name = req_dn
600x509_extensions = cert_type
601
602[ req_dn ]
603countryName = Country Name (2 letter code)
604countryName_default = US
605countryName_min = 2
606countryName_max = 2
607
608stateOrProvinceName = State or Province Name (full name)
609stateOrProvinceName_default = %(state)s
610
611localityName = Locality Name (eg, city)
612localityName_default = %(city)s
613
6140.organizationName = Organization Name (eg, company)
6150.organizationName_default = %(organization)s
616
617organizationalUnitName = Organizational Unit Name (eg, section)
618organizationalUnitName_default = %(unit)s
619
6200.commonName = Common Name (FQDN of your server)
6210.commonName_default = %(common-name)s
622
623# To create a certificate for more than one name uncomment:
624# 1.commonName = DNS alias of your server
625# 2.commonName = DNS alias of your server
626# ...
627# See http://home.netscape.com/eng/security/ssl_2.0_certificate.html
628# to see how Netscape understands commonName.
629
630[ cert_type ]
631nsCertType = server
632"""
633
Guido van Rossumba8c5652007-08-27 17:19:42 +0000634def create_cert_files(hostname=None):
635
636 """This is the routine that was run to create the certificate
637 and private key contained in keycert.pem."""
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000638
639 import tempfile, socket, os
640 d = tempfile.mkdtemp()
641 # now create a configuration file for the CA signing cert
Guido van Rossumba8c5652007-08-27 17:19:42 +0000642 fqdn = hostname or socket.getfqdn()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000643 crtfile = os.path.join(d, "cert.pem")
644 conffile = os.path.join(d, "ca.conf")
645 fp = open(conffile, "w")
646 fp.write(CERTFILE_CONFIG_TEMPLATE %
647 {'state': "Delaware",
648 'city': "Wilmington",
649 'organization': "Python Software Foundation",
650 'unit': "SSL",
651 'common-name': fqdn,
652 })
653 fp.close()
Neal Norwitz7fc8e292007-08-26 18:50:39 +0000654 error = os.system(
Guido van Rossumba8c5652007-08-27 17:19:42 +0000655 "openssl req -batch -new -x509 -days 2000 -nodes -config %s "
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000656 "-keyout \"%s\" -out \"%s\" > /dev/null < /dev/null 2>&1" %
657 (conffile, crtfile, crtfile))
658 # now we have a self-signed server cert in crtfile
659 os.unlink(conffile)
Neal Norwitzf6f525b2007-08-27 00:58:33 +0000660 if (os.WEXITSTATUS(error) or
661 not os.path.exists(crtfile) or os.path.getsize(crtfile) == 0):
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000662 if test_support.verbose:
Guido van Rossumba8c5652007-08-27 17:19:42 +0000663 sys.stdout.write("Unable to create certificate for test, "
664 + "error status %d\n" % (error >> 8))
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000665 crtfile = None
666 elif test_support.verbose:
667 sys.stdout.write(open(crtfile, 'r').read() + '\n')
Neal Norwitz7fc8e292007-08-26 18:50:39 +0000668 return d, crtfile
669
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000670
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000671def test_main(verbose=False):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000672 if skip_expected:
Bill Janssenffe576d2007-09-05 00:46:27 +0000673 raise test_support.TestSkipped("No SSL support")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000674
675 global CERTFILE
Guido van Rossumba8c5652007-08-27 17:19:42 +0000676 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
Bill Janssen98d19da2007-09-10 21:51:02 +0000677 "keycert.pem")
678 if (not os.path.exists(CERTFILE)):
679 raise test_support.TestFailed("Can't read certificate files!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000680
681 tests = [BasicTests]
682
Bill Janssen98d19da2007-09-10 21:51:02 +0000683 if _have_threads:
684 thread_info = test_support.threading_setup()
685 if CERTFILE and thread_info and test_support.is_resource_enabled('network'):
686 tests.append(ConnectedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000687
Bill Janssen98d19da2007-09-10 21:51:02 +0000688 test_support.run_unittest(*tests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000689
Bill Janssen98d19da2007-09-10 21:51:02 +0000690 if _have_threads:
691 test_support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000692
693if __name__ == "__main__":
694 test_main()