blob: de8a521033fd8e3f3ab9588b4f0af092265def9b [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Guido van Rossumcd16bf62007-06-13 18:07:49 +00002
Barry Warsaw820c1202008-06-12 04:06:45 +00003import email
Jeremy Hylton1afc1692008-06-18 20:49:58 +00004import urllib.parse
5import urllib.request
Georg Brandl24420152008-05-26 16:32:26 +00006import http.server
Guido van Rossumcd16bf62007-06-13 18:07:49 +00007import unittest
8import hashlib
Benjamin Petersonee8712c2008-05-20 21:35:26 +00009from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000010threading = support.import_module('threading')
Guido van Rossumcd16bf62007-06-13 18:07:49 +000011
12# Loopback http server infrastructure
13
Georg Brandl24420152008-05-26 16:32:26 +000014class LoopbackHttpServer(http.server.HTTPServer):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000015 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
17 """
18
19 def __init__(self, server_address, RequestHandlerClass):
Georg Brandl24420152008-05-26 16:32:26 +000020 http.server.HTTPServer.__init__(self,
21 server_address,
22 RequestHandlerClass)
Guido van Rossumcd16bf62007-06-13 18:07:49 +000023
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self.socket.settimeout(1.0)
27
28 def get_request(self):
Georg Brandl24420152008-05-26 16:32:26 +000029 """HTTPServer method, overridden."""
Guido van Rossumcd16bf62007-06-13 18:07:49 +000030
31 request, client_address = self.socket.accept()
32
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request.settimeout(10.0)
37
38 return (request, client_address)
39
40class LoopbackHttpServerThread(threading.Thread):
41 """Stoppable thread that runs a loopback http server."""
42
Guido van Rossum806c2462007-08-06 23:33:07 +000043 def __init__(self, request_handler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000044 threading.Thread.__init__(self)
Guido van Rossum4566c712007-08-21 03:36:47 +000045 self._stop_server = False
Guido van Rossumcd16bf62007-06-13 18:07:49 +000046 self.ready = threading.Event()
Guido van Rossum806c2462007-08-06 23:33:07 +000047 request_handler.protocol_version = "HTTP/1.0"
Jeremy Hylton1afc1692008-06-18 20:49:58 +000048 self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
Guido van Rossum806c2462007-08-06 23:33:07 +000049 request_handler)
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51 # self.httpd.server_port)
52 self.port = self.httpd.server_port
Guido van Rossumcd16bf62007-06-13 18:07:49 +000053
54 def stop(self):
55 """Stops the webserver if it's currently running."""
56
57 # Set the stop flag.
Guido van Rossum4566c712007-08-21 03:36:47 +000058 self._stop_server = True
Guido van Rossumcd16bf62007-06-13 18:07:49 +000059
60 self.join()
61
62 def run(self):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000063 self.ready.set()
Guido van Rossum4566c712007-08-21 03:36:47 +000064 while not self._stop_server:
Guido van Rossum806c2462007-08-06 23:33:07 +000065 self.httpd.handle_request()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000066
67# Authentication infrastructure
68
69class DigestAuthHandler:
70 """Handler for performing digest authentication."""
71
72 def __init__(self):
73 self._request_num = 0
74 self._nonces = []
75 self._users = {}
76 self._realm_name = "Test Realm"
77 self._qop = "auth"
78
79 def set_qop(self, qop):
80 self._qop = qop
81
82 def set_users(self, users):
83 assert isinstance(users, dict)
84 self._users = users
85
86 def set_realm(self, realm):
87 self._realm_name = realm
88
89 def _generate_nonce(self):
90 self._request_num += 1
Guido van Rossum81360142007-08-29 14:26:52 +000091 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000092 self._nonces.append(nonce)
93 return nonce
94
95 def _create_auth_dict(self, auth_str):
96 first_space_index = auth_str.find(" ")
97 auth_str = auth_str[first_space_index+1:]
98
99 parts = auth_str.split(",")
100
101 auth_dict = {}
102 for part in parts:
103 name, value = part.split("=")
104 name = name.strip()
105 if value[0] == '"' and value[-1] == '"':
106 value = value[1:-1]
107 else:
108 value = value.strip()
109 auth_dict[name] = value
110 return auth_dict
111
112 def _validate_auth(self, auth_dict, password, method, uri):
113 final_dict = {}
114 final_dict.update(auth_dict)
115 final_dict["password"] = password
116 final_dict["method"] = method
117 final_dict["uri"] = uri
118 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000119 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000120 HA2_str = "%(method)s:%(uri)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000121 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000122 final_dict["HA1"] = HA1
123 final_dict["HA2"] = HA2
124 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000126 response = hashlib.md5(response_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000127
128 return response == auth_dict["response"]
129
130 def _return_auth_challenge(self, request_handler):
131 request_handler.send_response(407, "Proxy Authentication Required")
132 request_handler.send_header("Content-Type", "text/html")
133 request_handler.send_header(
134 'Proxy-Authenticate', 'Digest realm="%s", '
135 'qop="%s",'
136 'nonce="%s", ' % \
137 (self._realm_name, self._qop, self._generate_nonce()))
138 # XXX: Not sure if we're supposed to add this next header or
139 # not.
140 #request_handler.send_header('Connection', 'close')
141 request_handler.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000142 request_handler.wfile.write(b"Proxy Authentication Required.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000143 return False
144
145 def handle_request(self, request_handler):
146 """Performs digest authentication on the given HTTP request
147 handler. Returns True if authentication was successful, False
148 otherwise.
149
150 If no users have been set, then digest auth is effectively
151 disabled and this method will always return True.
152 """
153
154 if len(self._users) == 0:
155 return True
156
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000157 if "Proxy-Authorization" not in request_handler.headers:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000158 return self._return_auth_challenge(request_handler)
159 else:
160 auth_dict = self._create_auth_dict(
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000161 request_handler.headers["Proxy-Authorization"]
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000162 )
163 if auth_dict["username"] in self._users:
164 password = self._users[ auth_dict["username"] ]
165 else:
166 return self._return_auth_challenge(request_handler)
167 if not auth_dict.get("nonce") in self._nonces:
168 return self._return_auth_challenge(request_handler)
169 else:
170 self._nonces.remove(auth_dict["nonce"])
171
172 auth_validated = False
173
174 # MSIE uses short_path in its validation, but Python's
Florent Xicluna419e3842010-08-08 16:16:07 +0000175 # urllib.request uses the full path, so we're going to see if
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000176 # either of them works here.
177
178 for path in [request_handler.path, request_handler.short_path]:
179 if self._validate_auth(auth_dict,
180 password,
181 request_handler.command,
182 path):
183 auth_validated = True
184
185 if not auth_validated:
186 return self._return_auth_challenge(request_handler)
187 return True
188
189# Proxy test infrastructure
190
Georg Brandl24420152008-05-26 16:32:26 +0000191class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000192 """This is a 'fake proxy' that makes it look like the entire
193 internet has gone down due to a sudden zombie invasion. It main
194 utility is in providing us with authentication support for
195 testing.
196 """
197
Collin Winter9a4414d2009-05-18 22:32:26 +0000198 def __init__(self, digest_auth_handler, *args, **kwargs):
199 # This has to be set before calling our parent's __init__(), which will
200 # try to call do_GET().
201 self.digest_auth_handler = digest_auth_handler
202 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000203
204 def log_message(self, format, *args):
205 # Uncomment the next line for debugging.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000206 # sys.stderr.write(format % args)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000207 pass
208
209 def do_GET(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000210 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
211 self.path, "http")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000212 self.short_path = path
213 if self.digest_auth_handler.handle_request(self):
214 self.send_response(200, "OK")
215 self.send_header("Content-Type", "text/html")
216 self.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000217 self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
218 "ascii"))
219 self.wfile.write(b"Our apologies, but our server is down due to "
220 b"a sudden zombie invasion.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000221
222# Test cases
223
Antoine Pitrouaefe6af2009-10-27 20:17:03 +0000224class BaseTestCase(unittest.TestCase):
225 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000226 self._threads = support.threading_setup()
Antoine Pitrouaefe6af2009-10-27 20:17:03 +0000227
228 def tearDown(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000229 support.threading_cleanup(*self._threads)
Antoine Pitrouaefe6af2009-10-27 20:17:03 +0000230
231
232class ProxyAuthTests(BaseTestCase):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000233 URL = "http://localhost"
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000234
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000235 USER = "tester"
236 PASSWD = "test123"
237 REALM = "TestRealm"
238
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000239 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000240 super(ProxyAuthTests, self).setUp()
Collin Winter9a4414d2009-05-18 22:32:26 +0000241 self.digest_auth_handler = DigestAuthHandler()
242 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
243 self.digest_auth_handler.set_realm(self.REALM)
244 def create_fake_proxy_handler(*args, **kwargs):
245 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000246
Collin Winter9a4414d2009-05-18 22:32:26 +0000247 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000248 self.server.start()
249 self.server.ready.wait()
Guido van Rossum806c2462007-08-06 23:33:07 +0000250 proxy_url = "http://127.0.0.1:%d" % self.server.port
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000251 handler = urllib.request.ProxyHandler({"http" : proxy_url})
Collin Winter9a4414d2009-05-18 22:32:26 +0000252 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000253 self.opener = urllib.request.build_opener(
Collin Winter9a4414d2009-05-18 22:32:26 +0000254 handler, self.proxy_digest_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000255
256 def tearDown(self):
257 self.server.stop()
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000258 super(ProxyAuthTests, self).tearDown()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000259
260 def test_proxy_with_bad_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000261 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000262 self.USER, self.PASSWD+"bad")
Collin Winter9a4414d2009-05-18 22:32:26 +0000263 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000264 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000265 self.opener.open,
266 self.URL)
267
268 def test_proxy_with_no_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000269 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000270 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000271 self.opener.open,
272 self.URL)
273
274 def test_proxy_qop_auth_works(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000275 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000276 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000277 self.digest_auth_handler.set_qop("auth")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000278 result = self.opener.open(self.URL)
279 while result.read():
280 pass
281 result.close()
282
283 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000284 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000285 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000286 self.digest_auth_handler.set_qop("auth-int")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000287 try:
288 result = self.opener.open(self.URL)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000289 except urllib.error.URLError:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000290 # It's okay if we don't support auth-int, but we certainly
291 # shouldn't receive any kind of exception here other than
292 # a URLError.
293 result = None
294 if result:
295 while result.read():
296 pass
297 result.close()
298
Christian Heimesbbe741d2008-03-28 10:53:29 +0000299
300def GetRequestHandler(responses):
301
Georg Brandl24420152008-05-26 16:32:26 +0000302 class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000303
304 server_version = "TestHTTP/"
305 requests = []
306 headers_received = []
307 port = 80
308
309 def do_GET(self):
310 body = self.send_head()
Florent Xicluna37d3d9a2010-08-08 16:25:27 +0000311 while body:
312 done = self.wfile.write(body)
313 body = body[done:]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000314
315 def do_POST(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000316 content_length = self.headers["Content-Length"]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000317 post_data = self.rfile.read(int(content_length))
318 self.do_GET()
319 self.requests.append(post_data)
320
321 def send_head(self):
322 FakeHTTPRequestHandler.headers_received = self.headers
323 self.requests.append(self.path)
324 response_code, headers, body = responses.pop(0)
325
326 self.send_response(response_code)
327
328 for (header, value) in headers:
Antoine Pitroub353c122009-02-11 00:39:14 +0000329 self.send_header(header, value % {'port':self.port})
Christian Heimesbbe741d2008-03-28 10:53:29 +0000330 if body:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000331 self.send_header("Content-type", "text/plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000332 self.end_headers()
333 return body
334 self.end_headers()
335
336 def log_message(self, *args):
337 pass
338
339
340 return FakeHTTPRequestHandler
341
342
Antoine Pitrouaefe6af2009-10-27 20:17:03 +0000343class TestUrlopen(BaseTestCase):
Florent Xicluna419e3842010-08-08 16:16:07 +0000344 """Tests urllib.request.urlopen using the network.
Christian Heimesbbe741d2008-03-28 10:53:29 +0000345
346 These tests are not exhaustive. Assuming that testing using files does a
347 good job overall of some of the basic interface features. There are no
348 tests exercising the optional 'data' and 'proxies' arguments. No tests
349 for transparent redirection have been written.
350 """
351
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000352 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000353 super(TestUrlopen, self).setUp()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000354 self.server = None
355
356 def tearDown(self):
357 if self.server is not None:
358 self.server.stop()
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000359 super(TestUrlopen, self).tearDown()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000360
361 def urlopen(self, url, data=None):
Antoine Pitroub353c122009-02-11 00:39:14 +0000362 l = []
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000363 f = urllib.request.urlopen(url, data)
Antoine Pitroub353c122009-02-11 00:39:14 +0000364 try:
365 # Exercise various methods
366 l.extend(f.readlines(200))
367 l.append(f.readline())
368 l.append(f.read(1024))
369 l.append(f.read())
370 finally:
371 f.close()
372 return b"".join(l)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000373
374 def start_server(self, responses=None):
375 if responses is None:
376 responses = [(200, [], b"we don't care")]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000377 handler = GetRequestHandler(responses)
378
379 self.server = LoopbackHttpServerThread(handler)
380 self.server.start()
381 self.server.ready.wait()
382 port = self.server.port
383 handler.port = port
384 return handler
385
Christian Heimesbbe741d2008-03-28 10:53:29 +0000386 def test_redirection(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000387 expected_response = b"We got here..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000388 responses = [
Antoine Pitroub353c122009-02-11 00:39:14 +0000389 (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
390 ""),
Christian Heimesbbe741d2008-03-28 10:53:29 +0000391 (200, [], expected_response)
392 ]
393
394 handler = self.start_server(responses)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000395 data = self.urlopen("http://localhost:%s/" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000396 self.assertEqual(data, expected_response)
397 self.assertEqual(handler.requests, ["/", "/somewhere_else"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000398
Antoine Pitroub353c122009-02-11 00:39:14 +0000399 def test_chunked(self):
400 expected_response = b"hello world"
401 chunked_start = (
402 b'a\r\n'
403 b'hello worl\r\n'
404 b'1\r\n'
405 b'd\r\n'
406 b'0\r\n'
407 )
408 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
409 handler = self.start_server(response)
410 data = self.urlopen("http://localhost:%s/" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000411 self.assertEqual(data, expected_response)
Antoine Pitroub353c122009-02-11 00:39:14 +0000412
Christian Heimesbbe741d2008-03-28 10:53:29 +0000413 def test_404(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000414 expected_response = b"Bad bad bad..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000415 handler = self.start_server([(404, [], expected_response)])
416
417 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000418 self.urlopen("http://localhost:%s/weeble" % handler.port)
419 except urllib.error.URLError as f:
420 data = f.read()
421 f.close()
422 else:
423 self.fail("404 should raise URLError")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000424
Florent Xicluna419e3842010-08-08 16:16:07 +0000425 self.assertEqual(data, expected_response)
426 self.assertEqual(handler.requests, ["/weeble"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000427
428 def test_200(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000429 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000430 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000431 data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000432 self.assertEqual(data, expected_response)
433 self.assertEqual(handler.requests, ["/bizarre"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000434
435 def test_200_with_parameters(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000436 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000437 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000438 data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
439 b"get=with_feeling")
Florent Xicluna419e3842010-08-08 16:16:07 +0000440 self.assertEqual(data, expected_response)
441 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000442
443 def test_sending_headers(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000444 handler = self.start_server()
445 req = urllib.request.Request("http://localhost:%s/" % handler.port,
446 headers={"Range": "bytes=20-39"})
447 urllib.request.urlopen(req)
448 self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000449
450 def test_basic(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000451 handler = self.start_server()
452 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
453 for attr in ("read", "close", "info", "geturl"):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000454 self.assertTrue(hasattr(open_url, attr), "object returned from "
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000455 "urlopen lacks the %s attribute" % attr)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000456 try:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000457 self.assertTrue(open_url.read(), "calling 'read' failed")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000458 finally:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000459 open_url.close()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000460
461 def test_info(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000462 handler = self.start_server()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000463 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000464 open_url = urllib.request.urlopen(
465 "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000466 info_obj = open_url.info()
Ezio Melottie9615932010-01-24 19:26:24 +0000467 self.assertIsInstance(info_obj, email.message.Message,
468 "object returned by 'info' is not an "
469 "instance of email.message.Message")
Barry Warsaw820c1202008-06-12 04:06:45 +0000470 self.assertEqual(info_obj.get_content_subtype(), "plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000471 finally:
472 self.server.stop()
473
474 def test_geturl(self):
475 # Make sure same URL as opened is returned by geturl.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000476 handler = self.start_server()
477 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
478 url = open_url.geturl()
479 self.assertEqual(url, "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000480
481 def test_bad_address(self):
482 # Make sure proper exception is raised when connecting to a bogus
483 # address.
484 self.assertRaises(IOError,
R. David Murray8da3cac2009-09-29 14:01:08 +0000485 # Given that both VeriSign and various ISPs have in
486 # the past or are presently hijacking various invalid
487 # domain name requests in an attempt to boost traffic
488 # to their own sites, finding a domain name to use
489 # for this test is difficult. RFC2606 leads one to
490 # believe that '.invalid' should work, but experience
491 # seemed to indicate otherwise. Single character
492 # TLDs are likely to remain invalid, so this seems to
493 # be the best choice. The trailing '.' prevents a
494 # related problem: The normal DNS resolver appends
495 # the domain names from the search path if there is
496 # no '.' the end and, and if one of those domains
497 # implements a '*' rule a result is returned.
498 # However, none of this will prevent the test from
499 # failing if the ISP hijacks all invalid domain
500 # requests. The real solution would be to be able to
501 # parameterize the framework with a mock resolver.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000502 urllib.request.urlopen,
R. David Murray8da3cac2009-09-29 14:01:08 +0000503 "http://sadflkjsasf.i.nvali.d./")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000504
Florent Xicluna37d3d9a2010-08-08 16:25:27 +0000505 def test_iteration(self):
506 expected_response = b"pycon 2008..."
507 handler = self.start_server([(200, [], expected_response)])
508 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
509 for line in data:
510 self.assertEqual(line, expected_response)
511
512 def test_line_iteration(self):
513 lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
514 expected_response = b"".join(lines)
515 handler = self.start_server([(200, [], expected_response)])
516 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
517 for index, line in enumerate(data):
518 self.assertEqual(line, lines[index],
519 "Fetched line number %s doesn't match expected:\n"
520 " Expected length was %s, got %s" %
521 (index, len(lines[index]), len(line)))
522 self.assertEqual(index + 1, len(lines))
523
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000524def test_main():
Collin Winter9a4414d2009-05-18 22:32:26 +0000525 support.run_unittest(ProxyAuthTests, TestUrlopen)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000526
527if __name__ == "__main__":
528 test_main()