blob: 9e1ce5bb9eda7316032232dd1320528251ddca95 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Guido van Rossumcd16bf62007-06-13 18:07:49 +00002
Antoine Pitrou803e6d62010-10-13 10:36:15 +00003import os
Barry Warsaw820c1202008-06-12 04:06:45 +00004import email
Jeremy Hylton1afc1692008-06-18 20:49:58 +00005import urllib.parse
6import urllib.request
Georg Brandl24420152008-05-26 16:32:26 +00007import http.server
Guido van Rossumcd16bf62007-06-13 18:07:49 +00008import unittest
9import hashlib
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000011threading = support.import_module('threading')
Guido van Rossumcd16bf62007-06-13 18:07:49 +000012
Antoine Pitrou803e6d62010-10-13 10:36:15 +000013
14here = os.path.dirname(__file__)
15# Self-signed cert file for 'localhost'
16CERT_localhost = os.path.join(here, 'keycert.pem')
17# Self-signed cert file for 'fakehostname'
18CERT_fakehostname = os.path.join(here, 'keycert2.pem')
19
Guido van Rossumcd16bf62007-06-13 18:07:49 +000020# Loopback http server infrastructure
21
Georg Brandl24420152008-05-26 16:32:26 +000022class LoopbackHttpServer(http.server.HTTPServer):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000023 """HTTP server w/ a few modifications that make it useful for
24 loopback testing purposes.
25 """
26
27 def __init__(self, server_address, RequestHandlerClass):
Georg Brandl24420152008-05-26 16:32:26 +000028 http.server.HTTPServer.__init__(self,
29 server_address,
30 RequestHandlerClass)
Guido van Rossumcd16bf62007-06-13 18:07:49 +000031
32 # Set the timeout of our listening socket really low so
33 # that we can stop the server easily.
Antoine Pitrou803e6d62010-10-13 10:36:15 +000034 self.socket.settimeout(0.1)
Guido van Rossumcd16bf62007-06-13 18:07:49 +000035
36 def get_request(self):
Georg Brandl24420152008-05-26 16:32:26 +000037 """HTTPServer method, overridden."""
Guido van Rossumcd16bf62007-06-13 18:07:49 +000038
39 request, client_address = self.socket.accept()
40
41 # It's a loopback connection, so setting the timeout
42 # really low shouldn't affect anything, but should make
43 # deadlocks less likely to occur.
44 request.settimeout(10.0)
45
46 return (request, client_address)
47
48class LoopbackHttpServerThread(threading.Thread):
49 """Stoppable thread that runs a loopback http server."""
50
Guido van Rossum806c2462007-08-06 23:33:07 +000051 def __init__(self, request_handler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000052 threading.Thread.__init__(self)
Guido van Rossum4566c712007-08-21 03:36:47 +000053 self._stop_server = False
Guido van Rossumcd16bf62007-06-13 18:07:49 +000054 self.ready = threading.Event()
Guido van Rossum806c2462007-08-06 23:33:07 +000055 request_handler.protocol_version = "HTTP/1.0"
Jeremy Hylton1afc1692008-06-18 20:49:58 +000056 self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
Guido van Rossum806c2462007-08-06 23:33:07 +000057 request_handler)
58 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
59 # self.httpd.server_port)
60 self.port = self.httpd.server_port
Guido van Rossumcd16bf62007-06-13 18:07:49 +000061
62 def stop(self):
63 """Stops the webserver if it's currently running."""
64
65 # Set the stop flag.
Guido van Rossum4566c712007-08-21 03:36:47 +000066 self._stop_server = True
Guido van Rossumcd16bf62007-06-13 18:07:49 +000067
68 self.join()
Antoine Pitroub6751dc2010-10-30 17:33:22 +000069 self.httpd.server_close()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000070
71 def run(self):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000072 self.ready.set()
Guido van Rossum4566c712007-08-21 03:36:47 +000073 while not self._stop_server:
Guido van Rossum806c2462007-08-06 23:33:07 +000074 self.httpd.handle_request()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000075
76# Authentication infrastructure
77
78class DigestAuthHandler:
79 """Handler for performing digest authentication."""
80
81 def __init__(self):
82 self._request_num = 0
83 self._nonces = []
84 self._users = {}
85 self._realm_name = "Test Realm"
86 self._qop = "auth"
87
88 def set_qop(self, qop):
89 self._qop = qop
90
91 def set_users(self, users):
92 assert isinstance(users, dict)
93 self._users = users
94
95 def set_realm(self, realm):
96 self._realm_name = realm
97
98 def _generate_nonce(self):
99 self._request_num += 1
Guido van Rossum81360142007-08-29 14:26:52 +0000100 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000101 self._nonces.append(nonce)
102 return nonce
103
104 def _create_auth_dict(self, auth_str):
105 first_space_index = auth_str.find(" ")
106 auth_str = auth_str[first_space_index+1:]
107
108 parts = auth_str.split(",")
109
110 auth_dict = {}
111 for part in parts:
112 name, value = part.split("=")
113 name = name.strip()
114 if value[0] == '"' and value[-1] == '"':
115 value = value[1:-1]
116 else:
117 value = value.strip()
118 auth_dict[name] = value
119 return auth_dict
120
121 def _validate_auth(self, auth_dict, password, method, uri):
122 final_dict = {}
123 final_dict.update(auth_dict)
124 final_dict["password"] = password
125 final_dict["method"] = method
126 final_dict["uri"] = uri
127 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000128 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000129 HA2_str = "%(method)s:%(uri)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000130 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000131 final_dict["HA1"] = HA1
132 final_dict["HA2"] = HA2
133 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
134 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000135 response = hashlib.md5(response_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000136
137 return response == auth_dict["response"]
138
139 def _return_auth_challenge(self, request_handler):
140 request_handler.send_response(407, "Proxy Authentication Required")
141 request_handler.send_header("Content-Type", "text/html")
142 request_handler.send_header(
143 'Proxy-Authenticate', 'Digest realm="%s", '
144 'qop="%s",'
145 'nonce="%s", ' % \
146 (self._realm_name, self._qop, self._generate_nonce()))
147 # XXX: Not sure if we're supposed to add this next header or
148 # not.
149 #request_handler.send_header('Connection', 'close')
150 request_handler.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000151 request_handler.wfile.write(b"Proxy Authentication Required.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000152 return False
153
154 def handle_request(self, request_handler):
155 """Performs digest authentication on the given HTTP request
156 handler. Returns True if authentication was successful, False
157 otherwise.
158
159 If no users have been set, then digest auth is effectively
160 disabled and this method will always return True.
161 """
162
163 if len(self._users) == 0:
164 return True
165
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000166 if "Proxy-Authorization" not in request_handler.headers:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000167 return self._return_auth_challenge(request_handler)
168 else:
169 auth_dict = self._create_auth_dict(
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000170 request_handler.headers["Proxy-Authorization"]
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000171 )
172 if auth_dict["username"] in self._users:
173 password = self._users[ auth_dict["username"] ]
174 else:
175 return self._return_auth_challenge(request_handler)
176 if not auth_dict.get("nonce") in self._nonces:
177 return self._return_auth_challenge(request_handler)
178 else:
179 self._nonces.remove(auth_dict["nonce"])
180
181 auth_validated = False
182
183 # MSIE uses short_path in its validation, but Python's
Florent Xicluna419e3842010-08-08 16:16:07 +0000184 # urllib.request uses the full path, so we're going to see if
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000185 # either of them works here.
186
187 for path in [request_handler.path, request_handler.short_path]:
188 if self._validate_auth(auth_dict,
189 password,
190 request_handler.command,
191 path):
192 auth_validated = True
193
194 if not auth_validated:
195 return self._return_auth_challenge(request_handler)
196 return True
197
198# Proxy test infrastructure
199
Georg Brandl24420152008-05-26 16:32:26 +0000200class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000201 """This is a 'fake proxy' that makes it look like the entire
202 internet has gone down due to a sudden zombie invasion. It main
203 utility is in providing us with authentication support for
204 testing.
205 """
206
Collin Winter9a4414d2009-05-18 22:32:26 +0000207 def __init__(self, digest_auth_handler, *args, **kwargs):
208 # This has to be set before calling our parent's __init__(), which will
209 # try to call do_GET().
210 self.digest_auth_handler = digest_auth_handler
211 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000212
213 def log_message(self, format, *args):
214 # Uncomment the next line for debugging.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000215 # sys.stderr.write(format % args)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000216 pass
217
218 def do_GET(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000219 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
220 self.path, "http")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000221 self.short_path = path
222 if self.digest_auth_handler.handle_request(self):
223 self.send_response(200, "OK")
224 self.send_header("Content-Type", "text/html")
225 self.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000226 self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
227 "ascii"))
228 self.wfile.write(b"Our apologies, but our server is down due to "
229 b"a sudden zombie invasion.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000230
231# Test cases
232
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000233class ProxyAuthTests(unittest.TestCase):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000234 URL = "http://localhost"
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000235
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000236 USER = "tester"
237 PASSWD = "test123"
238 REALM = "TestRealm"
239
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000240 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000241 super(ProxyAuthTests, self).setUp()
Collin Winter9a4414d2009-05-18 22:32:26 +0000242 self.digest_auth_handler = DigestAuthHandler()
243 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
244 self.digest_auth_handler.set_realm(self.REALM)
245 def create_fake_proxy_handler(*args, **kwargs):
246 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000247
Collin Winter9a4414d2009-05-18 22:32:26 +0000248 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000249 self.server.start()
250 self.server.ready.wait()
Guido van Rossum806c2462007-08-06 23:33:07 +0000251 proxy_url = "http://127.0.0.1:%d" % self.server.port
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000252 handler = urllib.request.ProxyHandler({"http" : proxy_url})
Collin Winter9a4414d2009-05-18 22:32:26 +0000253 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000254 self.opener = urllib.request.build_opener(
Collin Winter9a4414d2009-05-18 22:32:26 +0000255 handler, self.proxy_digest_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000256
257 def tearDown(self):
258 self.server.stop()
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000259 super(ProxyAuthTests, self).tearDown()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000260
261 def test_proxy_with_bad_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000262 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000263 self.USER, self.PASSWD+"bad")
Collin Winter9a4414d2009-05-18 22:32:26 +0000264 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000265 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000266 self.opener.open,
267 self.URL)
268
269 def test_proxy_with_no_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000270 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000271 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000272 self.opener.open,
273 self.URL)
274
275 def test_proxy_qop_auth_works(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000276 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000277 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000278 self.digest_auth_handler.set_qop("auth")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000279 result = self.opener.open(self.URL)
280 while result.read():
281 pass
282 result.close()
283
284 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000285 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000286 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000287 self.digest_auth_handler.set_qop("auth-int")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000288 try:
289 result = self.opener.open(self.URL)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000290 except urllib.error.URLError:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000291 # It's okay if we don't support auth-int, but we certainly
292 # shouldn't receive any kind of exception here other than
293 # a URLError.
294 result = None
295 if result:
296 while result.read():
297 pass
298 result.close()
299
Christian Heimesbbe741d2008-03-28 10:53:29 +0000300
301def GetRequestHandler(responses):
302
Georg Brandl24420152008-05-26 16:32:26 +0000303 class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000304
305 server_version = "TestHTTP/"
306 requests = []
307 headers_received = []
308 port = 80
309
310 def do_GET(self):
311 body = self.send_head()
Florent Xicluna37d3d9a2010-08-08 16:25:27 +0000312 while body:
313 done = self.wfile.write(body)
314 body = body[done:]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000315
316 def do_POST(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000317 content_length = self.headers["Content-Length"]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000318 post_data = self.rfile.read(int(content_length))
319 self.do_GET()
320 self.requests.append(post_data)
321
322 def send_head(self):
323 FakeHTTPRequestHandler.headers_received = self.headers
324 self.requests.append(self.path)
325 response_code, headers, body = responses.pop(0)
326
327 self.send_response(response_code)
328
329 for (header, value) in headers:
Antoine Pitroub353c122009-02-11 00:39:14 +0000330 self.send_header(header, value % {'port':self.port})
Christian Heimesbbe741d2008-03-28 10:53:29 +0000331 if body:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000332 self.send_header("Content-type", "text/plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000333 self.end_headers()
334 return body
335 self.end_headers()
336
337 def log_message(self, *args):
338 pass
339
340
341 return FakeHTTPRequestHandler
342
343
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000344class TestUrlopen(unittest.TestCase):
Florent Xicluna419e3842010-08-08 16:16:07 +0000345 """Tests urllib.request.urlopen using the network.
Christian Heimesbbe741d2008-03-28 10:53:29 +0000346
347 These tests are not exhaustive. Assuming that testing using files does a
348 good job overall of some of the basic interface features. There are no
349 tests exercising the optional 'data' and 'proxies' arguments. No tests
350 for transparent redirection have been written.
351 """
352
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000353 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000354 super(TestUrlopen, self).setUp()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000355 self.server = None
356
357 def tearDown(self):
358 if self.server is not None:
359 self.server.stop()
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000360 super(TestUrlopen, self).tearDown()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000361
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000362 def urlopen(self, url, data=None, **kwargs):
Antoine Pitroub353c122009-02-11 00:39:14 +0000363 l = []
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000364 f = urllib.request.urlopen(url, data, **kwargs)
Antoine Pitroub353c122009-02-11 00:39:14 +0000365 try:
366 # Exercise various methods
367 l.extend(f.readlines(200))
368 l.append(f.readline())
369 l.append(f.read(1024))
370 l.append(f.read())
371 finally:
372 f.close()
373 return b"".join(l)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000374
375 def start_server(self, responses=None):
376 if responses is None:
377 responses = [(200, [], b"we don't care")]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000378 handler = GetRequestHandler(responses)
379
380 self.server = LoopbackHttpServerThread(handler)
381 self.server.start()
382 self.server.ready.wait()
383 port = self.server.port
384 handler.port = port
385 return handler
386
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000387 def start_https_server(self, responses=None, certfile=CERT_localhost):
388 if not hasattr(urllib.request, 'HTTPSHandler'):
389 self.skipTest('ssl support required')
390 from test.ssl_servers import make_https_server
391 if responses is None:
392 responses = [(200, [], b"we care a bit")]
393 handler = GetRequestHandler(responses)
394 server = make_https_server(self, certfile=certfile, handler_class=handler)
395 handler.port = server.port
396 return handler
397
Christian Heimesbbe741d2008-03-28 10:53:29 +0000398 def test_redirection(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000399 expected_response = b"We got here..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000400 responses = [
Antoine Pitroub353c122009-02-11 00:39:14 +0000401 (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
402 ""),
Christian Heimesbbe741d2008-03-28 10:53:29 +0000403 (200, [], expected_response)
404 ]
405
406 handler = self.start_server(responses)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000407 data = self.urlopen("http://localhost:%s/" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000408 self.assertEqual(data, expected_response)
409 self.assertEqual(handler.requests, ["/", "/somewhere_else"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000410
Antoine Pitroub353c122009-02-11 00:39:14 +0000411 def test_chunked(self):
412 expected_response = b"hello world"
413 chunked_start = (
414 b'a\r\n'
415 b'hello worl\r\n'
416 b'1\r\n'
417 b'd\r\n'
418 b'0\r\n'
419 )
420 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
421 handler = self.start_server(response)
422 data = self.urlopen("http://localhost:%s/" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000423 self.assertEqual(data, expected_response)
Antoine Pitroub353c122009-02-11 00:39:14 +0000424
Christian Heimesbbe741d2008-03-28 10:53:29 +0000425 def test_404(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000426 expected_response = b"Bad bad bad..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000427 handler = self.start_server([(404, [], expected_response)])
428
429 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000430 self.urlopen("http://localhost:%s/weeble" % handler.port)
431 except urllib.error.URLError as f:
432 data = f.read()
433 f.close()
434 else:
435 self.fail("404 should raise URLError")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000436
Florent Xicluna419e3842010-08-08 16:16:07 +0000437 self.assertEqual(data, expected_response)
438 self.assertEqual(handler.requests, ["/weeble"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000439
440 def test_200(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000441 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000442 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000443 data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000444 self.assertEqual(data, expected_response)
445 self.assertEqual(handler.requests, ["/bizarre"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000446
447 def test_200_with_parameters(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000448 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000449 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000450 data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
451 b"get=with_feeling")
Florent Xicluna419e3842010-08-08 16:16:07 +0000452 self.assertEqual(data, expected_response)
453 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000454
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000455 def test_https(self):
456 handler = self.start_https_server()
457 data = self.urlopen("https://localhost:%s/bizarre" % handler.port)
458 self.assertEqual(data, b"we care a bit")
459
460 def test_https_with_cafile(self):
461 handler = self.start_https_server(certfile=CERT_localhost)
462 import ssl
463 # Good cert
464 data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
465 cafile=CERT_localhost)
466 self.assertEqual(data, b"we care a bit")
467 # Bad cert
468 with self.assertRaises(urllib.error.URLError) as cm:
469 self.urlopen("https://localhost:%s/bizarre" % handler.port,
470 cafile=CERT_fakehostname)
471 # Good cert, but mismatching hostname
472 handler = self.start_https_server(certfile=CERT_fakehostname)
473 with self.assertRaises(ssl.CertificateError) as cm:
474 self.urlopen("https://localhost:%s/bizarre" % handler.port,
475 cafile=CERT_fakehostname)
476
Christian Heimesbbe741d2008-03-28 10:53:29 +0000477 def test_sending_headers(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000478 handler = self.start_server()
479 req = urllib.request.Request("http://localhost:%s/" % handler.port,
480 headers={"Range": "bytes=20-39"})
481 urllib.request.urlopen(req)
482 self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000483
484 def test_basic(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000485 handler = self.start_server()
486 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
487 for attr in ("read", "close", "info", "geturl"):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000488 self.assertTrue(hasattr(open_url, attr), "object returned from "
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000489 "urlopen lacks the %s attribute" % attr)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000490 try:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000491 self.assertTrue(open_url.read(), "calling 'read' failed")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000492 finally:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000493 open_url.close()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000494
495 def test_info(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000496 handler = self.start_server()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000497 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000498 open_url = urllib.request.urlopen(
499 "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000500 info_obj = open_url.info()
Ezio Melottie9615932010-01-24 19:26:24 +0000501 self.assertIsInstance(info_obj, email.message.Message,
502 "object returned by 'info' is not an "
503 "instance of email.message.Message")
Barry Warsaw820c1202008-06-12 04:06:45 +0000504 self.assertEqual(info_obj.get_content_subtype(), "plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000505 finally:
506 self.server.stop()
507
508 def test_geturl(self):
509 # Make sure same URL as opened is returned by geturl.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000510 handler = self.start_server()
511 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
512 url = open_url.geturl()
513 self.assertEqual(url, "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000514
515 def test_bad_address(self):
516 # Make sure proper exception is raised when connecting to a bogus
517 # address.
518 self.assertRaises(IOError,
R. David Murray8da3cac2009-09-29 14:01:08 +0000519 # Given that both VeriSign and various ISPs have in
520 # the past or are presently hijacking various invalid
521 # domain name requests in an attempt to boost traffic
522 # to their own sites, finding a domain name to use
523 # for this test is difficult. RFC2606 leads one to
524 # believe that '.invalid' should work, but experience
525 # seemed to indicate otherwise. Single character
526 # TLDs are likely to remain invalid, so this seems to
527 # be the best choice. The trailing '.' prevents a
528 # related problem: The normal DNS resolver appends
529 # the domain names from the search path if there is
530 # no '.' the end and, and if one of those domains
531 # implements a '*' rule a result is returned.
532 # However, none of this will prevent the test from
533 # failing if the ISP hijacks all invalid domain
534 # requests. The real solution would be to be able to
535 # parameterize the framework with a mock resolver.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000536 urllib.request.urlopen,
R. David Murray8da3cac2009-09-29 14:01:08 +0000537 "http://sadflkjsasf.i.nvali.d./")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000538
Florent Xicluna37d3d9a2010-08-08 16:25:27 +0000539 def test_iteration(self):
540 expected_response = b"pycon 2008..."
541 handler = self.start_server([(200, [], expected_response)])
542 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
543 for line in data:
544 self.assertEqual(line, expected_response)
545
546 def test_line_iteration(self):
547 lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
548 expected_response = b"".join(lines)
549 handler = self.start_server([(200, [], expected_response)])
550 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
551 for index, line in enumerate(data):
552 self.assertEqual(line, lines[index],
553 "Fetched line number %s doesn't match expected:\n"
554 " Expected length was %s, got %s" %
555 (index, len(lines[index]), len(line)))
556 self.assertEqual(index + 1, len(lines))
557
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000558
559@support.reap_threads
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000560def test_main():
Collin Winter9a4414d2009-05-18 22:32:26 +0000561 support.run_unittest(ProxyAuthTests, TestUrlopen)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000562
563if __name__ == "__main__":
564 test_main()