blob: 08250c36e5d6a9b20e12789e4030e315e4770882 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Guido van Rossumcd16bf62007-06-13 18:07:49 +00002
Antoine Pitrou803e6d62010-10-13 10:36:15 +00003import os
Barry Warsaw820c1202008-06-12 04:06:45 +00004import email
Jeremy Hylton1afc1692008-06-18 20:49:58 +00005import urllib.parse
6import urllib.request
Georg Brandl24420152008-05-26 16:32:26 +00007import http.server
Guido van Rossumcd16bf62007-06-13 18:07:49 +00008import unittest
9import hashlib
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010from test import support
Victor Stinner45df8202010-04-28 22:31:17 +000011threading = support.import_module('threading')
Antoine Pitrouda232592013-02-05 21:20:51 +010012try:
13 import ssl
14except ImportError:
15 ssl = None
Antoine Pitrou803e6d62010-10-13 10:36:15 +000016
17here = os.path.dirname(__file__)
18# Self-signed cert file for 'localhost'
19CERT_localhost = os.path.join(here, 'keycert.pem')
20# Self-signed cert file for 'fakehostname'
21CERT_fakehostname = os.path.join(here, 'keycert2.pem')
22
Antoine Pitrouda232592013-02-05 21:20:51 +010023
Guido van Rossumcd16bf62007-06-13 18:07:49 +000024# Loopback http server infrastructure
25
Georg Brandl24420152008-05-26 16:32:26 +000026class LoopbackHttpServer(http.server.HTTPServer):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000027 """HTTP server w/ a few modifications that make it useful for
28 loopback testing purposes.
29 """
30
31 def __init__(self, server_address, RequestHandlerClass):
Georg Brandl24420152008-05-26 16:32:26 +000032 http.server.HTTPServer.__init__(self,
33 server_address,
34 RequestHandlerClass)
Guido van Rossumcd16bf62007-06-13 18:07:49 +000035
36 # Set the timeout of our listening socket really low so
37 # that we can stop the server easily.
Antoine Pitrou803e6d62010-10-13 10:36:15 +000038 self.socket.settimeout(0.1)
Guido van Rossumcd16bf62007-06-13 18:07:49 +000039
40 def get_request(self):
Georg Brandl24420152008-05-26 16:32:26 +000041 """HTTPServer method, overridden."""
Guido van Rossumcd16bf62007-06-13 18:07:49 +000042
43 request, client_address = self.socket.accept()
44
45 # It's a loopback connection, so setting the timeout
46 # really low shouldn't affect anything, but should make
47 # deadlocks less likely to occur.
48 request.settimeout(10.0)
49
50 return (request, client_address)
51
52class LoopbackHttpServerThread(threading.Thread):
53 """Stoppable thread that runs a loopback http server."""
54
Guido van Rossum806c2462007-08-06 23:33:07 +000055 def __init__(self, request_handler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000056 threading.Thread.__init__(self)
Guido van Rossum4566c712007-08-21 03:36:47 +000057 self._stop_server = False
Guido van Rossumcd16bf62007-06-13 18:07:49 +000058 self.ready = threading.Event()
Guido van Rossum806c2462007-08-06 23:33:07 +000059 request_handler.protocol_version = "HTTP/1.0"
Jeremy Hylton1afc1692008-06-18 20:49:58 +000060 self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
Guido van Rossum806c2462007-08-06 23:33:07 +000061 request_handler)
62 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
63 # self.httpd.server_port)
64 self.port = self.httpd.server_port
Guido van Rossumcd16bf62007-06-13 18:07:49 +000065
66 def stop(self):
67 """Stops the webserver if it's currently running."""
68
69 # Set the stop flag.
Guido van Rossum4566c712007-08-21 03:36:47 +000070 self._stop_server = True
Guido van Rossumcd16bf62007-06-13 18:07:49 +000071
72 self.join()
Antoine Pitroub6751dc2010-10-30 17:33:22 +000073 self.httpd.server_close()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000074
75 def run(self):
Guido van Rossumcd16bf62007-06-13 18:07:49 +000076 self.ready.set()
Guido van Rossum4566c712007-08-21 03:36:47 +000077 while not self._stop_server:
Guido van Rossum806c2462007-08-06 23:33:07 +000078 self.httpd.handle_request()
Guido van Rossumcd16bf62007-06-13 18:07:49 +000079
80# Authentication infrastructure
81
82class DigestAuthHandler:
83 """Handler for performing digest authentication."""
84
85 def __init__(self):
86 self._request_num = 0
87 self._nonces = []
88 self._users = {}
89 self._realm_name = "Test Realm"
90 self._qop = "auth"
91
92 def set_qop(self, qop):
93 self._qop = qop
94
95 def set_users(self, users):
96 assert isinstance(users, dict)
97 self._users = users
98
99 def set_realm(self, realm):
100 self._realm_name = realm
101
102 def _generate_nonce(self):
103 self._request_num += 1
Guido van Rossum81360142007-08-29 14:26:52 +0000104 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000105 self._nonces.append(nonce)
106 return nonce
107
108 def _create_auth_dict(self, auth_str):
109 first_space_index = auth_str.find(" ")
110 auth_str = auth_str[first_space_index+1:]
111
112 parts = auth_str.split(",")
113
114 auth_dict = {}
115 for part in parts:
116 name, value = part.split("=")
117 name = name.strip()
118 if value[0] == '"' and value[-1] == '"':
119 value = value[1:-1]
120 else:
121 value = value.strip()
122 auth_dict[name] = value
123 return auth_dict
124
125 def _validate_auth(self, auth_dict, password, method, uri):
126 final_dict = {}
127 final_dict.update(auth_dict)
128 final_dict["password"] = password
129 final_dict["method"] = method
130 final_dict["uri"] = uri
131 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000132 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000133 HA2_str = "%(method)s:%(uri)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000134 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000135 final_dict["HA1"] = HA1
136 final_dict["HA2"] = HA2
137 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
138 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
Guido van Rossum81360142007-08-29 14:26:52 +0000139 response = hashlib.md5(response_str.encode("ascii")).hexdigest()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000140
141 return response == auth_dict["response"]
142
143 def _return_auth_challenge(self, request_handler):
144 request_handler.send_response(407, "Proxy Authentication Required")
145 request_handler.send_header("Content-Type", "text/html")
146 request_handler.send_header(
147 'Proxy-Authenticate', 'Digest realm="%s", '
148 'qop="%s",'
149 'nonce="%s", ' % \
150 (self._realm_name, self._qop, self._generate_nonce()))
151 # XXX: Not sure if we're supposed to add this next header or
152 # not.
153 #request_handler.send_header('Connection', 'close')
154 request_handler.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000155 request_handler.wfile.write(b"Proxy Authentication Required.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000156 return False
157
158 def handle_request(self, request_handler):
159 """Performs digest authentication on the given HTTP request
160 handler. Returns True if authentication was successful, False
161 otherwise.
162
163 If no users have been set, then digest auth is effectively
164 disabled and this method will always return True.
165 """
166
167 if len(self._users) == 0:
168 return True
169
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000170 if "Proxy-Authorization" not in request_handler.headers:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000171 return self._return_auth_challenge(request_handler)
172 else:
173 auth_dict = self._create_auth_dict(
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000174 request_handler.headers["Proxy-Authorization"]
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000175 )
176 if auth_dict["username"] in self._users:
177 password = self._users[ auth_dict["username"] ]
178 else:
179 return self._return_auth_challenge(request_handler)
180 if not auth_dict.get("nonce") in self._nonces:
181 return self._return_auth_challenge(request_handler)
182 else:
183 self._nonces.remove(auth_dict["nonce"])
184
185 auth_validated = False
186
187 # MSIE uses short_path in its validation, but Python's
Florent Xicluna419e3842010-08-08 16:16:07 +0000188 # urllib.request uses the full path, so we're going to see if
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000189 # either of them works here.
190
191 for path in [request_handler.path, request_handler.short_path]:
192 if self._validate_auth(auth_dict,
193 password,
194 request_handler.command,
195 path):
196 auth_validated = True
197
198 if not auth_validated:
199 return self._return_auth_challenge(request_handler)
200 return True
201
202# Proxy test infrastructure
203
Georg Brandl24420152008-05-26 16:32:26 +0000204class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000205 """This is a 'fake proxy' that makes it look like the entire
206 internet has gone down due to a sudden zombie invasion. It main
207 utility is in providing us with authentication support for
208 testing.
209 """
210
Collin Winter9a4414d2009-05-18 22:32:26 +0000211 def __init__(self, digest_auth_handler, *args, **kwargs):
212 # This has to be set before calling our parent's __init__(), which will
213 # try to call do_GET().
214 self.digest_auth_handler = digest_auth_handler
215 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000216
217 def log_message(self, format, *args):
218 # Uncomment the next line for debugging.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000219 # sys.stderr.write(format % args)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000220 pass
221
222 def do_GET(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000223 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
224 self.path, "http")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000225 self.short_path = path
226 if self.digest_auth_handler.handle_request(self):
227 self.send_response(200, "OK")
228 self.send_header("Content-Type", "text/html")
229 self.end_headers()
Guido van Rossum8a392d72007-11-21 22:09:45 +0000230 self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
231 "ascii"))
232 self.wfile.write(b"Our apologies, but our server is down due to "
233 b"a sudden zombie invasion.")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000234
235# Test cases
236
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000237class ProxyAuthTests(unittest.TestCase):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000238 URL = "http://localhost"
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000239
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000240 USER = "tester"
241 PASSWD = "test123"
242 REALM = "TestRealm"
243
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000244 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000245 super(ProxyAuthTests, self).setUp()
Collin Winter9a4414d2009-05-18 22:32:26 +0000246 self.digest_auth_handler = DigestAuthHandler()
247 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
248 self.digest_auth_handler.set_realm(self.REALM)
249 def create_fake_proxy_handler(*args, **kwargs):
250 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000251
Collin Winter9a4414d2009-05-18 22:32:26 +0000252 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000253 self.server.start()
254 self.server.ready.wait()
Guido van Rossum806c2462007-08-06 23:33:07 +0000255 proxy_url = "http://127.0.0.1:%d" % self.server.port
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000256 handler = urllib.request.ProxyHandler({"http" : proxy_url})
Collin Winter9a4414d2009-05-18 22:32:26 +0000257 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000258 self.opener = urllib.request.build_opener(
Collin Winter9a4414d2009-05-18 22:32:26 +0000259 handler, self.proxy_digest_handler)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000260
261 def tearDown(self):
262 self.server.stop()
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000263 super(ProxyAuthTests, self).tearDown()
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000264
265 def test_proxy_with_bad_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000266 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000267 self.USER, self.PASSWD+"bad")
Collin Winter9a4414d2009-05-18 22:32:26 +0000268 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000269 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000270 self.opener.open,
271 self.URL)
272
273 def test_proxy_with_no_password_raises_httperror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000274 self.digest_auth_handler.set_qop("auth")
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000275 self.assertRaises(urllib.error.HTTPError,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000276 self.opener.open,
277 self.URL)
278
279 def test_proxy_qop_auth_works(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000280 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000281 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000282 self.digest_auth_handler.set_qop("auth")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000283 result = self.opener.open(self.URL)
284 while result.read():
285 pass
286 result.close()
287
288 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
Collin Winter9a4414d2009-05-18 22:32:26 +0000289 self.proxy_digest_handler.add_password(self.REALM, self.URL,
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000290 self.USER, self.PASSWD)
Collin Winter9a4414d2009-05-18 22:32:26 +0000291 self.digest_auth_handler.set_qop("auth-int")
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000292 try:
293 result = self.opener.open(self.URL)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000294 except urllib.error.URLError:
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000295 # It's okay if we don't support auth-int, but we certainly
296 # shouldn't receive any kind of exception here other than
297 # a URLError.
298 result = None
299 if result:
300 while result.read():
301 pass
302 result.close()
303
Christian Heimesbbe741d2008-03-28 10:53:29 +0000304
305def GetRequestHandler(responses):
306
Georg Brandl24420152008-05-26 16:32:26 +0000307 class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
Christian Heimesbbe741d2008-03-28 10:53:29 +0000308
309 server_version = "TestHTTP/"
310 requests = []
311 headers_received = []
312 port = 80
313
314 def do_GET(self):
315 body = self.send_head()
Florent Xicluna37d3d9a2010-08-08 16:25:27 +0000316 while body:
317 done = self.wfile.write(body)
318 body = body[done:]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000319
320 def do_POST(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000321 content_length = self.headers["Content-Length"]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000322 post_data = self.rfile.read(int(content_length))
323 self.do_GET()
324 self.requests.append(post_data)
325
326 def send_head(self):
327 FakeHTTPRequestHandler.headers_received = self.headers
328 self.requests.append(self.path)
329 response_code, headers, body = responses.pop(0)
330
331 self.send_response(response_code)
332
333 for (header, value) in headers:
Antoine Pitroub353c122009-02-11 00:39:14 +0000334 self.send_header(header, value % {'port':self.port})
Christian Heimesbbe741d2008-03-28 10:53:29 +0000335 if body:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000336 self.send_header("Content-type", "text/plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000337 self.end_headers()
338 return body
339 self.end_headers()
340
341 def log_message(self, *args):
342 pass
343
344
345 return FakeHTTPRequestHandler
346
347
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000348class TestUrlopen(unittest.TestCase):
Florent Xicluna419e3842010-08-08 16:16:07 +0000349 """Tests urllib.request.urlopen using the network.
Christian Heimesbbe741d2008-03-28 10:53:29 +0000350
351 These tests are not exhaustive. Assuming that testing using files does a
352 good job overall of some of the basic interface features. There are no
353 tests exercising the optional 'data' and 'proxies' arguments. No tests
354 for transparent redirection have been written.
355 """
356
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000357 def setUp(self):
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000358 super(TestUrlopen, self).setUp()
Senthil Kumaran303eb472012-12-26 01:45:58 -0800359 # Ignore proxies for localhost tests.
Antoine Pitrouda232592013-02-05 21:20:51 +0100360 self.old_environ = os.environ.copy()
Senthil Kumaran303eb472012-12-26 01:45:58 -0800361 os.environ['NO_PROXY'] = '*'
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000362 self.server = None
363
364 def tearDown(self):
365 if self.server is not None:
366 self.server.stop()
Antoine Pitrouda232592013-02-05 21:20:51 +0100367 os.environ.clear()
368 os.environ.update(self.old_environ)
Florent Xicluna9b86b9a2010-03-19 19:00:44 +0000369 super(TestUrlopen, self).tearDown()
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000370
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000371 def urlopen(self, url, data=None, **kwargs):
Antoine Pitroub353c122009-02-11 00:39:14 +0000372 l = []
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000373 f = urllib.request.urlopen(url, data, **kwargs)
Antoine Pitroub353c122009-02-11 00:39:14 +0000374 try:
375 # Exercise various methods
376 l.extend(f.readlines(200))
377 l.append(f.readline())
378 l.append(f.read(1024))
379 l.append(f.read())
380 finally:
381 f.close()
382 return b"".join(l)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000383
384 def start_server(self, responses=None):
385 if responses is None:
386 responses = [(200, [], b"we don't care")]
Christian Heimesbbe741d2008-03-28 10:53:29 +0000387 handler = GetRequestHandler(responses)
388
389 self.server = LoopbackHttpServerThread(handler)
390 self.server.start()
391 self.server.ready.wait()
392 port = self.server.port
393 handler.port = port
394 return handler
395
Antoine Pitrouda232592013-02-05 21:20:51 +0100396 def start_https_server(self, responses=None, **kwargs):
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000397 if not hasattr(urllib.request, 'HTTPSHandler'):
398 self.skipTest('ssl support required')
399 from test.ssl_servers import make_https_server
400 if responses is None:
401 responses = [(200, [], b"we care a bit")]
402 handler = GetRequestHandler(responses)
Antoine Pitrouda232592013-02-05 21:20:51 +0100403 server = make_https_server(self, handler_class=handler, **kwargs)
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000404 handler.port = server.port
405 return handler
406
Christian Heimesbbe741d2008-03-28 10:53:29 +0000407 def test_redirection(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000408 expected_response = b"We got here..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000409 responses = [
Antoine Pitroub353c122009-02-11 00:39:14 +0000410 (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
411 ""),
Christian Heimesbbe741d2008-03-28 10:53:29 +0000412 (200, [], expected_response)
413 ]
414
415 handler = self.start_server(responses)
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000416 data = self.urlopen("http://localhost:%s/" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000417 self.assertEqual(data, expected_response)
418 self.assertEqual(handler.requests, ["/", "/somewhere_else"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000419
Antoine Pitroub353c122009-02-11 00:39:14 +0000420 def test_chunked(self):
421 expected_response = b"hello world"
422 chunked_start = (
423 b'a\r\n'
424 b'hello worl\r\n'
425 b'1\r\n'
426 b'd\r\n'
427 b'0\r\n'
428 )
429 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
430 handler = self.start_server(response)
431 data = self.urlopen("http://localhost:%s/" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000432 self.assertEqual(data, expected_response)
Antoine Pitroub353c122009-02-11 00:39:14 +0000433
Christian Heimesbbe741d2008-03-28 10:53:29 +0000434 def test_404(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000435 expected_response = b"Bad bad bad..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000436 handler = self.start_server([(404, [], expected_response)])
437
438 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000439 self.urlopen("http://localhost:%s/weeble" % handler.port)
440 except urllib.error.URLError as f:
441 data = f.read()
442 f.close()
443 else:
444 self.fail("404 should raise URLError")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000445
Florent Xicluna419e3842010-08-08 16:16:07 +0000446 self.assertEqual(data, expected_response)
447 self.assertEqual(handler.requests, ["/weeble"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000448
449 def test_200(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000450 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000451 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000452 data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
Florent Xicluna419e3842010-08-08 16:16:07 +0000453 self.assertEqual(data, expected_response)
454 self.assertEqual(handler.requests, ["/bizarre"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000455
456 def test_200_with_parameters(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000457 expected_response = b"pycon 2008..."
Christian Heimesbbe741d2008-03-28 10:53:29 +0000458 handler = self.start_server([(200, [], expected_response)])
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000459 data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
460 b"get=with_feeling")
Florent Xicluna419e3842010-08-08 16:16:07 +0000461 self.assertEqual(data, expected_response)
462 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
Christian Heimesbbe741d2008-03-28 10:53:29 +0000463
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000464 def test_https(self):
465 handler = self.start_https_server()
466 data = self.urlopen("https://localhost:%s/bizarre" % handler.port)
467 self.assertEqual(data, b"we care a bit")
468
469 def test_https_with_cafile(self):
470 handler = self.start_https_server(certfile=CERT_localhost)
471 import ssl
472 # Good cert
473 data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
474 cafile=CERT_localhost)
475 self.assertEqual(data, b"we care a bit")
476 # Bad cert
477 with self.assertRaises(urllib.error.URLError) as cm:
478 self.urlopen("https://localhost:%s/bizarre" % handler.port,
479 cafile=CERT_fakehostname)
480 # Good cert, but mismatching hostname
481 handler = self.start_https_server(certfile=CERT_fakehostname)
482 with self.assertRaises(ssl.CertificateError) as cm:
483 self.urlopen("https://localhost:%s/bizarre" % handler.port,
484 cafile=CERT_fakehostname)
485
Antoine Pitroude9ac6c2012-05-16 21:40:01 +0200486 def test_https_with_cadefault(self):
487 handler = self.start_https_server(certfile=CERT_localhost)
488 # Self-signed cert should fail verification with system certificate store
489 with self.assertRaises(urllib.error.URLError) as cm:
490 self.urlopen("https://localhost:%s/bizarre" % handler.port,
491 cadefault=True)
492
Antoine Pitrouda232592013-02-05 21:20:51 +0100493 def test_https_sni(self):
494 if ssl is None:
495 self.skipTest("ssl module required")
496 if not ssl.HAS_SNI:
497 self.skipTest("SNI support required in OpenSSL")
498 sni_name = None
499 def cb_sni(ssl_sock, server_name, initial_context):
500 nonlocal sni_name
501 sni_name = server_name
502 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
503 context.set_servername_callback(cb_sni)
504 handler = self.start_https_server(context=context, certfile=CERT_localhost)
505 self.urlopen("https://localhost:%s" % handler.port)
506 self.assertEqual(sni_name, "localhost")
507
Christian Heimesbbe741d2008-03-28 10:53:29 +0000508 def test_sending_headers(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000509 handler = self.start_server()
510 req = urllib.request.Request("http://localhost:%s/" % handler.port,
511 headers={"Range": "bytes=20-39"})
512 urllib.request.urlopen(req)
513 self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000514
515 def test_basic(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000516 handler = self.start_server()
517 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
518 for attr in ("read", "close", "info", "geturl"):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000519 self.assertTrue(hasattr(open_url, attr), "object returned from "
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000520 "urlopen lacks the %s attribute" % attr)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000521 try:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000522 self.assertTrue(open_url.read(), "calling 'read' failed")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000523 finally:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000524 open_url.close()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000525
526 def test_info(self):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000527 handler = self.start_server()
Christian Heimesbbe741d2008-03-28 10:53:29 +0000528 try:
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000529 open_url = urllib.request.urlopen(
530 "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000531 info_obj = open_url.info()
Ezio Melottie9615932010-01-24 19:26:24 +0000532 self.assertIsInstance(info_obj, email.message.Message,
533 "object returned by 'info' is not an "
534 "instance of email.message.Message")
Barry Warsaw820c1202008-06-12 04:06:45 +0000535 self.assertEqual(info_obj.get_content_subtype(), "plain")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000536 finally:
537 self.server.stop()
538
539 def test_geturl(self):
540 # Make sure same URL as opened is returned by geturl.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000541 handler = self.start_server()
542 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
543 url = open_url.geturl()
544 self.assertEqual(url, "http://localhost:%s" % handler.port)
Christian Heimesbbe741d2008-03-28 10:53:29 +0000545
546 def test_bad_address(self):
547 # Make sure proper exception is raised when connecting to a bogus
548 # address.
Ezio Melotti90984722013-03-30 01:28:40 +0200549
550 # as indicated by the comment below, this might fail with some ISP,
551 # so we run the test only when -unetwork/-uall is specified to
552 # mitigate the problem a bit (see #17564)
553 support.requires('network')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200554 self.assertRaises(OSError,
R. David Murray8da3cac2009-09-29 14:01:08 +0000555 # Given that both VeriSign and various ISPs have in
556 # the past or are presently hijacking various invalid
557 # domain name requests in an attempt to boost traffic
558 # to their own sites, finding a domain name to use
559 # for this test is difficult. RFC2606 leads one to
560 # believe that '.invalid' should work, but experience
561 # seemed to indicate otherwise. Single character
562 # TLDs are likely to remain invalid, so this seems to
563 # be the best choice. The trailing '.' prevents a
564 # related problem: The normal DNS resolver appends
565 # the domain names from the search path if there is
566 # no '.' the end and, and if one of those domains
567 # implements a '*' rule a result is returned.
568 # However, none of this will prevent the test from
569 # failing if the ISP hijacks all invalid domain
570 # requests. The real solution would be to be able to
571 # parameterize the framework with a mock resolver.
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000572 urllib.request.urlopen,
R. David Murray8da3cac2009-09-29 14:01:08 +0000573 "http://sadflkjsasf.i.nvali.d./")
Christian Heimesbbe741d2008-03-28 10:53:29 +0000574
Florent Xicluna37d3d9a2010-08-08 16:25:27 +0000575 def test_iteration(self):
576 expected_response = b"pycon 2008..."
577 handler = self.start_server([(200, [], expected_response)])
578 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
579 for line in data:
580 self.assertEqual(line, expected_response)
581
582 def test_line_iteration(self):
583 lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
584 expected_response = b"".join(lines)
585 handler = self.start_server([(200, [], expected_response)])
586 data = urllib.request.urlopen("http://localhost:%s" % handler.port)
587 for index, line in enumerate(data):
588 self.assertEqual(line, lines[index],
589 "Fetched line number %s doesn't match expected:\n"
590 " Expected length was %s, got %s" %
591 (index, len(lines[index]), len(line)))
592 self.assertEqual(index + 1, len(lines))
593
Antoine Pitrou803e6d62010-10-13 10:36:15 +0000594
595@support.reap_threads
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000596def test_main():
Collin Winter9a4414d2009-05-18 22:32:26 +0000597 support.run_unittest(ProxyAuthTests, TestUrlopen)
Guido van Rossumcd16bf62007-06-13 18:07:49 +0000598
599if __name__ == "__main__":
600 test_main()