| import sys |
| import os |
| import shutil |
| import tempfile |
| import StringIO |
| |
| import unittest |
| import tarfile |
| |
| from test import test_support |
| |
| # Check for our compression modules. |
| try: |
| import gzip |
| gzip.GzipFile |
| except (ImportError, AttributeError): |
| gzip = None |
| try: |
| import bz2 |
| except ImportError: |
| bz2 = None |
| |
| def path(path): |
| return test_support.findfile(path) |
| |
| testtar = path("testtar.tar") |
| tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir") |
| tempname = test_support.TESTFN |
| membercount = 12 |
| |
| def tarname(comp=""): |
| if not comp: |
| return testtar |
| return os.path.join(tempdir, "%s%s%s" % (testtar, os.extsep, comp)) |
| |
| def dirname(): |
| if not os.path.exists(tempdir): |
| os.mkdir(tempdir) |
| return tempdir |
| |
| def tmpname(): |
| return tempname |
| |
| |
| class BaseTest(unittest.TestCase): |
| comp = '' |
| mode = 'r' |
| sep = ':' |
| |
| def setUp(self): |
| mode = self.mode + self.sep + self.comp |
| self.tar = tarfile.open(tarname(self.comp), mode) |
| |
| def tearDown(self): |
| self.tar.close() |
| |
| class ReadTest(BaseTest): |
| |
| def test(self): |
| """Test member extraction. |
| """ |
| members = 0 |
| for tarinfo in self.tar: |
| members += 1 |
| if not tarinfo.isreg(): |
| continue |
| f = self.tar.extractfile(tarinfo) |
| self.assert_(len(f.read()) == tarinfo.size, |
| "size read does not match expected size") |
| f.close() |
| |
| self.assert_(members == membercount, |
| "could not find all members") |
| |
| def test_sparse(self): |
| """Test sparse member extraction. |
| """ |
| if self.sep != "|": |
| f1 = self.tar.extractfile("S-SPARSE") |
| f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS") |
| self.assert_(f1.read() == f2.read(), |
| "_FileObject failed on sparse file member") |
| |
| def test_readlines(self): |
| """Test readlines() method of _FileObject. |
| """ |
| if self.sep != "|": |
| filename = "0-REGTYPE-TEXT" |
| self.tar.extract(filename, dirname()) |
| f = open(os.path.join(dirname(), filename), "rU") |
| lines1 = f.readlines() |
| f.close() |
| lines2 = self.tar.extractfile(filename).readlines() |
| self.assert_(lines1 == lines2, |
| "_FileObject.readline() does not work correctly") |
| |
| def test_iter(self): |
| # Test iteration over ExFileObject. |
| if self.sep != "|": |
| filename = "0-REGTYPE-TEXT" |
| self.tar.extract(filename, dirname()) |
| f = open(os.path.join(dirname(), filename), "rU") |
| lines1 = f.readlines() |
| f.close() |
| lines2 = [line for line in self.tar.extractfile(filename)] |
| self.assert_(lines1 == lines2, |
| "ExFileObject iteration does not work correctly") |
| |
| def test_seek(self): |
| """Test seek() method of _FileObject, incl. random reading. |
| """ |
| if self.sep != "|": |
| filename = "0-REGTYPE" |
| self.tar.extract(filename, dirname()) |
| f = open(os.path.join(dirname(), filename), "rb") |
| data = f.read() |
| f.close() |
| |
| tarinfo = self.tar.getmember(filename) |
| fobj = self.tar.extractfile(tarinfo) |
| |
| text = fobj.read() |
| fobj.seek(0) |
| self.assert_(0 == fobj.tell(), |
| "seek() to file's start failed") |
| fobj.seek(2048, 0) |
| self.assert_(2048 == fobj.tell(), |
| "seek() to absolute position failed") |
| fobj.seek(-1024, 1) |
| self.assert_(1024 == fobj.tell(), |
| "seek() to negative relative position failed") |
| fobj.seek(1024, 1) |
| self.assert_(2048 == fobj.tell(), |
| "seek() to positive relative position failed") |
| s = fobj.read(10) |
| self.assert_(s == data[2048:2058], |
| "read() after seek failed") |
| fobj.seek(0, 2) |
| self.assert_(tarinfo.size == fobj.tell(), |
| "seek() to file's end failed") |
| self.assert_(fobj.read() == "", |
| "read() at file's end did not return empty string") |
| fobj.seek(-tarinfo.size, 2) |
| self.assert_(0 == fobj.tell(), |
| "relative seek() to file's start failed") |
| fobj.seek(512) |
| s1 = fobj.readlines() |
| fobj.seek(512) |
| s2 = fobj.readlines() |
| self.assert_(s1 == s2, |
| "readlines() after seek failed") |
| fobj.close() |
| |
| def test_old_dirtype(self): |
| """Test old style dirtype member (bug #1336623). |
| """ |
| # Old tars create directory members using a REGTYPE |
| # header with a "/" appended to the filename field. |
| |
| # Create an old tar style directory entry. |
| filename = tmpname() |
| tarinfo = tarfile.TarInfo("directory/") |
| tarinfo.type = tarfile.REGTYPE |
| |
| fobj = open(filename, "w") |
| fobj.write(tarinfo.tobuf()) |
| fobj.close() |
| |
| try: |
| # Test if it is still a directory entry when |
| # read back. |
| tar = tarfile.open(filename) |
| tarinfo = tar.getmembers()[0] |
| tar.close() |
| |
| self.assert_(tarinfo.type == tarfile.DIRTYPE) |
| self.assert_(tarinfo.name.endswith("/")) |
| finally: |
| try: |
| os.unlink(filename) |
| except: |
| pass |
| |
| class ReadStreamTest(ReadTest): |
| sep = "|" |
| |
| def test(self): |
| """Test member extraction, and for StreamError when |
| seeking backwards. |
| """ |
| ReadTest.test(self) |
| tarinfo = self.tar.getmembers()[0] |
| f = self.tar.extractfile(tarinfo) |
| self.assertRaises(tarfile.StreamError, f.read) |
| |
| def test_stream(self): |
| """Compare the normal tar and the stream tar. |
| """ |
| stream = self.tar |
| tar = tarfile.open(tarname(), 'r') |
| |
| while 1: |
| t1 = tar.next() |
| t2 = stream.next() |
| if t1 is None: |
| break |
| self.assert_(t2 is not None, "stream.next() failed.") |
| |
| if t2.islnk() or t2.issym(): |
| self.assertRaises(tarfile.StreamError, stream.extractfile, t2) |
| continue |
| v1 = tar.extractfile(t1) |
| v2 = stream.extractfile(t2) |
| if v1 is None: |
| continue |
| self.assert_(v2 is not None, "stream.extractfile() failed") |
| self.assert_(v1.read() == v2.read(), "stream extraction failed") |
| |
| tar.close() |
| stream.close() |
| |
| class ReadDetectTest(ReadTest): |
| |
| def setUp(self): |
| self.tar = tarfile.open(tarname(self.comp), self.mode) |
| |
| class ReadDetectFileobjTest(ReadTest): |
| |
| def setUp(self): |
| name = tarname(self.comp) |
| self.tar = tarfile.open(name, mode=self.mode, |
| fileobj=open(name, "rb")) |
| |
| class ReadAsteriskTest(ReadTest): |
| |
| def setUp(self): |
| mode = self.mode + self.sep + "*" |
| self.tar = tarfile.open(tarname(self.comp), mode) |
| |
| class ReadStreamAsteriskTest(ReadStreamTest): |
| |
| def setUp(self): |
| mode = self.mode + self.sep + "*" |
| self.tar = tarfile.open(tarname(self.comp), mode) |
| |
| class WriteTest(BaseTest): |
| mode = 'w' |
| |
| def setUp(self): |
| mode = self.mode + self.sep + self.comp |
| self.src = tarfile.open(tarname(self.comp), 'r') |
| self.dstname = tmpname() |
| self.dst = tarfile.open(self.dstname, mode) |
| |
| def tearDown(self): |
| self.src.close() |
| self.dst.close() |
| |
| def test_posix(self): |
| self.dst.posix = 1 |
| self._test() |
| |
| def test_nonposix(self): |
| self.dst.posix = 0 |
| self._test() |
| |
| def test_small(self): |
| self.dst.add(os.path.join(os.path.dirname(__file__),"cfgparser.1")) |
| self.dst.close() |
| self.assertNotEqual(os.stat(self.dstname).st_size, 0) |
| |
| def _test(self): |
| for tarinfo in self.src: |
| if not tarinfo.isreg(): |
| continue |
| f = self.src.extractfile(tarinfo) |
| if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME and "/" not in tarinfo.name: |
| self.assertRaises(ValueError, self.dst.addfile, |
| tarinfo, f) |
| else: |
| self.dst.addfile(tarinfo, f) |
| |
| class WriteSize0Test(BaseTest): |
| mode = 'w' |
| |
| def setUp(self): |
| self.tmpdir = dirname() |
| self.dstname = tmpname() |
| self.dst = tarfile.open(self.dstname, "w") |
| |
| def tearDown(self): |
| self.dst.close() |
| |
| def test_file(self): |
| path = os.path.join(self.tmpdir, "file") |
| f = open(path, "w") |
| f.close() |
| tarinfo = self.dst.gettarinfo(path) |
| self.assertEqual(tarinfo.size, 0) |
| f = open(path, "w") |
| f.write("aaa") |
| f.close() |
| tarinfo = self.dst.gettarinfo(path) |
| self.assertEqual(tarinfo.size, 3) |
| |
| def test_directory(self): |
| path = os.path.join(self.tmpdir, "directory") |
| if os.path.exists(path): |
| # This shouldn't be necessary, but is <wink> if a previous |
| # run was killed in mid-stream. |
| shutil.rmtree(path) |
| os.mkdir(path) |
| tarinfo = self.dst.gettarinfo(path) |
| self.assertEqual(tarinfo.size, 0) |
| |
| def test_symlink(self): |
| if hasattr(os, "symlink"): |
| path = os.path.join(self.tmpdir, "symlink") |
| os.symlink("link_target", path) |
| tarinfo = self.dst.gettarinfo(path) |
| self.assertEqual(tarinfo.size, 0) |
| |
| |
| class WriteStreamTest(WriteTest): |
| sep = '|' |
| |
| def test_padding(self): |
| self.dst.close() |
| |
| if self.comp == "gz": |
| f = gzip.GzipFile(self.dstname) |
| s = f.read() |
| f.close() |
| elif self.comp == "bz2": |
| f = bz2.BZ2Decompressor() |
| s = file(self.dstname).read() |
| s = f.decompress(s) |
| self.assertEqual(len(f.unused_data), 0, "trailing data") |
| else: |
| f = file(self.dstname) |
| s = f.read() |
| f.close() |
| |
| self.assertEqual(s.count("\0"), tarfile.RECORDSIZE, |
| "incorrect zero padding") |
| |
| |
| class WriteGNULongTest(unittest.TestCase): |
| """This testcase checks for correct creation of GNU Longname |
| and Longlink extensions. |
| |
| It creates a tarfile and adds empty members with either |
| long names, long linknames or both and compares the size |
| of the tarfile with the expected size. |
| |
| It checks for SF bug #812325 in TarFile._create_gnulong(). |
| |
| While I was writing this testcase, I noticed a second bug |
| in the same method: |
| Long{names,links} weren't null-terminated which lead to |
| bad tarfiles when their length was a multiple of 512. This |
| is tested as well. |
| """ |
| |
| def setUp(self): |
| self.tar = tarfile.open(tmpname(), "w") |
| self.tar.posix = False |
| |
| def tearDown(self): |
| self.tar.close() |
| |
| def _length(self, s): |
| blocks, remainder = divmod(len(s) + 1, 512) |
| if remainder: |
| blocks += 1 |
| return blocks * 512 |
| |
| def _calc_size(self, name, link=None): |
| # initial tar header |
| count = 512 |
| |
| if len(name) > tarfile.LENGTH_NAME: |
| # gnu longname extended header + longname |
| count += 512 |
| count += self._length(name) |
| |
| if link is not None and len(link) > tarfile.LENGTH_LINK: |
| # gnu longlink extended header + longlink |
| count += 512 |
| count += self._length(link) |
| |
| return count |
| |
| def _test(self, name, link=None): |
| tarinfo = tarfile.TarInfo(name) |
| if link: |
| tarinfo.linkname = link |
| tarinfo.type = tarfile.LNKTYPE |
| |
| self.tar.addfile(tarinfo) |
| |
| v1 = self._calc_size(name, link) |
| v2 = self.tar.offset |
| self.assertEqual(v1, v2, "GNU longname/longlink creation failed") |
| |
| def test_longname_1023(self): |
| self._test(("longnam/" * 127) + "longnam") |
| |
| def test_longname_1024(self): |
| self._test(("longnam/" * 127) + "longname") |
| |
| def test_longname_1025(self): |
| self._test(("longnam/" * 127) + "longname_") |
| |
| def test_longlink_1023(self): |
| self._test("name", ("longlnk/" * 127) + "longlnk") |
| |
| def test_longlink_1024(self): |
| self._test("name", ("longlnk/" * 127) + "longlink") |
| |
| def test_longlink_1025(self): |
| self._test("name", ("longlnk/" * 127) + "longlink_") |
| |
| def test_longnamelink_1023(self): |
| self._test(("longnam/" * 127) + "longnam", |
| ("longlnk/" * 127) + "longlnk") |
| |
| def test_longnamelink_1024(self): |
| self._test(("longnam/" * 127) + "longname", |
| ("longlnk/" * 127) + "longlink") |
| |
| def test_longnamelink_1025(self): |
| self._test(("longnam/" * 127) + "longname_", |
| ("longlnk/" * 127) + "longlink_") |
| |
| class ReadGNULongTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.tar = tarfile.open(tarname()) |
| |
| def tearDown(self): |
| self.tar.close() |
| |
| def test_1471427(self): |
| """Test reading of longname (bug #1471427). |
| """ |
| name = "test/" * 20 + "0-REGTYPE" |
| try: |
| tarinfo = self.tar.getmember(name) |
| except KeyError: |
| tarinfo = None |
| self.assert_(tarinfo is not None, "longname not found") |
| self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") |
| |
| def test_read_name(self): |
| name = ("0-LONGNAME-" * 10)[:101] |
| try: |
| tarinfo = self.tar.getmember(name) |
| except KeyError: |
| tarinfo = None |
| self.assert_(tarinfo is not None, "longname not found") |
| |
| def test_read_link(self): |
| link = ("1-LONGLINK-" * 10)[:101] |
| name = ("0-LONGNAME-" * 10)[:101] |
| try: |
| tarinfo = self.tar.getmember(link) |
| except KeyError: |
| tarinfo = None |
| self.assert_(tarinfo is not None, "longlink not found") |
| self.assert_(tarinfo.linkname == name, "linkname wrong") |
| |
| def test_truncated_longname(self): |
| f = open(tarname()) |
| fobj = StringIO.StringIO(f.read(1024)) |
| f.close() |
| tar = tarfile.open(name="foo.tar", fileobj=fobj) |
| self.assert_(len(tar.getmembers()) == 0, "") |
| tar.close() |
| |
| |
| class ExtractHardlinkTest(BaseTest): |
| |
| def test_hardlink(self): |
| """Test hardlink extraction (bug #857297) |
| """ |
| # Prevent errors from being caught |
| self.tar.errorlevel = 1 |
| |
| self.tar.extract("0-REGTYPE", dirname()) |
| try: |
| # Extract 1-LNKTYPE which is a hardlink to 0-REGTYPE |
| self.tar.extract("1-LNKTYPE", dirname()) |
| except EnvironmentError, e: |
| import errno |
| if e.errno == errno.ENOENT: |
| self.fail("hardlink not extracted properly") |
| |
| class CreateHardlinkTest(BaseTest): |
| """Test the creation of LNKTYPE (hardlink) members in an archive. |
| In this respect tarfile.py mimics the behaviour of GNU tar: If |
| a file has a st_nlink > 1, it will be added a REGTYPE member |
| only the first time. |
| """ |
| |
| def setUp(self): |
| self.tar = tarfile.open(tmpname(), "w") |
| |
| self.foo = os.path.join(dirname(), "foo") |
| self.bar = os.path.join(dirname(), "bar") |
| |
| if os.path.exists(self.foo): |
| os.remove(self.foo) |
| if os.path.exists(self.bar): |
| os.remove(self.bar) |
| |
| f = open(self.foo, "w") |
| f.write("foo") |
| f.close() |
| self.tar.add(self.foo) |
| |
| def test_add_twice(self): |
| # If st_nlink == 1 then the same file will be added as |
| # REGTYPE every time. |
| tarinfo = self.tar.gettarinfo(self.foo) |
| self.assertEqual(tarinfo.type, tarfile.REGTYPE, |
| "add file as regular failed") |
| |
| def test_add_hardlink(self): |
| # If st_nlink > 1 then the same file will be added as |
| # LNKTYPE. |
| os.link(self.foo, self.bar) |
| tarinfo = self.tar.gettarinfo(self.foo) |
| self.assertEqual(tarinfo.type, tarfile.LNKTYPE, |
| "add file as hardlink failed") |
| |
| tarinfo = self.tar.gettarinfo(self.bar) |
| self.assertEqual(tarinfo.type, tarfile.LNKTYPE, |
| "add file as hardlink failed") |
| |
| def test_dereference_hardlink(self): |
| self.tar.dereference = True |
| os.link(self.foo, self.bar) |
| tarinfo = self.tar.gettarinfo(self.bar) |
| self.assertEqual(tarinfo.type, tarfile.REGTYPE, |
| "dereferencing hardlink failed") |
| |
| |
| # Gzip TestCases |
| class ReadTestGzip(ReadTest): |
| comp = "gz" |
| class ReadStreamTestGzip(ReadStreamTest): |
| comp = "gz" |
| class WriteTestGzip(WriteTest): |
| comp = "gz" |
| class WriteStreamTestGzip(WriteStreamTest): |
| comp = "gz" |
| class ReadDetectTestGzip(ReadDetectTest): |
| comp = "gz" |
| class ReadDetectFileobjTestGzip(ReadDetectFileobjTest): |
| comp = "gz" |
| class ReadAsteriskTestGzip(ReadAsteriskTest): |
| comp = "gz" |
| class ReadStreamAsteriskTestGzip(ReadStreamAsteriskTest): |
| comp = "gz" |
| |
| # Filemode test cases |
| |
| class FileModeTest(unittest.TestCase): |
| def test_modes(self): |
| self.assertEqual(tarfile.filemode(0755), '-rwxr-xr-x') |
| self.assertEqual(tarfile.filemode(07111), '---s--s--t') |
| |
| |
| if bz2: |
| # Bzip2 TestCases |
| class ReadTestBzip2(ReadTestGzip): |
| comp = "bz2" |
| class ReadStreamTestBzip2(ReadStreamTestGzip): |
| comp = "bz2" |
| class WriteTestBzip2(WriteTest): |
| comp = "bz2" |
| class WriteStreamTestBzip2(WriteStreamTestGzip): |
| comp = "bz2" |
| class ReadDetectTestBzip2(ReadDetectTest): |
| comp = "bz2" |
| class ReadDetectFileobjTestBzip2(ReadDetectFileobjTest): |
| comp = "bz2" |
| class ReadAsteriskTestBzip2(ReadAsteriskTest): |
| comp = "bz2" |
| class ReadStreamAsteriskTestBzip2(ReadStreamAsteriskTest): |
| comp = "bz2" |
| |
| # If importing gzip failed, discard the Gzip TestCases. |
| if not gzip: |
| del ReadTestGzip |
| del ReadStreamTestGzip |
| del WriteTestGzip |
| del WriteStreamTestGzip |
| |
| def test_main(): |
| # Create archive. |
| f = open(tarname(), "rb") |
| fguts = f.read() |
| f.close() |
| if gzip: |
| # create testtar.tar.gz |
| tar = gzip.open(tarname("gz"), "wb") |
| tar.write(fguts) |
| tar.close() |
| if bz2: |
| # create testtar.tar.bz2 |
| tar = bz2.BZ2File(tarname("bz2"), "wb") |
| tar.write(fguts) |
| tar.close() |
| |
| tests = [ |
| FileModeTest, |
| ReadTest, |
| ReadStreamTest, |
| ReadDetectTest, |
| ReadDetectFileobjTest, |
| ReadAsteriskTest, |
| ReadStreamAsteriskTest, |
| WriteTest, |
| WriteSize0Test, |
| WriteStreamTest, |
| WriteGNULongTest, |
| ReadGNULongTest, |
| ] |
| |
| if hasattr(os, "link"): |
| tests.append(ExtractHardlinkTest) |
| tests.append(CreateHardlinkTest) |
| |
| if gzip: |
| tests.extend([ |
| ReadTestGzip, ReadStreamTestGzip, |
| WriteTestGzip, WriteStreamTestGzip, |
| ReadDetectTestGzip, ReadDetectFileobjTestGzip, |
| ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip |
| ]) |
| |
| if bz2: |
| tests.extend([ |
| ReadTestBzip2, ReadStreamTestBzip2, |
| WriteTestBzip2, WriteStreamTestBzip2, |
| ReadDetectTestBzip2, ReadDetectFileobjTestBzip2, |
| ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2 |
| ]) |
| try: |
| test_support.run_unittest(*tests) |
| finally: |
| if gzip: |
| os.remove(tarname("gz")) |
| if bz2: |
| os.remove(tarname("bz2")) |
| if os.path.exists(dirname()): |
| shutil.rmtree(dirname()) |
| if os.path.exists(tmpname()): |
| os.remove(tmpname()) |
| |
| if __name__ == "__main__": |
| test_main() |