blob: dd9c684d146e4ac3c5c9e999c669805638cb34cb [file] [log] [blame]
Phillip J. Eby069159b2006-04-18 04:05:34 +00001"""Utilities for extracting common archive formats"""
2
3
4__all__ = [
5 "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
6 "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
7]
8
9import zipfile, tarfile, os, shutil
10from pkg_resources import ensure_directory
11from distutils.errors import DistutilsError
12
13class UnrecognizedFormat(DistutilsError):
14 """Couldn't recognize the archive type"""
15
16def default_filter(src,dst):
Tim Peters584b0e02006-04-18 17:32:12 +000017 """The default progress/filter callback; returns True for all files"""
Phillip J. Eby069159b2006-04-18 04:05:34 +000018 return dst
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42def unpack_archive(filename, extract_dir, progress_filter=default_filter,
43 drivers=None
44):
45 """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
46
47 `progress_filter` is a function taking two arguments: a source path
48 internal to the archive ('/'-separated), and a filesystem path where it
49 will be extracted. The callback must return the desired extract path
50 (which may be the same as the one passed in), or else ``None`` to skip
51 that file or directory. The callback can thus be used to report on the
52 progress of the extraction, as well as to filter the items extracted or
53 alter their extraction paths.
54
55 `drivers`, if supplied, must be a non-empty sequence of functions with the
56 same signature as this function (minus the `drivers` argument), that raise
57 ``UnrecognizedFormat`` if they do not support extracting the designated
58 archive type. The `drivers` are tried in sequence until one is found that
59 does not raise an error, or until all are exhausted (in which case
60 ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
61 drivers, the module's ``extraction_drivers`` constant will be used, which
62 means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
63 order.
64 """
65 for driver in drivers or extraction_drivers:
66 try:
67 driver(filename, extract_dir, progress_filter)
68 except UnrecognizedFormat:
69 continue
70 else:
71 return
72 else:
73 raise UnrecognizedFormat(
74 "Not a recognized archive type: %s" % filename
75 )
76
77
78
79
80
81
82
83def unpack_directory(filename, extract_dir, progress_filter=default_filter):
84 """"Unpack" a directory, using the same interface as for archives
85
86 Raises ``UnrecognizedFormat`` if `filename` is not a directory
87 """
88 if not os.path.isdir(filename):
89 raise UnrecognizedFormat("%s is not a directory" % (filename,))
90
91 paths = {filename:('',extract_dir)}
92 for base, dirs, files in os.walk(filename):
93 src,dst = paths[base]
94 for d in dirs:
95 paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d)
96 for f in files:
97 name = src+f
98 target = os.path.join(dst,f)
99 target = progress_filter(src+f, target)
100 if not target:
101 continue # skip non-files
102 ensure_directory(target)
103 f = os.path.join(base,f)
104 shutil.copyfile(f, target)
105 shutil.copystat(f, target)
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
125 """Unpack zip `filename` to `extract_dir`
126
127 Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
128 by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
129 of the `progress_filter` argument.
130 """
131
132 if not zipfile.is_zipfile(filename):
133 raise UnrecognizedFormat("%s is not a zip file" % (filename,))
134
135 z = zipfile.ZipFile(filename)
136 try:
137 for info in z.infolist():
138 name = info.filename
139
140 # don't extract absolute paths or ones with .. in them
141 if name.startswith('/') or '..' in name:
142 continue
143
144 target = os.path.join(extract_dir, *name.split('/'))
145 target = progress_filter(name, target)
146 if not target:
147 continue
148 if name.endswith('/'):
149 # directory
150 ensure_directory(target)
151 else:
152 # file
153 ensure_directory(target)
154 data = z.read(info.filename)
155 f = open(target,'wb')
156 try:
157 f.write(data)
158 finally:
159 f.close()
160 del data
161 finally:
162 z.close()
163
164
165def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
166 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
167
168 Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
169 by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
170 of the `progress_filter` argument.
171 """
172
173 try:
174 tarobj = tarfile.open(filename)
175 except tarfile.TarError:
176 raise UnrecognizedFormat(
177 "%s is not a compressed or uncompressed tar file" % (filename,)
178 )
179
180 try:
181 tarobj.chown = lambda *args: None # don't do any chowning!
182 for member in tarobj:
183 if member.isfile() or member.isdir():
184 name = member.name
185 # don't extract absolute paths or ones with .. in them
186 if not name.startswith('/') and '..' not in name:
Tim Peters584b0e02006-04-18 17:32:12 +0000187 dst = os.path.join(extract_dir, *name.split('/'))
Phillip J. Eby069159b2006-04-18 04:05:34 +0000188 dst = progress_filter(name, dst)
189 if dst:
190 if dst.endswith(os.sep):
191 dst = dst[:-1]
192 tarobj._extract_member(member,dst) # XXX Ugh
193 return True
194 finally:
195 tarobj.close()
196
197
198
199
200extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile