blob: d394ceddd080e7d4ef696df397b3a5950fa1f644 [file] [log] [blame]
Skip Montanaro89feabc2003-03-30 04:54:24 +00001import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002from test import support
Skip Montanaro89feabc2003-03-30 04:54:24 +00003
Antoine Pitroua98d26a2011-05-22 17:35:17 +02004import contextlib
Skip Montanaro89feabc2003-03-30 04:54:24 +00005import socket
Stéphane Wirtela40681d2019-02-22 14:45:36 +01006import urllib.parse
Jeremy Hylton1afc1692008-06-18 20:49:58 +00007import urllib.request
Brett Cannona71319e2003-05-14 02:18:31 +00008import os
Barry Warsaw820c1202008-06-12 04:06:45 +00009import email.message
Senthil Kumaranf6c456d2010-05-01 08:29:18 +000010import time
Skip Montanaro89feabc2003-03-30 04:54:24 +000011
Christian Heimesaf98da12008-01-27 15:18:18 +000012
Senthil Kumarancfdd0162014-04-14 21:31:41 -040013support.requires('network')
14
Senthil Kumaran1bd7d292017-05-15 23:08:07 -070015
Skip Montanaro89feabc2003-03-30 04:54:24 +000016class URLTimeoutTest(unittest.TestCase):
Antoine Pitroud9faa202011-03-26 18:38:06 +010017 # XXX this test doesn't seem to test anything useful.
Skip Montanaro89feabc2003-03-30 04:54:24 +000018
Senthil Kumaranbd8f1452010-12-15 04:02:45 +000019 TIMEOUT = 30.0
Skip Montanaro89feabc2003-03-30 04:54:24 +000020
21 def setUp(self):
22 socket.setdefaulttimeout(self.TIMEOUT)
23
24 def tearDown(self):
25 socket.setdefaulttimeout(None)
26
27 def testURLread(self):
Stéphane Wirtela40681d2019-02-22 14:45:36 +010028 domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc
29 with support.transient_internet(domain):
30 f = urllib.request.urlopen(support.TEST_HTTP_URL)
Senthil Kumaran1bd7d292017-05-15 23:08:07 -070031 f.read()
Skip Montanaro89feabc2003-03-30 04:54:24 +000032
Antoine Pitroua98d26a2011-05-22 17:35:17 +020033
Brett Cannona71319e2003-05-14 02:18:31 +000034class urlopenNetworkTests(unittest.TestCase):
Senthil Kumaranefbd4ea2017-04-01 23:47:35 -070035 """Tests urllib.request.urlopen using the network.
Tim Peters813cec92003-05-16 15:35:10 +000036
Brett Cannona71319e2003-05-14 02:18:31 +000037 These tests are not exhaustive. Assuming that testing using files does a
38 good job overall of some of the basic interface features. There are no
39 tests exercising the optional 'data' and 'proxies' arguments. No tests
40 for transparent redirection have been written.
Tim Peters813cec92003-05-16 15:35:10 +000041
Brett Cannona71319e2003-05-14 02:18:31 +000042 setUp is not used for always constructing a connection to
Berker Peksaga40b0ef2016-05-07 16:37:09 +030043 http://www.pythontest.net/ since there a few tests that don't use that address
Brett Cannona71319e2003-05-14 02:18:31 +000044 and making a connection is expensive enough to warrant minimizing unneeded
45 connections.
Tim Peters813cec92003-05-16 15:35:10 +000046
Brett Cannona71319e2003-05-14 02:18:31 +000047 """
48
Berker Peksaga40b0ef2016-05-07 16:37:09 +030049 url = 'http://www.pythontest.net/'
50
Antoine Pitroua98d26a2011-05-22 17:35:17 +020051 @contextlib.contextmanager
Senthil Kumaranee2538b2010-10-17 10:52:12 +000052 def urlopen(self, *args, **kwargs):
53 resource = args[0]
Antoine Pitroua98d26a2011-05-22 17:35:17 +020054 with support.transient_internet(resource):
55 r = urllib.request.urlopen(*args, **kwargs)
56 try:
57 yield r
58 finally:
59 r.close()
Christian Heimesaf98da12008-01-27 15:18:18 +000060
Brett Cannona71319e2003-05-14 02:18:31 +000061 def test_basic(self):
62 # Simple test expected to pass.
Berker Peksaga40b0ef2016-05-07 16:37:09 +030063 with self.urlopen(self.url) as open_url:
Antoine Pitroua98d26a2011-05-22 17:35:17 +020064 for attr in ("read", "readline", "readlines", "fileno", "close",
65 "info", "geturl"):
66 self.assertTrue(hasattr(open_url, attr), "object returned from "
67 "urlopen lacks the %s attribute" % attr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +000068 self.assertTrue(open_url.read(), "calling 'read' failed")
Brett Cannona71319e2003-05-14 02:18:31 +000069
70 def test_readlines(self):
71 # Test both readline and readlines.
Berker Peksaga40b0ef2016-05-07 16:37:09 +030072 with self.urlopen(self.url) as open_url:
Ezio Melottie9615932010-01-24 19:26:24 +000073 self.assertIsInstance(open_url.readline(), bytes,
74 "readline did not return a string")
75 self.assertIsInstance(open_url.readlines(), list,
76 "readlines did not return a list")
Brett Cannona71319e2003-05-14 02:18:31 +000077
78 def test_info(self):
79 # Test 'info'.
Berker Peksaga40b0ef2016-05-07 16:37:09 +030080 with self.urlopen(self.url) as open_url:
Brett Cannona71319e2003-05-14 02:18:31 +000081 info_obj = open_url.info()
Ezio Melottie9615932010-01-24 19:26:24 +000082 self.assertIsInstance(info_obj, email.message.Message,
83 "object returned by 'info' is not an "
84 "instance of email.message.Message")
Barry Warsaw820c1202008-06-12 04:06:45 +000085 self.assertEqual(info_obj.get_content_subtype(), "html")
Brett Cannona71319e2003-05-14 02:18:31 +000086
87 def test_geturl(self):
88 # Make sure same URL as opened is returned by geturl.
Berker Peksaga40b0ef2016-05-07 16:37:09 +030089 with self.urlopen(self.url) as open_url:
Brett Cannona71319e2003-05-14 02:18:31 +000090 gotten_url = open_url.geturl()
Berker Peksaga40b0ef2016-05-07 16:37:09 +030091 self.assertEqual(gotten_url, self.url)
Brett Cannona71319e2003-05-14 02:18:31 +000092
Christian Heimes9bd667a2008-01-20 15:14:11 +000093 def test_getcode(self):
94 # test getcode() with the fancy opener to get 404 error codes
Berker Peksaga40b0ef2016-05-07 16:37:09 +030095 URL = self.url + "XXXinvalidXXX"
Antoine Pitroua98d26a2011-05-22 17:35:17 +020096 with support.transient_internet(URL):
R David Murray130a5662014-06-11 17:09:43 -040097 with self.assertWarns(DeprecationWarning):
98 open_url = urllib.request.FancyURLopener().open(URL)
Antoine Pitroua98d26a2011-05-22 17:35:17 +020099 try:
100 code = open_url.getcode()
101 finally:
102 open_url.close()
103 self.assertEqual(code, 404)
Christian Heimes9bd667a2008-01-20 15:14:11 +0000104
Brett Cannona71319e2003-05-14 02:18:31 +0000105 def test_bad_address(self):
106 # Make sure proper exception is raised when connecting to a bogus
107 # address.
Martin Pantera7f99332015-12-16 04:36:20 +0000108
109 # Given that both VeriSign and various ISPs have in
110 # the past or are presently hijacking various invalid
111 # domain name requests in an attempt to boost traffic
112 # to their own sites, finding a domain name to use
113 # for this test is difficult. RFC2606 leads one to
114 # believe that '.invalid' should work, but experience
115 # seemed to indicate otherwise. Single character
116 # TLDs are likely to remain invalid, so this seems to
117 # be the best choice. The trailing '.' prevents a
118 # related problem: The normal DNS resolver appends
119 # the domain names from the search path if there is
120 # no '.' the end and, and if one of those domains
121 # implements a '*' rule a result is returned.
122 # However, none of this will prevent the test from
123 # failing if the ISP hijacks all invalid domain
124 # requests. The real solution would be to be able to
125 # parameterize the framework with a mock resolver.
126 bogus_domain = "sadflkjsasf.i.nvali.d."
Antoine Pitrou72fff042011-07-08 19:19:57 +0200127 try:
128 socket.gethostbyname(bogus_domain)
Antoine Pitrou6b5a38c2013-05-25 13:08:13 +0200129 except OSError:
130 # socket.gaierror is too narrow, since getaddrinfo() may also
131 # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
132 # i.e. Python's TimeoutError.
Antoine Pitrou72fff042011-07-08 19:19:57 +0200133 pass
134 else:
135 # This happens with some overzealous DNS providers such as OpenDNS
136 self.skipTest("%r should not resolve for test to work" % bogus_domain)
Brett Cannonb463c482013-01-11 11:17:53 -0500137 failure_explanation = ('opening an invalid URL did not raise OSError; '
138 'can be caused by a broken DNS server '
139 '(e.g. returns 404 or hijacks page)')
140 with self.assertRaises(OSError, msg=failure_explanation):
Martin Pantera7f99332015-12-16 04:36:20 +0000141 urllib.request.urlopen("http://{}/".format(bogus_domain))
Brett Cannona71319e2003-05-14 02:18:31 +0000142
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200143
Brett Cannona71319e2003-05-14 02:18:31 +0000144class urlretrieveNetworkTests(unittest.TestCase):
Jeremy Hylton1afc1692008-06-18 20:49:58 +0000145 """Tests urllib.request.urlretrieve using the network."""
Brett Cannona71319e2003-05-14 02:18:31 +0000146
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200147 @contextlib.contextmanager
Gregory P. Smith6b0bdab2012-11-10 13:43:44 -0800148 def urlretrieve(self, *args, **kwargs):
Senthil Kumaranee2538b2010-10-17 10:52:12 +0000149 resource = args[0]
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200150 with support.transient_internet(resource):
Gregory P. Smith6b0bdab2012-11-10 13:43:44 -0800151 file_location, info = urllib.request.urlretrieve(*args, **kwargs)
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200152 try:
153 yield file_location, info
154 finally:
155 support.unlink(file_location)
Christian Heimesaf98da12008-01-27 15:18:18 +0000156
Brett Cannona71319e2003-05-14 02:18:31 +0000157 def test_basic(self):
158 # Test basic functionality.
Berker Peksaga40b0ef2016-05-07 16:37:09 +0300159 with self.urlretrieve(self.logo) as (file_location, info):
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200160 self.assertTrue(os.path.exists(file_location), "file location returned by"
161 " urlretrieve is not a valid path")
Benjamin Petersona96ed632014-02-19 23:06:41 -0500162 with open(file_location, 'rb') as f:
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200163 self.assertTrue(f.read(), "reading from the file location returned"
164 " by urlretrieve failed")
Brett Cannona71319e2003-05-14 02:18:31 +0000165
166 def test_specified_path(self):
167 # Make sure that specifying the location of the file to write to works.
Berker Peksaga40b0ef2016-05-07 16:37:09 +0300168 with self.urlretrieve(self.logo,
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200169 support.TESTFN) as (file_location, info):
170 self.assertEqual(file_location, support.TESTFN)
171 self.assertTrue(os.path.exists(file_location))
Benjamin Petersona96ed632014-02-19 23:06:41 -0500172 with open(file_location, 'rb') as f:
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200173 self.assertTrue(f.read(), "reading from temporary file failed")
Brett Cannona71319e2003-05-14 02:18:31 +0000174
175 def test_header(self):
176 # Make sure header returned as 2nd value from urlretrieve is good.
Berker Peksaga40b0ef2016-05-07 16:37:09 +0300177 with self.urlretrieve(self.logo) as (file_location, info):
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200178 self.assertIsInstance(info, email.message.Message,
179 "info is not an instance of email.message.Message")
Tim Peters813cec92003-05-16 15:35:10 +0000180
Berker Peksaga40b0ef2016-05-07 16:37:09 +0300181 logo = "http://www.pythontest.net/"
Gregory P. Smith6b0bdab2012-11-10 13:43:44 -0800182
Senthil Kumaranf6c456d2010-05-01 08:29:18 +0000183 def test_data_header(self):
Gregory P. Smith6b0bdab2012-11-10 13:43:44 -0800184 with self.urlretrieve(self.logo) as (file_location, fileheaders):
Antoine Pitroua98d26a2011-05-22 17:35:17 +0200185 datevalue = fileheaders.get('Date')
186 dateformat = '%a, %d %b %Y %H:%M:%S GMT'
187 try:
188 time.strptime(datevalue, dateformat)
189 except ValueError:
Berker Peksagad1fd342016-05-07 16:58:41 +0300190 self.fail('Date value not in %r format' % dateformat)
Brett Cannona71319e2003-05-14 02:18:31 +0000191
Gregory P. Smith6b0bdab2012-11-10 13:43:44 -0800192 def test_reporthook(self):
193 records = []
Senthil Kumaran1bd7d292017-05-15 23:08:07 -0700194
Gregory P. Smith6b0bdab2012-11-10 13:43:44 -0800195 def recording_reporthook(blocks, block_size, total_size):
196 records.append((blocks, block_size, total_size))
197
198 with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
199 file_location, fileheaders):
200 expected_size = int(fileheaders['Content-Length'])
201
202 records_repr = repr(records) # For use in error messages.
203 self.assertGreater(len(records), 1, msg="There should always be two "
204 "calls; the first one before the transfer starts.")
205 self.assertEqual(records[0][0], 0)
206 self.assertGreater(records[0][1], 0,
207 msg="block size can't be 0 in %s" % records_repr)
208 self.assertEqual(records[0][2], expected_size)
209 self.assertEqual(records[-1][2], expected_size)
210
211 block_sizes = {block_size for _, block_size, _ in records}
212 self.assertEqual({records[0][1]}, block_sizes,
213 msg="block sizes in %s must be equal" % records_repr)
214 self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
215 msg="number of blocks * block size must be"
216 " >= total size in %s" % records_repr)
217
Brett Cannona71319e2003-05-14 02:18:31 +0000218
Skip Montanaro89feabc2003-03-30 04:54:24 +0000219if __name__ == "__main__":
Senthil Kumarancfdd0162014-04-14 21:31:41 -0400220 unittest.main()