blob: 6bf1820f22cd5eefd0ac290a9e7f1e4ae7f4fc78 [file] [log] [blame]
Guido van Rossumcd16bf62007-06-13 18:07:49 +00001#!/usr/bin/env python
2
Christian Heimesbbe741d2008-03-28 10:53:29 +00003import mimetools
Guido van Rossumcd16bf62007-06-13 18:07:49 +00004import threading
5import urlparse
6import urllib2
7import BaseHTTPServer
8import unittest
9import hashlib
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010from test import support
Guido van Rossumcd16bf62007-06-13 18:07:49 +000011
12# Loopback http server infrastructure
13
14class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
15 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
17 """
18
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseHTTPServer.HTTPServer.__init__(self,
21 server_address,
22 RequestHandlerClass)
23
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self.socket.settimeout(1.0)
27
28 def get_request(self):
29 """BaseHTTPServer method, overridden."""
30
31 request, client_address = self.socket.accept()
32
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request.settimeout(10.0)
37
38 return (request, client_address)
39
40class LoopbackHttpServerThread(threading.Thread):
41 """Stoppable thread that runs a loopback http server."""
42
Guido van Rossum806c2462007-08-06 23:33:07 +000043 def __init__(self, request_handler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000044 threading.Thread.__init__(self)
Guido van Rossum4566c712007-08-21 03:36:47 +000045 self._stop_server = False
Guido van Rossumcd16bf62007-06-13 18:07:49 +000046 self.ready = threading.Event()
Guido van Rossum806c2462007-08-06 23:33:07 +000047 request_handler.protocol_version = "HTTP/1.0"
48 self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
49 request_handler)
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51 # self.httpd.server_port)
52 self.port = self.httpd.server_port
Guido van Rossumcd16bf62007-06-13 18:07:49 +000053
54 def stop(self):
55 """Stops the webserver if it's currently running."""
56
57 # Set the stop flag.
Guido van Rossum4566c712007-08-21 03:36:47 +000058 self._stop_server = True
Guido van Rossumcd16bf62007-06-13 18:07:49 +000059
60 self.join()
61
62 def run(self):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000063 self.ready.set()
Guido van Rossum4566c712007-08-21 03:36:47 +000064 while not self._stop_server:
Guido van Rossum806c2462007-08-06 23:33:07 +000065 self.httpd.handle_request()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000066
67# Authentication infrastructure
68
69class DigestAuthHandler:
70 """Handler for performing digest authentication."""
71
72 def __init__(self):
73 self._request_num = 0
74 self._nonces = []
75 self._users = {}
76 self._realm_name = "Test Realm"
77 self._qop = "auth"
78
79 def set_qop(self, qop):
80 self._qop = qop
81
82 def set_users(self, users):
83 assert isinstance(users, dict)
84 self._users = users
85
86 def set_realm(self, realm):
87 self._realm_name = realm
88
89 def _generate_nonce(self):
90 self._request_num += 1
Guido van Rossum81360142007-08-29 14:26:52 +000091 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000092 self._nonces.append(nonce)
93 return nonce
94
95 def _create_auth_dict(self, auth_str):
96 first_space_index = auth_str.find(" ")
97 auth_str = auth_str[first_space_index+1:]
98
99 parts = auth_str.split(",")
100
101 auth_dict = {}
102 for part in parts:
103 name, value = part.split("=")
104 name = name.strip()
105 if value[0] == '"' and value[-1] == '"':
106 value = value[1:-1]
107 else:
108 value = value.strip()
109 auth_dict[name] = value
110 return auth_dict
111
112 def _validate_auth(self, auth_dict, password, method, uri):
113 final_dict = {}
114 final_dict.update(auth_dict)
115 final_dict["password"] = password
116 final_dict["method"] = method
117 final_dict["uri"] = uri
118 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000119 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000120 HA2_str = "%(method)s:%(uri)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000121 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000122 final_dict["HA1"] = HA1
123 final_dict["HA2"] = HA2
124 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000126 response = hashlib.md5(response_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000127
128 return response == auth_dict["response"]
129
130 def _return_auth_challenge(self, request_handler):
131 request_handler.send_response(407, "Proxy Authentication Required")
132 request_handler.send_header("Content-Type", "text/html")
133 request_handler.send_header(
134 'Proxy-Authenticate', 'Digest realm="%s", '
135 'qop="%s",'
136 'nonce="%s", ' % \
137 (self._realm_name, self._qop, self._generate_nonce()))
138 # XXX: Not sure if we're supposed to add this next header or
139 # not.
140 #request_handler.send_header('Connection', 'close')
141 request_handler.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000142 request_handler.wfile.write(b"Proxy Authentication Required.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000143 return False
144
145 def handle_request(self, request_handler):
146 """Performs digest authentication on the given HTTP request
147 handler. Returns True if authentication was successful, False
148 otherwise.
149
150 If no users have been set, then digest auth is effectively
151 disabled and this method will always return True.
152 """
153
154 if len(self._users) == 0:
155 return True
156
157 if 'Proxy-Authorization' not in request_handler.headers:
158 return self._return_auth_challenge(request_handler)
159 else:
160 auth_dict = self._create_auth_dict(
161 request_handler.headers['Proxy-Authorization']
162 )
163 if auth_dict["username"] in self._users:
164 password = self._users[ auth_dict["username"] ]
165 else:
166 return self._return_auth_challenge(request_handler)
167 if not auth_dict.get("nonce") in self._nonces:
168 return self._return_auth_challenge(request_handler)
169 else:
170 self._nonces.remove(auth_dict["nonce"])
171
172 auth_validated = False
173
174 # MSIE uses short_path in its validation, but Python's
175 # urllib2 uses the full path, so we're going to see if
176 # either of them works here.
177
178 for path in [request_handler.path, request_handler.short_path]:
179 if self._validate_auth(auth_dict,
180 password,
181 request_handler.command,
182 path):
183 auth_validated = True
184
185 if not auth_validated:
186 return self._return_auth_challenge(request_handler)
187 return True
188
189# Proxy test infrastructure
190
191class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
192 """This is a 'fake proxy' that makes it look like the entire
193 internet has gone down due to a sudden zombie invasion. It main
194 utility is in providing us with authentication support for
195 testing.
196 """
197
198 digest_auth_handler = DigestAuthHandler()
199
200 def log_message(self, format, *args):
201 # Uncomment the next line for debugging.
202 #sys.stderr.write(format % args)
203 pass
204
205 def do_GET(self):
206 (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
207 self.path, 'http')
208 self.short_path = path
209 if self.digest_auth_handler.handle_request(self):
210 self.send_response(200, "OK")
211 self.send_header("Content-Type", "text/html")
212 self.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000213 self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
214 "ascii"))
215 self.wfile.write(b"Our apologies, but our server is down due to "
216 b"a sudden zombie invasion.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000217
218# Test cases
219
220class ProxyAuthTests(unittest.TestCase):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000221 URL = "http://localhost"
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000222
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000223 USER = "tester"
224 PASSWD = "test123"
225 REALM = "TestRealm"
226
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000227 def setUp(self):
228 FakeProxyHandler.digest_auth_handler.set_users({
229 self.USER : self.PASSWD
230 })
231 FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
232
Guido van Rossum806c2462007-08-06 23:33:07 +0000233 self.server = LoopbackHttpServerThread(FakeProxyHandler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000234 self.server.start()
235 self.server.ready.wait()
Guido van Rossum806c2462007-08-06 23:33:07 +0000236 proxy_url = "http://127.0.0.1:%d" % self.server.port
237 handler = urllib2.ProxyHandler({"http" : proxy_url})
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000238 self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
239 self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
240
241 def tearDown(self):
242 self.server.stop()
243
244 def test_proxy_with_bad_password_raises_httperror(self):
245 self._digest_auth_handler.add_password(self.REALM, self.URL,
246 self.USER, self.PASSWD+"bad")
247 FakeProxyHandler.digest_auth_handler.set_qop("auth")
248 self.assertRaises(urllib2.HTTPError,
249 self.opener.open,
250 self.URL)
251
252 def test_proxy_with_no_password_raises_httperror(self):
253 FakeProxyHandler.digest_auth_handler.set_qop("auth")
254 self.assertRaises(urllib2.HTTPError,
255 self.opener.open,
256 self.URL)
257
258 def test_proxy_qop_auth_works(self):
259 self._digest_auth_handler.add_password(self.REALM, self.URL,
260 self.USER, self.PASSWD)
261 FakeProxyHandler.digest_auth_handler.set_qop("auth")
262 result = self.opener.open(self.URL)
263 while result.read():
264 pass
265 result.close()
266
267 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
268 self._digest_auth_handler.add_password(self.REALM, self.URL,
269 self.USER, self.PASSWD)
270 FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
271 try:
272 result = self.opener.open(self.URL)
273 except urllib2.URLError:
274 # It's okay if we don't support auth-int, but we certainly
275 # shouldn't receive any kind of exception here other than
276 # a URLError.
277 result = None
278 if result:
279 while result.read():
280 pass
281 result.close()
282
Christian Heimesbbe741d2008-03-28 10:53:29 +0000283
284def GetRequestHandler(responses):
285
286 class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
287
288 server_version = "TestHTTP/"
289 requests = []
290 headers_received = []
291 port = 80
292
293 def do_GET(self):
294 body = self.send_head()
295 if body:
296 self.wfile.write(body)
297
298 def do_POST(self):
299 content_length = self.headers['Content-Length']
300 post_data = self.rfile.read(int(content_length))
301 self.do_GET()
302 self.requests.append(post_data)
303
304 def send_head(self):
305 FakeHTTPRequestHandler.headers_received = self.headers
306 self.requests.append(self.path)
307 response_code, headers, body = responses.pop(0)
308
309 self.send_response(response_code)
310
311 for (header, value) in headers:
312 self.send_header(header, value % self.port)
313 if body:
314 self.send_header('Content-type', 'text/plain')
315 self.end_headers()
316 return body
317 self.end_headers()
318
319 def log_message(self, *args):
320 pass
321
322
323 return FakeHTTPRequestHandler
324
325
326class TestUrlopen(unittest.TestCase):
327 """Tests urllib2.urlopen using the network.
328
329 These tests are not exhaustive. Assuming that testing using files does a
330 good job overall of some of the basic interface features. There are no
331 tests exercising the optional 'data' and 'proxies' arguments. No tests
332 for transparent redirection have been written.
333 """
334
335 def start_server(self, responses):
336 handler = GetRequestHandler(responses)
337
338 self.server = LoopbackHttpServerThread(handler)
339 self.server.start()
340 self.server.ready.wait()
341 port = self.server.port
342 handler.port = port
343 return handler
344
345
346 def test_redirection(self):
347 expected_response = b'We got here...'
348 responses = [
349 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
350 (200, [], expected_response)
351 ]
352
353 handler = self.start_server(responses)
354
355 try:
356 f = urllib2.urlopen('http://localhost:%s/' % handler.port)
357 data = f.read()
358 f.close()
359
360 self.assertEquals(data, expected_response)
361 self.assertEquals(handler.requests, ['/', '/somewhere_else'])
362 finally:
363 self.server.stop()
364
365
366 def test_404(self):
367 expected_response = b'Bad bad bad...'
368 handler = self.start_server([(404, [], expected_response)])
369
370 try:
371 try:
372 urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
373 except urllib2.URLError as f:
374 data = f.read()
375 f.close()
376 else:
377 self.fail('404 should raise URLError')
378
379 self.assertEquals(data, expected_response)
380 self.assertEquals(handler.requests, ['/weeble'])
381 finally:
382 self.server.stop()
383
384
385 def test_200(self):
386 expected_response = b'pycon 2008...'
387 handler = self.start_server([(200, [], expected_response)])
388
389 try:
390 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
391 data = f.read()
392 f.close()
393
394 self.assertEquals(data, expected_response)
395 self.assertEquals(handler.requests, ['/bizarre'])
396 finally:
397 self.server.stop()
398
399 def test_200_with_parameters(self):
400 expected_response = b'pycon 2008...'
401 handler = self.start_server([(200, [], expected_response)])
402
403 try:
404 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, b'get=with_feeling')
405 data = f.read()
406 f.close()
407
408 self.assertEquals(data, expected_response)
409 self.assertEquals(handler.requests, ['/bizarre', b'get=with_feeling'])
410 finally:
411 self.server.stop()
412
413
414 def test_sending_headers(self):
415 handler = self.start_server([(200, [], b"we don't care")])
416
417 try:
418 req = urllib2.Request("http://localhost:%s/" % handler.port,
419 headers={'Range': 'bytes=20-39'})
420 urllib2.urlopen(req)
421 self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
422 finally:
423 self.server.stop()
424
425 def test_basic(self):
426 handler = self.start_server([(200, [], b"we don't care")])
427
428 try:
429 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
430 for attr in ("read", "close", "info", "geturl"):
431 self.assert_(hasattr(open_url, attr), "object returned from "
432 "urlopen lacks the %s attribute" % attr)
433 try:
434 self.assert_(open_url.read(), "calling 'read' failed")
435 finally:
436 open_url.close()
437 finally:
438 self.server.stop()
439
440 def test_info(self):
441 handler = self.start_server([(200, [], b"we don't care")])
442
443 try:
444 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
445 info_obj = open_url.info()
446 self.assert_(isinstance(info_obj, mimetools.Message),
447 "object returned by 'info' is not an instance of "
448 "mimetools.Message")
449 self.assertEqual(info_obj.getsubtype(), "plain")
450 finally:
451 self.server.stop()
452
453 def test_geturl(self):
454 # Make sure same URL as opened is returned by geturl.
455 handler = self.start_server([(200, [], b"we don't care")])
456
457 try:
458 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
459 url = open_url.geturl()
460 self.assertEqual(url, "http://localhost:%s" % handler.port)
461 finally:
462 self.server.stop()
463
464
465 def test_bad_address(self):
466 # Make sure proper exception is raised when connecting to a bogus
467 # address.
468 self.assertRaises(IOError,
469 # SF patch 809915: In Sep 2003, VeriSign started
470 # highjacking invalid .com and .net addresses to
471 # boost traffic to their own site. This test
472 # started failing then. One hopes the .invalid
473 # domain will be spared to serve its defined
474 # purpose.
475 # urllib2.urlopen, "http://www.sadflkjsasadf.com/")
476 urllib2.urlopen, "http://www.python.invalid./")
477
478
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000479def test_main():
480 # We will NOT depend on the network resource flag
481 # (Lib/test/regrtest.py -u network) since all tests here are only
482 # localhost. However, if this is a bad rationale, then uncomment
483 # the next line.
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000484 #support.requires("network")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000485
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000486 support.run_unittest(ProxyAuthTests)
487 support.run_unittest(TestUrlopen)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000488
489if __name__ == "__main__":
490 test_main()