blob: 663330ba93faf31b27fe7fecade8f4560727e5d8 [file] [log] [blame]
Guido van Rossumcd16bf62007-06-13 18:07:49 +00001#!/usr/bin/env python
2
Guido van Rossumcd16bf62007-06-13 18:07:49 +00003import threading
4import urlparse
5import urllib2
6import BaseHTTPServer
7import unittest
8import hashlib
9from test import test_support
10
11# Loopback http server infrastructure
12
13class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
14 """HTTP server w/ a few modifications that make it useful for
15 loopback testing purposes.
16 """
17
18 def __init__(self, server_address, RequestHandlerClass):
19 BaseHTTPServer.HTTPServer.__init__(self,
20 server_address,
21 RequestHandlerClass)
22
23 # Set the timeout of our listening socket really low so
24 # that we can stop the server easily.
25 self.socket.settimeout(1.0)
26
27 def get_request(self):
28 """BaseHTTPServer method, overridden."""
29
30 request, client_address = self.socket.accept()
31
32 # It's a loopback connection, so setting the timeout
33 # really low shouldn't affect anything, but should make
34 # deadlocks less likely to occur.
35 request.settimeout(10.0)
36
37 return (request, client_address)
38
39class LoopbackHttpServerThread(threading.Thread):
40 """Stoppable thread that runs a loopback http server."""
41
Guido van Rossum806c2462007-08-06 23:33:07 +000042 def __init__(self, request_handler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000043 threading.Thread.__init__(self)
Guido van Rossum4566c712007-08-21 03:36:47 +000044 self._stop_server = False
Guido van Rossumcd16bf62007-06-13 18:07:49 +000045 self.ready = threading.Event()
Guido van Rossum806c2462007-08-06 23:33:07 +000046 request_handler.protocol_version = "HTTP/1.0"
47 self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
48 request_handler)
49 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
50 # self.httpd.server_port)
51 self.port = self.httpd.server_port
Guido van Rossumcd16bf62007-06-13 18:07:49 +000052
53 def stop(self):
54 """Stops the webserver if it's currently running."""
55
56 # Set the stop flag.
Guido van Rossum4566c712007-08-21 03:36:47 +000057 self._stop_server = True
Guido van Rossumcd16bf62007-06-13 18:07:49 +000058
59 self.join()
60
61 def run(self):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000062 self.ready.set()
Guido van Rossum4566c712007-08-21 03:36:47 +000063 while not self._stop_server:
Guido van Rossum806c2462007-08-06 23:33:07 +000064 self.httpd.handle_request()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000065
66# Authentication infrastructure
67
68class DigestAuthHandler:
69 """Handler for performing digest authentication."""
70
71 def __init__(self):
72 self._request_num = 0
73 self._nonces = []
74 self._users = {}
75 self._realm_name = "Test Realm"
76 self._qop = "auth"
77
78 def set_qop(self, qop):
79 self._qop = qop
80
81 def set_users(self, users):
82 assert isinstance(users, dict)
83 self._users = users
84
85 def set_realm(self, realm):
86 self._realm_name = realm
87
88 def _generate_nonce(self):
89 self._request_num += 1
Guido van Rossum81360142007-08-29 14:26:52 +000090 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000091 self._nonces.append(nonce)
92 return nonce
93
94 def _create_auth_dict(self, auth_str):
95 first_space_index = auth_str.find(" ")
96 auth_str = auth_str[first_space_index+1:]
97
98 parts = auth_str.split(",")
99
100 auth_dict = {}
101 for part in parts:
102 name, value = part.split("=")
103 name = name.strip()
104 if value[0] == '"' and value[-1] == '"':
105 value = value[1:-1]
106 else:
107 value = value.strip()
108 auth_dict[name] = value
109 return auth_dict
110
111 def _validate_auth(self, auth_dict, password, method, uri):
112 final_dict = {}
113 final_dict.update(auth_dict)
114 final_dict["password"] = password
115 final_dict["method"] = method
116 final_dict["uri"] = uri
117 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000118 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000119 HA2_str = "%(method)s:%(uri)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000120 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000121 final_dict["HA1"] = HA1
122 final_dict["HA2"] = HA2
123 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
124 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000125 response = hashlib.md5(response_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000126
127 return response == auth_dict["response"]
128
129 def _return_auth_challenge(self, request_handler):
130 request_handler.send_response(407, "Proxy Authentication Required")
131 request_handler.send_header("Content-Type", "text/html")
132 request_handler.send_header(
133 'Proxy-Authenticate', 'Digest realm="%s", '
134 'qop="%s",'
135 'nonce="%s", ' % \
136 (self._realm_name, self._qop, self._generate_nonce()))
137 # XXX: Not sure if we're supposed to add this next header or
138 # not.
139 #request_handler.send_header('Connection', 'close')
140 request_handler.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000141 request_handler.wfile.write(b"Proxy Authentication Required.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000142 return False
143
144 def handle_request(self, request_handler):
145 """Performs digest authentication on the given HTTP request
146 handler. Returns True if authentication was successful, False
147 otherwise.
148
149 If no users have been set, then digest auth is effectively
150 disabled and this method will always return True.
151 """
152
153 if len(self._users) == 0:
154 return True
155
156 if 'Proxy-Authorization' not in request_handler.headers:
157 return self._return_auth_challenge(request_handler)
158 else:
159 auth_dict = self._create_auth_dict(
160 request_handler.headers['Proxy-Authorization']
161 )
162 if auth_dict["username"] in self._users:
163 password = self._users[ auth_dict["username"] ]
164 else:
165 return self._return_auth_challenge(request_handler)
166 if not auth_dict.get("nonce") in self._nonces:
167 return self._return_auth_challenge(request_handler)
168 else:
169 self._nonces.remove(auth_dict["nonce"])
170
171 auth_validated = False
172
173 # MSIE uses short_path in its validation, but Python's
174 # urllib2 uses the full path, so we're going to see if
175 # either of them works here.
176
177 for path in [request_handler.path, request_handler.short_path]:
178 if self._validate_auth(auth_dict,
179 password,
180 request_handler.command,
181 path):
182 auth_validated = True
183
184 if not auth_validated:
185 return self._return_auth_challenge(request_handler)
186 return True
187
188# Proxy test infrastructure
189
190class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
191 """This is a 'fake proxy' that makes it look like the entire
192 internet has gone down due to a sudden zombie invasion. It main
193 utility is in providing us with authentication support for
194 testing.
195 """
196
197 digest_auth_handler = DigestAuthHandler()
198
199 def log_message(self, format, *args):
200 # Uncomment the next line for debugging.
201 #sys.stderr.write(format % args)
202 pass
203
204 def do_GET(self):
205 (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
206 self.path, 'http')
207 self.short_path = path
208 if self.digest_auth_handler.handle_request(self):
209 self.send_response(200, "OK")
210 self.send_header("Content-Type", "text/html")
211 self.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000212 self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
213 "ascii"))
214 self.wfile.write(b"Our apologies, but our server is down due to "
215 b"a sudden zombie invasion.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000216
217# Test cases
218
219class ProxyAuthTests(unittest.TestCase):
220 URL = "http://www.foo.com"
221
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000222 USER = "tester"
223 PASSWD = "test123"
224 REALM = "TestRealm"
225
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000226 def setUp(self):
227 FakeProxyHandler.digest_auth_handler.set_users({
228 self.USER : self.PASSWD
229 })
230 FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
231
Guido van Rossum806c2462007-08-06 23:33:07 +0000232 self.server = LoopbackHttpServerThread(FakeProxyHandler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000233 self.server.start()
234 self.server.ready.wait()
Guido van Rossum806c2462007-08-06 23:33:07 +0000235 proxy_url = "http://127.0.0.1:%d" % self.server.port
236 handler = urllib2.ProxyHandler({"http" : proxy_url})
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000237 self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
238 self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
239
240 def tearDown(self):
241 self.server.stop()
242
243 def test_proxy_with_bad_password_raises_httperror(self):
244 self._digest_auth_handler.add_password(self.REALM, self.URL,
245 self.USER, self.PASSWD+"bad")
246 FakeProxyHandler.digest_auth_handler.set_qop("auth")
247 self.assertRaises(urllib2.HTTPError,
248 self.opener.open,
249 self.URL)
250
251 def test_proxy_with_no_password_raises_httperror(self):
252 FakeProxyHandler.digest_auth_handler.set_qop("auth")
253 self.assertRaises(urllib2.HTTPError,
254 self.opener.open,
255 self.URL)
256
257 def test_proxy_qop_auth_works(self):
258 self._digest_auth_handler.add_password(self.REALM, self.URL,
259 self.USER, self.PASSWD)
260 FakeProxyHandler.digest_auth_handler.set_qop("auth")
261 result = self.opener.open(self.URL)
262 while result.read():
263 pass
264 result.close()
265
266 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
267 self._digest_auth_handler.add_password(self.REALM, self.URL,
268 self.USER, self.PASSWD)
269 FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
270 try:
271 result = self.opener.open(self.URL)
272 except urllib2.URLError:
273 # It's okay if we don't support auth-int, but we certainly
274 # shouldn't receive any kind of exception here other than
275 # a URLError.
276 result = None
277 if result:
278 while result.read():
279 pass
280 result.close()
281
282def test_main():
283 # We will NOT depend on the network resource flag
284 # (Lib/test/regrtest.py -u network) since all tests here are only
285 # localhost. However, if this is a bad rationale, then uncomment
286 # the next line.
287 #test_support.requires("network")
288
289 test_support.run_unittest(ProxyAuthTests)
290
291if __name__ == "__main__":
292 test_main()