blob: 2dc06ead1a1e3babb8abcc85b778862b942bb7ad [file] [log] [blame]
Neal Norwitzb9ef4ae2003-01-05 23:19:43 +00001import sys
2import os
3import shutil
4
5import unittest
6import tarfile
7
8from test import test_support
9
10# Check for our compression modules.
11try:
12 import gzip
13except ImportError:
14 gzip = None
15try:
16 import bz2
17except ImportError:
18 bz2 = None
19
20def path(path):
21 return test_support.findfile(path)
22
23testtar = path("testtar.tar")
24tempdir = path("testtar.dir")
25tempname = path("testtar.tmp")
26membercount = 10
27
28def tarname(comp=""):
29 if not comp:
30 return testtar
31 return "%s.%s" % (testtar, comp)
32
33def dirname():
34 if not os.path.exists(tempdir):
35 os.mkdir(tempdir)
36 return tempdir
37
38def tmpname():
39 return tempname
40
41
42class BaseTest(unittest.TestCase):
43 comp = ''
44 mode = 'r'
45 sep = ':'
46
47 def setUp(self):
48 mode = self.mode + self.sep + self.comp
49 self.tar = tarfile.open(tarname(self.comp), mode)
50
51 def tearDown(self):
52 self.tar.close()
53
54class ReadTest(BaseTest):
55
56 def test(self):
57 """Test member extraction.
58 """
59 members = 0
60 for tarinfo in self.tar:
61 members += 1
62 if not tarinfo.isreg():
63 continue
64 f = self.tar.extractfile(tarinfo)
65 self.assert_(len(f.read()) == tarinfo.size,
66 "size read does not match expected size")
67 f.close()
68
69 self.assert_(members == membercount,
70 "could not find all members")
71
72 def test_sparse(self):
73 """Test sparse member extraction.
74 """
75 if self.sep != "|":
76 f1 = self.tar.extractfile("S-SPARSE")
77 f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
78 self.assert_(f1.read() == f2.read(),
79 "_FileObject failed on sparse file member")
80
81 def test_readlines(self):
82 """Test readlines() method of _FileObject.
83 """
84 if self.sep != "|":
85 filename = "0-REGTYPE-TEXT"
86 self.tar.extract(filename, dirname())
87 lines1 = file(os.path.join(dirname(), filename), "r").readlines()
88 lines2 = self.tar.extractfile(filename).readlines()
89 self.assert_(lines1 == lines2,
90 "_FileObject.readline() does not work correctly")
91
92 def test_seek(self):
93 """Test seek() method of _FileObject, incl. random reading.
94 """
95 if self.sep != "|":
96 filename = "0-REGTYPE"
97 self.tar.extract(filename, dirname())
98 data = file(os.path.join(dirname(), filename), "rb").read()
99
100 tarinfo = self.tar.getmember(filename)
101 fobj = self.tar.extractfile(tarinfo)
102
103 text = fobj.read()
104 fobj.seek(0)
105 self.assert_(0 == fobj.tell(),
106 "seek() to file's start failed")
107 fobj.seek(2048, 0)
108 self.assert_(2048 == fobj.tell(),
109 "seek() to absolute position failed")
110 fobj.seek(-1024, 1)
111 self.assert_(1024 == fobj.tell(),
112 "seek() to negative relative position failed")
113 fobj.seek(1024, 1)
114 self.assert_(2048 == fobj.tell(),
115 "seek() to positive relative position failed")
116 s = fobj.read(10)
117 self.assert_(s == data[2048:2058],
118 "read() after seek failed")
119 fobj.seek(0, 2)
120 self.assert_(tarinfo.size == fobj.tell(),
121 "seek() to file's end failed")
122 self.assert_(fobj.read() == "",
123 "read() at file's end did not return empty string")
124 fobj.seek(-tarinfo.size, 2)
125 self.assert_(0 == fobj.tell(),
126 "relative seek() to file's start failed")
127 fobj.seek(512)
128 s1 = fobj.readlines()
129 fobj.seek(512)
130 s2 = fobj.readlines()
131 self.assert_(s1 == s2,
132 "readlines() after seek failed")
133 fobj.close()
134
135class ReadStreamTest(ReadTest):
136 sep = "|"
137
138 def test(self):
139 """Test member extraction, and for StreamError when
140 seeking backwards.
141 """
142 ReadTest.test(self)
143 tarinfo = self.tar.getmembers()[0]
144 f = self.tar.extractfile(tarinfo)
145 self.assertRaises(tarfile.StreamError, f.read)
146
147 def test_stream(self):
148 """Compare the normal tar and the stream tar.
149 """
150 stream = self.tar
151 tar = tarfile.open(tarname(), 'r')
152
153 while 1:
154 t1 = tar.next()
155 t2 = stream.next()
156 if t1 is None:
157 break
158 self.assert_(t2 is not None, "stream.next() failed.")
159
160 if t2.islnk() or t2.issym():
161 self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
162 continue
163 v1 = tar.extractfile(t1)
164 v2 = stream.extractfile(t2)
165 if v1 is None:
166 continue
167 self.assert_(v2 is not None, "stream.extractfile() failed")
168 self.assert_(v1.read() == v2.read(), "stream extraction failed")
169
170 stream.close()
171
172class WriteTest(BaseTest):
173 mode = 'w'
174
175 def setUp(self):
176 mode = self.mode + self.sep + self.comp
177 self.src = tarfile.open(tarname(self.comp), 'r')
178 self.dst = tarfile.open(tmpname(), mode)
179
180 def tearDown(self):
181 self.src.close()
182 self.dst.close()
183
184 def test_posix(self):
185 self.dst.posix = 1
186 self._test()
187
188 def test_nonposix(self):
189 self.dst.posix = 0
190 self._test()
191
192 def _test(self):
193 for tarinfo in self.src:
194 if not tarinfo.isreg():
195 continue
196 f = self.src.extractfile(tarinfo)
197 if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME:
198 self.assertRaises(ValueError, self.dst.addfile,
199 tarinfo, f)
200 else:
201 self.dst.addfile(tarinfo, f)
202
203class WriteStreamTest(WriteTest):
204 sep = '|'
205
206# Gzip TestCases
207class ReadTestGzip(ReadTest):
208 comp = "gz"
209class ReadStreamTestGzip(ReadStreamTest):
210 comp = "gz"
211class WriteTestGzip(WriteTest):
212 comp = "gz"
213class WriteStreamTestGzip(WriteStreamTest):
214 comp = "gz"
215
216if bz2:
217 # Bzip2 TestCases
218 class ReadTestBzip2(ReadTestGzip):
219 comp = "bz2"
220 class ReadStreamTestBzip2(ReadStreamTestGzip):
221 comp = "bz2"
222 class WriteTestBzip2(WriteTest):
223 comp = "bz2"
224 class WriteStreamTestBzip2(WriteStreamTestGzip):
225 comp = "bz2"
226
227# If importing gzip failed, discard the Gzip TestCases.
228if not gzip:
229 del ReadTestGzip
230 del ReadStreamTestGzip
231 del WriteTestGzip
232 del WriteStreamTestGzip
233
234if __name__ == "__main__":
235 if gzip:
236 # create testtar.tar.gz
237 gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read())
238 if bz2:
239 # create testtar.tar.bz2
240 bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read())
241
242 try:
243 unittest.main()
244 finally:
245 if gzip:
246 os.remove(tarname("gz"))
247 if bz2:
248 os.remove(tarname("bz2"))
249 if os.path.exists(tempdir):
250 shutil.rmtree(tempdir)
251 if os.path.exists(tempname):
252 os.remove(tempname)
253