patch [ 810023 ] Fix for off-by-one bug in urllib.URLopener.retrieve
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
index 1a32f53..3621476 100644
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -6,6 +6,7 @@
from test import test_support
import os
import mimetools
+import tempfile
import StringIO
def hexescape(char):
@@ -125,15 +126,53 @@
"""Test urllib.urlretrieve() on local files"""
def setUp(self):
+ # Create a list of temporary files. Each item in the list is a file
+ # name (absolute path or relative to the current working directory).
+ # All files in this list will be deleted in the tearDown method. Note,
+ # this only helps to makes sure temporary files get deleted, but it
+ # does nothing about trying to close files that may still be open. It
+ # is the responsibility of the developer to properly close files even
+ # when exceptional conditions occur.
+ self.tempFiles = []
+
# Create a temporary file.
+ self.registerFileForCleanUp(test_support.TESTFN)
self.text = 'testing urllib.urlretrieve'
- FILE = file(test_support.TESTFN, 'wb')
- FILE.write(self.text)
- FILE.close()
+ try:
+ FILE = file(test_support.TESTFN, 'wb')
+ FILE.write(self.text)
+ FILE.close()
+ finally:
+ try: FILE.close()
+ except: pass
def tearDown(self):
- # Delete the temporary file.
- os.remove(test_support.TESTFN)
+ # Delete the temporary files.
+ for each in self.tempFiles:
+ try: os.remove(each)
+ except: pass
+
+ def constructLocalFileUrl(self, filePath):
+ return "file://%s" % urllib.pathname2url(os.path.abspath(filePath))
+
+ def createNewTempFile(self, data=""):
+ """Creates a new temporary file containing the specified data,
+ registers the file for deletion during the test fixture tear down, and
+ returns the absolute path of the file."""
+
+ newFd, newFilePath = tempfile.mkstemp()
+ try:
+ self.registerFileForCleanUp(newFilePath)
+ newFile = os.fdopen(newFd, "wb")
+ newFile.write(data)
+ newFile.close()
+ finally:
+ try: newFile.close()
+ except: pass
+ return newFilePath
+
+ def registerFileForCleanUp(self, fileName):
+ self.tempFiles.append(fileName)
def test_basic(self):
# Make sure that a local file just gets its own location returned and
@@ -147,15 +186,19 @@
def test_copy(self):
# Test that setting the filename argument works.
second_temp = "%s.2" % test_support.TESTFN
- result = urllib.urlretrieve("file:%s" % test_support.TESTFN, second_temp)
+ self.registerFileForCleanUp(second_temp)
+ result = urllib.urlretrieve(self.constructLocalFileUrl(
+ test_support.TESTFN), second_temp)
self.assertEqual(second_temp, result[0])
self.assert_(os.path.exists(second_temp), "copy of the file was not "
"made")
FILE = file(second_temp, 'rb')
try:
text = FILE.read()
- finally:
FILE.close()
+ finally:
+ try: FILE.close()
+ except: pass
self.assertEqual(self.text, text)
def test_reporthook(self):
@@ -167,8 +210,49 @@
self.assertEqual(count, count_holder[0])
count_holder[0] = count_holder[0] + 1
second_temp = "%s.2" % test_support.TESTFN
- urllib.urlretrieve(test_support.TESTFN, second_temp, hooktester)
- os.remove(second_temp)
+ self.registerFileForCleanUp(second_temp)
+ urllib.urlretrieve(self.constructLocalFileUrl(test_support.TESTFN),
+ second_temp, hooktester)
+
+ def test_reporthook_0_bytes(self):
+ # Test on zero length file. Should call reporthook only 1 time.
+ report = []
+ def hooktester(count, block_size, total_size, _report=report):
+ _report.append((count, block_size, total_size))
+ srcFileName = self.createNewTempFile()
+ urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
+ test_support.TESTFN, hooktester)
+ self.assertEqual(len(report), 1)
+ self.assertEqual(report[0][2], 0)
+
+ def test_reporthook_5_bytes(self):
+ # Test on 5 byte file. Should call reporthook only 2 times (once when
+ # the "network connection" is established and once when the block is
+ # read). Since the block size is 8192 bytes, only one block read is
+ # required to read the entire file.
+ report = []
+ def hooktester(count, block_size, total_size, _report=report):
+ _report.append((count, block_size, total_size))
+ srcFileName = self.createNewTempFile("x" * 5)
+ urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
+ test_support.TESTFN, hooktester)
+ self.assertEqual(len(report), 2)
+ self.assertEqual(report[0][1], 8192)
+ self.assertEqual(report[0][2], 5)
+
+ def test_reporthook_8193_bytes(self):
+ # Test on 8193 byte file. Should call reporthook only 3 times (once
+ # when the "network connection" is established, once for the next 8192
+ # bytes, and once for the last byte).
+ report = []
+ def hooktester(count, block_size, total_size, _report=report):
+ _report.append((count, block_size, total_size))
+ srcFileName = self.createNewTempFile("x" * 8193)
+ urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
+ test_support.TESTFN, hooktester)
+ self.assertEqual(len(report), 3)
+ self.assertEqual(report[0][1], 8192)
+ self.assertEqual(report[0][2], 8193)
class QuotingTests(unittest.TestCase):
"""Tests for urllib.quote() and urllib.quote_plus()