Patch #1121142: Implement ZipFile.open.
diff --git a/Doc/lib/libzipfile.tex b/Doc/lib/libzipfile.tex
index 06a236c..bfe5966 100644
--- a/Doc/lib/libzipfile.tex
+++ b/Doc/lib/libzipfile.tex
@@ -141,6 +141,32 @@
Return a list of archive members by name.
\end{methoddesc}
+\begin{methoddesc}{open}{name\optional{, mode\optional{, pwd}}}
+ Extract a member from the archive as a file-like object (ZipExtFile).
+ \var{name} is the name of the file in the archive. The \var{mode}
+ parameter, if included, must be one of the following: \code{'r'} (the
+ default), \code{'U'}, or \code{'rU'}. Choosing \code{'U'} or
+ \code{'rU'} will enable universal newline support in the read-only
+ object. \var{pwd} is the password used for encrypted files.
+ \begin{notice}
+ The file-like object is read-only and provides the following methods:
+ \method{read()}, \method{readline()}, \method{readlines()},
+ \method{__iter__()}, \method{next()}.
+ \end{notice}
+ \begin{notice}
+ If the ZipFile was created by passing in a file-like object as the
+ first argument to the constructor, then the object returned by
+ \method{open()} shares the ZipFile's file pointer. Under these
+ circumstances, the object returned by \method{open()} should not
+ be used after any additional operations are performed on the
+ ZipFile object. If the ZipFile was created by passing in a string
+ (the filename) as the first argument to the constructor, then
+ \method{open()} will create a new file object that will be held
+ by the ZipExtFile, allowing it to operate independently of the
+ ZipFile.
+ \end{notice}
+\end{methoddesc}
+
\begin{methoddesc}{printdir}{}
Print a table of contents for the archive to \code{sys.stdout}.
\end{methoddesc}
diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py
index c17039f..e0b255c 100644
--- a/Lib/test/test_zipfile.py
+++ b/Lib/test/test_zipfile.py
@@ -4,26 +4,29 @@
except ImportError:
zlib = None
-import zipfile, os, unittest, sys, shutil
+import zipfile, os, unittest, sys, shutil, struct
from StringIO import StringIO
from tempfile import TemporaryFile
+from random import randint, random
from test.test_support import TESTFN, run_unittest
TESTFN2 = TESTFN + "2"
+FIXEDTEST_SIZE = 10
class TestsWithSourceFile(unittest.TestCase):
def setUp(self):
- line_gen = ("Test of zipfile line %d." % i for i in range(0, 1000))
- self.data = '\n'.join(line_gen)
+ self.line_gen = ("Zipfile test line %d. random float: %f" % (i, random())
+ for i in xrange(FIXEDTEST_SIZE))
+ self.data = '\n'.join(self.line_gen) + '\n'
# Make a source file with some lines
fp = open(TESTFN, "wb")
fp.write(self.data)
fp.close()
- def zipTest(self, f, compression):
+ def makeTestArchive(self, f, compression):
# Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression)
zipfp.write(TESTFN, "another"+os.extsep+"name")
@@ -31,6 +34,9 @@
zipfp.writestr("strfile", self.data)
zipfp.close()
+ def zipTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
# Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression)
self.assertEqual(zipfp.read(TESTFN), self.data)
@@ -85,22 +91,144 @@
# Check that testzip doesn't raise an exception
zipfp.testzip()
-
-
zipfp.close()
-
-
-
def testStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipTest(f, zipfile.ZIP_STORED)
+ def zipOpenTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ zipdata1 = []
+ zipopen1 = zipfp.open(TESTFN)
+ while 1:
+ read_data = zipopen1.read(256)
+ if not read_data:
+ break
+ zipdata1.append(read_data)
+
+ zipdata2 = []
+ zipopen2 = zipfp.open("another"+os.extsep+"name")
+ while 1:
+ read_data = zipopen2.read(256)
+ if not read_data:
+ break
+ zipdata2.append(read_data)
+
+ self.assertEqual(''.join(zipdata1), self.data)
+ self.assertEqual(''.join(zipdata2), self.data)
+ zipfp.close()
+
+ def testOpenStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipOpenTest(f, zipfile.ZIP_STORED)
+
+ def zipRandomOpenTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ zipdata1 = []
+ zipopen1 = zipfp.open(TESTFN)
+ while 1:
+ read_data = zipopen1.read(randint(1, 1024))
+ if not read_data:
+ break
+ zipdata1.append(read_data)
+
+ self.assertEqual(''.join(zipdata1), self.data)
+ zipfp.close()
+
+ def testRandomOpenStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
+
+ def zipReadlineTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ zipopen = zipfp.open(TESTFN)
+ for line in self.line_gen:
+ linedata = zipopen.readline()
+ self.assertEqual(linedata, line + '\n')
+
+ zipfp.close()
+
+ def zipReadlinesTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ ziplines = zipfp.open(TESTFN).readlines()
+ for line, zipline in zip(self.line_gen, ziplines):
+ self.assertEqual(zipline, line + '\n')
+
+ zipfp.close()
+
+ def zipIterlinesTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
+ self.assertEqual(zipline, line + '\n')
+
+ zipfp.close()
+
+ def testReadlineStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipReadlineTest(f, zipfile.ZIP_STORED)
+
+ def testReadlinesStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipReadlinesTest(f, zipfile.ZIP_STORED)
+
+ def testIterlinesStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipIterlinesTest(f, zipfile.ZIP_STORED)
+
if zlib:
def testDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipTest(f, zipfile.ZIP_DEFLATED)
+ def testOpenDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipOpenTest(f, zipfile.ZIP_DEFLATED)
+
+ def testRandomOpenDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED)
+
+ def testReadlineDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipReadlineTest(f, zipfile.ZIP_DEFLATED)
+
+ def testReadlinesDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED)
+
+ def testIterlinesDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED)
+
+ def testLowCompression(self):
+ # Checks for cases where compressed data is larger than original
+ # Create the ZIP archive
+ zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED)
+ zipfp.writestr("strfile", '12')
+ zipfp.close()
+
+ # Get an open object for strfile
+ zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED)
+ openobj = zipfp.open("strfile")
+ self.assertEqual(openobj.read(1), '1')
+ self.assertEqual(openobj.read(1), '2')
+
def testAbsoluteArcnames(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED)
zipfp.write(TESTFN, "/absolute")
@@ -110,7 +238,6 @@
self.assertEqual(zipfp.namelist(), ["absolute"])
zipfp.close()
-
def tearDown(self):
os.remove(TESTFN)
os.remove(TESTFN2)
@@ -123,7 +250,7 @@
self._limit = zipfile.ZIP64_LIMIT
zipfile.ZIP64_LIMIT = 5
- line_gen = ("Test of zipfile line %d." % i for i in range(0, 1000))
+ line_gen = ("Test of zipfile line %d." % i for i in range(0, FIXEDTEST_SIZE))
self.data = '\n'.join(line_gen)
# Make a source file with some lines
@@ -344,6 +471,26 @@
except zipfile.BadZipfile:
os.unlink(TESTFN)
+ def testIsZipErroneousFile(self):
+ # This test checks that the is_zipfile function correctly identifies
+ # a file that is not a zip file
+ fp = open(TESTFN, "w")
+ fp.write("this is not a legal zip file\n")
+ fp.close()
+ chk = zipfile.is_zipfile(TESTFN)
+ os.unlink(TESTFN)
+ self.assert_(chk is False)
+
+ def testIsZipValidFile(self):
+ # This test checks that the is_zipfile function correctly identifies
+ # a file that is a zip file
+ zipf = zipfile.ZipFile(TESTFN, mode="w")
+ zipf.writestr("foo.txt", "O, for a Muse of Fire!")
+ zipf.close()
+ chk = zipfile.is_zipfile(TESTFN)
+ os.unlink(TESTFN)
+ self.assert_(chk is True)
+
def testNonExistentFileRaisesIOError(self):
# make sure we don't raise an AttributeError when a partially-constructed
# ZipFile instance is finalized; this tests for regression on SF tracker
@@ -371,7 +518,6 @@
# and report that the first file in the archive was corrupt.
self.assertRaises(RuntimeError, zipf.testzip)
-
class DecryptionTests(unittest.TestCase):
# This test checks that ZIP decryption works. Since the library does not
# support encryption at the moment, we use a pre-generated encrypted
@@ -411,9 +557,255 @@
self.zip.setpassword("python")
self.assertEquals(self.zip.read("test.txt"), self.plain)
+
+class TestsWithRandomBinaryFiles(unittest.TestCase):
+ def setUp(self):
+ datacount = randint(16, 64)*1024 + randint(1, 1024)
+ self.data = ''.join((struct.pack('<f', random()*randint(-1000, 1000)) for i in xrange(datacount)))
+
+ # Make a source file with some lines
+ fp = open(TESTFN, "wb")
+ fp.write(self.data)
+ fp.close()
+
+ def makeTestArchive(self, f, compression):
+ # Create the ZIP archive
+ zipfp = zipfile.ZipFile(f, "w", compression)
+ zipfp.write(TESTFN, "another"+os.extsep+"name")
+ zipfp.write(TESTFN, TESTFN)
+ zipfp.close()
+
+ def zipTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ testdata = zipfp.read(TESTFN)
+ self.assertEqual(len(testdata), len(self.data))
+ self.assertEqual(testdata, self.data)
+ self.assertEqual(zipfp.read("another"+os.extsep+"name"), self.data)
+ zipfp.close()
+
+ def testStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipTest(f, zipfile.ZIP_STORED)
+
+ def zipOpenTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ zipdata1 = []
+ zipopen1 = zipfp.open(TESTFN)
+ while 1:
+ read_data = zipopen1.read(256)
+ if not read_data:
+ break
+ zipdata1.append(read_data)
+
+ zipdata2 = []
+ zipopen2 = zipfp.open("another"+os.extsep+"name")
+ while 1:
+ read_data = zipopen2.read(256)
+ if not read_data:
+ break
+ zipdata2.append(read_data)
+
+ testdata1 = ''.join(zipdata1)
+ self.assertEqual(len(testdata1), len(self.data))
+ self.assertEqual(testdata1, self.data)
+
+ testdata2 = ''.join(zipdata2)
+ self.assertEqual(len(testdata1), len(self.data))
+ self.assertEqual(testdata1, self.data)
+ zipfp.close()
+
+ def testOpenStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipOpenTest(f, zipfile.ZIP_STORED)
+
+ def zipRandomOpenTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r", compression)
+ zipdata1 = []
+ zipopen1 = zipfp.open(TESTFN)
+ while 1:
+ read_data = zipopen1.read(randint(1, 1024))
+ if not read_data:
+ break
+ zipdata1.append(read_data)
+
+ testdata = ''.join(zipdata1)
+ self.assertEqual(len(testdata), len(self.data))
+ self.assertEqual(testdata, self.data)
+ zipfp.close()
+
+ def testRandomOpenStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
+
+class TestsWithMultipleOpens(unittest.TestCase):
+ def setUp(self):
+ # Create the ZIP archive
+ zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED)
+ zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
+ zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
+ zipfp.close()
+
+ def testSameFile(self):
+ # Verify that (when the ZipFile is in control of creating file objects)
+ # multiple open() calls can be made without interfering with each other.
+ zipf = zipfile.ZipFile(TESTFN2, mode="r")
+ zopen1 = zipf.open('ones')
+ zopen2 = zipf.open('ones')
+ data1 = zopen1.read(500)
+ data2 = zopen2.read(500)
+ data1 += zopen1.read(500)
+ data2 += zopen2.read(500)
+ self.assertEqual(data1, data2)
+ zipf.close()
+
+ def testDifferentFile(self):
+ # Verify that (when the ZipFile is in control of creating file objects)
+ # multiple open() calls can be made without interfering with each other.
+ zipf = zipfile.ZipFile(TESTFN2, mode="r")
+ zopen1 = zipf.open('ones')
+ zopen2 = zipf.open('twos')
+ data1 = zopen1.read(500)
+ data2 = zopen2.read(500)
+ data1 += zopen1.read(500)
+ data2 += zopen2.read(500)
+ self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
+ self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
+ zipf.close()
+
+ def testInterleaved(self):
+ # Verify that (when the ZipFile is in control of creating file objects)
+ # multiple open() calls can be made without interfering with each other.
+ zipf = zipfile.ZipFile(TESTFN2, mode="r")
+ zopen1 = zipf.open('ones')
+ data1 = zopen1.read(500)
+ zopen2 = zipf.open('twos')
+ data2 = zopen2.read(500)
+ data1 += zopen1.read(500)
+ data2 += zopen2.read(500)
+ self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
+ self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
+ zipf.close()
+
+ def tearDown(self):
+ os.remove(TESTFN2)
+
+
+class UniversalNewlineTests(unittest.TestCase):
+ def setUp(self):
+ self.line_gen = ["Test of zipfile line %d." % i for i in xrange(FIXEDTEST_SIZE)]
+ self.seps = ('\r', '\r\n', '\n')
+ self.arcdata, self.arcfiles = {}, {}
+ for n, s in enumerate(self.seps):
+ self.arcdata[s] = s.join(self.line_gen) + s
+ self.arcfiles[s] = '%s-%d' % (TESTFN, n)
+ file(self.arcfiles[s], "wb").write(self.arcdata[s])
+
+ def makeTestArchive(self, f, compression):
+ # Create the ZIP archive
+ zipfp = zipfile.ZipFile(f, "w", compression)
+ for fn in self.arcfiles.values():
+ zipfp.write(fn, fn)
+ zipfp.close()
+
+ def readTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for sep, fn in self.arcfiles.items():
+ zipdata = zipfp.open(fn, "rU").read()
+ self.assertEqual(self.arcdata[sep], zipdata)
+
+ zipfp.close()
+
+ def readlineTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for sep, fn in self.arcfiles.items():
+ zipopen = zipfp.open(fn, "rU")
+ for line in self.line_gen:
+ linedata = zipopen.readline()
+ self.assertEqual(linedata, line + '\n')
+
+ zipfp.close()
+
+ def readlinesTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for sep, fn in self.arcfiles.items():
+ ziplines = zipfp.open(fn, "rU").readlines()
+ for line, zipline in zip(self.line_gen, ziplines):
+ self.assertEqual(zipline, line + '\n')
+
+ zipfp.close()
+
+ def iterlinesTest(self, f, compression):
+ self.makeTestArchive(f, compression)
+
+ # Read the ZIP archive
+ zipfp = zipfile.ZipFile(f, "r")
+ for sep, fn in self.arcfiles.items():
+ for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
+ self.assertEqual(zipline, line + '\n')
+
+ zipfp.close()
+
+ def testReadStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readTest(f, zipfile.ZIP_STORED)
+
+ def testReadlineStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readlineTest(f, zipfile.ZIP_STORED)
+
+ def testReadlinesStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readlinesTest(f, zipfile.ZIP_STORED)
+
+ def testIterlinesStored(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.iterlinesTest(f, zipfile.ZIP_STORED)
+
+ if zlib:
+ def testReadDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readTest(f, zipfile.ZIP_DEFLATED)
+
+ def testReadlineDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readlineTest(f, zipfile.ZIP_DEFLATED)
+
+ def testReadlinesDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.readlinesTest(f, zipfile.ZIP_DEFLATED)
+
+ def testIterlinesDeflated(self):
+ for f in (TESTFN2, TemporaryFile(), StringIO()):
+ self.iterlinesTest(f, zipfile.ZIP_DEFLATED)
+
+ def tearDown(self):
+ for sep, fn in self.arcfiles.items():
+ os.remove(fn)
+
+
def test_main():
run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests,
- PyZipFileTests, DecryptionTests)
+ PyZipFileTests, DecryptionTests, TestsWithMultipleOpens,
+ UniversalNewlineTests, TestsWithRandomBinaryFiles)
+
#run_unittest(TestZip64InSmallFiles)
if __name__ == "__main__":
diff --git a/Lib/zipfile.py b/Lib/zipfile.py
index 6e59242..8ac9cac 100644
--- a/Lib/zipfile.py
+++ b/Lib/zipfile.py
@@ -355,6 +355,200 @@
self._UpdateKeys(c)
return c
+class ZipExtFile:
+ """File-like object for reading an archive member.
+ Is returned by ZipFile.open().
+ """
+
+ def __init__(self, fileobj, zipinfo, decrypt=None):
+ self.fileobj = fileobj
+ self.decrypter = decrypt
+ self.bytes_read = 0L
+ self.rawbuffer = ''
+ self.readbuffer = ''
+ self.linebuffer = ''
+ self.eof = False
+ self.univ_newlines = False
+ self.nlSeps = ("\n", )
+ self.lastdiscard = ''
+
+ self.compress_type = zipinfo.compress_type
+ self.compress_size = zipinfo.compress_size
+
+ self.closed = False
+ self.mode = "r"
+ self.name = zipinfo.filename
+
+ # read from compressed files in 64k blocks
+ self.compreadsize = 64*1024
+ if self.compress_type == ZIP_DEFLATED:
+ self.dc = zlib.decompressobj(-15)
+
+ def set_univ_newlines(self, univ_newlines):
+ self.univ_newlines = univ_newlines
+
+ # pick line separator char(s) based on universal newlines flag
+ self.nlSeps = ("\n", )
+ if self.univ_newlines:
+ self.nlSeps = ("\r\n", "\r", "\n")
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ nextline = self.readline()
+ if not nextline:
+ raise StopIteration()
+
+ return nextline
+
+ def close(self):
+ self.closed = True
+
+ def _checkfornewline(self):
+ nl, nllen = -1, -1
+ if self.linebuffer:
+ # ugly check for cases where half of an \r\n pair was
+ # read on the last pass, and the \r was discarded. In this
+ # case we just throw away the \n at the start of the buffer.
+ if (self.lastdiscard, self.linebuffer[0]) == ('\r','\n'):
+ self.linebuffer = self.linebuffer[1:]
+
+ for sep in self.nlSeps:
+ nl = self.linebuffer.find(sep)
+ if nl >= 0:
+ nllen = len(sep)
+ return nl, nllen
+
+ return nl, nllen
+
+ def readline(self, size = -1):
+ """Read a line with approx. size. If size is negative,
+ read a whole line.
+ """
+ if size < 0:
+ size = sys.maxint
+ elif size == 0:
+ return ''
+
+ # check for a newline already in buffer
+ nl, nllen = self._checkfornewline()
+
+ if nl >= 0:
+ # the next line was already in the buffer
+ nl = min(nl, size)
+ else:
+ # no line break in buffer - try to read more
+ size -= len(self.linebuffer)
+ while nl < 0 and size > 0:
+ buf = self.read(min(size, 100))
+ if not buf:
+ break
+ self.linebuffer += buf
+ size -= len(buf)
+
+ # check for a newline in buffer
+ nl, nllen = self._checkfornewline()
+
+ # we either ran out of bytes in the file, or
+ # met the specified size limit without finding a newline,
+ # so return current buffer
+ if nl < 0:
+ s = self.linebuffer
+ self.linebuffer = ''
+ return s
+
+ buf = self.linebuffer[:nl]
+ self.lastdiscard = self.linebuffer[nl:nl + nllen]
+ self.linebuffer = self.linebuffer[nl + nllen:]
+
+ # line is always returned with \n as newline char (except possibly
+ # for a final incomplete line in the file, which is handled above).
+ return buf + "\n"
+
+ def readlines(self, sizehint = -1):
+ """Return a list with all (following) lines. The sizehint parameter
+ is ignored in this implementation.
+ """
+ result = []
+ while True:
+ line = self.readline()
+ if not line: break
+ result.append(line)
+ return result
+
+ def read(self, size = None):
+ # act like file() obj and return empty string if size is 0
+ if size == 0:
+ return ''
+
+ # determine read size
+ bytesToRead = self.compress_size - self.bytes_read
+
+ # adjust read size for encrypted files since the first 12 bytes
+ # are for the encryption/password information
+ if self.decrypter is not None:
+ bytesToRead -= 12
+
+ if size is not None and size >= 0:
+ if self.compress_type == ZIP_STORED:
+ lr = len(self.readbuffer)
+ bytesToRead = min(bytesToRead, size - lr)
+ elif self.compress_type == ZIP_DEFLATED:
+ if len(self.readbuffer) > size:
+ # the user has requested fewer bytes than we've already
+ # pulled through the decompressor; don't read any more
+ bytesToRead = 0
+ else:
+ # user will use up the buffer, so read some more
+ lr = len(self.rawbuffer)
+ bytesToRead = min(bytesToRead, self.compreadsize - lr)
+
+ # avoid reading past end of file contents
+ if bytesToRead + self.bytes_read > self.compress_size:
+ bytesToRead = self.compress_size - self.bytes_read
+
+ # try to read from file (if necessary)
+ if bytesToRead > 0:
+ bytes = self.fileobj.read(bytesToRead)
+ self.bytes_read += len(bytes)
+ self.rawbuffer += bytes
+
+ # handle contents of raw buffer
+ if self.rawbuffer:
+ newdata = self.rawbuffer
+ self.rawbuffer = ''
+
+ # decrypt new data if we were given an object to handle that
+ if newdata and self.decrypter is not None:
+ newdata = ''.join(map(self.decrypter, newdata))
+
+ # decompress newly read data if necessary
+ if newdata and self.compress_type == ZIP_DEFLATED:
+ newdata = self.dc.decompress(newdata)
+ self.rawbuffer = self.dc.unconsumed_tail
+ if self.eof and len(self.rawbuffer) == 0:
+ # we're out of raw bytes (both from the file and
+ # the local buffer); flush just to make sure the
+ # decompressor is done
+ newdata += self.dc.flush()
+ # prevent decompressor from being used again
+ self.dc = None
+
+ self.readbuffer += newdata
+
+
+ # return what the user asked for
+ if size is None or len(self.readbuffer) <= size:
+ bytes = self.readbuffer
+ self.readbuffer = ''
+ else:
+ bytes = self.readbuffer[:size]
+ self.readbuffer = self.readbuffer[size:]
+
+ return bytes
+
+
class ZipFile:
""" Class with methods to open, read, write, close, list zip files.
@@ -534,73 +728,75 @@
def read(self, name, pwd=None):
"""Return file bytes (as a string) for name."""
- if self.mode not in ("r", "a"):
- raise RuntimeError, 'read() requires mode "r" or "a"'
+ return self.open(name, "r", pwd).read()
+
+ def open(self, name, mode="r", pwd=None):
+ """Return file-like object for 'name'."""
+ if mode not in ("r", "U", "rU"):
+ raise RuntimeError, 'open() requires mode "r", "U", or "rU"'
if not self.fp:
raise RuntimeError, \
"Attempt to read ZIP archive that was already closed"
- zinfo = self.getinfo(name)
- is_encrypted = zinfo.flag_bits & 0x1
- if is_encrypted:
- if not pwd:
- pwd = self.pwd
- if not pwd:
- raise RuntimeError, "File %s is encrypted, " \
- "password required for extraction" % name
- filepos = self.fp.tell()
- self.fp.seek(zinfo.header_offset, 0)
+ # Only open a new file for instances where we were not
+ # given a file object in the constructor
+ if self._filePassed:
+ zef_file = self.fp
+ else:
+ zef_file = open(self.filename, 'rb')
+
+ # Get info object for name
+ zinfo = self.getinfo(name)
+
+ filepos = zef_file.tell()
+
+ zef_file.seek(zinfo.header_offset, 0)
# Skip the file header:
- fheader = self.fp.read(30)
+ fheader = zef_file.read(30)
if fheader[0:4] != stringFileHeader:
raise BadZipfile, "Bad magic number for file header"
fheader = struct.unpack(structFileHeader, fheader)
- fname = self.fp.read(fheader[_FH_FILENAME_LENGTH])
+ fname = zef_file.read(fheader[_FH_FILENAME_LENGTH])
if fheader[_FH_EXTRA_FIELD_LENGTH]:
- self.fp.read(fheader[_FH_EXTRA_FIELD_LENGTH])
+ zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH])
if fname != zinfo.orig_filename:
raise BadZipfile, \
'File name in directory "%s" and header "%s" differ.' % (
zinfo.orig_filename, fname)
- bytes = self.fp.read(zinfo.compress_size)
- # Go with decryption
+ # check for encrypted flag & handle password
+ is_encrypted = zinfo.flag_bits & 0x1
+ zd = None
if is_encrypted:
+ if not pwd:
+ pwd = self.pwd
+ if not pwd:
+ raise RuntimeError, "File %s is encrypted, " \
+ "password required for extraction" % name
+
zd = _ZipDecrypter(pwd)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# and is used to check the correctness of the password.
+ bytes = zef_file.read(12)
h = map(zd, bytes[0:12])
if ord(h[11]) != ((zinfo.CRC>>24)&255):
raise RuntimeError, "Bad password for file %s" % name
- bytes = "".join(map(zd, bytes[12:]))
- # Go with decompression
- self.fp.seek(filepos, 0)
- if zinfo.compress_type == ZIP_STORED:
- pass
- elif zinfo.compress_type == ZIP_DEFLATED:
- if not zlib:
- raise RuntimeError, \
- "De-compression requires the (missing) zlib module"
- # zlib compress/decompress code by Jeremy Hylton of CNRI
- dc = zlib.decompressobj(-15)
- bytes = dc.decompress(bytes)
- # need to feed in unused pad byte so that zlib won't choke
- ex = dc.decompress('Z') + dc.flush()
- if ex:
- bytes = bytes + ex
+
+ # build and return a ZipExtFile
+ if zd is None:
+ zef = ZipExtFile(zef_file, zinfo)
else:
- raise BadZipfile, \
- "Unsupported compression method %d for file %s" % \
- (zinfo.compress_type, name)
- crc = binascii.crc32(bytes)
- if crc != zinfo.CRC:
- raise BadZipfile, "Bad CRC-32 for file %s" % name
- return bytes
+ zef = ZipExtFile(zef_file, zinfo, zd)
+
+ # set universal newlines on ZipExtFile if necessary
+ if "U" in mode:
+ zef.set_univ_newlines(True)
+ return zef
def _writecheck(self, zinfo):
"""Check for errors before writing a file to the archive."""
diff --git a/Misc/NEWS b/Misc/NEWS
index b0bc889..894f1cb 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -139,6 +139,8 @@
Library
-------
+- Patch #1121142: Implement ZipFile.open.
+
- Taught setup.py how to locate Berkeley DB on Macs using MacPorts.
- Added heapq.merge() for merging sorted input streams.