Issue #9003: http.client.HTTPSConnection, urllib.request.HTTPSHandler and
urllib.request.urlopen now take optional arguments to allow for
server certificate checking, as recommended in public uses of HTTPS.
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index bf08f6f..ca74e71 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -11,15 +11,13 @@
import errno
import pprint
import tempfile
-import urllib.parse, urllib.request
+import urllib.request
import traceback
import asyncore
import weakref
import platform
import functools
-from http.server import HTTPServer, SimpleHTTPRequestHandler
-
# Optionally test SSL support, if we have it in the tested platform
skip_expected = False
try:
@@ -605,6 +603,8 @@
else:
_have_threads = True
+ from test.ssl_servers import make_https_server
+
class ThreadedEchoServer(threading.Thread):
class ConnectionHandler(threading.Thread):
@@ -774,98 +774,6 @@
def stop(self):
self.active = False
- class OurHTTPSServer(threading.Thread):
-
- # This one's based on HTTPServer, which is based on SocketServer
-
- class HTTPSServer(HTTPServer):
-
- def __init__(self, server_address, RequestHandlerClass, certfile):
- HTTPServer.__init__(self, server_address, RequestHandlerClass)
- # we assume the certfile contains both private key and certificate
- self.certfile = certfile
- self.allow_reuse_address = True
-
- def __str__(self):
- return ('<%s %s:%s>' %
- (self.__class__.__name__,
- self.server_name,
- self.server_port))
-
- def get_request(self):
- # override this to wrap socket with SSL
- sock, addr = self.socket.accept()
- sslconn = ssl.wrap_socket(sock, server_side=True,
- certfile=self.certfile)
- return sslconn, addr
-
- class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
- # need to override translate_path to get a known root,
- # instead of using os.curdir, since the test could be
- # run from anywhere
-
- server_version = "TestHTTPS/1.0"
-
- root = None
-
- def translate_path(self, path):
- """Translate a /-separated PATH to the local filename syntax.
-
- Components that mean special things to the local file system
- (e.g. drive or directory names) are ignored. (XXX They should
- probably be diagnosed.)
-
- """
- # abandon query parameters
- path = urllib.parse.urlparse(path)[2]
- path = os.path.normpath(urllib.parse.unquote(path))
- words = path.split('/')
- words = filter(None, words)
- path = self.root
- for word in words:
- drive, word = os.path.splitdrive(word)
- head, word = os.path.split(word)
- if word in self.root: continue
- path = os.path.join(path, word)
- return path
-
- def log_message(self, format, *args):
- # we override this to suppress logging unless "verbose"
-
- if support.verbose:
- sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
- (self.server.server_address,
- self.server.server_port,
- self.request.cipher(),
- self.log_date_time_string(),
- format%args))
-
-
- def __init__(self, certfile):
- self.flag = None
- self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
- self.server = self.HTTPSServer(
- (HOST, 0), self.RootedHTTPRequestHandler, certfile)
- self.port = self.server.server_port
- threading.Thread.__init__(self)
- self.daemon = True
-
- def __str__(self):
- return "<%s %s>" % (self.__class__.__name__, self.server)
-
- def start(self, flag=None):
- self.flag = flag
- threading.Thread.start(self)
-
- def run(self):
- if self.flag:
- self.flag.set()
- self.server.serve_forever(0.05)
-
- def stop(self):
- self.server.shutdown()
-
-
class AsyncoreEchoServer(threading.Thread):
# this one's based on asyncore.dispatcher
@@ -1349,22 +1257,18 @@
def test_socketserver(self):
"""Using a SocketServer to create and manage SSL connections."""
- server = OurHTTPSServer(CERTFILE)
- flag = threading.Event()
- server.start(flag)
- # wait for it to start
- flag.wait()
+ server = make_https_server(self, CERTFILE)
# try to connect
+ if support.verbose:
+ sys.stdout.write('\n')
+ with open(CERTFILE, 'rb') as f:
+ d1 = f.read()
+ d2 = ''
+ # now fetch the same data from the HTTPS server
+ url = 'https://%s:%d/%s' % (
+ HOST, server.port, os.path.split(CERTFILE)[1])
+ f = urllib.request.urlopen(url)
try:
- if support.verbose:
- sys.stdout.write('\n')
- with open(CERTFILE, 'rb') as f:
- d1 = f.read()
- d2 = ''
- # now fetch the same data from the HTTPS server
- url = 'https://%s:%d/%s' % (
- HOST, server.port, os.path.split(CERTFILE)[1])
- f = urllib.request.urlopen(url)
dlen = f.info().get("content-length")
if dlen and (int(dlen) > 0):
d2 = f.read(int(dlen))
@@ -1372,15 +1276,9 @@
sys.stdout.write(
" client: read %d bytes from remote server '%s'\n"
% (len(d2), server))
- f.close()
- self.assertEqual(d1, d2)
finally:
- if support.verbose:
- sys.stdout.write('stopping server\n')
- server.stop()
- if support.verbose:
- sys.stdout.write('joining thread\n')
- server.join()
+ f.close()
+ self.assertEqual(d1, d2)
def test_asyncore_server(self):
"""Check the example asyncore integration."""