blob: a0b77b5e43a4f12e74ef65006e170f01f35dea1d [file] [log] [blame]
Antoine Pitrou803e6d62010-10-13 10:36:15 +00001import os
2import sys
3import ssl
Antoine Pitrouf26f87e2010-10-13 11:27:09 +00004import pprint
Antoine Pitrou803e6d62010-10-13 10:36:15 +00005import threading
6import urllib.parse
7# Rename HTTPServer to _HTTPServer so as to avoid confusion with HTTPSServer.
Antoine Pitrouf26f87e2010-10-13 11:27:09 +00008from http.server import (HTTPServer as _HTTPServer,
9 SimpleHTTPRequestHandler, BaseHTTPRequestHandler)
Antoine Pitrou803e6d62010-10-13 10:36:15 +000010
11from test import support
12
13here = os.path.dirname(__file__)
14
15HOST = support.HOST
16CERTFILE = os.path.join(here, 'keycert.pem')
17
18# This one's based on HTTPServer, which is based on SocketServer
19
20class HTTPSServer(_HTTPServer):
21
22 def __init__(self, server_address, handler_class, context):
23 _HTTPServer.__init__(self, server_address, handler_class)
24 self.context = context
25
26 def __str__(self):
27 return ('<%s %s:%s>' %
28 (self.__class__.__name__,
29 self.server_name,
30 self.server_port))
31
32 def get_request(self):
33 # override this to wrap socket with SSL
34 sock, addr = self.socket.accept()
35 sslconn = self.context.wrap_socket(sock, server_side=True)
36 return sslconn, addr
37
38class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
39 # need to override translate_path to get a known root,
40 # instead of using os.curdir, since the test could be
41 # run from anywhere
42
43 server_version = "TestHTTPS/1.0"
44 root = here
45 # Avoid hanging when a request gets interrupted by the client
46 timeout = 5
47
48 def translate_path(self, path):
49 """Translate a /-separated PATH to the local filename syntax.
50
51 Components that mean special things to the local file system
52 (e.g. drive or directory names) are ignored. (XXX They should
53 probably be diagnosed.)
54
55 """
56 # abandon query parameters
57 path = urllib.parse.urlparse(path)[2]
58 path = os.path.normpath(urllib.parse.unquote(path))
59 words = path.split('/')
60 words = filter(None, words)
61 path = self.root
62 for word in words:
63 drive, word = os.path.splitdrive(word)
64 head, word = os.path.split(word)
65 path = os.path.join(path, word)
66 return path
67
68 def log_message(self, format, *args):
69 # we override this to suppress logging unless "verbose"
70 if support.verbose:
71 sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
72 (self.server.server_address,
73 self.server.server_port,
74 self.request.cipher(),
75 self.log_date_time_string(),
76 format%args))
77
Antoine Pitrouf26f87e2010-10-13 11:27:09 +000078
79class StatsRequestHandler(BaseHTTPRequestHandler):
80 """Example HTTP request handler which returns SSL statistics on GET
81 requests.
82 """
83
84 server_version = "StatsHTTPS/1.0"
85
86 def do_GET(self, send_body=True):
87 """Serve a GET request."""
88 sock = self.rfile.raw._sock
89 context = sock.context
90 body = pprint.pformat(context.session_stats())
91 body = body.encode('utf-8')
92 self.send_response(200)
93 self.send_header("Content-type", "text/plain; charset=utf-8")
94 self.send_header("Content-Length", str(len(body)))
95 self.end_headers()
96 if send_body:
97 self.wfile.write(body)
98
99 def do_HEAD(self):
100 """Serve a HEAD request."""
101 self.do_GET(send_body=False)
102
103 def log_request(self, format, *args):
104 if support.verbose:
105 BaseHTTPRequestHandler.log_request(self, format, *args)
106
107
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000108class HTTPSServerThread(threading.Thread):
109
110 def __init__(self, context, host=HOST, handler_class=None):
111 self.flag = None
112 self.server = HTTPSServer((host, 0),
113 handler_class or RootedHTTPRequestHandler,
114 context)
115 self.port = self.server.server_port
116 threading.Thread.__init__(self)
117 self.daemon = True
118
119 def __str__(self):
120 return "<%s %s>" % (self.__class__.__name__, self.server)
121
122 def start(self, flag=None):
123 self.flag = flag
124 threading.Thread.start(self)
125
126 def run(self):
127 if self.flag:
128 self.flag.set()
129 self.server.serve_forever(0.05)
130
131 def stop(self):
132 self.server.shutdown()
133
134
135def make_https_server(case, certfile=CERTFILE, host=HOST, handler_class=None):
136 # we assume the certfile contains both private key and certificate
137 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
138 context.load_cert_chain(certfile)
139 server = HTTPSServerThread(context, host, handler_class)
140 flag = threading.Event()
141 server.start(flag)
142 flag.wait()
143 def cleanup():
144 if support.verbose:
145 sys.stdout.write('stopping HTTPS server\n')
146 server.stop()
147 if support.verbose:
148 sys.stdout.write('joining HTTPS thread\n')
149 server.join()
150 case.addCleanup(cleanup)
151 return server
Antoine Pitrouf26f87e2010-10-13 11:27:09 +0000152
153
154if __name__ == "__main__":
155 import argparse
156 parser = argparse.ArgumentParser(
157 description='Run a test HTTPS server. '
158 'By default, the current directory is served.')
159 parser.add_argument('-p', '--port', type=int, default=4433,
160 help='port to listen on (default: %(default)s)')
161 parser.add_argument('-q', '--quiet', dest='verbose', default=True,
162 action='store_false', help='be less verbose')
163 parser.add_argument('-s', '--stats', dest='use_stats_handler', default=False,
164 action='store_true', help='always return stats page')
165 args = parser.parse_args()
166
167 support.verbose = args.verbose
168 if args.use_stats_handler:
169 handler_class = StatsRequestHandler
170 else:
171 handler_class = RootedHTTPRequestHandler
172 handler_class.root = os.getcwd()
173 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
174 context.load_cert_chain(CERTFILE)
175
176 server = HTTPSServer(("", args.port), handler_class, context)
177 server.serve_forever(0.1)