blob: a126262140fc533a9abb2989ad7b495c05012ba0 [file] [log] [blame]
Guido van Rossumcd16bf62007-06-13 18:07:49 +00001#!/usr/bin/env python
2
3import sys
4import threading
5import urlparse
6import urllib2
7import BaseHTTPServer
8import unittest
9import hashlib
10from test import test_support
11
12# Loopback http server infrastructure
13
14class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
15 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
17 """
18
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseHTTPServer.HTTPServer.__init__(self,
21 server_address,
22 RequestHandlerClass)
23
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self.socket.settimeout(1.0)
27
28 def get_request(self):
29 """BaseHTTPServer method, overridden."""
30
31 request, client_address = self.socket.accept()
32
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request.settimeout(10.0)
37
38 return (request, client_address)
39
40class LoopbackHttpServerThread(threading.Thread):
41 """Stoppable thread that runs a loopback http server."""
42
Guido van Rossum806c2462007-08-06 23:33:07 +000043 def __init__(self, request_handler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000044 threading.Thread.__init__(self)
Guido van Rossumcd16bf62007-06-13 18:07:49 +000045 self._stop = False
Guido van Rossumcd16bf62007-06-13 18:07:49 +000046 self.ready = threading.Event()
Guido van Rossum806c2462007-08-06 23:33:07 +000047 request_handler.protocol_version = "HTTP/1.0"
48 self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
49 request_handler)
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51 # self.httpd.server_port)
52 self.port = self.httpd.server_port
Guido van Rossumcd16bf62007-06-13 18:07:49 +000053
54 def stop(self):
55 """Stops the webserver if it's currently running."""
56
57 # Set the stop flag.
58 self._stop = True
59
60 self.join()
61
62 def run(self):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000063 self.ready.set()
64 while not self._stop:
Guido van Rossum806c2462007-08-06 23:33:07 +000065 self.httpd.handle_request()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000066
67# Authentication infrastructure
68
69class DigestAuthHandler:
70 """Handler for performing digest authentication."""
71
72 def __init__(self):
73 self._request_num = 0
74 self._nonces = []
75 self._users = {}
76 self._realm_name = "Test Realm"
77 self._qop = "auth"
78
79 def set_qop(self, qop):
80 self._qop = qop
81
82 def set_users(self, users):
83 assert isinstance(users, dict)
84 self._users = users
85
86 def set_realm(self, realm):
87 self._realm_name = realm
88
89 def _generate_nonce(self):
90 self._request_num += 1
91 nonce = hashlib.md5(str(self._request_num)).hexdigest()
92 self._nonces.append(nonce)
93 return nonce
94
95 def _create_auth_dict(self, auth_str):
96 first_space_index = auth_str.find(" ")
97 auth_str = auth_str[first_space_index+1:]
98
99 parts = auth_str.split(",")
100
101 auth_dict = {}
102 for part in parts:
103 name, value = part.split("=")
104 name = name.strip()
105 if value[0] == '"' and value[-1] == '"':
106 value = value[1:-1]
107 else:
108 value = value.strip()
109 auth_dict[name] = value
110 return auth_dict
111
112 def _validate_auth(self, auth_dict, password, method, uri):
113 final_dict = {}
114 final_dict.update(auth_dict)
115 final_dict["password"] = password
116 final_dict["method"] = method
117 final_dict["uri"] = uri
118 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
119 HA1 = hashlib.md5(HA1_str).hexdigest()
120 HA2_str = "%(method)s:%(uri)s" % final_dict
121 HA2 = hashlib.md5(HA2_str).hexdigest()
122 final_dict["HA1"] = HA1
123 final_dict["HA2"] = HA2
124 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
126 response = hashlib.md5(response_str).hexdigest()
127
128 return response == auth_dict["response"]
129
130 def _return_auth_challenge(self, request_handler):
131 request_handler.send_response(407, "Proxy Authentication Required")
132 request_handler.send_header("Content-Type", "text/html")
133 request_handler.send_header(
134 'Proxy-Authenticate', 'Digest realm="%s", '
135 'qop="%s",'
136 'nonce="%s", ' % \
137 (self._realm_name, self._qop, self._generate_nonce()))
138 # XXX: Not sure if we're supposed to add this next header or
139 # not.
140 #request_handler.send_header('Connection', 'close')
141 request_handler.end_headers()
142 request_handler.wfile.write("Proxy Authentication Required.")
143 return False
144
145 def handle_request(self, request_handler):
146 """Performs digest authentication on the given HTTP request
147 handler. Returns True if authentication was successful, False
148 otherwise.
149
150 If no users have been set, then digest auth is effectively
151 disabled and this method will always return True.
152 """
153
154 if len(self._users) == 0:
155 return True
156
157 if 'Proxy-Authorization' not in request_handler.headers:
158 return self._return_auth_challenge(request_handler)
159 else:
160 auth_dict = self._create_auth_dict(
161 request_handler.headers['Proxy-Authorization']
162 )
163 if auth_dict["username"] in self._users:
164 password = self._users[ auth_dict["username"] ]
165 else:
166 return self._return_auth_challenge(request_handler)
167 if not auth_dict.get("nonce") in self._nonces:
168 return self._return_auth_challenge(request_handler)
169 else:
170 self._nonces.remove(auth_dict["nonce"])
171
172 auth_validated = False
173
174 # MSIE uses short_path in its validation, but Python's
175 # urllib2 uses the full path, so we're going to see if
176 # either of them works here.
177
178 for path in [request_handler.path, request_handler.short_path]:
179 if self._validate_auth(auth_dict,
180 password,
181 request_handler.command,
182 path):
183 auth_validated = True
184
185 if not auth_validated:
186 return self._return_auth_challenge(request_handler)
187 return True
188
189# Proxy test infrastructure
190
191class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
192 """This is a 'fake proxy' that makes it look like the entire
193 internet has gone down due to a sudden zombie invasion. It main
194 utility is in providing us with authentication support for
195 testing.
196 """
197
198 digest_auth_handler = DigestAuthHandler()
199
200 def log_message(self, format, *args):
201 # Uncomment the next line for debugging.
202 #sys.stderr.write(format % args)
203 pass
204
205 def do_GET(self):
206 (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
207 self.path, 'http')
208 self.short_path = path
209 if self.digest_auth_handler.handle_request(self):
210 self.send_response(200, "OK")
211 self.send_header("Content-Type", "text/html")
212 self.end_headers()
213 self.wfile.write("You've reached %s!<BR>" % self.path)
214 self.wfile.write("Our apologies, but our server is down due to "
215 "a sudden zombie invasion.")
216
217# Test cases
218
219class ProxyAuthTests(unittest.TestCase):
220 URL = "http://www.foo.com"
221
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000222 USER = "tester"
223 PASSWD = "test123"
224 REALM = "TestRealm"
225
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000226 def setUp(self):
227 FakeProxyHandler.digest_auth_handler.set_users({
228 self.USER : self.PASSWD
229 })
230 FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
231
Guido van Rossum806c2462007-08-06 23:33:07 +0000232 self.server = LoopbackHttpServerThread(FakeProxyHandler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000233 self.server.start()
234 self.server.ready.wait()
Guido van Rossum806c2462007-08-06 23:33:07 +0000235 proxy_url = "http://127.0.0.1:%d" % self.server.port
236 handler = urllib2.ProxyHandler({"http" : proxy_url})
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000237 self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
238 self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
239
240 def tearDown(self):
241 self.server.stop()
242
243 def test_proxy_with_bad_password_raises_httperror(self):
244 self._digest_auth_handler.add_password(self.REALM, self.URL,
245 self.USER, self.PASSWD+"bad")
246 FakeProxyHandler.digest_auth_handler.set_qop("auth")
247 self.assertRaises(urllib2.HTTPError,
248 self.opener.open,
249 self.URL)
250
251 def test_proxy_with_no_password_raises_httperror(self):
252 FakeProxyHandler.digest_auth_handler.set_qop("auth")
253 self.assertRaises(urllib2.HTTPError,
254 self.opener.open,
255 self.URL)
256
257 def test_proxy_qop_auth_works(self):
258 self._digest_auth_handler.add_password(self.REALM, self.URL,
259 self.USER, self.PASSWD)
260 FakeProxyHandler.digest_auth_handler.set_qop("auth")
261 result = self.opener.open(self.URL)
262 while result.read():
263 pass
264 result.close()
265
266 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
267 self._digest_auth_handler.add_password(self.REALM, self.URL,
268 self.USER, self.PASSWD)
269 FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
270 try:
271 result = self.opener.open(self.URL)
272 except urllib2.URLError:
273 # It's okay if we don't support auth-int, but we certainly
274 # shouldn't receive any kind of exception here other than
275 # a URLError.
276 result = None
277 if result:
278 while result.read():
279 pass
280 result.close()
281
282def test_main():
283 # We will NOT depend on the network resource flag
284 # (Lib/test/regrtest.py -u network) since all tests here are only
285 # localhost. However, if this is a bad rationale, then uncomment
286 # the next line.
287 #test_support.requires("network")
288
289 test_support.run_unittest(ProxyAuthTests)
290
291if __name__ == "__main__":
292 test_main()