blob: 5ffd779d412b062e7dde49e51800b4d267ad98f3 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Guido van Rossumcd16bf62007-06-13 18:07:49 +00002
Barry Warsaw820c1202008-06-12 04:06:45 +00003import email
Guido van Rossumcd16bf62007-06-13 18:07:49 +00004import threading
Jeremy Hylton1afc1692008-06-18 20:49:58 +00005import urllib.parse
6import urllib.request
Georg Brandl24420152008-05-26 16:32:26 +00007import http.server
Guido van Rossumcd16bf62007-06-13 18:07:49 +00008import unittest
9import hashlib
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010from test import support
Guido van Rossumcd16bf62007-06-13 18:07:49 +000011
12# Loopback http server infrastructure
13
Georg Brandl24420152008-05-26 16:32:26 +000014class LoopbackHttpServer(http.server.HTTPServer):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000015 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
17 """
18
19 def __init__(self, server_address, RequestHandlerClass):
Georg Brandl24420152008-05-26 16:32:26 +000020 http.server.HTTPServer.__init__(self,
21 server_address,
22 RequestHandlerClass)
Guido van Rossumcd16bf62007-06-13 18:07:49 +000023
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self.socket.settimeout(1.0)
27
28 def get_request(self):
Georg Brandl24420152008-05-26 16:32:26 +000029 """HTTPServer method, overridden."""
Guido van Rossumcd16bf62007-06-13 18:07:49 +000030
31 request, client_address = self.socket.accept()
32
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request.settimeout(10.0)
37
38 return (request, client_address)
39
40class LoopbackHttpServerThread(threading.Thread):
41 """Stoppable thread that runs a loopback http server."""
42
Guido van Rossum806c2462007-08-06 23:33:07 +000043 def __init__(self, request_handler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000044 threading.Thread.__init__(self)
Guido van Rossum4566c712007-08-21 03:36:47 +000045 self._stop_server = False
Guido van Rossumcd16bf62007-06-13 18:07:49 +000046 self.ready = threading.Event()
Guido van Rossum806c2462007-08-06 23:33:07 +000047 request_handler.protocol_version = "HTTP/1.0"
Jeremy Hylton1afc1692008-06-18 20:49:58 +000048 self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
Guido van Rossum806c2462007-08-06 23:33:07 +000049 request_handler)
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51 # self.httpd.server_port)
52 self.port = self.httpd.server_port
Guido van Rossumcd16bf62007-06-13 18:07:49 +000053
54 def stop(self):
55 """Stops the webserver if it's currently running."""
56
57 # Set the stop flag.
Guido van Rossum4566c712007-08-21 03:36:47 +000058 self._stop_server = True
Guido van Rossumcd16bf62007-06-13 18:07:49 +000059
60 self.join()
61
62 def run(self):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000063 self.ready.set()
Guido van Rossum4566c712007-08-21 03:36:47 +000064 while not self._stop_server:
Guido van Rossum806c2462007-08-06 23:33:07 +000065 self.httpd.handle_request()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000066
67# Authentication infrastructure
68
69class DigestAuthHandler:
70 """Handler for performing digest authentication."""
71
72 def __init__(self):
73 self._request_num = 0
74 self._nonces = []
75 self._users = {}
76 self._realm_name = "Test Realm"
77 self._qop = "auth"
78
79 def set_qop(self, qop):
80 self._qop = qop
81
82 def set_users(self, users):
83 assert isinstance(users, dict)
84 self._users = users
85
86 def set_realm(self, realm):
87 self._realm_name = realm
88
89 def _generate_nonce(self):
90 self._request_num += 1
Guido van Rossum81360142007-08-29 14:26:52 +000091 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000092 self._nonces.append(nonce)
93 return nonce
94
95 def _create_auth_dict(self, auth_str):
96 first_space_index = auth_str.find(" ")
97 auth_str = auth_str[first_space_index+1:]
98
99 parts = auth_str.split(",")
100
101 auth_dict = {}
102 for part in parts:
103 name, value = part.split("=")
104 name = name.strip()
105 if value[0] == '"' and value[-1] == '"':
106 value = value[1:-1]
107 else:
108 value = value.strip()
109 auth_dict[name] = value
110 return auth_dict
111
112 def _validate_auth(self, auth_dict, password, method, uri):
113 final_dict = {}
114 final_dict.update(auth_dict)
115 final_dict["password"] = password
116 final_dict["method"] = method
117 final_dict["uri"] = uri
118 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000119 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000120 HA2_str = "%(method)s:%(uri)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000121 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000122 final_dict["HA1"] = HA1
123 final_dict["HA2"] = HA2
124 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000126 response = hashlib.md5(response_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000127
128 return response == auth_dict["response"]
129
130 def _return_auth_challenge(self, request_handler):
131 request_handler.send_response(407, "Proxy Authentication Required")
132 request_handler.send_header("Content-Type", "text/html")
133 request_handler.send_header(
134 'Proxy-Authenticate', 'Digest realm="%s", '
135 'qop="%s",'
136 'nonce="%s", ' % \
137 (self._realm_name, self._qop, self._generate_nonce()))
138 # XXX: Not sure if we're supposed to add this next header or
139 # not.
140 #request_handler.send_header('Connection', 'close')
141 request_handler.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000142 request_handler.wfile.write(b"Proxy Authentication Required.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000143 return False
144
145 def handle_request(self, request_handler):
146 """Performs digest authentication on the given HTTP request
147 handler. Returns True if authentication was successful, False
148 otherwise.
149
150 If no users have been set, then digest auth is effectively
151 disabled and this method will always return True.
152 """
153
154 if len(self._users) == 0:
155 return True
156
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000157 if "Proxy-Authorization" not in request_handler.headers:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000158 return self._return_auth_challenge(request_handler)
159 else:
160 auth_dict = self._create_auth_dict(
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000161 request_handler.headers["Proxy-Authorization"]
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000162 )
163 if auth_dict["username"] in self._users:
164 password = self._users[ auth_dict["username"] ]
165 else:
166 return self._return_auth_challenge(request_handler)
167 if not auth_dict.get("nonce") in self._nonces:
168 return self._return_auth_challenge(request_handler)
169 else:
170 self._nonces.remove(auth_dict["nonce"])
171
172 auth_validated = False
173
174 # MSIE uses short_path in its validation, but Python's
175 # urllib2 uses the full path, so we're going to see if
176 # either of them works here.
177
178 for path in [request_handler.path, request_handler.short_path]:
179 if self._validate_auth(auth_dict,
180 password,
181 request_handler.command,
182 path):
183 auth_validated = True
184
185 if not auth_validated:
186 return self._return_auth_challenge(request_handler)
187 return True
188
189# Proxy test infrastructure
190
Georg Brandl24420152008-05-26 16:32:26 +0000191class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000192 """This is a 'fake proxy' that makes it look like the entire
193 internet has gone down due to a sudden zombie invasion. It main
194 utility is in providing us with authentication support for
195 testing.
196 """
197
Collin Winter9a4414d2009-05-18 22:32:26 +0000198 def __init__(self, digest_auth_handler, *args, **kwargs):
199 # This has to be set before calling our parent's __init__(), which will
200 # try to call do_GET().
201 self.digest_auth_handler = digest_auth_handler
202 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000203
204 def log_message(self, format, *args):
205 # Uncomment the next line for debugging.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000206 # sys.stderr.write(format % args)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000207 pass
208
209 def do_GET(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000210 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
211 self.path, "http")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000212 self.short_path = path
213 if self.digest_auth_handler.handle_request(self):
214 self.send_response(200, "OK")
215 self.send_header("Content-Type", "text/html")
216 self.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000217 self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
218 "ascii"))
219 self.wfile.write(b"Our apologies, but our server is down due to "
220 b"a sudden zombie invasion.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000221
222# Test cases
223
Antoine Pitrouaefe6af2009-10-27 20:17:03 +0000224class BaseTestCase(unittest.TestCase):
225 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000226 self._threads = support.threading_setup()
Antoine Pitrouaefe6af2009-10-27 20:17:03 +0000227
228 def tearDown(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000229 support.threading_cleanup(*self._threads)
Antoine Pitrouaefe6af2009-10-27 20:17:03 +0000230
231
232class ProxyAuthTests(BaseTestCase):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000233 URL = "http://localhost"
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000234
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000235 USER = "tester"
236 PASSWD = "test123"
237 REALM = "TestRealm"
238
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000239 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000240 super(ProxyAuthTests, self).setUp()
Collin Winter9a4414d2009-05-18 22:32:26 +0000241 self.digest_auth_handler = DigestAuthHandler()
242 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
243 self.digest_auth_handler.set_realm(self.REALM)
244 def create_fake_proxy_handler(*args, **kwargs):
245 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000246
Collin Winter9a4414d2009-05-18 22:32:26 +0000247 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000248 self.server.start()
249 self.server.ready.wait()
Guido van Rossum806c2462007-08-06 23:33:07 +0000250 proxy_url = "http://127.0.0.1:%d" % self.server.port
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000251 handler = urllib.request.ProxyHandler({"http" : proxy_url})
Collin Winter9a4414d2009-05-18 22:32:26 +0000252 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000253 self.opener = urllib.request.build_opener(
Collin Winter9a4414d2009-05-18 22:32:26 +0000254 handler, self.proxy_digest_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000255
256 def tearDown(self):
257 self.server.stop()
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000258 super(ProxyAuthTests, self).tearDown()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000259
260 def test_proxy_with_bad_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000261 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000262 self.USER, self.PASSWD+"bad")
Collin Winter9a4414d2009-05-18 22:32:26 +0000263 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000264 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000265 self.opener.open,
266 self.URL)
267
268 def test_proxy_with_no_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000269 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000270 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000271 self.opener.open,
272 self.URL)
273
274 def test_proxy_qop_auth_works(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000275 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000276 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000277 self.digest_auth_handler.set_qop("auth")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000278 result = self.opener.open(self.URL)
279 while result.read():
280 pass
281 result.close()
282
283 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000284 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000285 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000286 self.digest_auth_handler.set_qop("auth-int")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000287 try:
288 result = self.opener.open(self.URL)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000289 except urllib.error.URLError:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000290 # It's okay if we don't support auth-int, but we certainly
291 # shouldn't receive any kind of exception here other than
292 # a URLError.
293 result = None
294 if result:
295 while result.read():
296 pass
297 result.close()
298
Christian Heimesbbe741d2008-03-28 10:53:29 +0000299
300def GetRequestHandler(responses):
301
Georg Brandl24420152008-05-26 16:32:26 +0000302 class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000303
304 server_version = "TestHTTP/"
305 requests = []
306 headers_received = []
307 port = 80
308
309 def do_GET(self):
310 body = self.send_head()
311 if body:
312 self.wfile.write(body)
313
314 def do_POST(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000315 content_length = self.headers["Content-Length"]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000316 post_data = self.rfile.read(int(content_length))
317 self.do_GET()
318 self.requests.append(post_data)
319
320 def send_head(self):
321 FakeHTTPRequestHandler.headers_received = self.headers
322 self.requests.append(self.path)
323 response_code, headers, body = responses.pop(0)
324
325 self.send_response(response_code)
326
327 for (header, value) in headers:
Antoine Pitroub353c122009-02-11 00:39:14 +0000328 self.send_header(header, value % {'port':self.port})
Christian Heimesbbe741d2008-03-28 10:53:29 +0000329 if body:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000330 self.send_header("Content-type", "text/plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000331 self.end_headers()
332 return body
333 self.end_headers()
334
335 def log_message(self, *args):
336 pass
337
338
339 return FakeHTTPRequestHandler
340
341
Antoine Pitrouaefe6af2009-10-27 20:17:03 +0000342class TestUrlopen(BaseTestCase):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000343 """Tests urllib2.urlopen using the network.
344
345 These tests are not exhaustive. Assuming that testing using files does a
346 good job overall of some of the basic interface features. There are no
347 tests exercising the optional 'data' and 'proxies' arguments. No tests
348 for transparent redirection have been written.
349 """
350
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000351 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000352 super(TestUrlopen, self).setUp()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000353 self.server = None
354
355 def tearDown(self):
356 if self.server is not None:
357 self.server.stop()
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000358 super(TestUrlopen, self).tearDown()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000359
360 def urlopen(self, url, data=None):
Antoine Pitroub353c122009-02-11 00:39:14 +0000361 l = []
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000362 f = urllib.request.urlopen(url, data)
Antoine Pitroub353c122009-02-11 00:39:14 +0000363 try:
364 # Exercise various methods
365 l.extend(f.readlines(200))
366 l.append(f.readline())
367 l.append(f.read(1024))
368 l.append(f.read())
369 finally:
370 f.close()
371 return b"".join(l)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000372
373 def start_server(self, responses=None):
374 if responses is None:
375 responses = [(200, [], b"we don't care")]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000376 handler = GetRequestHandler(responses)
377
378 self.server = LoopbackHttpServerThread(handler)
379 self.server.start()
380 self.server.ready.wait()
381 port = self.server.port
382 handler.port = port
383 return handler
384
Christian Heimesbbe741d2008-03-28 10:53:29 +0000385 def test_redirection(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000386 expected_response = b"We got here..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000387 responses = [
Antoine Pitroub353c122009-02-11 00:39:14 +0000388 (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
389 ""),
Christian Heimesbbe741d2008-03-28 10:53:29 +0000390 (200, [], expected_response)
391 ]
392
393 handler = self.start_server(responses)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000394 data = self.urlopen("http://localhost:%s/" % handler.port)
395 self.assertEquals(data, expected_response)
396 self.assertEquals(handler.requests, ["/", "/somewhere_else"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000397
Antoine Pitroub353c122009-02-11 00:39:14 +0000398 def test_chunked(self):
399 expected_response = b"hello world"
400 chunked_start = (
401 b'a\r\n'
402 b'hello worl\r\n'
403 b'1\r\n'
404 b'd\r\n'
405 b'0\r\n'
406 )
407 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
408 handler = self.start_server(response)
409 data = self.urlopen("http://localhost:%s/" % handler.port)
410 self.assertEquals(data, expected_response)
411
Christian Heimesbbe741d2008-03-28 10:53:29 +0000412 def test_404(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000413 expected_response = b"Bad bad bad..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000414 handler = self.start_server([(404, [], expected_response)])
415
416 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000417 self.urlopen("http://localhost:%s/weeble" % handler.port)
418 except urllib.error.URLError as f:
419 data = f.read()
420 f.close()
421 else:
422 self.fail("404 should raise URLError")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000423
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000424 self.assertEquals(data, expected_response)
425 self.assertEquals(handler.requests, ["/weeble"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000426
427 def test_200(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000428 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000429 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000430 data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
431 self.assertEquals(data, expected_response)
432 self.assertEquals(handler.requests, ["/bizarre"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000433
434 def test_200_with_parameters(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000435 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000436 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000437 data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
438 b"get=with_feeling")
439 self.assertEquals(data, expected_response)
440 self.assertEquals(handler.requests, ["/bizarre", b"get=with_feeling"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000441
442 def test_sending_headers(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000443 handler = self.start_server()
444 req = urllib.request.Request("http://localhost:%s/" % handler.port,
445 headers={"Range": "bytes=20-39"})
446 urllib.request.urlopen(req)
447 self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000448
449 def test_basic(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000450 handler = self.start_server()
451 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
452 for attr in ("read", "close", "info", "geturl"):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000453 self.assertTrue(hasattr(open_url, attr), "object returned from "
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000454 "urlopen lacks the %s attribute" % attr)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000455 try:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000456 self.assertTrue(open_url.read(), "calling 'read' failed")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000457 finally:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000458 open_url.close()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000459
460 def test_info(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000461 handler = self.start_server()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000462 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000463 open_url = urllib.request.urlopen(
464 "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000465 info_obj = open_url.info()
Ezio Melottie9615932010-01-24 19:26:24 +0000466 self.assertIsInstance(info_obj, email.message.Message,
467 "object returned by 'info' is not an "
468 "instance of email.message.Message")
Barry Warsaw820c1202008-06-12 04:06:45 +0000469 self.assertEqual(info_obj.get_content_subtype(), "plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000470 finally:
471 self.server.stop()
472
473 def test_geturl(self):
474 # Make sure same URL as opened is returned by geturl.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000475 handler = self.start_server()
476 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
477 url = open_url.geturl()
478 self.assertEqual(url, "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000479
480 def test_bad_address(self):
481 # Make sure proper exception is raised when connecting to a bogus
482 # address.
483 self.assertRaises(IOError,
R. David Murray8da3cac2009-09-29 14:01:08 +0000484 # Given that both VeriSign and various ISPs have in
485 # the past or are presently hijacking various invalid
486 # domain name requests in an attempt to boost traffic
487 # to their own sites, finding a domain name to use
488 # for this test is difficult. RFC2606 leads one to
489 # believe that '.invalid' should work, but experience
490 # seemed to indicate otherwise. Single character
491 # TLDs are likely to remain invalid, so this seems to
492 # be the best choice. The trailing '.' prevents a
493 # related problem: The normal DNS resolver appends
494 # the domain names from the search path if there is
495 # no '.' the end and, and if one of those domains
496 # implements a '*' rule a result is returned.
497 # However, none of this will prevent the test from
498 # failing if the ISP hijacks all invalid domain
499 # requests. The real solution would be to be able to
500 # parameterize the framework with a mock resolver.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000501 urllib.request.urlopen,
R. David Murray8da3cac2009-09-29 14:01:08 +0000502 "http://sadflkjsasf.i.nvali.d./")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000503
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000504def test_main():
Collin Winter9a4414d2009-05-18 22:32:26 +0000505 support.run_unittest(ProxyAuthTests, TestUrlopen)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000506
507if __name__ == "__main__":
508 test_main()