blob: baa19433b33733ed71c510d2acde01bc364995e0 [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
4import shutil
5import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00006import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00007import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00008import os
9import os.path
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010from test import support
11from test.support import TESTFN
Tarek Ziadé396fad72010-02-23 05:30:31 +000012from os.path import splitdrive
13from distutils.spawn import find_executable, spawn
14from shutil import (_make_tarball, _make_zipfile, make_archive,
15 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000016 get_archive_formats, Error, unpack_archive,
17 register_unpack_format, RegistryError,
18 unregister_unpack_format, get_unpack_formats)
Tarek Ziadé396fad72010-02-23 05:30:31 +000019import tarfile
20import warnings
21
22from test import support
23from test.support import TESTFN, check_warnings, captured_stdout
24
Tarek Ziadéffa155a2010-04-29 13:34:35 +000025try:
26 import bz2
27 BZ2_SUPPORTED = True
28except ImportError:
29 BZ2_SUPPORTED = False
30
Antoine Pitrou7fff0962009-05-01 21:09:44 +000031TESTFN2 = TESTFN + "2"
Barry Warsaw7fc2cca2003-01-24 17:34:13 +000032
Tarek Ziadé396fad72010-02-23 05:30:31 +000033try:
34 import grp
35 import pwd
36 UID_GID_SUPPORT = True
37except ImportError:
38 UID_GID_SUPPORT = False
39
40try:
41 import zlib
42except ImportError:
43 zlib = None
44
45try:
46 import zipfile
47 ZIP_SUPPORT = True
48except ImportError:
49 ZIP_SUPPORT = find_executable('zip')
50
Barry Warsaw7fc2cca2003-01-24 17:34:13 +000051class TestShutil(unittest.TestCase):
Tarek Ziadé396fad72010-02-23 05:30:31 +000052
53 def setUp(self):
54 super(TestShutil, self).setUp()
55 self.tempdirs = []
56
57 def tearDown(self):
58 super(TestShutil, self).tearDown()
59 while self.tempdirs:
60 d = self.tempdirs.pop()
61 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
62
63 def write_file(self, path, content='xxx'):
64 """Writes a file in the given path.
65
66
67 path can be a string or a sequence.
68 """
69 if isinstance(path, (list, tuple)):
70 path = os.path.join(*path)
71 f = open(path, 'w')
72 try:
73 f.write(content)
74 finally:
75 f.close()
76
77 def mkdtemp(self):
78 """Create a temporary directory that will be cleaned up.
79
80 Returns the path of the directory.
81 """
82 d = tempfile.mkdtemp()
83 self.tempdirs.append(d)
84 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +000085
Barry Warsaw7fc2cca2003-01-24 17:34:13 +000086 def test_rmtree_errors(self):
87 # filename is guaranteed not to exist
88 filename = tempfile.mktemp()
89 self.assertRaises(OSError, shutil.rmtree, filename)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +000090
Johannes Gijsbersb8b09d02004-12-06 20:50:15 +000091 # See bug #1071513 for why we don't run this on cygwin
92 # and bug #1076467 for why we don't run this as root.
93 if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin'
Johannes Gijsbers6b220b02004-12-12 15:52:57 +000094 and not (hasattr(os, 'geteuid') and os.geteuid() == 0)):
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +000095 def test_on_error(self):
96 self.errorState = 0
97 os.mkdir(TESTFN)
Tim Peters4590c002004-11-01 02:40:52 +000098 self.childpath = os.path.join(TESTFN, 'a')
99 f = open(self.childpath, 'w')
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000100 f.close()
Tim Peters4590c002004-11-01 02:40:52 +0000101 old_dir_mode = os.stat(TESTFN).st_mode
102 old_child_mode = os.stat(self.childpath).st_mode
103 # Make unwritable.
104 os.chmod(self.childpath, stat.S_IREAD)
105 os.chmod(TESTFN, stat.S_IREAD)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000106
107 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +0000108 # Test whether onerror has actually been called.
Johannes Gijsbersb8b09d02004-12-06 20:50:15 +0000109 self.assertEqual(self.errorState, 2,
110 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000111
Tim Peters4590c002004-11-01 02:40:52 +0000112 # Make writable again.
113 os.chmod(TESTFN, old_dir_mode)
114 os.chmod(self.childpath, old_child_mode)
115
116 # Clean up.
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000117 shutil.rmtree(TESTFN)
118
119 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000120 # test_rmtree_errors deliberately runs rmtree
121 # on a directory that is chmod 400, which will fail.
122 # This function is run when shutil.rmtree fails.
123 # 99.9% of the time it initially fails to remove
124 # a file in the directory, so the first time through
125 # func is os.remove.
126 # However, some Linux machines running ZFS on
127 # FUSE experienced a failure earlier in the process
128 # at os.listdir. The first failure may legally
129 # be either.
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000130 if self.errorState == 0:
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000131 if func is os.remove:
132 self.assertEqual(arg, self.childpath)
133 else:
134 self.assertIs(func, os.listdir,
135 "func must be either os.remove or os.listdir")
136 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000137 self.assertTrue(issubclass(exc[0], OSError))
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000138 self.errorState = 1
139 else:
140 self.assertEqual(func, os.rmdir)
141 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000142 self.assertTrue(issubclass(exc[0], OSError))
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +0000143 self.errorState = 2
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000144
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000145 def test_rmtree_dont_delete_file(self):
146 # When called on a file instead of a directory, don't delete it.
147 handle, path = tempfile.mkstemp()
148 os.fdopen(handle).close()
149 self.assertRaises(OSError, shutil.rmtree, path)
150 os.remove(path)
151
Tarek Ziadé5340db32010-04-19 22:30:51 +0000152 def _write_data(self, path, data):
153 f = open(path, "w")
154 f.write(data)
155 f.close()
156
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000157 def test_copytree_simple(self):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000158
159 def read_data(path):
160 f = open(path)
161 data = f.read()
162 f.close()
163 return data
164
165 src_dir = tempfile.mkdtemp()
166 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000167 self._write_data(os.path.join(src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000168 os.mkdir(os.path.join(src_dir, 'test_dir'))
Tarek Ziadé5340db32010-04-19 22:30:51 +0000169 self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000170
171 try:
172 shutil.copytree(src_dir, dst_dir)
173 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
174 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
175 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
176 'test.txt')))
177 actual = read_data(os.path.join(dst_dir, 'test.txt'))
178 self.assertEqual(actual, '123')
179 actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
180 self.assertEqual(actual, '456')
181 finally:
182 for path in (
183 os.path.join(src_dir, 'test.txt'),
184 os.path.join(dst_dir, 'test.txt'),
185 os.path.join(src_dir, 'test_dir', 'test.txt'),
186 os.path.join(dst_dir, 'test_dir', 'test.txt'),
187 ):
188 if os.path.exists(path):
189 os.remove(path)
Christian Heimese052dd82007-11-20 03:20:04 +0000190 for path in (src_dir,
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000191 os.path.dirname(dst_dir)
Christian Heimese052dd82007-11-20 03:20:04 +0000192 ):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000193 if os.path.exists(path):
Christian Heimes94140152007-11-20 01:45:17 +0000194 shutil.rmtree(path)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000195
Georg Brandl2ee470f2008-07-16 12:55:28 +0000196 def test_copytree_with_exclude(self):
197
Georg Brandl2ee470f2008-07-16 12:55:28 +0000198 def read_data(path):
199 f = open(path)
200 data = f.read()
201 f.close()
202 return data
203
204 # creating data
205 join = os.path.join
206 exists = os.path.exists
207 src_dir = tempfile.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000208 try:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000209 dst_dir = join(tempfile.mkdtemp(), 'destination')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000210 self._write_data(join(src_dir, 'test.txt'), '123')
211 self._write_data(join(src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000212 os.mkdir(join(src_dir, 'test_dir'))
Tarek Ziadé5340db32010-04-19 22:30:51 +0000213 self._write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000214 os.mkdir(join(src_dir, 'test_dir2'))
Tarek Ziadé5340db32010-04-19 22:30:51 +0000215 self._write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000216 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
217 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Tarek Ziadé5340db32010-04-19 22:30:51 +0000218 self._write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'),
219 '456')
220 self._write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'),
221 '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000222
223
224 # testing glob-like patterns
225 try:
226 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
227 shutil.copytree(src_dir, dst_dir, ignore=patterns)
228 # checking the result: some elements should not be copied
229 self.assertTrue(exists(join(dst_dir, 'test.txt')))
230 self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
231 self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
232 finally:
233 if os.path.exists(dst_dir):
234 shutil.rmtree(dst_dir)
235 try:
236 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
237 shutil.copytree(src_dir, dst_dir, ignore=patterns)
238 # checking the result: some elements should not be copied
239 self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
240 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
241 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
242 finally:
243 if os.path.exists(dst_dir):
244 shutil.rmtree(dst_dir)
245
246 # testing callable-style
247 try:
248 def _filter(src, names):
249 res = []
250 for name in names:
251 path = os.path.join(src, name)
252
253 if (os.path.isdir(path) and
254 path.split()[-1] == 'subdir'):
255 res.append(name)
256 elif os.path.splitext(path)[-1] in ('.py'):
257 res.append(name)
258 return res
259
260 shutil.copytree(src_dir, dst_dir, ignore=_filter)
261
262 # checking the result: some elements should not be copied
263 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
264 'test.py')))
265 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
266
267 finally:
268 if os.path.exists(dst_dir):
269 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000270 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000271 shutil.rmtree(src_dir)
272 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000273
Brian Curtind40e6f72010-07-08 21:39:08 +0000274 @support.skip_unless_symlink
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000275 def test_dont_copy_file_onto_link_to_itself(self):
276 # bug 851123.
277 os.mkdir(TESTFN)
278 src = os.path.join(TESTFN, 'cheese')
279 dst = os.path.join(TESTFN, 'shop')
280 try:
281 f = open(src, 'w')
282 f.write('cheddar')
283 f.close()
284
Brian Curtind40e6f72010-07-08 21:39:08 +0000285 if hasattr(os, "link"):
286 os.link(src, dst)
287 self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
288 self.assertEqual(open(src,'r').read(), 'cheddar')
289 os.remove(dst)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000290
291 # Using `src` here would mean we end up with a symlink pointing
292 # to TESTFN/TESTFN/cheese, while it should point at
293 # TESTFN/cheese.
294 os.symlink('cheese', dst)
295 self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
296 self.assertEqual(open(src,'r').read(), 'cheddar')
297 os.remove(dst)
298 finally:
Johannes Gijsbers46f14592004-08-14 13:30:02 +0000299 try:
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000300 shutil.rmtree(TESTFN)
301 except OSError:
302 pass
Johannes Gijsbers68128712004-08-14 13:57:08 +0000303
Brian Curtind40e6f72010-07-08 21:39:08 +0000304 @support.skip_unless_symlink
305 def test_rmtree_on_symlink(self):
306 # bug 1669.
307 os.mkdir(TESTFN)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000308 try:
Brian Curtind40e6f72010-07-08 21:39:08 +0000309 src = os.path.join(TESTFN, 'cheese')
310 dst = os.path.join(TESTFN, 'shop')
311 os.mkdir(src)
312 os.symlink(src, dst)
313 self.assertRaises(OSError, shutil.rmtree, dst)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000314 finally:
Brian Curtind40e6f72010-07-08 21:39:08 +0000315 shutil.rmtree(TESTFN, ignore_errors=True)
316
317 if hasattr(os, "mkfifo"):
318 # Issue #3002: copyfile and copytree block indefinitely on named pipes
319 def test_copyfile_named_pipe(self):
320 os.mkfifo(TESTFN)
321 try:
322 self.assertRaises(shutil.SpecialFileError,
323 shutil.copyfile, TESTFN, TESTFN2)
324 self.assertRaises(shutil.SpecialFileError,
325 shutil.copyfile, __file__, TESTFN)
326 finally:
327 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000328
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000329 @unittest.skipUnless(hasattr(os, 'mkfifo'), 'requires os.mkfifo')
330 def test_copytree_named_pipe(self):
331 os.mkdir(TESTFN)
332 try:
333 subdir = os.path.join(TESTFN, "subdir")
334 os.mkdir(subdir)
335 pipe = os.path.join(subdir, "mypipe")
336 os.mkfifo(pipe)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000337 try:
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000338 shutil.copytree(TESTFN, TESTFN2)
339 except shutil.Error as e:
340 errors = e.args[0]
341 self.assertEqual(len(errors), 1)
342 src, dst, error_msg = errors[0]
343 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
344 else:
345 self.fail("shutil.Error should have been raised")
346 finally:
347 shutil.rmtree(TESTFN, ignore_errors=True)
348 shutil.rmtree(TESTFN2, ignore_errors=True)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000349
Tarek Ziadé5340db32010-04-19 22:30:51 +0000350 def test_copytree_special_func(self):
351
352 src_dir = self.mkdtemp()
353 dst_dir = os.path.join(self.mkdtemp(), 'destination')
354 self._write_data(os.path.join(src_dir, 'test.txt'), '123')
355 os.mkdir(os.path.join(src_dir, 'test_dir'))
356 self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
357
358 copied = []
359 def _copy(src, dst):
360 copied.append((src, dst))
361
362 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
363 self.assertEquals(len(copied), 2)
364
Brian Curtind40e6f72010-07-08 21:39:08 +0000365 @support.skip_unless_symlink
Tarek Ziadéfb437512010-04-20 08:57:33 +0000366 def test_copytree_dangling_symlinks(self):
367
368 # a dangling symlink raises an error at the end
369 src_dir = self.mkdtemp()
370 dst_dir = os.path.join(self.mkdtemp(), 'destination')
371 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
372 os.mkdir(os.path.join(src_dir, 'test_dir'))
373 self._write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
374 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
375
376 # a dangling symlink is ignored with the proper flag
377 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
378 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
379 self.assertNotIn('test.txt', os.listdir(dst_dir))
380
381 # a dangling symlink is copied if symlinks=True
382 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
383 shutil.copytree(src_dir, dst_dir, symlinks=True)
384 self.assertIn('test.txt', os.listdir(dst_dir))
385
Tarek Ziadé396fad72010-02-23 05:30:31 +0000386 @unittest.skipUnless(zlib, "requires zlib")
387 def test_make_tarball(self):
388 # creating something to tar
389 tmpdir = self.mkdtemp()
390 self.write_file([tmpdir, 'file1'], 'xxx')
391 self.write_file([tmpdir, 'file2'], 'xxx')
392 os.mkdir(os.path.join(tmpdir, 'sub'))
393 self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
394
395 tmpdir2 = self.mkdtemp()
396 unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
397 "source and target should be on same drive")
398
399 base_name = os.path.join(tmpdir2, 'archive')
400
401 # working with relative paths to avoid tar warnings
402 old_dir = os.getcwd()
403 os.chdir(tmpdir)
404 try:
405 _make_tarball(splitdrive(base_name)[1], '.')
406 finally:
407 os.chdir(old_dir)
408
409 # check if the compressed tarball was created
410 tarball = base_name + '.tar.gz'
411 self.assertTrue(os.path.exists(tarball))
412
413 # trying an uncompressed one
414 base_name = os.path.join(tmpdir2, 'archive')
415 old_dir = os.getcwd()
416 os.chdir(tmpdir)
417 try:
418 _make_tarball(splitdrive(base_name)[1], '.', compress=None)
419 finally:
420 os.chdir(old_dir)
421 tarball = base_name + '.tar'
422 self.assertTrue(os.path.exists(tarball))
423
424 def _tarinfo(self, path):
425 tar = tarfile.open(path)
426 try:
427 names = tar.getnames()
428 names.sort()
429 return tuple(names)
430 finally:
431 tar.close()
432
433 def _create_files(self):
434 # creating something to tar
435 tmpdir = self.mkdtemp()
436 dist = os.path.join(tmpdir, 'dist')
437 os.mkdir(dist)
438 self.write_file([dist, 'file1'], 'xxx')
439 self.write_file([dist, 'file2'], 'xxx')
440 os.mkdir(os.path.join(dist, 'sub'))
441 self.write_file([dist, 'sub', 'file3'], 'xxx')
442 os.mkdir(os.path.join(dist, 'sub2'))
443 tmpdir2 = self.mkdtemp()
444 base_name = os.path.join(tmpdir2, 'archive')
445 return tmpdir, tmpdir2, base_name
446
447 @unittest.skipUnless(zlib, "Requires zlib")
448 @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
449 'Need the tar command to run')
450 def test_tarfile_vs_tar(self):
451 tmpdir, tmpdir2, base_name = self._create_files()
452 old_dir = os.getcwd()
453 os.chdir(tmpdir)
454 try:
455 _make_tarball(base_name, 'dist')
456 finally:
457 os.chdir(old_dir)
458
459 # check if the compressed tarball was created
460 tarball = base_name + '.tar.gz'
461 self.assertTrue(os.path.exists(tarball))
462
463 # now create another tarball using `tar`
464 tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
465 tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
466 gzip_cmd = ['gzip', '-f9', 'archive2.tar']
467 old_dir = os.getcwd()
468 os.chdir(tmpdir)
469 try:
470 with captured_stdout() as s:
471 spawn(tar_cmd)
472 spawn(gzip_cmd)
473 finally:
474 os.chdir(old_dir)
475
476 self.assertTrue(os.path.exists(tarball2))
477 # let's compare both tarballs
478 self.assertEquals(self._tarinfo(tarball), self._tarinfo(tarball2))
479
480 # trying an uncompressed one
481 base_name = os.path.join(tmpdir2, 'archive')
482 old_dir = os.getcwd()
483 os.chdir(tmpdir)
484 try:
485 _make_tarball(base_name, 'dist', compress=None)
486 finally:
487 os.chdir(old_dir)
488 tarball = base_name + '.tar'
489 self.assertTrue(os.path.exists(tarball))
490
491 # now for a dry_run
492 base_name = os.path.join(tmpdir2, 'archive')
493 old_dir = os.getcwd()
494 os.chdir(tmpdir)
495 try:
496 _make_tarball(base_name, 'dist', compress=None, dry_run=True)
497 finally:
498 os.chdir(old_dir)
499 tarball = base_name + '.tar'
500 self.assertTrue(os.path.exists(tarball))
501
Tarek Ziadé396fad72010-02-23 05:30:31 +0000502 @unittest.skipUnless(zlib, "Requires zlib")
503 @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
504 def test_make_zipfile(self):
505 # creating something to tar
506 tmpdir = self.mkdtemp()
507 self.write_file([tmpdir, 'file1'], 'xxx')
508 self.write_file([tmpdir, 'file2'], 'xxx')
509
510 tmpdir2 = self.mkdtemp()
511 base_name = os.path.join(tmpdir2, 'archive')
512 _make_zipfile(base_name, tmpdir)
513
514 # check if the compressed tarball was created
515 tarball = base_name + '.zip'
516
517
518 def test_make_archive(self):
519 tmpdir = self.mkdtemp()
520 base_name = os.path.join(tmpdir, 'archive')
521 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
522
523 @unittest.skipUnless(zlib, "Requires zlib")
524 def test_make_archive_owner_group(self):
525 # testing make_archive with owner and group, with various combinations
526 # this works even if there's not gid/uid support
527 if UID_GID_SUPPORT:
528 group = grp.getgrgid(0)[0]
529 owner = pwd.getpwuid(0)[0]
530 else:
531 group = owner = 'root'
532
533 base_dir, root_dir, base_name = self._create_files()
534 base_name = os.path.join(self.mkdtemp() , 'archive')
535 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
536 group=group)
537 self.assertTrue(os.path.exists(res))
538
539 res = make_archive(base_name, 'zip', root_dir, base_dir)
540 self.assertTrue(os.path.exists(res))
541
542 res = make_archive(base_name, 'tar', root_dir, base_dir,
543 owner=owner, group=group)
544 self.assertTrue(os.path.exists(res))
545
546 res = make_archive(base_name, 'tar', root_dir, base_dir,
547 owner='kjhkjhkjg', group='oihohoh')
548 self.assertTrue(os.path.exists(res))
549
Tarek Ziadé6ac91722010-04-28 17:51:36 +0000550
Tarek Ziadé396fad72010-02-23 05:30:31 +0000551 @unittest.skipUnless(zlib, "Requires zlib")
552 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
553 def test_tarfile_root_owner(self):
554 tmpdir, tmpdir2, base_name = self._create_files()
555 old_dir = os.getcwd()
556 os.chdir(tmpdir)
557 group = grp.getgrgid(0)[0]
558 owner = pwd.getpwuid(0)[0]
559 try:
560 archive_name = _make_tarball(base_name, 'dist', compress=None,
561 owner=owner, group=group)
562 finally:
563 os.chdir(old_dir)
564
565 # check if the compressed tarball was created
566 self.assertTrue(os.path.exists(archive_name))
567
568 # now checks the rights
569 archive = tarfile.open(archive_name)
570 try:
571 for member in archive.getmembers():
572 self.assertEquals(member.uid, 0)
573 self.assertEquals(member.gid, 0)
574 finally:
575 archive.close()
576
577 def test_make_archive_cwd(self):
578 current_dir = os.getcwd()
579 def _breaks(*args, **kw):
580 raise RuntimeError()
581
582 register_archive_format('xxx', _breaks, [], 'xxx file')
583 try:
584 try:
585 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
586 except Exception:
587 pass
588 self.assertEquals(os.getcwd(), current_dir)
589 finally:
590 unregister_archive_format('xxx')
591
592 def test_register_archive_format(self):
593
594 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
595 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
596 1)
597 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
598 [(1, 2), (1, 2, 3)])
599
600 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
601 formats = [name for name, params in get_archive_formats()]
602 self.assertIn('xxx', formats)
603
604 unregister_archive_format('xxx')
605 formats = [name for name, params in get_archive_formats()]
606 self.assertNotIn('xxx', formats)
607
Tarek Ziadé6ac91722010-04-28 17:51:36 +0000608 def _compare_dirs(self, dir1, dir2):
609 # check that dir1 and dir2 are equivalent,
610 # return the diff
611 diff = []
612 for root, dirs, files in os.walk(dir1):
613 for file_ in files:
614 path = os.path.join(root, file_)
615 target_path = os.path.join(dir2, os.path.split(path)[-1])
616 if not os.path.exists(target_path):
617 diff.append(file_)
618 return diff
619
620 @unittest.skipUnless(zlib, "Requires zlib")
621 def test_unpack_archive(self):
Tarek Ziadéffa155a2010-04-29 13:34:35 +0000622 formats = ['tar', 'gztar', 'zip']
623 if BZ2_SUPPORTED:
624 formats.append('bztar')
Tarek Ziadé6ac91722010-04-28 17:51:36 +0000625
Tarek Ziadéffa155a2010-04-29 13:34:35 +0000626 for format in formats:
Tarek Ziadé6ac91722010-04-28 17:51:36 +0000627 tmpdir = self.mkdtemp()
628 base_dir, root_dir, base_name = self._create_files()
629 tmpdir2 = self.mkdtemp()
630 filename = make_archive(base_name, format, root_dir, base_dir)
631
632 # let's try to unpack it now
633 unpack_archive(filename, tmpdir2)
634 diff = self._compare_dirs(tmpdir, tmpdir2)
635 self.assertEquals(diff, [])
636
637 def test_unpack_registery(self):
638
639 formats = get_unpack_formats()
640
641 def _boo(filename, extract_dir, extra):
642 self.assertEquals(extra, 1)
643 self.assertEquals(filename, 'stuff.boo')
644 self.assertEquals(extract_dir, 'xx')
645
646 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
647 unpack_archive('stuff.boo', 'xx')
648
649 # trying to register a .boo unpacker again
650 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
651 ['.boo'], _boo)
652
653 # should work now
654 unregister_unpack_format('Boo')
655 register_unpack_format('Boo2', ['.boo'], _boo)
656 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
657 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
658
659 # let's leave a clean state
660 unregister_unpack_format('Boo2')
661 self.assertEquals(get_unpack_formats(), formats)
662
Christian Heimes9bd667a2008-01-20 15:14:11 +0000663
Christian Heimesada8c3b2008-03-18 18:26:33 +0000664class TestMove(unittest.TestCase):
665
666 def setUp(self):
667 filename = "foo"
668 self.src_dir = tempfile.mkdtemp()
669 self.dst_dir = tempfile.mkdtemp()
670 self.src_file = os.path.join(self.src_dir, filename)
671 self.dst_file = os.path.join(self.dst_dir, filename)
672 # Try to create a dir in the current directory, hoping that it is
673 # not located on the same filesystem as the system tmp dir.
674 try:
675 self.dir_other_fs = tempfile.mkdtemp(
676 dir=os.path.dirname(__file__))
677 self.file_other_fs = os.path.join(self.dir_other_fs,
678 filename)
679 except OSError:
680 self.dir_other_fs = None
681 with open(self.src_file, "wb") as f:
682 f.write(b"spam")
683
684 def tearDown(self):
685 for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
686 try:
687 if d:
688 shutil.rmtree(d)
689 except:
690 pass
691
692 def _check_move_file(self, src, dst, real_dst):
693 contents = open(src, "rb").read()
694 shutil.move(src, dst)
695 self.assertEqual(contents, open(real_dst, "rb").read())
696 self.assertFalse(os.path.exists(src))
697
698 def _check_move_dir(self, src, dst, real_dst):
699 contents = sorted(os.listdir(src))
700 shutil.move(src, dst)
701 self.assertEqual(contents, sorted(os.listdir(real_dst)))
702 self.assertFalse(os.path.exists(src))
703
704 def test_move_file(self):
705 # Move a file to another location on the same filesystem.
706 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
707
708 def test_move_file_to_dir(self):
709 # Move a file inside an existing dir on the same filesystem.
710 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
711
712 def test_move_file_other_fs(self):
713 # Move a file to an existing dir on another filesystem.
714 if not self.dir_other_fs:
715 # skip
716 return
717 self._check_move_file(self.src_file, self.file_other_fs,
718 self.file_other_fs)
719
720 def test_move_file_to_dir_other_fs(self):
721 # Move a file to another location on another filesystem.
722 if not self.dir_other_fs:
723 # skip
724 return
725 self._check_move_file(self.src_file, self.dir_other_fs,
726 self.file_other_fs)
727
728 def test_move_dir(self):
729 # Move a dir to another location on the same filesystem.
730 dst_dir = tempfile.mktemp()
731 try:
732 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
733 finally:
734 try:
735 shutil.rmtree(dst_dir)
736 except:
737 pass
738
739 def test_move_dir_other_fs(self):
740 # Move a dir to another location on another filesystem.
741 if not self.dir_other_fs:
742 # skip
743 return
744 dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
745 try:
746 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
747 finally:
748 try:
749 shutil.rmtree(dst_dir)
750 except:
751 pass
752
753 def test_move_dir_to_dir(self):
754 # Move a dir inside an existing dir on the same filesystem.
755 self._check_move_dir(self.src_dir, self.dst_dir,
756 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
757
758 def test_move_dir_to_dir_other_fs(self):
759 # Move a dir inside an existing dir on another filesystem.
760 if not self.dir_other_fs:
761 # skip
762 return
763 self._check_move_dir(self.src_dir, self.dir_other_fs,
764 os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
765
766 def test_existing_file_inside_dest_dir(self):
767 # A file with the same name inside the destination dir already exists.
768 with open(self.dst_file, "wb"):
769 pass
770 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
771
772 def test_dont_move_dir_in_itself(self):
773 # Moving a dir inside itself raises an Error.
774 dst = os.path.join(self.src_dir, "bar")
775 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
776
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +0000777 def test_destinsrc_false_negative(self):
778 os.mkdir(TESTFN)
779 try:
780 for src, dst in [('srcdir', 'srcdir/dest')]:
781 src = os.path.join(TESTFN, src)
782 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000783 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +0000784 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +0000785 'dst (%s) is not in src (%s)' % (dst, src))
786 finally:
787 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimesada8c3b2008-03-18 18:26:33 +0000788
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +0000789 def test_destinsrc_false_positive(self):
790 os.mkdir(TESTFN)
791 try:
792 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
793 src = os.path.join(TESTFN, src)
794 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000795 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +0000796 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +0000797 'dst (%s) is in src (%s)' % (dst, src))
798 finally:
799 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimes9bd667a2008-01-20 15:14:11 +0000800
Tarek Ziadé5340db32010-04-19 22:30:51 +0000801
Tarek Ziadéae4d5c62010-05-05 22:27:31 +0000802class TestCopyFile(unittest.TestCase):
803
804 _delete = False
805
806 class Faux(object):
807 _entered = False
808 _exited_with = None
809 _raised = False
810 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
811 self._raise_in_exit = raise_in_exit
812 self._suppress_at_exit = suppress_at_exit
813 def read(self, *args):
814 return ''
815 def __enter__(self):
816 self._entered = True
817 def __exit__(self, exc_type, exc_val, exc_tb):
818 self._exited_with = exc_type, exc_val, exc_tb
819 if self._raise_in_exit:
820 self._raised = True
821 raise IOError("Cannot close")
822 return self._suppress_at_exit
823
824 def tearDown(self):
825 if self._delete:
826 del shutil.open
827
828 def _set_shutil_open(self, func):
829 shutil.open = func
830 self._delete = True
831
832 def test_w_source_open_fails(self):
833 def _open(filename, mode='r'):
834 if filename == 'srcfile':
835 raise IOError('Cannot open "srcfile"')
836 assert 0 # shouldn't reach here.
837
838 self._set_shutil_open(_open)
839
840 self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
841
842 def test_w_dest_open_fails(self):
843
844 srcfile = self.Faux()
845
846 def _open(filename, mode='r'):
847 if filename == 'srcfile':
848 return srcfile
849 if filename == 'destfile':
850 raise IOError('Cannot open "destfile"')
851 assert 0 # shouldn't reach here.
852
853 self._set_shutil_open(_open)
854
855 shutil.copyfile('srcfile', 'destfile')
856 self.assertTrue(srcfile._entered)
857 self.assertTrue(srcfile._exited_with[0] is IOError)
858 self.assertEqual(srcfile._exited_with[1].args,
859 ('Cannot open "destfile"',))
860
861 def test_w_dest_close_fails(self):
862
863 srcfile = self.Faux()
864 destfile = self.Faux(True)
865
866 def _open(filename, mode='r'):
867 if filename == 'srcfile':
868 return srcfile
869 if filename == 'destfile':
870 return destfile
871 assert 0 # shouldn't reach here.
872
873 self._set_shutil_open(_open)
874
875 shutil.copyfile('srcfile', 'destfile')
876 self.assertTrue(srcfile._entered)
877 self.assertTrue(destfile._entered)
878 self.assertTrue(destfile._raised)
879 self.assertTrue(srcfile._exited_with[0] is IOError)
880 self.assertEqual(srcfile._exited_with[1].args,
881 ('Cannot close',))
882
883 def test_w_source_close_fails(self):
884
885 srcfile = self.Faux(True)
886 destfile = self.Faux()
887
888 def _open(filename, mode='r'):
889 if filename == 'srcfile':
890 return srcfile
891 if filename == 'destfile':
892 return destfile
893 assert 0 # shouldn't reach here.
894
895 self._set_shutil_open(_open)
896
897 self.assertRaises(IOError,
898 shutil.copyfile, 'srcfile', 'destfile')
899 self.assertTrue(srcfile._entered)
900 self.assertTrue(destfile._entered)
901 self.assertFalse(destfile._raised)
902 self.assertTrue(srcfile._exited_with[0] is None)
903 self.assertTrue(srcfile._raised)
904
905
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000906def test_main():
Tarek Ziadéae4d5c62010-05-05 22:27:31 +0000907 support.run_unittest(TestShutil, TestMove, TestCopyFile)
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000908
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000909if __name__ == '__main__':
Walter Dörwald21d3a322003-05-01 17:45:56 +0000910 test_main()