Patch #1121142: Implement ZipFile.open.
diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py
index c17039f..e0b255c 100644
--- a/Lib/test/test_zipfile.py
+++ b/Lib/test/test_zipfile.py
@@ -4,26 +4,29 @@
except ImportError:
zlib = None
-import zipfile, os, unittest, sys, shutil
+import zipfile, os, unittest, sys, shutil, struct
from StringIO import StringIO
from tempfile import TemporaryFile
+from random import randint, random
from test.test_support import TESTFN, run_unittest
TESTFN2 = TESTFN + "2"
+FIXEDTEST_SIZE = 10
class TestsWithSourceFile(unittest.TestCase):
def setUp(self):
- line_gen = ("Test of zipfile line %d." % i for i in range(0, 1000))
- self.data = '\n'.join(line_gen)
+ self.line_gen = ("Zipfile test line %d. random float: %f" % (i, random())
+ for i in xrange(FIXEDTEST_SIZE))
+ self.data = '\n'.join(self.line_gen) + '\n'
# Make a source file with some lines
fp = open(TESTFN, "wb")
fp.write(self.data)
fp.close()
- def zipTest(self, f, compression):
+ def makeTestArchive(self, f, compression):
# Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression)
zipfp.write(TESTFN, "another"+os.extsep+"name")
@@ -31,6 +34,9 @@
zipfp.writestr("strfile", self.data)
zipfp.close()
+ def zipTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
# Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression)
self.assertEqual(zipfp.read(TESTFN), self.data)
@@ -85,22 +91,144 @@
# Check that testzip doesn't raise an exception
zipfp.testzip()
-
-
zipfp.close()
-
-
-
def testStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipTest(f, zipfile.ZIP_STORED)
+ def zipOpenTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ zipdata1 = []
+ zipopen1 = zipfp.open(TESTFN)
+ while 1:
+ read_data = zipopen1.read(256)
+ if not read_data:
+ break
+ zipdata1.append(read_data)
+
+ zipdata2 = []
+ zipopen2 = zipfp.open("another"+os.extsep+"name")
+ while 1:
+ read_data = zipopen2.read(256)
+ if not read_data:
+ break
+ zipdata2.append(read_data)
+
+ self.assertEqual(''.join(zipdata1), self.data)
+ self.assertEqual(''.join(zipdata2), self.data)
+ zipfp.close()
+
+ def testOpenStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipOpenTest(f, zipfile.ZIP_STORED)
+
+ def zipRandomOpenTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ zipdata1 = []
+ zipopen1 = zipfp.open(TESTFN)
+ while 1:
+ read_data = zipopen1.read(randint(1, 1024))
+ if not read_data:
+ break
+ zipdata1.append(read_data)
+
+ self.assertEqual(''.join(zipdata1), self.data)
+ zipfp.close()
+
+ def testRandomOpenStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
+
+ def zipReadlineTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ zipopen = zipfp.open(TESTFN)
+ for line in self.line_gen:
+ linedata = zipopen.readline()
+ self.assertEqual(linedata, line + '\n')
+
+ zipfp.close()
+
+ def zipReadlinesTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ ziplines = zipfp.open(TESTFN).readlines()
+ for line, zipline in zip(self.line_gen, ziplines):
+ self.assertEqual(zipline, line + '\n')
+
+ zipfp.close()
+
+ def zipIterlinesTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
+ self.assertEqual(zipline, line + '\n')
+
+ zipfp.close()
+
+ def testReadlineStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipReadlineTest(f, zipfile.ZIP_STORED)
+
+ def testReadlinesStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipReadlinesTest(f, zipfile.ZIP_STORED)
+
+ def testIterlinesStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipIterlinesTest(f, zipfile.ZIP_STORED)
+
if zlib:
def testDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipTest(f, zipfile.ZIP_DEFLATED)
+ def testOpenDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipOpenTest(f, zipfile.ZIP_DEFLATED)
+
+ def testRandomOpenDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED)
+
+ def testReadlineDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipReadlineTest(f, zipfile.ZIP_DEFLATED)
+
+ def testReadlinesDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED)
+
+ def testIterlinesDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED)
+
+ def testLowCompression(self):
+ # Checks for cases where compressed data is larger than original
+ # Create the ZIP archive
+ zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED)
+ zipfp.writestr("strfile", '12')
+ zipfp.close()
+
+ # Get an open object for strfile
+ zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED)
+ openobj = zipfp.open("strfile")
+ self.assertEqual(openobj.read(1), '1')
+ self.assertEqual(openobj.read(1), '2')
+
def testAbsoluteArcnames(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
zipfp.write(TESTFN, "/absolute")
@@ -110,7 +238,6 @@
self.assertEqual(zipfp.namelist(), ["absolute"])
zipfp.close()
-
def tearDown(self):
os.remove(TESTFN)
os.remove(TESTFN2)
@@ -123,7 +250,7 @@
self._limit = zipfile.ZIP64_LIMIT
zipfile.ZIP64_LIMIT = 5
- line_gen = ("Test of zipfile line %d." % i for i in range(0, 1000))
+ line_gen = ("Test of zipfile line %d." % i for i in range(0, FIXEDTEST_SIZE))
self.data = '\n'.join(line_gen)
# Make a source file with some lines
@@ -344,6 +471,26 @@
except zipfile.BadZipfile:
os.unlink(TESTFN)
+ def testIsZipErroneousFile(self):
+ # This test checks that the is_zipfile function correctly identifies
+ # a file that is not a zip file
+ fp = open(TESTFN, "w")
+ fp.write("this is not a legal zip file\n")
+ fp.close()
+ chk = zipfile.is_zipfile(TESTFN)
+ os.unlink(TESTFN)
+ self.assert_(chk is False)
+
+ def testIsZipValidFile(self):
+ # This test checks that the is_zipfile function correctly identifies
+ # a file that is a zip file
+ zipf = zipfile.ZipFile(TESTFN, mode="w")
+ zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+ zipf.close()
+ chk = zipfile.is_zipfile(TESTFN)
+ os.unlink(TESTFN)
+ self.assert_(chk is True)
+
def testNonExistentFileRaisesIOError(self):
# make sure we don't raise an AttributeError when a partially-constructed
# ZipFile instance is finalized; this tests for regression on SF tracker
@@ -371,7 +518,6 @@
# and report that the first file in the archive was corrupt.
self.assertRaises(RuntimeError, zipf.testzip)
-
class DecryptionTests(unittest.TestCase):
# This test checks that ZIP decryption works. Since the library does not
# support encryption at the moment, we use a pre-generated encrypted
@@ -411,9 +557,255 @@
self.zip.setpassword("python")
self.assertEquals(self.zip.read("test.txt"), self.plain)
+
+class TestsWithRandomBinaryFiles(unittest.TestCase):
+ def setUp(self):
+ datacount = randint(16, 64)*1024 + randint(1, 1024)
+ self.data = ''.join((struct.pack('<f', random()*randint(-1000, 1000)) for i in xrange(datacount)))
+
+ # Make a source file with some lines
+ fp = open(TESTFN, "wb")
+ fp.write(self.data)
+ fp.close()
+
+ def makeTestArchive(self, f, compression):
+ # Create the ZIP archive
+ zipfp = zipfile.ZipFile(f, "w", compression)
+ zipfp.write(TESTFN, "another"+os.extsep+"name")
+ zipfp.write(TESTFN, TESTFN)
+ zipfp.close()
+
+ def zipTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ testdata = zipfp.read(TESTFN)
+ self.assertEqual(len(testdata), len(self.data))
+ self.assertEqual(testdata, self.data)
+ self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data)
+ zipfp.close()
+
+ def testStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipTest(f, zipfile.ZIP_STORED)
+
+ def zipOpenTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ zipdata1 = []
+ zipopen1 = zipfp.open(TESTFN)
+ while 1:
+ read_data = zipopen1.read(256)
+ if not read_data:
+ break
+ zipdata1.append(read_data)
+
+ zipdata2 = []
+ zipopen2 = zipfp.open("another"+os.extsep+"name")
+ while 1:
+ read_data = zipopen2.read(256)
+ if not read_data:
+ break
+ zipdata2.append(read_data)
+
+ testdata1 = ''.join(zipdata1)
+ self.assertEqual(len(testdata1), len(self.data))
+ self.assertEqual(testdata1, self.data)
+
+ testdata2 = ''.join(zipdata2)
+ self.assertEqual(len(testdata1), len(self.data))
+ self.assertEqual(testdata1, self.data)
+ zipfp.close()
+
+ def testOpenStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipOpenTest(f, zipfile.ZIP_STORED)
+
+ def zipRandomOpenTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ zipdata1 = []
+ zipopen1 = zipfp.open(TESTFN)
+ while 1:
+ read_data = zipopen1.read(randint(1, 1024))
+ if not read_data:
+ break
+ zipdata1.append(read_data)
+
+ testdata = ''.join(zipdata1)
+ self.assertEqual(len(testdata), len(self.data))
+ self.assertEqual(testdata, self.data)
+ zipfp.close()
+
+ def testRandomOpenStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
+
+class TestsWithMultipleOpens(unittest.TestCase):
+ def setUp(self):
+ # Create the ZIP archive
+ zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED)
+ zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
+ zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
+ zipfp.close()
+
+ def testSameFile(self):
+ # Verify that (when the ZipFile is in control of creating file objects)
+ # multiple open() calls can be made without interfering with each other.
+ zipf = zipfile.ZipFile(TESTFN2, mode="r")
+ zopen1 = zipf.open('ones')
+ zopen2 = zipf.open('ones')
+ data1 = zopen1.read(500)
+ data2 = zopen2.read(500)
+ data1 += zopen1.read(500)
+ data2 += zopen2.read(500)
+ self.assertEqual(data1, data2)
+ zipf.close()
+
+ def testDifferentFile(self):
+ # Verify that (when the ZipFile is in control of creating file objects)
+ # multiple open() calls can be made without interfering with each other.
+ zipf = zipfile.ZipFile(TESTFN2, mode="r")
+ zopen1 = zipf.open('ones')
+ zopen2 = zipf.open('twos')
+ data1 = zopen1.read(500)
+ data2 = zopen2.read(500)
+ data1 += zopen1.read(500)
+ data2 += zopen2.read(500)
+ self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
+ self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
+ zipf.close()
+
+ def testInterleaved(self):
+ # Verify that (when the ZipFile is in control of creating file objects)
+ # multiple open() calls can be made without interfering with each other.
+ zipf = zipfile.ZipFile(TESTFN2, mode="r")
+ zopen1 = zipf.open('ones')
+ data1 = zopen1.read(500)
+ zopen2 = zipf.open('twos')
+ data2 = zopen2.read(500)
+ data1 += zopen1.read(500)
+ data2 += zopen2.read(500)
+ self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
+ self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
+ zipf.close()
+
+ def tearDown(self):
+ os.remove(TESTFN2)
+
+
+class UniversalNewlineTests(unittest.TestCase):
+ def setUp(self):
+ self.line_gen = ["Test of zipfile line %d." % i for i in xrange(FIXEDTEST_SIZE)]
+ self.seps = ('\r', '\r\n', '\n')
+ self.arcdata, self.arcfiles = {}, {}
+ for n, s in enumerate(self.seps):
+ self.arcdata[s] = s.join(self.line_gen) + s
+ self.arcfiles[s] = '%s-%d' % (TESTFN, n)
+ file(self.arcfiles[s], "wb").write(self.arcdata[s])
+
+ def makeTestArchive(self, f, compression):
+ # Create the ZIP archive
+ zipfp = zipfile.ZipFile(f, "w", compression)
+ for fn in self.arcfiles.values():
+ zipfp.write(fn, fn)
+ zipfp.close()
+
+ def readTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for sep, fn in self.arcfiles.items():
+ zipdata = zipfp.open(fn, "rU").read()
+ self.assertEqual(self.arcdata[sep], zipdata)
+
+ zipfp.close()
+
+ def readlineTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for sep, fn in self.arcfiles.items():
+ zipopen = zipfp.open(fn, "rU")
+ for line in self.line_gen:
+ linedata = zipopen.readline()
+ self.assertEqual(linedata, line + '\n')
+
+ zipfp.close()
+
+ def readlinesTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for sep, fn in self.arcfiles.items():
+ ziplines = zipfp.open(fn, "rU").readlines()
+ for line, zipline in zip(self.line_gen, ziplines):
+ self.assertEqual(zipline, line + '\n')
+
+ zipfp.close()
+
+ def iterlinesTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for sep, fn in self.arcfiles.items():
+ for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
+ self.assertEqual(zipline, line + '\n')
+
+ zipfp.close()
+
+ def testReadStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readTest(f, zipfile.ZIP_STORED)
+
+ def testReadlineStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readlineTest(f, zipfile.ZIP_STORED)
+
+ def testReadlinesStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readlinesTest(f, zipfile.ZIP_STORED)
+
+ def testIterlinesStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.iterlinesTest(f, zipfile.ZIP_STORED)
+
+ if zlib:
+ def testReadDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readTest(f, zipfile.ZIP_DEFLATED)
+
+ def testReadlineDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readlineTest(f, zipfile.ZIP_DEFLATED)
+
+ def testReadlinesDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readlinesTest(f, zipfile.ZIP_DEFLATED)
+
+ def testIterlinesDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.iterlinesTest(f, zipfile.ZIP_DEFLATED)
+
+ def tearDown(self):
+ for sep, fn in self.arcfiles.items():
+ os.remove(fn)
+
+
def test_main():
run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests,
- PyZipFileTests, DecryptionTests)
+ PyZipFileTests, DecryptionTests, TestsWithMultipleOpens,
+ UniversalNewlineTests, TestsWithRandomBinaryFiles)
+
#run_unittest(TestZip64InSmallFiles)
if __name__ == "__main__":