blob: 872b2be7bef3ec2a7e1ea856aec8959d77ec3809 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Guido van Rossumcd16bf62007-06-13 18:07:49 +00002
Antoine Pitrou803e6d62010-10-13 10:36:15 +00003import os
Barry Warsaw820c1202008-06-12 04:06:45 +00004import email
Jeremy Hylton1afc1692008-06-18 20:49:58 +00005import urllib.parse
6import urllib.request
Georg Brandl24420152008-05-26 16:32:26 +00007import http.server
Guido van Rossumcd16bf62007-06-13 18:07:49 +00008import unittest
9import hashlib
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000011threading = support.import_module('threading')
Guido van Rossumcd16bf62007-06-13 18:07:49 +000012
Antoine Pitrou803e6d62010-10-13 10:36:15 +000013
14here = os.path.dirname(__file__)
15# Self-signed cert file for 'localhost'
16CERT_localhost = os.path.join(here, 'keycert.pem')
17# Self-signed cert file for 'fakehostname'
18CERT_fakehostname = os.path.join(here, 'keycert2.pem')
19
Guido van Rossumcd16bf62007-06-13 18:07:49 +000020# Loopback http server infrastructure
21
Georg Brandl24420152008-05-26 16:32:26 +000022class LoopbackHttpServer(http.server.HTTPServer):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000023 """HTTP server w/ a few modifications that make it useful for
24 loopback testing purposes.
25 """
26
27 def __init__(self, server_address, RequestHandlerClass):
Georg Brandl24420152008-05-26 16:32:26 +000028 http.server.HTTPServer.__init__(self,
29 server_address,
30 RequestHandlerClass)
Guido van Rossumcd16bf62007-06-13 18:07:49 +000031
32 # Set the timeout of our listening socket really low so
33 # that we can stop the server easily.
Antoine Pitrou803e6d62010-10-13 10:36:15 +000034 self.socket.settimeout(0.1)
Guido van Rossumcd16bf62007-06-13 18:07:49 +000035
36 def get_request(self):
Georg Brandl24420152008-05-26 16:32:26 +000037 """HTTPServer method, overridden."""
Guido van Rossumcd16bf62007-06-13 18:07:49 +000038
39 request, client_address = self.socket.accept()
40
41 # It's a loopback connection, so setting the timeout
42 # really low shouldn't affect anything, but should make
43 # deadlocks less likely to occur.
44 request.settimeout(10.0)
45
46 return (request, client_address)
47
48class LoopbackHttpServerThread(threading.Thread):
49 """Stoppable thread that runs a loopback http server."""
50
Guido van Rossum806c2462007-08-06 23:33:07 +000051 def __init__(self, request_handler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000052 threading.Thread.__init__(self)
Guido van Rossum4566c712007-08-21 03:36:47 +000053 self._stop_server = False
Guido van Rossumcd16bf62007-06-13 18:07:49 +000054 self.ready = threading.Event()
Guido van Rossum806c2462007-08-06 23:33:07 +000055 request_handler.protocol_version = "HTTP/1.0"
Jeremy Hylton1afc1692008-06-18 20:49:58 +000056 self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
Guido van Rossum806c2462007-08-06 23:33:07 +000057 request_handler)
58 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
59 # self.httpd.server_port)
60 self.port = self.httpd.server_port
Guido van Rossumcd16bf62007-06-13 18:07:49 +000061
62 def stop(self):
63 """Stops the webserver if it's currently running."""
64
65 # Set the stop flag.
Guido van Rossum4566c712007-08-21 03:36:47 +000066 self._stop_server = True
Guido van Rossumcd16bf62007-06-13 18:07:49 +000067
68 self.join()
69
70 def run(self):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000071 self.ready.set()
Guido van Rossum4566c712007-08-21 03:36:47 +000072 while not self._stop_server:
Guido van Rossum806c2462007-08-06 23:33:07 +000073 self.httpd.handle_request()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000074
75# Authentication infrastructure
76
77class DigestAuthHandler:
78 """Handler for performing digest authentication."""
79
80 def __init__(self):
81 self._request_num = 0
82 self._nonces = []
83 self._users = {}
84 self._realm_name = "Test Realm"
85 self._qop = "auth"
86
87 def set_qop(self, qop):
88 self._qop = qop
89
90 def set_users(self, users):
91 assert isinstance(users, dict)
92 self._users = users
93
94 def set_realm(self, realm):
95 self._realm_name = realm
96
97 def _generate_nonce(self):
98 self._request_num += 1
Guido van Rossum81360142007-08-29 14:26:52 +000099 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000100 self._nonces.append(nonce)
101 return nonce
102
103 def _create_auth_dict(self, auth_str):
104 first_space_index = auth_str.find(" ")
105 auth_str = auth_str[first_space_index+1:]
106
107 parts = auth_str.split(",")
108
109 auth_dict = {}
110 for part in parts:
111 name, value = part.split("=")
112 name = name.strip()
113 if value[0] == '"' and value[-1] == '"':
114 value = value[1:-1]
115 else:
116 value = value.strip()
117 auth_dict[name] = value
118 return auth_dict
119
120 def _validate_auth(self, auth_dict, password, method, uri):
121 final_dict = {}
122 final_dict.update(auth_dict)
123 final_dict["password"] = password
124 final_dict["method"] = method
125 final_dict["uri"] = uri
126 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000127 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000128 HA2_str = "%(method)s:%(uri)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000129 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000130 final_dict["HA1"] = HA1
131 final_dict["HA2"] = HA2
132 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
133 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000134 response = hashlib.md5(response_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000135
136 return response == auth_dict["response"]
137
138 def _return_auth_challenge(self, request_handler):
139 request_handler.send_response(407, "Proxy Authentication Required")
140 request_handler.send_header("Content-Type", "text/html")
141 request_handler.send_header(
142 'Proxy-Authenticate', 'Digest realm="%s", '
143 'qop="%s",'
144 'nonce="%s", ' % \
145 (self._realm_name, self._qop, self._generate_nonce()))
146 # XXX: Not sure if we're supposed to add this next header or
147 # not.
148 #request_handler.send_header('Connection', 'close')
149 request_handler.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000150 request_handler.wfile.write(b"Proxy Authentication Required.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000151 return False
152
153 def handle_request(self, request_handler):
154 """Performs digest authentication on the given HTTP request
155 handler. Returns True if authentication was successful, False
156 otherwise.
157
158 If no users have been set, then digest auth is effectively
159 disabled and this method will always return True.
160 """
161
162 if len(self._users) == 0:
163 return True
164
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000165 if "Proxy-Authorization" not in request_handler.headers:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000166 return self._return_auth_challenge(request_handler)
167 else:
168 auth_dict = self._create_auth_dict(
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000169 request_handler.headers["Proxy-Authorization"]
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000170 )
171 if auth_dict["username"] in self._users:
172 password = self._users[ auth_dict["username"] ]
173 else:
174 return self._return_auth_challenge(request_handler)
175 if not auth_dict.get("nonce") in self._nonces:
176 return self._return_auth_challenge(request_handler)
177 else:
178 self._nonces.remove(auth_dict["nonce"])
179
180 auth_validated = False
181
182 # MSIE uses short_path in its validation, but Python's
Florent Xicluna419e3842010-08-08 16:16:07 +0000183 # urllib.request uses the full path, so we're going to see if
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000184 # either of them works here.
185
186 for path in [request_handler.path, request_handler.short_path]:
187 if self._validate_auth(auth_dict,
188 password,
189 request_handler.command,
190 path):
191 auth_validated = True
192
193 if not auth_validated:
194 return self._return_auth_challenge(request_handler)
195 return True
196
197# Proxy test infrastructure
198
Georg Brandl24420152008-05-26 16:32:26 +0000199class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000200 """This is a 'fake proxy' that makes it look like the entire
201 internet has gone down due to a sudden zombie invasion. It main
202 utility is in providing us with authentication support for
203 testing.
204 """
205
Collin Winter9a4414d2009-05-18 22:32:26 +0000206 def __init__(self, digest_auth_handler, *args, **kwargs):
207 # This has to be set before calling our parent's __init__(), which will
208 # try to call do_GET().
209 self.digest_auth_handler = digest_auth_handler
210 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000211
212 def log_message(self, format, *args):
213 # Uncomment the next line for debugging.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000214 # sys.stderr.write(format % args)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000215 pass
216
217 def do_GET(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000218 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
219 self.path, "http")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000220 self.short_path = path
221 if self.digest_auth_handler.handle_request(self):
222 self.send_response(200, "OK")
223 self.send_header("Content-Type", "text/html")
224 self.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000225 self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
226 "ascii"))
227 self.wfile.write(b"Our apologies, but our server is down due to "
228 b"a sudden zombie invasion.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000229
230# Test cases
231
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000232class ProxyAuthTests(unittest.TestCase):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000233 URL = "http://localhost"
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000234
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000235 USER = "tester"
236 PASSWD = "test123"
237 REALM = "TestRealm"
238
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000239 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000240 super(ProxyAuthTests, self).setUp()
Collin Winter9a4414d2009-05-18 22:32:26 +0000241 self.digest_auth_handler = DigestAuthHandler()
242 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
243 self.digest_auth_handler.set_realm(self.REALM)
244 def create_fake_proxy_handler(*args, **kwargs):
245 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000246
Collin Winter9a4414d2009-05-18 22:32:26 +0000247 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000248 self.server.start()
249 self.server.ready.wait()
Guido van Rossum806c2462007-08-06 23:33:07 +0000250 proxy_url = "http://127.0.0.1:%d" % self.server.port
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000251 handler = urllib.request.ProxyHandler({"http" : proxy_url})
Collin Winter9a4414d2009-05-18 22:32:26 +0000252 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000253 self.opener = urllib.request.build_opener(
Collin Winter9a4414d2009-05-18 22:32:26 +0000254 handler, self.proxy_digest_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000255
256 def tearDown(self):
257 self.server.stop()
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000258 super(ProxyAuthTests, self).tearDown()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000259
260 def test_proxy_with_bad_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000261 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000262 self.USER, self.PASSWD+"bad")
Collin Winter9a4414d2009-05-18 22:32:26 +0000263 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000264 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000265 self.opener.open,
266 self.URL)
267
268 def test_proxy_with_no_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000269 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000270 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000271 self.opener.open,
272 self.URL)
273
274 def test_proxy_qop_auth_works(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000275 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000276 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000277 self.digest_auth_handler.set_qop("auth")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000278 result = self.opener.open(self.URL)
279 while result.read():
280 pass
281 result.close()
282
283 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000284 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000285 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000286 self.digest_auth_handler.set_qop("auth-int")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000287 try:
288 result = self.opener.open(self.URL)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000289 except urllib.error.URLError:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000290 # It's okay if we don't support auth-int, but we certainly
291 # shouldn't receive any kind of exception here other than
292 # a URLError.
293 result = None
294 if result:
295 while result.read():
296 pass
297 result.close()
298
Christian Heimesbbe741d2008-03-28 10:53:29 +0000299
300def GetRequestHandler(responses):
301
Georg Brandl24420152008-05-26 16:32:26 +0000302 class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000303
304 server_version = "TestHTTP/"
305 requests = []
306 headers_received = []
307 port = 80
308
309 def do_GET(self):
310 body = self.send_head()
Florent Xicluna37d3d9a2010-08-08 16:25:27 +0000311 while body:
312 done = self.wfile.write(body)
313 body = body[done:]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000314
315 def do_POST(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000316 content_length = self.headers["Content-Length"]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000317 post_data = self.rfile.read(int(content_length))
318 self.do_GET()
319 self.requests.append(post_data)
320
321 def send_head(self):
322 FakeHTTPRequestHandler.headers_received = self.headers
323 self.requests.append(self.path)
324 response_code, headers, body = responses.pop(0)
325
326 self.send_response(response_code)
327
328 for (header, value) in headers:
Antoine Pitroub353c122009-02-11 00:39:14 +0000329 self.send_header(header, value % {'port':self.port})
Christian Heimesbbe741d2008-03-28 10:53:29 +0000330 if body:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000331 self.send_header("Content-type", "text/plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000332 self.end_headers()
333 return body
334 self.end_headers()
335
336 def log_message(self, *args):
337 pass
338
339
340 return FakeHTTPRequestHandler
341
342
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000343class TestUrlopen(unittest.TestCase):
Florent Xicluna419e3842010-08-08 16:16:07 +0000344 """Tests urllib.request.urlopen using the network.
Christian Heimesbbe741d2008-03-28 10:53:29 +0000345
346 These tests are not exhaustive. Assuming that testing using files does a
347 good job overall of some of the basic interface features. There are no
348 tests exercising the optional 'data' and 'proxies' arguments. No tests
349 for transparent redirection have been written.
350 """
351
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000352 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000353 super(TestUrlopen, self).setUp()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000354 self.server = None
355
356 def tearDown(self):
357 if self.server is not None:
358 self.server.stop()
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000359 super(TestUrlopen, self).tearDown()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000360
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000361 def urlopen(self, url, data=None, **kwargs):
Antoine Pitroub353c122009-02-11 00:39:14 +0000362 l = []
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000363 f = urllib.request.urlopen(url, data, **kwargs)
Antoine Pitroub353c122009-02-11 00:39:14 +0000364 try:
365 # Exercise various methods
366 l.extend(f.readlines(200))
367 l.append(f.readline())
368 l.append(f.read(1024))
369 l.append(f.read())
370 finally:
371 f.close()
372 return b"".join(l)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000373
374 def start_server(self, responses=None):
375 if responses is None:
376 responses = [(200, [], b"we don't care")]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000377 handler = GetRequestHandler(responses)
378
379 self.server = LoopbackHttpServerThread(handler)
380 self.server.start()
381 self.server.ready.wait()
382 port = self.server.port
383 handler.port = port
384 return handler
385
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000386 def start_https_server(self, responses=None, certfile=CERT_localhost):
387 if not hasattr(urllib.request, 'HTTPSHandler'):
388 self.skipTest('ssl support required')
389 from test.ssl_servers import make_https_server
390 if responses is None:
391 responses = [(200, [], b"we care a bit")]
392 handler = GetRequestHandler(responses)
393 server = make_https_server(self, certfile=certfile, handler_class=handler)
394 handler.port = server.port
395 return handler
396
Christian Heimesbbe741d2008-03-28 10:53:29 +0000397 def test_redirection(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000398 expected_response = b"We got here..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000399 responses = [
Antoine Pitroub353c122009-02-11 00:39:14 +0000400 (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
401 ""),
Christian Heimesbbe741d2008-03-28 10:53:29 +0000402 (200, [], expected_response)
403 ]
404
405 handler = self.start_server(responses)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000406 data = self.urlopen("http://localhost:%s/" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000407 self.assertEqual(data, expected_response)
408 self.assertEqual(handler.requests, ["/", "/somewhere_else"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000409
Antoine Pitroub353c122009-02-11 00:39:14 +0000410 def test_chunked(self):
411 expected_response = b"hello world"
412 chunked_start = (
413 b'a\r\n'
414 b'hello worl\r\n'
415 b'1\r\n'
416 b'd\r\n'
417 b'0\r\n'
418 )
419 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
420 handler = self.start_server(response)
421 data = self.urlopen("http://localhost:%s/" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000422 self.assertEqual(data, expected_response)
Antoine Pitroub353c122009-02-11 00:39:14 +0000423
Christian Heimesbbe741d2008-03-28 10:53:29 +0000424 def test_404(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000425 expected_response = b"Bad bad bad..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000426 handler = self.start_server([(404, [], expected_response)])
427
428 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000429 self.urlopen("http://localhost:%s/weeble" % handler.port)
430 except urllib.error.URLError as f:
431 data = f.read()
432 f.close()
433 else:
434 self.fail("404 should raise URLError")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000435
Florent Xicluna419e3842010-08-08 16:16:07 +0000436 self.assertEqual(data, expected_response)
437 self.assertEqual(handler.requests, ["/weeble"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000438
439 def test_200(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000440 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000441 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000442 data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000443 self.assertEqual(data, expected_response)
444 self.assertEqual(handler.requests, ["/bizarre"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000445
446 def test_200_with_parameters(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000447 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000448 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000449 data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
450 b"get=with_feeling")
Florent Xicluna419e3842010-08-08 16:16:07 +0000451 self.assertEqual(data, expected_response)
452 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000453
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000454 def test_https(self):
455 handler = self.start_https_server()
456 data = self.urlopen("https://localhost:%s/bizarre" % handler.port)
457 self.assertEqual(data, b"we care a bit")
458
459 def test_https_with_cafile(self):
460 handler = self.start_https_server(certfile=CERT_localhost)
461 import ssl
462 # Good cert
463 data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
464 cafile=CERT_localhost)
465 self.assertEqual(data, b"we care a bit")
466 # Bad cert
467 with self.assertRaises(urllib.error.URLError) as cm:
468 self.urlopen("https://localhost:%s/bizarre" % handler.port,
469 cafile=CERT_fakehostname)
470 # Good cert, but mismatching hostname
471 handler = self.start_https_server(certfile=CERT_fakehostname)
472 with self.assertRaises(ssl.CertificateError) as cm:
473 self.urlopen("https://localhost:%s/bizarre" % handler.port,
474 cafile=CERT_fakehostname)
475
Christian Heimesbbe741d2008-03-28 10:53:29 +0000476 def test_sending_headers(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000477 handler = self.start_server()
478 req = urllib.request.Request("http://localhost:%s/" % handler.port,
479 headers={"Range": "bytes=20-39"})
480 urllib.request.urlopen(req)
481 self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000482
483 def test_basic(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000484 handler = self.start_server()
485 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
486 for attr in ("read", "close", "info", "geturl"):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000487 self.assertTrue(hasattr(open_url, attr), "object returned from "
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000488 "urlopen lacks the %s attribute" % attr)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000489 try:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000490 self.assertTrue(open_url.read(), "calling 'read' failed")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000491 finally:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000492 open_url.close()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000493
494 def test_info(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000495 handler = self.start_server()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000496 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000497 open_url = urllib.request.urlopen(
498 "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000499 info_obj = open_url.info()
Ezio Melottie9615932010-01-24 19:26:24 +0000500 self.assertIsInstance(info_obj, email.message.Message,
501 "object returned by 'info' is not an "
502 "instance of email.message.Message")
Barry Warsaw820c1202008-06-12 04:06:45 +0000503 self.assertEqual(info_obj.get_content_subtype(), "plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000504 finally:
505 self.server.stop()
506
507 def test_geturl(self):
508 # Make sure same URL as opened is returned by geturl.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000509 handler = self.start_server()
510 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
511 url = open_url.geturl()
512 self.assertEqual(url, "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000513
514 def test_bad_address(self):
515 # Make sure proper exception is raised when connecting to a bogus
516 # address.
517 self.assertRaises(IOError,
R. David Murray8da3cac2009-09-29 14:01:08 +0000518 # Given that both VeriSign and various ISPs have in
519 # the past or are presently hijacking various invalid
520 # domain name requests in an attempt to boost traffic
521 # to their own sites, finding a domain name to use
522 # for this test is difficult. RFC2606 leads one to
523 # believe that '.invalid' should work, but experience
524 # seemed to indicate otherwise. Single character
525 # TLDs are likely to remain invalid, so this seems to
526 # be the best choice. The trailing '.' prevents a
527 # related problem: The normal DNS resolver appends
528 # the domain names from the search path if there is
529 # no '.' the end and, and if one of those domains
530 # implements a '*' rule a result is returned.
531 # However, none of this will prevent the test from
532 # failing if the ISP hijacks all invalid domain
533 # requests. The real solution would be to be able to
534 # parameterize the framework with a mock resolver.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000535 urllib.request.urlopen,
R. David Murray8da3cac2009-09-29 14:01:08 +0000536 "http://sadflkjsasf.i.nvali.d./")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000537
Florent Xicluna37d3d9a2010-08-08 16:25:27 +0000538 def test_iteration(self):
539 expected_response = b"pycon 2008..."
540 handler = self.start_server([(200, [], expected_response)])
541 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
542 for line in data:
543 self.assertEqual(line, expected_response)
544
545 def test_line_iteration(self):
546 lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
547 expected_response = b"".join(lines)
548 handler = self.start_server([(200, [], expected_response)])
549 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
550 for index, line in enumerate(data):
551 self.assertEqual(line, lines[index],
552 "Fetched line number %s doesn't match expected:\n"
553 " Expected length was %s, got %s" %
554 (index, len(lines[index]), len(line)))
555 self.assertEqual(index + 1, len(lines))
556
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000557
558@support.reap_threads
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000559def test_main():
Collin Winter9a4414d2009-05-18 22:32:26 +0000560 support.run_unittest(ProxyAuthTests, TestUrlopen)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000561
562if __name__ == "__main__":
563 test_main()