blob: 737ecbd2b16d7872590e4fa0caa6d2ac0538acbf [file] [log] [blame]
Guido van Rossumcd16bf62007-06-13 18:07:49 +00001#!/usr/bin/env python
2
3import sys
4import threading
5import urlparse
6import urllib2
7import BaseHTTPServer
8import unittest
9import hashlib
10from test import test_support
11
12# Loopback http server infrastructure
13
14class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
15 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
17 """
18
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseHTTPServer.HTTPServer.__init__(self,
21 server_address,
22 RequestHandlerClass)
23
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self.socket.settimeout(1.0)
27
28 def get_request(self):
29 """BaseHTTPServer method, overridden."""
30
31 request, client_address = self.socket.accept()
32
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request.settimeout(10.0)
37
38 return (request, client_address)
39
40class LoopbackHttpServerThread(threading.Thread):
41 """Stoppable thread that runs a loopback http server."""
42
43 def __init__(self, port, RequestHandlerClass):
44 threading.Thread.__init__(self)
45 self._RequestHandlerClass = RequestHandlerClass
46 self._stop = False
47 self._port = port
48 self._server_address = ('127.0.0.1', self._port)
49 self.ready = threading.Event()
50 self.error = None
51
52 def stop(self):
53 """Stops the webserver if it's currently running."""
54
55 # Set the stop flag.
56 self._stop = True
57
58 self.join()
59
60 def run(self):
61 protocol = "HTTP/1.0"
62
63 try:
64 self._RequestHandlerClass.protocol_version = protocol
65 httpd = LoopbackHttpServer(self._server_address,
66 self._RequestHandlerClass)
67
68 sa = httpd.socket.getsockname()
69 #print "Serving HTTP on", sa[0], "port", sa[1], "..."
70 except:
71 # Fail "gracefully" if we are unable to start.
72 self.ready.set()
73 self.error = sys.exc_info()[1]
74 raise
75
76 self.ready.set()
77 while not self._stop:
78 httpd.handle_request()
79
80# Authentication infrastructure
81
82class DigestAuthHandler:
83 """Handler for performing digest authentication."""
84
85 def __init__(self):
86 self._request_num = 0
87 self._nonces = []
88 self._users = {}
89 self._realm_name = "Test Realm"
90 self._qop = "auth"
91
92 def set_qop(self, qop):
93 self._qop = qop
94
95 def set_users(self, users):
96 assert isinstance(users, dict)
97 self._users = users
98
99 def set_realm(self, realm):
100 self._realm_name = realm
101
102 def _generate_nonce(self):
103 self._request_num += 1
104 nonce = hashlib.md5(str(self._request_num)).hexdigest()
105 self._nonces.append(nonce)
106 return nonce
107
108 def _create_auth_dict(self, auth_str):
109 first_space_index = auth_str.find(" ")
110 auth_str = auth_str[first_space_index+1:]
111
112 parts = auth_str.split(",")
113
114 auth_dict = {}
115 for part in parts:
116 name, value = part.split("=")
117 name = name.strip()
118 if value[0] == '"' and value[-1] == '"':
119 value = value[1:-1]
120 else:
121 value = value.strip()
122 auth_dict[name] = value
123 return auth_dict
124
125 def _validate_auth(self, auth_dict, password, method, uri):
126 final_dict = {}
127 final_dict.update(auth_dict)
128 final_dict["password"] = password
129 final_dict["method"] = method
130 final_dict["uri"] = uri
131 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
132 HA1 = hashlib.md5(HA1_str).hexdigest()
133 HA2_str = "%(method)s:%(uri)s" % final_dict
134 HA2 = hashlib.md5(HA2_str).hexdigest()
135 final_dict["HA1"] = HA1
136 final_dict["HA2"] = HA2
137 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
138 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
139 response = hashlib.md5(response_str).hexdigest()
140
141 return response == auth_dict["response"]
142
143 def _return_auth_challenge(self, request_handler):
144 request_handler.send_response(407, "Proxy Authentication Required")
145 request_handler.send_header("Content-Type", "text/html")
146 request_handler.send_header(
147 'Proxy-Authenticate', 'Digest realm="%s", '
148 'qop="%s",'
149 'nonce="%s", ' % \
150 (self._realm_name, self._qop, self._generate_nonce()))
151 # XXX: Not sure if we're supposed to add this next header or
152 # not.
153 #request_handler.send_header('Connection', 'close')
154 request_handler.end_headers()
155 request_handler.wfile.write("Proxy Authentication Required.")
156 return False
157
158 def handle_request(self, request_handler):
159 """Performs digest authentication on the given HTTP request
160 handler. Returns True if authentication was successful, False
161 otherwise.
162
163 If no users have been set, then digest auth is effectively
164 disabled and this method will always return True.
165 """
166
167 if len(self._users) == 0:
168 return True
169
170 if 'Proxy-Authorization' not in request_handler.headers:
171 return self._return_auth_challenge(request_handler)
172 else:
173 auth_dict = self._create_auth_dict(
174 request_handler.headers['Proxy-Authorization']
175 )
176 if auth_dict["username"] in self._users:
177 password = self._users[ auth_dict["username"] ]
178 else:
179 return self._return_auth_challenge(request_handler)
180 if not auth_dict.get("nonce") in self._nonces:
181 return self._return_auth_challenge(request_handler)
182 else:
183 self._nonces.remove(auth_dict["nonce"])
184
185 auth_validated = False
186
187 # MSIE uses short_path in its validation, but Python's
188 # urllib2 uses the full path, so we're going to see if
189 # either of them works here.
190
191 for path in [request_handler.path, request_handler.short_path]:
192 if self._validate_auth(auth_dict,
193 password,
194 request_handler.command,
195 path):
196 auth_validated = True
197
198 if not auth_validated:
199 return self._return_auth_challenge(request_handler)
200 return True
201
202# Proxy test infrastructure
203
204class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
205 """This is a 'fake proxy' that makes it look like the entire
206 internet has gone down due to a sudden zombie invasion. It main
207 utility is in providing us with authentication support for
208 testing.
209 """
210
211 digest_auth_handler = DigestAuthHandler()
212
213 def log_message(self, format, *args):
214 # Uncomment the next line for debugging.
215 #sys.stderr.write(format % args)
216 pass
217
218 def do_GET(self):
219 (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
220 self.path, 'http')
221 self.short_path = path
222 if self.digest_auth_handler.handle_request(self):
223 self.send_response(200, "OK")
224 self.send_header("Content-Type", "text/html")
225 self.end_headers()
226 self.wfile.write("You've reached %s!<BR>" % self.path)
227 self.wfile.write("Our apologies, but our server is down due to "
228 "a sudden zombie invasion.")
229
230# Test cases
231
232class ProxyAuthTests(unittest.TestCase):
233 URL = "http://www.foo.com"
234
235 PORT = 8080
236 USER = "tester"
237 PASSWD = "test123"
238 REALM = "TestRealm"
239
240 PROXY_URL = "http://127.0.0.1:%d" % PORT
241
242 def setUp(self):
243 FakeProxyHandler.digest_auth_handler.set_users({
244 self.USER : self.PASSWD
245 })
246 FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
247
248 self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler)
249 self.server.start()
250 self.server.ready.wait()
251 if self.server.error:
252 raise self.server.error
253
254 handler = urllib2.ProxyHandler({"http" : self.PROXY_URL})
255 self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
256 self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
257
258 def tearDown(self):
259 self.server.stop()
260
261 def test_proxy_with_bad_password_raises_httperror(self):
262 self._digest_auth_handler.add_password(self.REALM, self.URL,
263 self.USER, self.PASSWD+"bad")
264 FakeProxyHandler.digest_auth_handler.set_qop("auth")
265 self.assertRaises(urllib2.HTTPError,
266 self.opener.open,
267 self.URL)
268
269 def test_proxy_with_no_password_raises_httperror(self):
270 FakeProxyHandler.digest_auth_handler.set_qop("auth")
271 self.assertRaises(urllib2.HTTPError,
272 self.opener.open,
273 self.URL)
274
275 def test_proxy_qop_auth_works(self):
276 self._digest_auth_handler.add_password(self.REALM, self.URL,
277 self.USER, self.PASSWD)
278 FakeProxyHandler.digest_auth_handler.set_qop("auth")
279 result = self.opener.open(self.URL)
280 while result.read():
281 pass
282 result.close()
283
284 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
285 self._digest_auth_handler.add_password(self.REALM, self.URL,
286 self.USER, self.PASSWD)
287 FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
288 try:
289 result = self.opener.open(self.URL)
290 except urllib2.URLError:
291 # It's okay if we don't support auth-int, but we certainly
292 # shouldn't receive any kind of exception here other than
293 # a URLError.
294 result = None
295 if result:
296 while result.read():
297 pass
298 result.close()
299
300def test_main():
301 # We will NOT depend on the network resource flag
302 # (Lib/test/regrtest.py -u network) since all tests here are only
303 # localhost. However, if this is a bad rationale, then uncomment
304 # the next line.
305 #test_support.requires("network")
306
307 test_support.run_unittest(ProxyAuthTests)
308
309if __name__ == "__main__":
310 test_main()