blob: 7e78b4c9cff4f02888aecf2ad3d7d77a603f77cb [file] [log] [blame]
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00001# Test the support for SSL and sockets
2
3import sys
4import unittest
5from test import test_support
6import socket
7import errno
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +00008import subprocess
9import time
10import os
11import pprint
Bill Janssen296a59d2007-09-16 22:06:00 +000012import urllib, urlparse
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000013import shutil
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000014import traceback
15
Bill Janssen296a59d2007-09-16 22:06:00 +000016from BaseHTTPServer import HTTPServer
17from SimpleHTTPServer import SimpleHTTPRequestHandler
18
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000019# Optionally test SSL support, if we have it in the tested platform
20skip_expected = False
21try:
22 import ssl
23except ImportError:
24 skip_expected = True
25
26CERTFILE = None
Bill Janssen296a59d2007-09-16 22:06:00 +000027SVN_PYTHON_ORG_ROOT_CERT = None
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000028
Bill Janssen98d19da2007-09-10 21:51:02 +000029TESTPORT = 10025
Neal Norwitz3e533c22007-08-27 01:03:18 +000030
31def handle_error(prefix):
32 exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
Bill Janssen98d19da2007-09-10 21:51:02 +000033 if test_support.verbose:
34 sys.stdout.write(prefix + exc_format)
Neal Norwitz3e533c22007-08-27 01:03:18 +000035
36
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000037class BasicTests(unittest.TestCase):
38
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000039 def testSSLconnect(self):
40 import os
Bill Janssen296a59d2007-09-16 22:06:00 +000041 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
42 cert_reqs=ssl.CERT_NONE)
43 s.connect(("svn.python.org", 443))
44 c = s.getpeercert()
45 if c:
46 raise test_support.TestFailed("Peer cert %s shouldn't be here!")
47 s.close()
48
49 # this should fail because we have no verification certs
50 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
51 cert_reqs=ssl.CERT_REQUIRED)
52 try:
53 s.connect(("svn.python.org", 443))
54 except ssl.SSLError:
55 pass
56 finally:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000057 s.close()
58
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000059
Bill Janssen98d19da2007-09-10 21:51:02 +000060 def testCrucialConstants(self):
61 ssl.PROTOCOL_SSLv2
62 ssl.PROTOCOL_SSLv23
63 ssl.PROTOCOL_SSLv3
64 ssl.PROTOCOL_TLSv1
65 ssl.CERT_NONE
66 ssl.CERT_OPTIONAL
67 ssl.CERT_REQUIRED
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000068
Bill Janssen98d19da2007-09-10 21:51:02 +000069 def testRAND(self):
70 v = ssl.RAND_status()
71 if test_support.verbose:
72 sys.stdout.write("\n RAND_status is %d (%s)\n"
73 % (v, (v and "sufficient randomness") or
74 "insufficient randomness"))
Guido van Rossume4729332007-08-26 19:35:09 +000075 try:
Bill Janssen98d19da2007-09-10 21:51:02 +000076 ssl.RAND_egd(1)
77 except TypeError:
78 pass
Guido van Rossume4729332007-08-26 19:35:09 +000079 else:
Bill Janssen98d19da2007-09-10 21:51:02 +000080 print "didn't raise TypeError"
81 ssl.RAND_add("this is a random string", 75.0)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000082
Bill Janssen98d19da2007-09-10 21:51:02 +000083 def testParseCert(self):
84 # note that this uses an 'unofficial' function in _ssl.c,
85 # provided solely for this test, to exercise the certificate
86 # parsing code
87 p = ssl._ssl._test_decode_cert(CERTFILE, False)
88 if test_support.verbose:
89 sys.stdout.write("\n" + pprint.pformat(p) + "\n")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +000090
Bill Janssen296a59d2007-09-16 22:06:00 +000091 def testDERtoPEM(self):
92
93 pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read()
94 d1 = ssl.PEM_cert_to_DER_cert(pem)
95 p2 = ssl.DER_cert_to_PEM_cert(d1)
96 d2 = ssl.PEM_cert_to_DER_cert(p2)
97 if (d1 != d2):
98 raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
99
100
101class NetworkTests(unittest.TestCase):
102
103 def testConnect(self):
104 import os
105 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
106 cert_reqs=ssl.CERT_NONE)
107 s.connect(("svn.python.org", 443))
108 c = s.getpeercert()
109 if c:
110 raise test_support.TestFailed("Peer cert %s shouldn't be here!")
111 s.close()
112
113 # this should fail because we have no verification certs
114 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
115 cert_reqs=ssl.CERT_REQUIRED)
116 try:
117 s.connect(("svn.python.org", 443))
118 except ssl.SSLError:
119 pass
120 finally:
121 s.close()
122
123 # this should succeed because we specify the root cert
124 s = ssl.wrap_socket(socket.socket(socket.AF_INET),
125 cert_reqs=ssl.CERT_REQUIRED,
126 ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
127 try:
128 s.connect(("svn.python.org", 443))
129 except ssl.SSLError, x:
130 raise test_support.TestFailed("Unexpected exception %s" % x)
131 finally:
132 s.close()
133
134 def testFetchServerCert(self):
135
136 pem = ssl.get_server_certificate(("svn.python.org", 443))
137 if not pem:
138 raise test_support.TestFailed("No server certificate on svn.python.org:443!")
139
140 try:
141 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
142 except ssl.SSLError:
143 #should fail
144 pass
145 else:
146 raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
147
148 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
149 if not pem:
150 raise test_support.TestFailed("No server certificate on svn.python.org:443!")
151 if test_support.verbose:
152 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
153
154
Bill Janssen98d19da2007-09-10 21:51:02 +0000155try:
156 import threading
157except ImportError:
158 _have_threads = False
159else:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000160
Bill Janssen98d19da2007-09-10 21:51:02 +0000161 _have_threads = True
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000162
Bill Janssen98d19da2007-09-10 21:51:02 +0000163 class ThreadedEchoServer(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000164
Bill Janssen98d19da2007-09-10 21:51:02 +0000165 class ConnectionHandler(threading.Thread):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000166
Bill Janssen98d19da2007-09-10 21:51:02 +0000167 """A mildly complicated class, because we want it to work both
168 with and without the SSL wrapper around the socket connection, so
169 that we can test the STARTTLS functionality."""
170
171 def __init__(self, server, connsock):
172 self.server = server
173 self.running = False
174 self.sock = connsock
175 self.sock.setblocking(1)
176 self.sslconn = None
177 threading.Thread.__init__(self)
178 self.setDaemon(True)
179
180 def wrap_conn (self):
181 try:
182 self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
183 certfile=self.server.certificate,
184 ssl_version=self.server.protocol,
185 ca_certs=self.server.cacerts,
186 cert_reqs=self.server.certreqs)
187 except:
188 if self.server.chatty:
189 handle_error("\n server: bad connection attempt from " +
190 str(self.sock.getpeername()) + ":\n")
191 if not self.server.expect_bad_connects:
192 # here, we want to stop the server, because this shouldn't
193 # happen in the context of our test case
194 self.running = False
195 # normally, we'd just stop here, but for the test
196 # harness, we want to stop the server
197 self.server.stop()
198 return False
199
200 else:
201 if self.server.certreqs == ssl.CERT_REQUIRED:
202 cert = self.sslconn.getpeercert()
203 if test_support.verbose and self.server.chatty:
204 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
205 cert_binary = self.sslconn.getpeercert(True)
206 if test_support.verbose and self.server.chatty:
207 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
208 cipher = self.sslconn.cipher()
209 if test_support.verbose and self.server.chatty:
210 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
211 return True
212
213 def read(self):
214 if self.sslconn:
215 return self.sslconn.read()
216 else:
217 return self.sock.recv(1024)
218
219 def write(self, bytes):
220 if self.sslconn:
221 return self.sslconn.write(bytes)
222 else:
223 return self.sock.send(bytes)
224
225 def close(self):
226 if self.sslconn:
227 self.sslconn.close()
228 else:
229 self.sock.close()
230
231 def run (self):
232 self.running = True
233 if not self.server.starttls_server:
234 if not self.wrap_conn():
235 return
236 while self.running:
237 try:
238 msg = self.read()
239 if not msg:
240 # eof, so quit this handler
241 self.running = False
242 self.close()
243 elif msg.strip() == 'over':
244 if test_support.verbose and self.server.connectionchatty:
245 sys.stdout.write(" server: client closed connection\n")
246 self.close()
247 return
248 elif self.server.starttls_server and msg.strip() == 'STARTTLS':
249 if test_support.verbose and self.server.connectionchatty:
250 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
251 self.write("OK\n")
252 if not self.wrap_conn():
253 return
254 else:
255 if (test_support.verbose and
256 self.server.connectionchatty):
257 ctype = (self.sslconn and "encrypted") or "unencrypted"
258 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
259 % (repr(msg), ctype, repr(msg.lower()), ctype))
260 self.write(msg.lower())
261 except ssl.SSLError:
262 if self.server.chatty:
263 handle_error("Test server failure:\n")
264 self.close()
265 self.running = False
266 # normally, we'd just stop here, but for the test
267 # harness, we want to stop the server
268 self.server.stop()
269 except:
270 handle_error('')
271
272 def __init__(self, port, certificate, ssl_version=None,
273 certreqs=None, cacerts=None, expect_bad_connects=False,
274 chatty=True, connectionchatty=False, starttls_server=False):
275 if ssl_version is None:
276 ssl_version = ssl.PROTOCOL_TLSv1
277 if certreqs is None:
278 certreqs = ssl.CERT_NONE
279 self.certificate = certificate
280 self.protocol = ssl_version
281 self.certreqs = certreqs
282 self.cacerts = cacerts
283 self.expect_bad_connects = expect_bad_connects
284 self.chatty = chatty
285 self.connectionchatty = connectionchatty
286 self.starttls_server = starttls_server
287 self.sock = socket.socket()
288 self.flag = None
289 if hasattr(socket, 'SO_REUSEADDR'):
290 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
291 if hasattr(socket, 'SO_REUSEPORT'):
292 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
293 self.sock.bind(('127.0.0.1', port))
294 self.active = False
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000295 threading.Thread.__init__(self)
Bill Janssen98d19da2007-09-10 21:51:02 +0000296 self.setDaemon(False)
297
298 def start (self, flag=None):
299 self.flag = flag
300 threading.Thread.start(self)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000301
302 def run (self):
Bill Janssen98d19da2007-09-10 21:51:02 +0000303 self.sock.settimeout(0.5)
304 self.sock.listen(5)
305 self.active = True
306 if self.flag:
307 # signal an event
308 self.flag.set()
309 while self.active:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000310 try:
Bill Janssen98d19da2007-09-10 21:51:02 +0000311 newconn, connaddr = self.sock.accept()
312 if test_support.verbose and self.chatty:
313 sys.stdout.write(' server: new connection from '
314 + str(connaddr) + '\n')
315 handler = self.ConnectionHandler(self, newconn)
316 handler.start()
317 except socket.timeout:
318 pass
319 except KeyboardInterrupt:
320 self.stop()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000321 except:
Bill Janssen98d19da2007-09-10 21:51:02 +0000322 if self.chatty:
323 handle_error("Test server failure:\n")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000324
Bill Janssen98d19da2007-09-10 21:51:02 +0000325 def stop (self):
326 self.active = False
327 self.sock.close()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000328
Bill Janssen296a59d2007-09-16 22:06:00 +0000329
330 class AsyncoreHTTPSServer(threading.Thread):
331
332 class HTTPSServer(HTTPServer):
333
334 def __init__(self, server_address, RequestHandlerClass, certfile):
335
336 HTTPServer.__init__(self, server_address, RequestHandlerClass)
337 # we assume the certfile contains both private key and certificate
338 self.certfile = certfile
339 self.active = False
340 self.allow_reuse_address = True
341
342 def get_request (self):
343 # override this to wrap socket with SSL
344 sock, addr = self.socket.accept()
345 sslconn = ssl.wrap_socket(sock, server_side=True,
346 certfile=self.certfile)
347 return sslconn, addr
348
349 # The methods overridden below this are mainly so that we
350 # can run it in a thread and be able to stop it from another
351 # You probably wouldn't need them in other uses.
352
353 def server_activate(self):
354 # We want to run this in a thread for testing purposes,
355 # so we override this to set timeout, so that we get
356 # a chance to stop the server
357 self.socket.settimeout(0.5)
358 HTTPServer.server_activate(self)
359
360 def serve_forever(self):
361 # We want this to run in a thread, so we use a slightly
362 # modified version of "forever".
363 self.active = True
364 while self.active:
365 try:
366 self.handle_request()
367 except socket.timeout:
368 pass
369 except KeyboardInterrupt:
370 self.server_close()
371 return
372 except:
373 sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info())));
374
375 def server_close(self):
376 # Again, we want this to run in a thread, so we need to override
377 # close to clear the "active" flag, so that serve_forever() will
378 # terminate.
379 HTTPServer.server_close(self)
380 self.active = False
381
382 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
383
384 # need to override translate_path to get a known root,
385 # instead of using os.curdir, since the test could be
386 # run from anywhere
387
388 server_version = "TestHTTPS/1.0"
389
390 root = None
391
392 def translate_path(self, path):
393 """Translate a /-separated PATH to the local filename syntax.
394
395 Components that mean special things to the local file system
396 (e.g. drive or directory names) are ignored. (XXX They should
397 probably be diagnosed.)
398
399 """
400 # abandon query parameters
401 path = urlparse.urlparse(path)[2]
402 path = os.path.normpath(urllib.unquote(path))
403 words = path.split('/')
404 words = filter(None, words)
405 path = self.root
406 for word in words:
407 drive, word = os.path.splitdrive(word)
408 head, word = os.path.split(word)
409 if word in self.root: continue
410 path = os.path.join(path, word)
411 return path
412
413 def log_message(self, format, *args):
414
415 # we override this to suppress logging unless "verbose"
416
417 if test_support.verbose:
418 sys.stdout.write(" server (%s, %d, %s):\n [%s] %s\n" %
419 (self.server.server_name,
420 self.server.server_port,
421 self.request.cipher(),
422 self.log_date_time_string(),
423 format%args))
424
425
426 def __init__(self, port, certfile):
427 self.flag = None
428 self.active = False
429 self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
430 self.server = self.HTTPSServer(
431 ('', port), self.RootedHTTPRequestHandler, certfile)
432 threading.Thread.__init__(self)
433 self.setDaemon(True)
434
435 def __str__(self):
436 return '<%s %s:%d>' % (self.__class__.__name__,
437 self.server.server_name,
438 self.server.server_port)
439
440 def start (self, flag=None):
441 self.flag = flag
442 threading.Thread.start(self)
443
444 def run (self):
445 self.active = True
446 if self.flag:
447 self.flag.set()
448 self.server.serve_forever()
449 self.active = False
450
451 def stop (self):
452 self.active = False
453 self.server.server_close()
454
455
Bill Janssen98d19da2007-09-10 21:51:02 +0000456 def badCertTest (certfile):
457 server = ThreadedEchoServer(TESTPORT, CERTFILE,
458 certreqs=ssl.CERT_REQUIRED,
459 cacerts=CERTFILE, chatty=False)
460 flag = threading.Event()
461 server.start(flag)
462 # wait for it to start
463 flag.wait()
464 # try to connect
465 try:
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000466 try:
Bill Janssen98d19da2007-09-10 21:51:02 +0000467 s = ssl.wrap_socket(socket.socket(),
468 certfile=certfile,
469 ssl_version=ssl.PROTOCOL_TLSv1)
470 s.connect(('127.0.0.1', TESTPORT))
471 except ssl.SSLError, x:
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000472 if test_support.verbose:
Bill Janssen98d19da2007-09-10 21:51:02 +0000473 sys.stdout.write("\nSSLError is %s\n" % x[1])
474 else:
475 raise test_support.TestFailed(
476 "Use of invalid cert should have failed!")
477 finally:
478 server.stop()
479 server.join()
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000480
Bill Janssen98d19da2007-09-10 21:51:02 +0000481 def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
482 client_certfile, client_protocol=None, indata="FOO\n",
483 chatty=True, connectionchatty=False):
484
485 server = ThreadedEchoServer(TESTPORT, certfile,
486 certreqs=certreqs,
487 ssl_version=protocol,
488 cacerts=cacertsfile,
489 chatty=chatty,
490 connectionchatty=connectionchatty)
491 flag = threading.Event()
492 server.start(flag)
493 # wait for it to start
494 flag.wait()
495 # try to connect
496 if client_protocol is None:
497 client_protocol = protocol
498 try:
499 try:
500 s = ssl.wrap_socket(socket.socket(),
501 certfile=client_certfile,
502 ca_certs=cacertsfile,
503 cert_reqs=certreqs,
504 ssl_version=client_protocol)
505 s.connect(('127.0.0.1', TESTPORT))
506 except ssl.SSLError, x:
507 raise test_support.TestFailed("Unexpected SSL error: " + str(x))
508 except Exception, x:
509 raise test_support.TestFailed("Unexpected exception: " + str(x))
510 else:
511 if connectionchatty:
512 if test_support.verbose:
513 sys.stdout.write(
514 " client: sending %s...\n" % (repr(indata)))
515 s.write(indata)
516 outdata = s.read()
517 if connectionchatty:
518 if test_support.verbose:
519 sys.stdout.write(" client: read %s\n" % repr(outdata))
520 if outdata != indata.lower():
521 raise test_support.TestFailed(
522 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
523 % (outdata[:min(len(outdata),20)], len(outdata),
524 indata[:min(len(indata),20)].lower(), len(indata)))
525 s.write("over\n")
526 if connectionchatty:
527 if test_support.verbose:
528 sys.stdout.write(" client: closing connection.\n")
Bill Janssen98d19da2007-09-10 21:51:02 +0000529 s.close()
530 finally:
531 server.stop()
532 server.join()
533
534 def tryProtocolCombo (server_protocol,
535 client_protocol,
536 expectedToWork,
Bill Janssene3f1d7d2007-09-11 01:09:19 +0000537 certsreqs=None):
538
539 if certsreqs == None:
540 certsreqs = ssl.CERT_NONE
Bill Janssen98d19da2007-09-10 21:51:02 +0000541
542 if certsreqs == ssl.CERT_NONE:
543 certtype = "CERT_NONE"
544 elif certsreqs == ssl.CERT_OPTIONAL:
545 certtype = "CERT_OPTIONAL"
546 elif certsreqs == ssl.CERT_REQUIRED:
547 certtype = "CERT_REQUIRED"
548 if test_support.verbose:
549 formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
550 sys.stdout.write(formatstr %
551 (ssl.get_protocol_name(client_protocol),
552 ssl.get_protocol_name(server_protocol),
553 certtype))
554 try:
555 serverParamsTest(CERTFILE, server_protocol, certsreqs,
556 CERTFILE, CERTFILE, client_protocol, chatty=False)
557 except test_support.TestFailed:
558 if expectedToWork:
559 raise
560 else:
561 if not expectedToWork:
562 raise test_support.TestFailed(
563 "Client protocol %s succeeded with server protocol %s!"
564 % (ssl.get_protocol_name(client_protocol),
565 ssl.get_protocol_name(server_protocol)))
566
567
568 class ConnectedTests(unittest.TestCase):
569
570 def testRudeShutdown(self):
571
572 listener_ready = threading.Event()
573 listener_gone = threading.Event()
574
575 # `listener` runs in a thread. It opens a socket listening on
576 # PORT, and sits in an accept() until the main thread connects.
577 # Then it rudely closes the socket, and sets Event `listener_gone`
578 # to let the main thread know the socket is gone.
579 def listener():
580 s = socket.socket()
Bill Janssen119c7a62007-09-10 23:41:24 +0000581 if hasattr(socket, 'SO_REUSEADDR'):
582 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Bill Janssen98d19da2007-09-10 21:51:02 +0000583 if hasattr(socket, 'SO_REUSEPORT'):
584 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
Bill Janssen296a59d2007-09-16 22:06:00 +0000585 s.bind(('127.0.0.1', TESTPORT))
Bill Janssen98d19da2007-09-10 21:51:02 +0000586 s.listen(5)
587 listener_ready.set()
588 s.accept()
589 s = None # reclaim the socket object, which also closes it
590 listener_gone.set()
591
592 def connector():
593 listener_ready.wait()
594 s = socket.socket()
Bill Janssen119c7a62007-09-10 23:41:24 +0000595 s.connect(('127.0.0.1', TESTPORT))
Bill Janssen98d19da2007-09-10 21:51:02 +0000596 listener_gone.wait()
597 try:
598 ssl_sock = ssl.wrap_socket(s)
599 except socket.sslerror:
600 pass
601 else:
602 raise test_support.TestFailed(
603 'connecting to closed SSL socket should have failed')
604
605 t = threading.Thread(target=listener)
606 t.start()
607 connector()
608 t.join()
609
610 def testEcho (self):
611
612 if test_support.verbose:
613 sys.stdout.write("\n")
614 serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
615 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
616 chatty=True, connectionchatty=True)
617
618 def testReadCert(self):
619
620 if test_support.verbose:
621 sys.stdout.write("\n")
622 s2 = socket.socket()
623 server = ThreadedEchoServer(TESTPORT, CERTFILE,
624 certreqs=ssl.CERT_NONE,
625 ssl_version=ssl.PROTOCOL_SSLv23,
626 cacerts=CERTFILE,
627 chatty=False)
628 flag = threading.Event()
629 server.start(flag)
630 # wait for it to start
631 flag.wait()
632 # try to connect
633 try:
634 try:
635 s = ssl.wrap_socket(socket.socket(),
636 certfile=CERTFILE,
637 ca_certs=CERTFILE,
638 cert_reqs=ssl.CERT_REQUIRED,
639 ssl_version=ssl.PROTOCOL_SSLv23)
640 s.connect(('127.0.0.1', TESTPORT))
641 except ssl.SSLError, x:
642 raise test_support.TestFailed(
643 "Unexpected SSL error: " + str(x))
644 except Exception, x:
645 raise test_support.TestFailed(
646 "Unexpected exception: " + str(x))
647 else:
648 if not s:
649 raise test_support.TestFailed(
650 "Can't SSL-handshake with test server")
651 cert = s.getpeercert()
652 if not cert:
653 raise test_support.TestFailed(
654 "Can't get peer certificate.")
655 cipher = s.cipher()
656 if test_support.verbose:
657 sys.stdout.write(pprint.pformat(cert) + '\n')
658 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
659 if not cert.has_key('subject'):
660 raise test_support.TestFailed(
661 "No subject field in certificate: %s." %
662 pprint.pformat(cert))
663 if ((('organizationName', 'Python Software Foundation'),)
664 not in cert['subject']):
665 raise test_support.TestFailed(
666 "Missing or invalid 'organizationName' field in certificate subject; "
667 "should be 'Python Software Foundation'.");
668 s.close()
669 finally:
670 server.stop()
671 server.join()
672
673 def testNULLcert(self):
674 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
675 "nullcert.pem"))
676 def testMalformedCert(self):
677 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
678 "badcert.pem"))
679 def testMalformedKey(self):
680 badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
681 "badkey.pem"))
682
683 def testProtocolSSL2(self):
684 if test_support.verbose:
685 sys.stdout.write("\n")
686 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
687 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
688 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
689 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
690 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
691 tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
692
693 def testProtocolSSL23(self):
694 if test_support.verbose:
695 sys.stdout.write("\n")
696 try:
697 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
698 except test_support.TestFailed, x:
699 # this fails on some older versions of OpenSSL (0.9.7l, for instance)
700 if test_support.verbose:
701 sys.stdout.write(
702 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
703 % str(x))
704 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
705 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
706 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
707
708 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
709 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
710 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
711
712 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
713 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
714 tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
715
716 def testProtocolSSL3(self):
717 if test_support.verbose:
718 sys.stdout.write("\n")
719 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
720 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
721 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
722 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
723 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
724 tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
725
726 def testProtocolTLS1(self):
727 if test_support.verbose:
728 sys.stdout.write("\n")
729 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
730 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
731 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
732 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
733 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
734 tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
735
736 def testSTARTTLS (self):
737
738 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
739
740 server = ThreadedEchoServer(TESTPORT, CERTFILE,
741 ssl_version=ssl.PROTOCOL_TLSv1,
742 starttls_server=True,
743 chatty=True,
744 connectionchatty=True)
745 flag = threading.Event()
746 server.start(flag)
747 # wait for it to start
748 flag.wait()
749 # try to connect
750 wrapped = False
751 try:
752 try:
753 s = socket.socket()
754 s.setblocking(1)
755 s.connect(('127.0.0.1', TESTPORT))
756 except Exception, x:
757 raise test_support.TestFailed("Unexpected exception: " + str(x))
758 else:
759 if test_support.verbose:
760 sys.stdout.write("\n")
761 for indata in msgs:
762 if test_support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +0000763 sys.stdout.write(
764 " client: sending %s...\n" % repr(indata))
Bill Janssen98d19da2007-09-10 21:51:02 +0000765 if wrapped:
766 conn.write(indata)
767 outdata = conn.read()
768 else:
769 s.send(indata)
770 outdata = s.recv(1024)
Bill Janssen296a59d2007-09-16 22:06:00 +0000771 if (indata == "STARTTLS" and
772 outdata.strip().lower().startswith("ok")):
Bill Janssen98d19da2007-09-10 21:51:02 +0000773 if test_support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +0000774 sys.stdout.write(
775 " client: read %s from server, starting TLS...\n"
776 % repr(outdata))
Bill Janssen98d19da2007-09-10 21:51:02 +0000777 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
778
779 wrapped = True
780 else:
781 if test_support.verbose:
Bill Janssen296a59d2007-09-16 22:06:00 +0000782 sys.stdout.write(
783 " client: read %s from server\n" % repr(outdata))
Bill Janssen98d19da2007-09-10 21:51:02 +0000784 if test_support.verbose:
785 sys.stdout.write(" client: closing connection.\n")
786 if wrapped:
787 conn.write("over\n")
Bill Janssen98d19da2007-09-10 21:51:02 +0000788 else:
789 s.send("over\n")
790 s.close()
791 finally:
792 server.stop()
793 server.join()
794
Bill Janssen296a59d2007-09-16 22:06:00 +0000795 def testAsyncore(self):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000796
Bill Janssen296a59d2007-09-16 22:06:00 +0000797 server = AsyncoreHTTPSServer(TESTPORT, CERTFILE)
798 flag = threading.Event()
799 server.start(flag)
800 # wait for it to start
801 flag.wait()
802 # try to connect
803 try:
804 if test_support.verbose:
805 sys.stdout.write('\n')
Bill Janssenbf10c472007-09-16 23:16:46 +0000806 d1 = open(CERTFILE, 'rb').read()
Bill Janssen296a59d2007-09-16 22:06:00 +0000807 d2 = ''
808 # now fetch the same data from the HTTPS server
809 url = 'https://127.0.0.1:%d/%s' % (
810 TESTPORT, os.path.split(CERTFILE)[1])
811 f = urllib.urlopen(url)
812 dlen = f.info().getheader("content-length")
813 if dlen and (int(dlen) > 0):
814 d2 = f.read(int(dlen))
815 if test_support.verbose:
816 sys.stdout.write(
817 " client: read %d bytes from remote server '%s'\n"
818 % (len(d2), server))
819 f.close()
820 except:
821 msg = ''.join(traceback.format_exception(*sys.exc_info()))
822 if test_support.verbose:
823 sys.stdout.write('\n' + msg)
824 raise test_support.TestFailed(msg)
825 else:
826 if not (d1 == d2):
827 raise test_support.TestFailed(
828 "Couldn't fetch data from HTTPS server")
829 finally:
830 server.stop()
831 server.join()
Neal Norwitz7fc8e292007-08-26 18:50:39 +0000832
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000833
Bill Janssen119c7a62007-09-10 23:41:24 +0000834def findtestsocket(start, end):
835 def testbind(i):
836 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
837 try:
838 s.bind(("127.0.0.1", i))
839 except:
840 return 0
841 else:
842 return 1
843 finally:
844 s.close()
845
846 for i in range(start, end):
847 if testbind(i) and testbind(i+1):
848 return i
849 return 0
850
851
Neal Norwitz9eb9b102007-08-27 01:15:33 +0000852def test_main(verbose=False):
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000853 if skip_expected:
Bill Janssenffe576d2007-09-05 00:46:27 +0000854 raise test_support.TestSkipped("No SSL support")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000855
Bill Janssen296a59d2007-09-16 22:06:00 +0000856 global CERTFILE, TESTPORT, SVN_PYTHON_ORG_ROOT_CERT
Guido van Rossumba8c5652007-08-27 17:19:42 +0000857 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
Bill Janssen296a59d2007-09-16 22:06:00 +0000858 "keycert.pem")
859 SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
860 os.path.dirname(__file__) or os.curdir,
861 "https_svn_python_org_root.pem")
862
863 if (not os.path.exists(CERTFILE) or
864 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
Bill Janssen98d19da2007-09-10 21:51:02 +0000865 raise test_support.TestFailed("Can't read certificate files!")
Bill Janssen119c7a62007-09-10 23:41:24 +0000866 TESTPORT = findtestsocket(10025, 12000)
867 if not TESTPORT:
868 raise test_support.TestFailed("Can't find open port to test servers on!")
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000869
870 tests = [BasicTests]
871
Bill Janssen296a59d2007-09-16 22:06:00 +0000872 if test_support.is_resource_enabled('network'):
873 tests.append(NetworkTests)
874
Bill Janssen98d19da2007-09-10 21:51:02 +0000875 if _have_threads:
876 thread_info = test_support.threading_setup()
Bill Janssen296a59d2007-09-16 22:06:00 +0000877 if thread_info and test_support.is_resource_enabled('network'):
Bill Janssen98d19da2007-09-10 21:51:02 +0000878 tests.append(ConnectedTests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000879
Bill Janssen98d19da2007-09-10 21:51:02 +0000880 test_support.run_unittest(*tests)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000881
Bill Janssen98d19da2007-09-10 21:51:02 +0000882 if _have_threads:
883 test_support.threading_cleanup(*thread_info)
Guido van Rossum4f2c3dd2007-08-25 15:08:43 +0000884
885if __name__ == "__main__":
886 test_main()