blob: 88dc4d9e195ebe26eb71261f981c75fc5c3fe0e2 [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010037AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000038try:
39 import grp
40 import pwd
41 UID_GID_SUPPORT = True
42except ImportError:
43 UID_GID_SUPPORT = False
44
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040045def _fake_rename(*args, **kwargs):
46 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010047 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040048
49def mock_rename(func):
50 @functools.wraps(func)
51 def wrap(*args, **kwargs):
52 try:
53 builtin_rename = os.rename
54 os.rename = _fake_rename
55 return func(*args, **kwargs)
56 finally:
57 os.rename = builtin_rename
58 return wrap
59
Éric Araujoa7e33a12011-08-12 19:51:35 +020060def write_file(path, content, binary=False):
61 """Write *content* to a file located at *path*.
62
63 If *path* is a tuple instead of a string, os.path.join will be used to
64 make a path. If *binary* is true, the file will be opened in binary
65 mode.
66 """
67 if isinstance(path, tuple):
68 path = os.path.join(*path)
69 with open(path, 'wb' if binary else 'w') as fp:
70 fp.write(content)
71
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020072def write_test_file(path, size):
73 """Create a test file with an arbitrary size and random text content."""
74 def chunks(total, step):
75 assert total >= step
76 while total > step:
77 yield step
78 total -= step
79 if total:
80 yield total
81
82 bufsize = min(size, 8192)
83 chunk = b"".join([random.choice(string.ascii_letters).encode()
84 for i in range(bufsize)])
85 with open(path, 'wb') as f:
86 for csize in chunks(size, bufsize):
87 f.write(chunk)
88 assert os.path.getsize(path) == size
89
Éric Araujoa7e33a12011-08-12 19:51:35 +020090def read_file(path, binary=False):
91 """Return contents from a file located at *path*.
92
93 If *path* is a tuple instead of a string, os.path.join will be used to
94 make a path. If *binary* is true, the file will be opened in binary
95 mode.
96 """
97 if isinstance(path, tuple):
98 path = os.path.join(*path)
99 with open(path, 'rb' if binary else 'r') as fp:
100 return fp.read()
101
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300102def rlistdir(path):
103 res = []
104 for name in sorted(os.listdir(path)):
105 p = os.path.join(path, name)
106 if os.path.isdir(p) and not os.path.islink(p):
107 res.append(name + '/')
108 for n in rlistdir(p):
109 res.append(name + '/' + n)
110 else:
111 res.append(name)
112 return res
113
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200114def supports_file2file_sendfile():
115 # ...apparently Linux and Solaris are the only ones
116 if not hasattr(os, "sendfile"):
117 return False
118 srcname = None
119 dstname = None
120 try:
121 with tempfile.NamedTemporaryFile("wb", delete=False) as f:
122 srcname = f.name
123 f.write(b"0123456789")
124
125 with open(srcname, "rb") as src:
126 with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200127 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200128 infd = src.fileno()
129 outfd = dst.fileno()
130 try:
131 os.sendfile(outfd, infd, 0, 2)
132 except OSError:
133 return False
134 else:
135 return True
136 finally:
137 if srcname is not None:
138 support.unlink(srcname)
139 if dstname is not None:
140 support.unlink(dstname)
141
142
143SUPPORTS_SENDFILE = supports_file2file_sendfile()
144
Michael Feltef110b12019-02-18 12:02:44 +0100145# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
146# The AIX command 'dump -o program' gives XCOFF header information
147# The second word of the last line in the maxdata value
148# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
149def _maxdataOK():
150 if AIX and sys.maxsize == 2147483647:
151 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
152 maxdata=hdrs.split("\n")[-1].split()[1]
153 return int(maxdata,16) >= 0x20000000
154 else:
155 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200156
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000157class TestShutil(unittest.TestCase):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000158
159 def setUp(self):
160 super(TestShutil, self).setUp()
161 self.tempdirs = []
162
163 def tearDown(self):
164 super(TestShutil, self).tearDown()
165 while self.tempdirs:
166 d = self.tempdirs.pop()
167 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
168
Tarek Ziadé396fad72010-02-23 05:30:31 +0000169
170 def mkdtemp(self):
171 """Create a temporary directory that will be cleaned up.
172
173 Returns the path of the directory.
174 """
175 d = tempfile.mkdtemp()
176 self.tempdirs.append(d)
177 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000178
Hynek Schlawack3b527782012-06-25 13:27:31 +0200179 def test_rmtree_works_on_bytes(self):
180 tmp = self.mkdtemp()
181 victim = os.path.join(tmp, 'killme')
182 os.mkdir(victim)
183 write_file(os.path.join(victim, 'somefile'), 'foo')
184 victim = os.fsencode(victim)
185 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700186 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200187
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200188 @support.skip_unless_symlink
189 def test_rmtree_fails_on_symlink(self):
190 tmp = self.mkdtemp()
191 dir_ = os.path.join(tmp, 'dir')
192 os.mkdir(dir_)
193 link = os.path.join(tmp, 'link')
194 os.symlink(dir_, link)
195 self.assertRaises(OSError, shutil.rmtree, link)
196 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100197 self.assertTrue(os.path.lexists(link))
198 errors = []
199 def onerror(*args):
200 errors.append(args)
201 shutil.rmtree(link, onerror=onerror)
202 self.assertEqual(len(errors), 1)
203 self.assertIs(errors[0][0], os.path.islink)
204 self.assertEqual(errors[0][1], link)
205 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200206
207 @support.skip_unless_symlink
208 def test_rmtree_works_on_symlinks(self):
209 tmp = self.mkdtemp()
210 dir1 = os.path.join(tmp, 'dir1')
211 dir2 = os.path.join(dir1, 'dir2')
212 dir3 = os.path.join(tmp, 'dir3')
213 for d in dir1, dir2, dir3:
214 os.mkdir(d)
215 file1 = os.path.join(tmp, 'file1')
216 write_file(file1, 'foo')
217 link1 = os.path.join(dir1, 'link1')
218 os.symlink(dir2, link1)
219 link2 = os.path.join(dir1, 'link2')
220 os.symlink(dir3, link2)
221 link3 = os.path.join(dir1, 'link3')
222 os.symlink(file1, link3)
223 # make sure symlinks are removed but not followed
224 shutil.rmtree(dir1)
225 self.assertFalse(os.path.exists(dir1))
226 self.assertTrue(os.path.exists(dir3))
227 self.assertTrue(os.path.exists(file1))
228
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000229 def test_rmtree_errors(self):
230 # filename is guaranteed not to exist
231 filename = tempfile.mktemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100232 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
233 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100234 shutil.rmtree(filename, ignore_errors=True)
235
236 # existing file
237 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100238 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100239 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100240 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100241 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100242 # The reason for this rather odd construct is that Windows sprinkles
243 # a \*.* at the end of file names. But only sometimes on some buildbots
244 possible_args = [filename, os.path.join(filename, '*.*')]
245 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100246 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100247 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100248 shutil.rmtree(filename, ignore_errors=True)
249 self.assertTrue(os.path.exists(filename))
250 errors = []
251 def onerror(*args):
252 errors.append(args)
253 shutil.rmtree(filename, onerror=onerror)
254 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200255 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100256 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100257 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100258 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100259 self.assertIs(errors[1][0], os.rmdir)
260 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100261 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100262 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000263
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000264
Serhiy Storchaka43767632013-11-03 21:31:38 +0200265 @unittest.skipIf(sys.platform[:6] == 'cygwin',
266 "This test can't be run on Cygwin (issue #1071513).")
267 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
268 "This test can't be run reliably as root (issue #1076467).")
269 def test_on_error(self):
270 self.errorState = 0
271 os.mkdir(TESTFN)
272 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200273
Serhiy Storchaka43767632013-11-03 21:31:38 +0200274 self.child_file_path = os.path.join(TESTFN, 'a')
275 self.child_dir_path = os.path.join(TESTFN, 'b')
276 support.create_empty_file(self.child_file_path)
277 os.mkdir(self.child_dir_path)
278 old_dir_mode = os.stat(TESTFN).st_mode
279 old_child_file_mode = os.stat(self.child_file_path).st_mode
280 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
281 # Make unwritable.
282 new_mode = stat.S_IREAD|stat.S_IEXEC
283 os.chmod(self.child_file_path, new_mode)
284 os.chmod(self.child_dir_path, new_mode)
285 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000286
Serhiy Storchaka43767632013-11-03 21:31:38 +0200287 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
288 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
289 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200290
Serhiy Storchaka43767632013-11-03 21:31:38 +0200291 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
292 # Test whether onerror has actually been called.
293 self.assertEqual(self.errorState, 3,
294 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000295
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000296 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000297 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200298 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000299 # This function is run when shutil.rmtree fails.
300 # 99.9% of the time it initially fails to remove
301 # a file in the directory, so the first time through
302 # func is os.remove.
303 # However, some Linux machines running ZFS on
304 # FUSE experienced a failure earlier in the process
305 # at os.listdir. The first failure may legally
306 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200307 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200308 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200309 self.assertEqual(arg, self.child_file_path)
310 elif func is os.rmdir:
311 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000312 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200313 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200314 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000315 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200316 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000317 else:
318 self.assertEqual(func, os.rmdir)
319 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000320 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200321 self.errorState = 3
322
323 def test_rmtree_does_not_choke_on_failing_lstat(self):
324 try:
325 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200326 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200327 if fn != TESTFN:
328 raise OSError()
329 else:
330 return orig_lstat(fn)
331 os.lstat = raiser
332
333 os.mkdir(TESTFN)
334 write_file((TESTFN, 'foo'), 'foo')
335 shutil.rmtree(TESTFN)
336 finally:
337 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000338
Antoine Pitrou78091e62011-12-29 18:54:15 +0100339 @support.skip_unless_symlink
340 def test_copymode_follow_symlinks(self):
341 tmp_dir = self.mkdtemp()
342 src = os.path.join(tmp_dir, 'foo')
343 dst = os.path.join(tmp_dir, 'bar')
344 src_link = os.path.join(tmp_dir, 'baz')
345 dst_link = os.path.join(tmp_dir, 'quux')
346 write_file(src, 'foo')
347 write_file(dst, 'foo')
348 os.symlink(src, src_link)
349 os.symlink(dst, dst_link)
350 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
351 # file to file
352 os.chmod(dst, stat.S_IRWXO)
353 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
354 shutil.copymode(src, dst)
355 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou3f48ac92014-01-01 02:50:45 +0100356 # On Windows, os.chmod does not follow symlinks (issue #15411)
357 if os.name != 'nt':
358 # follow src link
359 os.chmod(dst, stat.S_IRWXO)
360 shutil.copymode(src_link, dst)
361 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
362 # follow dst link
363 os.chmod(dst, stat.S_IRWXO)
364 shutil.copymode(src, dst_link)
365 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
366 # follow both links
367 os.chmod(dst, stat.S_IRWXO)
368 shutil.copymode(src_link, dst_link)
369 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100370
371 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
372 @support.skip_unless_symlink
373 def test_copymode_symlink_to_symlink(self):
374 tmp_dir = self.mkdtemp()
375 src = os.path.join(tmp_dir, 'foo')
376 dst = os.path.join(tmp_dir, 'bar')
377 src_link = os.path.join(tmp_dir, 'baz')
378 dst_link = os.path.join(tmp_dir, 'quux')
379 write_file(src, 'foo')
380 write_file(dst, 'foo')
381 os.symlink(src, src_link)
382 os.symlink(dst, dst_link)
383 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
384 os.chmod(dst, stat.S_IRWXU)
385 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
386 # link to link
387 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700388 shutil.copymode(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100389 self.assertEqual(os.lstat(src_link).st_mode,
390 os.lstat(dst_link).st_mode)
391 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
392 # src link - use chmod
393 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700394 shutil.copymode(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100395 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
396 # dst link - use chmod
397 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700398 shutil.copymode(src, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100399 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
400
401 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
402 @support.skip_unless_symlink
403 def test_copymode_symlink_to_symlink_wo_lchmod(self):
404 tmp_dir = self.mkdtemp()
405 src = os.path.join(tmp_dir, 'foo')
406 dst = os.path.join(tmp_dir, 'bar')
407 src_link = os.path.join(tmp_dir, 'baz')
408 dst_link = os.path.join(tmp_dir, 'quux')
409 write_file(src, 'foo')
410 write_file(dst, 'foo')
411 os.symlink(src, src_link)
412 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700413 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
Antoine Pitrou78091e62011-12-29 18:54:15 +0100414
415 @support.skip_unless_symlink
416 def test_copystat_symlinks(self):
417 tmp_dir = self.mkdtemp()
418 src = os.path.join(tmp_dir, 'foo')
419 dst = os.path.join(tmp_dir, 'bar')
420 src_link = os.path.join(tmp_dir, 'baz')
421 dst_link = os.path.join(tmp_dir, 'qux')
422 write_file(src, 'foo')
423 src_stat = os.stat(src)
424 os.utime(src, (src_stat.st_atime,
425 src_stat.st_mtime - 42.0)) # ensure different mtimes
426 write_file(dst, 'bar')
427 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
428 os.symlink(src, src_link)
429 os.symlink(dst, dst_link)
430 if hasattr(os, 'lchmod'):
431 os.lchmod(src_link, stat.S_IRWXO)
432 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
433 os.lchflags(src_link, stat.UF_NODUMP)
434 src_link_stat = os.lstat(src_link)
435 # follow
436 if hasattr(os, 'lchmod'):
Larry Hastingsb4038062012-07-15 10:57:38 -0700437 shutil.copystat(src_link, dst_link, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100438 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
439 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700440 shutil.copystat(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100441 dst_link_stat = os.lstat(dst_link)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700442 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100443 for attr in 'st_atime', 'st_mtime':
444 # The modification times may be truncated in the new file.
445 self.assertLessEqual(getattr(src_link_stat, attr),
446 getattr(dst_link_stat, attr) + 1)
447 if hasattr(os, 'lchmod'):
448 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
449 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
450 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
451 # tell to follow but dst is not a link
Larry Hastingsb4038062012-07-15 10:57:38 -0700452 shutil.copystat(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100453 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
454 00000.1)
455
Ned Deilybaf75712012-05-10 17:05:19 -0700456 @unittest.skipUnless(hasattr(os, 'chflags') and
457 hasattr(errno, 'EOPNOTSUPP') and
458 hasattr(errno, 'ENOTSUP'),
459 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
460 def test_copystat_handles_harmless_chflags_errors(self):
461 tmpdir = self.mkdtemp()
462 file1 = os.path.join(tmpdir, 'file1')
463 file2 = os.path.join(tmpdir, 'file2')
464 write_file(file1, 'xxx')
465 write_file(file2, 'xxx')
466
467 def make_chflags_raiser(err):
468 ex = OSError()
469
Larry Hastings90867a52012-06-22 17:01:41 -0700470 def _chflags_raiser(path, flags, *, follow_symlinks=True):
Ned Deilybaf75712012-05-10 17:05:19 -0700471 ex.errno = err
472 raise ex
473 return _chflags_raiser
474 old_chflags = os.chflags
475 try:
476 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
477 os.chflags = make_chflags_raiser(err)
478 shutil.copystat(file1, file2)
479 # assert others errors break it
480 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
481 self.assertRaises(OSError, shutil.copystat, file1, file2)
482 finally:
483 os.chflags = old_chflags
484
Antoine Pitrou424246f2012-05-12 19:02:01 +0200485 @support.skip_unless_xattr
486 def test_copyxattr(self):
487 tmp_dir = self.mkdtemp()
488 src = os.path.join(tmp_dir, 'foo')
489 write_file(src, 'foo')
490 dst = os.path.join(tmp_dir, 'bar')
491 write_file(dst, 'bar')
492
493 # no xattr == no problem
494 shutil._copyxattr(src, dst)
495 # common case
496 os.setxattr(src, 'user.foo', b'42')
497 os.setxattr(src, 'user.bar', b'43')
498 shutil._copyxattr(src, dst)
Gregory P. Smith1093bf22014-01-17 12:01:22 -0800499 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200500 self.assertEqual(
501 os.getxattr(src, 'user.foo'),
502 os.getxattr(dst, 'user.foo'))
503 # check errors don't affect other attrs
504 os.remove(dst)
505 write_file(dst, 'bar')
506 os_error = OSError(errno.EPERM, 'EPERM')
507
Larry Hastings9cf065c2012-06-22 16:30:09 -0700508 def _raise_on_user_foo(fname, attr, val, **kwargs):
Antoine Pitrou424246f2012-05-12 19:02:01 +0200509 if attr == 'user.foo':
510 raise os_error
511 else:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700512 orig_setxattr(fname, attr, val, **kwargs)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200513 try:
514 orig_setxattr = os.setxattr
515 os.setxattr = _raise_on_user_foo
516 shutil._copyxattr(src, dst)
Antoine Pitrou61597d32012-05-12 23:37:35 +0200517 self.assertIn('user.bar', os.listxattr(dst))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200518 finally:
519 os.setxattr = orig_setxattr
Hynek Schlawack0beab052013-02-05 08:22:44 +0100520 # the source filesystem not supporting xattrs should be ok, too.
521 def _raise_on_src(fname, *, follow_symlinks=True):
522 if fname == src:
523 raise OSError(errno.ENOTSUP, 'Operation not supported')
524 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
525 try:
526 orig_listxattr = os.listxattr
527 os.listxattr = _raise_on_src
528 shutil._copyxattr(src, dst)
529 finally:
530 os.listxattr = orig_listxattr
Antoine Pitrou424246f2012-05-12 19:02:01 +0200531
Larry Hastingsad5ae042012-07-14 17:55:11 -0700532 # test that shutil.copystat copies xattrs
533 src = os.path.join(tmp_dir, 'the_original')
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500534 srcro = os.path.join(tmp_dir, 'the_original_ro')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700535 write_file(src, src)
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500536 write_file(srcro, srcro)
Larry Hastingsad5ae042012-07-14 17:55:11 -0700537 os.setxattr(src, 'user.the_value', b'fiddly')
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500538 os.setxattr(srcro, 'user.the_value', b'fiddly')
539 os.chmod(srcro, 0o444)
Larry Hastingsad5ae042012-07-14 17:55:11 -0700540 dst = os.path.join(tmp_dir, 'the_copy')
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500541 dstro = os.path.join(tmp_dir, 'the_copy_ro')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700542 write_file(dst, dst)
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500543 write_file(dstro, dstro)
Larry Hastingsad5ae042012-07-14 17:55:11 -0700544 shutil.copystat(src, dst)
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500545 shutil.copystat(srcro, dstro)
Hynek Schlawackc2d481f2012-07-16 17:11:10 +0200546 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500547 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700548
Antoine Pitrou424246f2012-05-12 19:02:01 +0200549 @support.skip_unless_symlink
550 @support.skip_unless_xattr
551 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
552 'root privileges required')
553 def test_copyxattr_symlinks(self):
554 # On Linux, it's only possible to access non-user xattr for symlinks;
555 # which in turn require root privileges. This test should be expanded
556 # as soon as other platforms gain support for extended attributes.
557 tmp_dir = self.mkdtemp()
558 src = os.path.join(tmp_dir, 'foo')
559 src_link = os.path.join(tmp_dir, 'baz')
560 write_file(src, 'foo')
561 os.symlink(src, src_link)
562 os.setxattr(src, 'trusted.foo', b'42')
Larry Hastings9cf065c2012-06-22 16:30:09 -0700563 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200564 dst = os.path.join(tmp_dir, 'bar')
565 dst_link = os.path.join(tmp_dir, 'qux')
566 write_file(dst, 'bar')
567 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700568 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700569 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
Antoine Pitrou424246f2012-05-12 19:02:01 +0200570 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
Larry Hastingsb4038062012-07-15 10:57:38 -0700571 shutil._copyxattr(src_link, dst, follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200572 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
573
Antoine Pitrou78091e62011-12-29 18:54:15 +0100574 @support.skip_unless_symlink
575 def test_copy_symlinks(self):
576 tmp_dir = self.mkdtemp()
577 src = os.path.join(tmp_dir, 'foo')
578 dst = os.path.join(tmp_dir, 'bar')
579 src_link = os.path.join(tmp_dir, 'baz')
580 write_file(src, 'foo')
581 os.symlink(src, src_link)
582 if hasattr(os, 'lchmod'):
583 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
584 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700585 shutil.copy(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100586 self.assertFalse(os.path.islink(dst))
587 self.assertEqual(read_file(src), read_file(dst))
588 os.remove(dst)
589 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700590 shutil.copy(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100591 self.assertTrue(os.path.islink(dst))
592 self.assertEqual(os.readlink(dst), os.readlink(src_link))
593 if hasattr(os, 'lchmod'):
594 self.assertEqual(os.lstat(src_link).st_mode,
595 os.lstat(dst).st_mode)
596
597 @support.skip_unless_symlink
598 def test_copy2_symlinks(self):
599 tmp_dir = self.mkdtemp()
600 src = os.path.join(tmp_dir, 'foo')
601 dst = os.path.join(tmp_dir, 'bar')
602 src_link = os.path.join(tmp_dir, 'baz')
603 write_file(src, 'foo')
604 os.symlink(src, src_link)
605 if hasattr(os, 'lchmod'):
606 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
607 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
608 os.lchflags(src_link, stat.UF_NODUMP)
609 src_stat = os.stat(src)
610 src_link_stat = os.lstat(src_link)
611 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700612 shutil.copy2(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100613 self.assertFalse(os.path.islink(dst))
614 self.assertEqual(read_file(src), read_file(dst))
615 os.remove(dst)
616 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700617 shutil.copy2(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100618 self.assertTrue(os.path.islink(dst))
619 self.assertEqual(os.readlink(dst), os.readlink(src_link))
620 dst_stat = os.lstat(dst)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700621 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100622 for attr in 'st_atime', 'st_mtime':
623 # The modification times may be truncated in the new file.
624 self.assertLessEqual(getattr(src_link_stat, attr),
625 getattr(dst_stat, attr) + 1)
626 if hasattr(os, 'lchmod'):
627 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
628 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
629 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
630 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
631
Antoine Pitrou424246f2012-05-12 19:02:01 +0200632 @support.skip_unless_xattr
633 def test_copy2_xattr(self):
634 tmp_dir = self.mkdtemp()
635 src = os.path.join(tmp_dir, 'foo')
636 dst = os.path.join(tmp_dir, 'bar')
637 write_file(src, 'foo')
638 os.setxattr(src, 'user.foo', b'42')
639 shutil.copy2(src, dst)
640 self.assertEqual(
641 os.getxattr(src, 'user.foo'),
642 os.getxattr(dst, 'user.foo'))
643 os.remove(dst)
644
Antoine Pitrou78091e62011-12-29 18:54:15 +0100645 @support.skip_unless_symlink
646 def test_copyfile_symlinks(self):
647 tmp_dir = self.mkdtemp()
648 src = os.path.join(tmp_dir, 'src')
649 dst = os.path.join(tmp_dir, 'dst')
650 dst_link = os.path.join(tmp_dir, 'dst_link')
651 link = os.path.join(tmp_dir, 'link')
652 write_file(src, 'foo')
653 os.symlink(src, link)
654 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700655 shutil.copyfile(link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100656 self.assertTrue(os.path.islink(dst_link))
657 self.assertEqual(os.readlink(link), os.readlink(dst_link))
658 # follow
659 shutil.copyfile(link, dst)
660 self.assertFalse(os.path.islink(dst))
661
Hynek Schlawack2100b422012-06-23 20:28:32 +0200662 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200663 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
664 os.supports_dir_fd and
665 os.listdir in os.supports_fd and
666 os.stat in os.supports_follow_symlinks)
667 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200668 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000669 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200670 tmp_dir = self.mkdtemp()
671 d = os.path.join(tmp_dir, 'a')
672 os.mkdir(d)
673 try:
674 real_rmtree = shutil._rmtree_safe_fd
675 class Called(Exception): pass
676 def _raiser(*args, **kwargs):
677 raise Called
678 shutil._rmtree_safe_fd = _raiser
679 self.assertRaises(Called, shutil.rmtree, d)
680 finally:
681 shutil._rmtree_safe_fd = real_rmtree
682 else:
683 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000684 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200685
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000686 def test_rmtree_dont_delete_file(self):
687 # When called on a file instead of a directory, don't delete it.
688 handle, path = tempfile.mkstemp()
Victor Stinnerbf816222011-06-30 23:25:47 +0200689 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200690 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000691 os.remove(path)
692
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000693 def test_copytree_simple(self):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000694 src_dir = tempfile.mkdtemp()
695 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200696 self.addCleanup(shutil.rmtree, src_dir)
697 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
698 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000699 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200700 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000701
Éric Araujoa7e33a12011-08-12 19:51:35 +0200702 shutil.copytree(src_dir, dst_dir)
703 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
704 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
705 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
706 'test.txt')))
707 actual = read_file((dst_dir, 'test.txt'))
708 self.assertEqual(actual, '123')
709 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
710 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000711
jab9e00d9e2018-12-28 13:03:40 -0500712 def test_copytree_dirs_exist_ok(self):
713 src_dir = tempfile.mkdtemp()
714 dst_dir = tempfile.mkdtemp()
715 self.addCleanup(shutil.rmtree, src_dir)
716 self.addCleanup(shutil.rmtree, dst_dir)
717
718 write_file((src_dir, 'nonexisting.txt'), '123')
719 os.mkdir(os.path.join(src_dir, 'existing_dir'))
720 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
721 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
722 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
723
724 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
725 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
726 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
727 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
728 'existing.txt')))
729 actual = read_file((dst_dir, 'nonexisting.txt'))
730 self.assertEqual(actual, '123')
731 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
732 self.assertEqual(actual, 'has been replaced')
733
734 with self.assertRaises(FileExistsError):
735 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
736
Antoine Pitrou78091e62011-12-29 18:54:15 +0100737 @support.skip_unless_symlink
738 def test_copytree_symlinks(self):
739 tmp_dir = self.mkdtemp()
740 src_dir = os.path.join(tmp_dir, 'src')
741 dst_dir = os.path.join(tmp_dir, 'dst')
742 sub_dir = os.path.join(src_dir, 'sub')
743 os.mkdir(src_dir)
744 os.mkdir(sub_dir)
745 write_file((src_dir, 'file.txt'), 'foo')
746 src_link = os.path.join(sub_dir, 'link')
747 dst_link = os.path.join(dst_dir, 'sub/link')
748 os.symlink(os.path.join(src_dir, 'file.txt'),
749 src_link)
750 if hasattr(os, 'lchmod'):
751 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
752 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
753 os.lchflags(src_link, stat.UF_NODUMP)
754 src_stat = os.lstat(src_link)
755 shutil.copytree(src_dir, dst_dir, symlinks=True)
756 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
757 self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
758 os.path.join(src_dir, 'file.txt'))
759 dst_stat = os.lstat(dst_link)
760 if hasattr(os, 'lchmod'):
761 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
762 if hasattr(os, 'lchflags'):
763 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
764
Georg Brandl2ee470f2008-07-16 12:55:28 +0000765 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000766 # creating data
767 join = os.path.join
768 exists = os.path.exists
769 src_dir = tempfile.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000770 try:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000771 dst_dir = join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200772 write_file((src_dir, 'test.txt'), '123')
773 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000774 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200775 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000776 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200777 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000778 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
779 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200780 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
781 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000782
783 # testing glob-like patterns
784 try:
785 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
786 shutil.copytree(src_dir, dst_dir, ignore=patterns)
787 # checking the result: some elements should not be copied
788 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200789 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
790 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000791 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200792 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000793 try:
794 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
795 shutil.copytree(src_dir, dst_dir, ignore=patterns)
796 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200797 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
798 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
799 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000800 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200801 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000802
803 # testing callable-style
804 try:
805 def _filter(src, names):
806 res = []
807 for name in names:
808 path = os.path.join(src, name)
809
810 if (os.path.isdir(path) and
811 path.split()[-1] == 'subdir'):
812 res.append(name)
813 elif os.path.splitext(path)[-1] in ('.py'):
814 res.append(name)
815 return res
816
817 shutil.copytree(src_dir, dst_dir, ignore=_filter)
818
819 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200820 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
821 'test.py')))
822 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000823
824 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200825 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000826 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000827 shutil.rmtree(src_dir)
828 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000829
Antoine Pitrouac601602013-08-16 19:35:02 +0200830 def test_copytree_retains_permissions(self):
831 tmp_dir = tempfile.mkdtemp()
832 src_dir = os.path.join(tmp_dir, 'source')
833 os.mkdir(src_dir)
834 dst_dir = os.path.join(tmp_dir, 'destination')
835 self.addCleanup(shutil.rmtree, tmp_dir)
836
837 os.chmod(src_dir, 0o777)
838 write_file((src_dir, 'permissive.txt'), '123')
839 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
840 write_file((src_dir, 'restrictive.txt'), '456')
841 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
842 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
843 os.chmod(restrictive_subdir, 0o600)
844
845 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400846 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
847 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200848 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400849 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200850 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
851 restrictive_subdir_dst = os.path.join(dst_dir,
852 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400853 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200854 os.stat(restrictive_subdir_dst).st_mode)
855
Berker Peksag884afd92014-12-10 02:50:32 +0200856 @unittest.mock.patch('os.chmod')
857 def test_copytree_winerror(self, mock_patch):
858 # When copying to VFAT, copystat() raises OSError. On Windows, the
859 # exception object has a meaningful 'winerror' attribute, but not
860 # on other operating systems. Do not assume 'winerror' is set.
861 src_dir = tempfile.mkdtemp()
862 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
863 self.addCleanup(shutil.rmtree, src_dir)
864 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
865
866 mock_patch.side_effect = PermissionError('ka-boom')
867 with self.assertRaises(shutil.Error):
868 shutil.copytree(src_dir, dst_dir)
869
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100870 def test_copytree_custom_copy_function(self):
871 # See: https://bugs.python.org/issue35648
872 def custom_cpfun(a, b):
873 flag.append(None)
874 self.assertIsInstance(a, str)
875 self.assertIsInstance(b, str)
876 self.assertEqual(a, os.path.join(src, 'foo'))
877 self.assertEqual(b, os.path.join(dst, 'foo'))
878
879 flag = []
880 src = tempfile.mkdtemp()
Victor Stinner4c26abd2019-06-27 01:39:53 +0200881 self.addCleanup(support.rmtree, src)
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100882 dst = tempfile.mktemp()
Victor Stinner4c26abd2019-06-27 01:39:53 +0200883 self.addCleanup(support.rmtree, dst)
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100884 with open(os.path.join(src, 'foo'), 'w') as f:
885 f.close()
886 shutil.copytree(src, dst, copy_function=custom_cpfun)
887 self.assertEqual(len(flag), 1)
888
Zachary Ware9fe6d862013-12-08 00:20:35 -0600889 @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000890 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000891 def test_dont_copy_file_onto_link_to_itself(self):
892 # bug 851123.
893 os.mkdir(TESTFN)
894 src = os.path.join(TESTFN, 'cheese')
895 dst = os.path.join(TESTFN, 'shop')
896 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000897 with open(src, 'w') as f:
898 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +0100899 try:
900 os.link(src, dst)
901 except PermissionError as e:
902 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +0200903 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000904 with open(src, 'r') as f:
905 self.assertEqual(f.read(), 'cheddar')
906 os.remove(dst)
907 finally:
908 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000909
Brian Curtin3b4499c2010-12-28 14:31:47 +0000910 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000911 def test_dont_copy_file_onto_symlink_to_itself(self):
912 # bug 851123.
913 os.mkdir(TESTFN)
914 src = os.path.join(TESTFN, 'cheese')
915 dst = os.path.join(TESTFN, 'shop')
916 try:
917 with open(src, 'w') as f:
918 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000919 # Using `src` here would mean we end up with a symlink pointing
920 # to TESTFN/TESTFN/cheese, while it should point at
921 # TESTFN/cheese.
922 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +0200923 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +0000924 with open(src, 'r') as f:
925 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000926 os.remove(dst)
927 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000928 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +0000929
Brian Curtin3b4499c2010-12-28 14:31:47 +0000930 @support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +0000931 def test_rmtree_on_symlink(self):
932 # bug 1669.
933 os.mkdir(TESTFN)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000934 try:
Brian Curtind40e6f72010-07-08 21:39:08 +0000935 src = os.path.join(TESTFN, 'cheese')
936 dst = os.path.join(TESTFN, 'shop')
937 os.mkdir(src)
938 os.symlink(src, dst)
939 self.assertRaises(OSError, shutil.rmtree, dst)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200940 shutil.rmtree(dst, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000941 finally:
Brian Curtind40e6f72010-07-08 21:39:08 +0000942 shutil.rmtree(TESTFN, ignore_errors=True)
943
Serhiy Storchaka43767632013-11-03 21:31:38 +0200944 # Issue #3002: copyfile and copytree block indefinitely on named pipes
945 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
946 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +0100947 try:
948 os.mkfifo(TESTFN)
949 except PermissionError as e:
950 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200951 try:
952 self.assertRaises(shutil.SpecialFileError,
953 shutil.copyfile, TESTFN, TESTFN2)
954 self.assertRaises(shutil.SpecialFileError,
955 shutil.copyfile, __file__, TESTFN)
956 finally:
957 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000958
Serhiy Storchaka43767632013-11-03 21:31:38 +0200959 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
960 @support.skip_unless_symlink
961 def test_copytree_named_pipe(self):
962 os.mkdir(TESTFN)
963 try:
964 subdir = os.path.join(TESTFN, "subdir")
965 os.mkdir(subdir)
966 pipe = os.path.join(subdir, "mypipe")
xdegaye92c2ca72017-11-12 17:31:07 +0100967 try:
968 os.mkfifo(pipe)
969 except PermissionError as e:
970 self.skipTest('os.mkfifo(): %s' % e)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000971 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200972 shutil.copytree(TESTFN, TESTFN2)
973 except shutil.Error as e:
974 errors = e.args[0]
975 self.assertEqual(len(errors), 1)
976 src, dst, error_msg = errors[0]
977 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
978 else:
979 self.fail("shutil.Error should have been raised")
980 finally:
981 shutil.rmtree(TESTFN, ignore_errors=True)
982 shutil.rmtree(TESTFN2, ignore_errors=True)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000983
Tarek Ziadé5340db32010-04-19 22:30:51 +0000984 def test_copytree_special_func(self):
985
986 src_dir = self.mkdtemp()
987 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200988 write_file((src_dir, 'test.txt'), '123')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000989 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200990 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000991
992 copied = []
993 def _copy(src, dst):
994 copied.append((src, dst))
995
996 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000997 self.assertEqual(len(copied), 2)
Tarek Ziadé5340db32010-04-19 22:30:51 +0000998
Brian Curtin3b4499c2010-12-28 14:31:47 +0000999 @support.skip_unless_symlink
Tarek Ziadéfb437512010-04-20 08:57:33 +00001000 def test_copytree_dangling_symlinks(self):
1001
1002 # a dangling symlink raises an error at the end
1003 src_dir = self.mkdtemp()
1004 dst_dir = os.path.join(self.mkdtemp(), 'destination')
1005 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
1006 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001007 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001008 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
1009
1010 # a dangling symlink is ignored with the proper flag
1011 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1012 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
1013 self.assertNotIn('test.txt', os.listdir(dst_dir))
1014
1015 # a dangling symlink is copied if symlinks=True
1016 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
1017 shutil.copytree(src_dir, dst_dir, symlinks=True)
1018 self.assertIn('test.txt', os.listdir(dst_dir))
1019
Berker Peksag5a294d82015-07-25 14:53:48 +03001020 @support.skip_unless_symlink
1021 def test_copytree_symlink_dir(self):
1022 src_dir = self.mkdtemp()
1023 dst_dir = os.path.join(self.mkdtemp(), 'destination')
1024 os.mkdir(os.path.join(src_dir, 'real_dir'))
1025 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
1026 pass
1027 os.symlink(os.path.join(src_dir, 'real_dir'),
1028 os.path.join(src_dir, 'link_to_dir'),
1029 target_is_directory=True)
1030
1031 shutil.copytree(src_dir, dst_dir, symlinks=False)
1032 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1033 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1034
1035 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1036 shutil.copytree(src_dir, dst_dir, symlinks=True)
1037 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1038 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1039
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001040 def _copy_file(self, method):
1041 fname = 'test.txt'
1042 tmpdir = self.mkdtemp()
Éric Araujoa7e33a12011-08-12 19:51:35 +02001043 write_file((tmpdir, fname), 'xxx')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001044 file1 = os.path.join(tmpdir, fname)
1045 tmpdir2 = self.mkdtemp()
1046 method(file1, tmpdir2)
1047 file2 = os.path.join(tmpdir2, fname)
1048 return (file1, file2)
1049
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001050 def test_copy(self):
1051 # Ensure that the copied file exists and has the same mode bits.
1052 file1, file2 = self._copy_file(shutil.copy)
1053 self.assertTrue(os.path.exists(file2))
1054 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1055
Senthil Kumaran0c2dba52011-07-03 18:21:38 -07001056 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001057 def test_copy2(self):
1058 # Ensure that the copied file exists and has the same mode and
1059 # modification time bits.
1060 file1, file2 = self._copy_file(shutil.copy2)
1061 self.assertTrue(os.path.exists(file2))
1062 file1_stat = os.stat(file1)
1063 file2_stat = os.stat(file2)
1064 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1065 for attr in 'st_atime', 'st_mtime':
1066 # The modification times may be truncated in the new file.
1067 self.assertLessEqual(getattr(file1_stat, attr),
1068 getattr(file2_stat, attr) + 1)
1069 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1070 self.assertEqual(getattr(file1_stat, 'st_flags'),
1071 getattr(file2_stat, 'st_flags'))
1072
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001073 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001074 def test_make_tarball(self):
1075 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001076 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001077
1078 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001079 # force shutil to create the directory
1080 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001081 # working with relative paths
1082 work_dir = os.path.dirname(tmpdir2)
1083 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001084
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001085 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001086 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001087 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001088
1089 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001090 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001091 self.assertTrue(os.path.isfile(tarball))
1092 self.assertTrue(tarfile.is_tarfile(tarball))
1093 with tarfile.open(tarball, 'r:gz') as tf:
1094 self.assertCountEqual(tf.getnames(),
1095 ['.', './sub', './sub2',
1096 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001097
1098 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001099 with support.change_cwd(work_dir):
1100 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001101 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001102 self.assertTrue(os.path.isfile(tarball))
1103 self.assertTrue(tarfile.is_tarfile(tarball))
1104 with tarfile.open(tarball, 'r') as tf:
1105 self.assertCountEqual(tf.getnames(),
1106 ['.', './sub', './sub2',
1107 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001108
1109 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001110 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001111 names = tar.getnames()
1112 names.sort()
1113 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001114
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001115 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001116 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001117 root_dir = self.mkdtemp()
1118 dist = os.path.join(root_dir, base_dir)
1119 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001120 write_file((dist, 'file1'), 'xxx')
1121 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001122 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001123 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001124 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001125 if base_dir:
1126 write_file((root_dir, 'outer'), 'xxx')
1127 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001128
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001129 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001130 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001131 'Need the tar command to run')
1132 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001133 root_dir, base_dir = self._create_files()
1134 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001135 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001136
1137 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001138 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001139 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001140
1141 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001142 tarball2 = os.path.join(root_dir, 'archive2.tar')
1143 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001144 subprocess.check_call(tar_cmd, cwd=root_dir,
1145 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001146
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001147 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001148 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001149 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001150
1151 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001152 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1153 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001154 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001155
1156 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001157 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1158 dry_run=True)
1159 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001160 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001161
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001162 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001163 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001164 # creating something to zip
1165 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001166
1167 tmpdir2 = self.mkdtemp()
1168 # force shutil to create the directory
1169 os.rmdir(tmpdir2)
1170 # working with relative paths
1171 work_dir = os.path.dirname(tmpdir2)
1172 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001173
1174 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001175 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001176 res = make_archive(rel_base_name, 'zip', root_dir)
1177
1178 self.assertEqual(res, base_name + '.zip')
1179 self.assertTrue(os.path.isfile(res))
1180 self.assertTrue(zipfile.is_zipfile(res))
1181 with zipfile.ZipFile(res) as zf:
1182 self.assertCountEqual(zf.namelist(),
1183 ['dist/', 'dist/sub/', 'dist/sub2/',
1184 'dist/file1', 'dist/file2', 'dist/sub/file3',
1185 'outer'])
1186
1187 with support.change_cwd(work_dir):
1188 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001189 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001190
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001191 self.assertEqual(res, base_name + '.zip')
1192 self.assertTrue(os.path.isfile(res))
1193 self.assertTrue(zipfile.is_zipfile(res))
1194 with zipfile.ZipFile(res) as zf:
1195 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001196 ['dist/', 'dist/sub/', 'dist/sub2/',
1197 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001198
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001199 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001200 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001201 'Need the zip command to run')
1202 def test_zipfile_vs_zip(self):
1203 root_dir, base_dir = self._create_files()
1204 base_name = os.path.join(self.mkdtemp(), 'archive')
1205 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1206
1207 # check if ZIP file was created
1208 self.assertEqual(archive, base_name + '.zip')
1209 self.assertTrue(os.path.isfile(archive))
1210
1211 # now create another ZIP file using `zip`
1212 archive2 = os.path.join(root_dir, 'archive2.zip')
1213 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001214 subprocess.check_call(zip_cmd, cwd=root_dir,
1215 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001216
1217 self.assertTrue(os.path.isfile(archive2))
1218 # let's compare both ZIP files
1219 with zipfile.ZipFile(archive) as zf:
1220 names = zf.namelist()
1221 with zipfile.ZipFile(archive2) as zf:
1222 names2 = zf.namelist()
1223 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001224
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001225 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001226 @unittest.skipUnless(shutil.which('unzip'),
1227 'Need the unzip command to run')
1228 def test_unzip_zipfile(self):
1229 root_dir, base_dir = self._create_files()
1230 base_name = os.path.join(self.mkdtemp(), 'archive')
1231 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1232
1233 # check if ZIP file was created
1234 self.assertEqual(archive, base_name + '.zip')
1235 self.assertTrue(os.path.isfile(archive))
1236
1237 # now check the ZIP file using `unzip -t`
1238 zip_cmd = ['unzip', '-t', archive]
1239 with support.change_cwd(root_dir):
1240 try:
1241 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1242 except subprocess.CalledProcessError as exc:
1243 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001244 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001245 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001246 msg = "{}\n\n**Unzip Output**\n{}"
1247 self.fail(msg.format(exc, details))
1248
Tarek Ziadé396fad72010-02-23 05:30:31 +00001249 def test_make_archive(self):
1250 tmpdir = self.mkdtemp()
1251 base_name = os.path.join(tmpdir, 'archive')
1252 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1253
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001254 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001255 def test_make_archive_owner_group(self):
1256 # testing make_archive with owner and group, with various combinations
1257 # this works even if there's not gid/uid support
1258 if UID_GID_SUPPORT:
1259 group = grp.getgrgid(0)[0]
1260 owner = pwd.getpwuid(0)[0]
1261 else:
1262 group = owner = 'root'
1263
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001264 root_dir, base_dir = self._create_files()
1265 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001266 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1267 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001268 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001269
1270 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001271 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001272
1273 res = make_archive(base_name, 'tar', root_dir, base_dir,
1274 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001275 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001276
1277 res = make_archive(base_name, 'tar', root_dir, base_dir,
1278 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001279 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001280
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001281
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001282 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001283 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1284 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001285 root_dir, base_dir = self._create_files()
1286 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001287 group = grp.getgrgid(0)[0]
1288 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001289 with support.change_cwd(root_dir):
1290 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1291 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001292
1293 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001294 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001295
1296 # now checks the rights
1297 archive = tarfile.open(archive_name)
1298 try:
1299 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001300 self.assertEqual(member.uid, 0)
1301 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001302 finally:
1303 archive.close()
1304
1305 def test_make_archive_cwd(self):
1306 current_dir = os.getcwd()
1307 def _breaks(*args, **kw):
1308 raise RuntimeError()
1309
1310 register_archive_format('xxx', _breaks, [], 'xxx file')
1311 try:
1312 try:
1313 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1314 except Exception:
1315 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001316 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001317 finally:
1318 unregister_archive_format('xxx')
1319
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001320 def test_make_tarfile_in_curdir(self):
1321 # Issue #21280
1322 root_dir = self.mkdtemp()
1323 with support.change_cwd(root_dir):
1324 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1325 self.assertTrue(os.path.isfile('test.tar'))
1326
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001327 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001328 def test_make_zipfile_in_curdir(self):
1329 # Issue #21280
1330 root_dir = self.mkdtemp()
1331 with support.change_cwd(root_dir):
1332 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1333 self.assertTrue(os.path.isfile('test.zip'))
1334
Tarek Ziadé396fad72010-02-23 05:30:31 +00001335 def test_register_archive_format(self):
1336
1337 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1338 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1339 1)
1340 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1341 [(1, 2), (1, 2, 3)])
1342
1343 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1344 formats = [name for name, params in get_archive_formats()]
1345 self.assertIn('xxx', formats)
1346
1347 unregister_archive_format('xxx')
1348 formats = [name for name, params in get_archive_formats()]
1349 self.assertNotIn('xxx', formats)
1350
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001351 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001352 self.check_unpack_archive_with_converter(format, lambda path: path)
1353 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001354 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001355
1356 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001357 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001358 expected = rlistdir(root_dir)
1359 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001360
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001361 base_name = os.path.join(self.mkdtemp(), 'archive')
1362 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001363
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001364 # let's try to unpack it now
1365 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001366 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001367 self.assertEqual(rlistdir(tmpdir2), expected)
1368
1369 # and again, this time with the format specified
1370 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001371 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001372 self.assertEqual(rlistdir(tmpdir3), expected)
1373
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001374 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1375 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001376
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001377 def test_unpack_archive_tar(self):
1378 self.check_unpack_archive('tar')
1379
1380 @support.requires_zlib
1381 def test_unpack_archive_gztar(self):
1382 self.check_unpack_archive('gztar')
1383
1384 @support.requires_bz2
1385 def test_unpack_archive_bztar(self):
1386 self.check_unpack_archive('bztar')
1387
1388 @support.requires_lzma
Michael Feltef110b12019-02-18 12:02:44 +01001389 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001390 def test_unpack_archive_xztar(self):
1391 self.check_unpack_archive('xztar')
1392
1393 @support.requires_zlib
1394 def test_unpack_archive_zip(self):
1395 self.check_unpack_archive('zip')
1396
Martin Pantereb995702016-07-28 01:11:04 +00001397 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001398
1399 formats = get_unpack_formats()
1400
1401 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001402 self.assertEqual(extra, 1)
1403 self.assertEqual(filename, 'stuff.boo')
1404 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001405
1406 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1407 unpack_archive('stuff.boo', 'xx')
1408
1409 # trying to register a .boo unpacker again
1410 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1411 ['.boo'], _boo)
1412
1413 # should work now
1414 unregister_unpack_format('Boo')
1415 register_unpack_format('Boo2', ['.boo'], _boo)
1416 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1417 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1418
1419 # let's leave a clean state
1420 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001422
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001423 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1424 "disk_usage not available on this platform")
1425 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001426 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001427 for attr in ('total', 'used', 'free'):
1428 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001429 self.assertGreater(usage.total, 0)
1430 self.assertGreater(usage.used, 0)
1431 self.assertGreaterEqual(usage.free, 0)
1432 self.assertGreaterEqual(usage.total, usage.used)
1433 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001434
Victor Stinnerdc525f42018-12-11 12:05:21 +01001435 # bpo-32557: Check that disk_usage() also accepts a filename
1436 shutil.disk_usage(__file__)
1437
Sandro Tosid902a142011-08-22 23:28:27 +02001438 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1439 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1440 def test_chown(self):
1441
1442 # cleaned-up automatically by TestShutil.tearDown method
1443 dirname = self.mkdtemp()
1444 filename = tempfile.mktemp(dir=dirname)
1445 write_file(filename, 'testing chown function')
1446
1447 with self.assertRaises(ValueError):
1448 shutil.chown(filename)
1449
1450 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001451 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001452
1453 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001454 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001455
1456 with self.assertRaises(TypeError):
1457 shutil.chown(filename, b'spam')
1458
1459 with self.assertRaises(TypeError):
1460 shutil.chown(filename, 3.14)
1461
1462 uid = os.getuid()
1463 gid = os.getgid()
1464
1465 def check_chown(path, uid=None, gid=None):
1466 s = os.stat(filename)
1467 if uid is not None:
1468 self.assertEqual(uid, s.st_uid)
1469 if gid is not None:
1470 self.assertEqual(gid, s.st_gid)
1471
1472 shutil.chown(filename, uid, gid)
1473 check_chown(filename, uid, gid)
1474 shutil.chown(filename, uid)
1475 check_chown(filename, uid)
1476 shutil.chown(filename, user=uid)
1477 check_chown(filename, uid)
1478 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001479 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001480
1481 shutil.chown(dirname, uid, gid)
1482 check_chown(dirname, uid, gid)
1483 shutil.chown(dirname, uid)
1484 check_chown(dirname, uid)
1485 shutil.chown(dirname, user=uid)
1486 check_chown(dirname, uid)
1487 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001488 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001489
1490 user = pwd.getpwuid(uid)[0]
1491 group = grp.getgrgid(gid)[0]
1492 shutil.chown(filename, user, group)
1493 check_chown(filename, uid, gid)
1494 shutil.chown(dirname, user, group)
1495 check_chown(dirname, uid, gid)
1496
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001497 def test_copy_return_value(self):
1498 # copy and copy2 both return their destination path.
1499 for fn in (shutil.copy, shutil.copy2):
1500 src_dir = self.mkdtemp()
1501 dst_dir = self.mkdtemp()
1502 src = os.path.join(src_dir, 'foo')
1503 write_file(src, 'foo')
1504 rv = fn(src, dst_dir)
1505 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1506 rv = fn(src, os.path.join(dst_dir, 'bar'))
1507 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1508
1509 def test_copyfile_return_value(self):
1510 # copytree returns its destination path.
1511 src_dir = self.mkdtemp()
1512 dst_dir = self.mkdtemp()
1513 dst_file = os.path.join(dst_dir, 'bar')
1514 src_file = os.path.join(src_dir, 'foo')
1515 write_file(src_file, 'foo')
1516 rv = shutil.copyfile(src_file, dst_file)
1517 self.assertTrue(os.path.exists(rv))
1518 self.assertEqual(read_file(src_file), read_file(dst_file))
1519
Hynek Schlawack48653762012-10-07 12:49:58 +02001520 def test_copyfile_same_file(self):
1521 # copyfile() should raise SameFileError if the source and destination
1522 # are the same.
1523 src_dir = self.mkdtemp()
1524 src_file = os.path.join(src_dir, 'foo')
1525 write_file(src_file, 'foo')
1526 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
Hynek Schlawack27ddb572012-10-28 13:59:27 +01001527 # But Error should work too, to stay backward compatible.
1528 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001529 # Make sure file is not corrupted.
1530 self.assertEqual(read_file(src_file), 'foo')
Hynek Schlawack48653762012-10-07 12:49:58 +02001531
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001532 def test_copytree_return_value(self):
1533 # copytree returns its destination path.
1534 src_dir = self.mkdtemp()
1535 dst_dir = src_dir + "dest"
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001536 self.addCleanup(shutil.rmtree, dst_dir, True)
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001537 src = os.path.join(src_dir, 'foo')
1538 write_file(src, 'foo')
1539 rv = shutil.copytree(src_dir, dst_dir)
1540 self.assertEqual(['foo'], os.listdir(rv))
1541
Christian Heimes9bd667a2008-01-20 15:14:11 +00001542
Brian Curtinc57a3452012-06-22 16:00:30 -05001543class TestWhich(unittest.TestCase):
1544
1545 def setUp(self):
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001546 self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001547 self.addCleanup(shutil.rmtree, self.temp_dir, True)
Brian Curtinc57a3452012-06-22 16:00:30 -05001548 # Give the temp_file an ".exe" suffix for all.
1549 # It's needed on Windows and not harmful on other platforms.
1550 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001551 prefix="Tmp",
1552 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001553 os.chmod(self.temp_file.name, stat.S_IXUSR)
1554 self.addCleanup(self.temp_file.close)
1555 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001556 self.env_path = self.dir
1557 self.curdir = os.curdir
1558 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001559
1560 def test_basic(self):
1561 # Given an EXE in a directory, it should be returned.
1562 rv = shutil.which(self.file, path=self.dir)
1563 self.assertEqual(rv, self.temp_file.name)
1564
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001565 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001566 # When given the fully qualified path to an executable that exists,
1567 # it should be returned.
1568 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001569 self.assertEqual(rv, self.temp_file.name)
1570
1571 def test_relative_cmd(self):
1572 # When given the relative path with a directory part to an executable
1573 # that exists, it should be returned.
1574 base_dir, tail_dir = os.path.split(self.dir)
1575 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001576 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001577 rv = shutil.which(relpath, path=self.temp_dir)
1578 self.assertEqual(rv, relpath)
1579 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001580 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001581 rv = shutil.which(relpath, path=base_dir)
1582 self.assertIsNone(rv)
1583
1584 def test_cwd(self):
1585 # Issue #16957
1586 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001587 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001588 rv = shutil.which(self.file, path=base_dir)
1589 if sys.platform == "win32":
1590 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001591 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001592 else:
1593 # Other platforms: shouldn't match in the current directory.
1594 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001595
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001596 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1597 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001598 def test_non_matching_mode(self):
1599 # Set the file read-only and ask for writeable files.
1600 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001601 if os.access(self.temp_file.name, os.W_OK):
1602 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001603 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1604 self.assertIsNone(rv)
1605
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001606 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001607 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001608 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001609 rv = shutil.which(self.file, path=tail_dir)
1610 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001611
Brian Curtinc57a3452012-06-22 16:00:30 -05001612 def test_nonexistent_file(self):
1613 # Return None when no matching executable file is found on the path.
1614 rv = shutil.which("foo.exe", path=self.dir)
1615 self.assertIsNone(rv)
1616
1617 @unittest.skipUnless(sys.platform == "win32",
1618 "pathext check is Windows-only")
1619 def test_pathext_checking(self):
1620 # Ask for the file without the ".exe" extension, then ensure that
1621 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001622 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001623 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001624
Barry Warsaw618738b2013-04-16 11:05:03 -04001625 def test_environ_path(self):
1626 with support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001627 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001628 rv = shutil.which(self.file)
1629 self.assertEqual(rv, self.temp_file.name)
1630
Victor Stinner228a3c92019-04-17 16:26:36 +02001631 def test_environ_path_empty(self):
1632 # PATH='': no match
1633 with support.EnvironmentVarGuard() as env:
1634 env['PATH'] = ''
1635 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1636 create=True), \
1637 support.swap_attr(os, 'defpath', self.dir), \
1638 support.change_cwd(self.dir):
1639 rv = shutil.which(self.file)
1640 self.assertIsNone(rv)
1641
1642 def test_environ_path_cwd(self):
1643 expected_cwd = os.path.basename(self.temp_file.name)
1644 if sys.platform == "win32":
1645 curdir = os.curdir
1646 if isinstance(expected_cwd, bytes):
1647 curdir = os.fsencode(curdir)
1648 expected_cwd = os.path.join(curdir, expected_cwd)
1649
1650 # PATH=':': explicitly looks in the current directory
1651 with support.EnvironmentVarGuard() as env:
1652 env['PATH'] = os.pathsep
1653 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1654 create=True), \
1655 support.swap_attr(os, 'defpath', self.dir):
1656 rv = shutil.which(self.file)
1657 self.assertIsNone(rv)
1658
1659 # look in current directory
1660 with support.change_cwd(self.dir):
1661 rv = shutil.which(self.file)
1662 self.assertEqual(rv, expected_cwd)
1663
1664 def test_environ_path_missing(self):
1665 with support.EnvironmentVarGuard() as env:
1666 env.pop('PATH', None)
1667
1668 # without confstr
1669 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1670 create=True), \
1671 support.swap_attr(os, 'defpath', self.dir):
1672 rv = shutil.which(self.file)
1673 self.assertEqual(rv, self.temp_file.name)
1674
1675 # with confstr
1676 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1677 create=True), \
1678 support.swap_attr(os, 'defpath', ''):
1679 rv = shutil.which(self.file)
1680 self.assertEqual(rv, self.temp_file.name)
1681
Barry Warsaw618738b2013-04-16 11:05:03 -04001682 def test_empty_path(self):
1683 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001684 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001685 support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001686 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001687 rv = shutil.which(self.file, path='')
1688 self.assertIsNone(rv)
1689
1690 def test_empty_path_no_PATH(self):
1691 with support.EnvironmentVarGuard() as env:
1692 env.pop('PATH', None)
1693 rv = shutil.which(self.file)
1694 self.assertIsNone(rv)
1695
Victor Stinner228a3c92019-04-17 16:26:36 +02001696 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1697 def test_pathext(self):
1698 ext = ".xyz"
1699 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1700 prefix="Tmp2", suffix=ext)
1701 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1702 self.addCleanup(temp_filexyz.close)
1703
1704 # strip path and extension
1705 program = os.path.basename(temp_filexyz.name)
1706 program = os.path.splitext(program)[0]
1707
1708 with support.EnvironmentVarGuard() as env:
1709 env['PATHEXT'] = ext
1710 rv = shutil.which(program, path=self.temp_dir)
1711 self.assertEqual(rv, temp_filexyz.name)
1712
Brian Curtinc57a3452012-06-22 16:00:30 -05001713
Cheryl Sabella5680f652019-02-13 06:25:10 -05001714class TestWhichBytes(TestWhich):
1715 def setUp(self):
1716 TestWhich.setUp(self)
1717 self.dir = os.fsencode(self.dir)
1718 self.file = os.fsencode(self.file)
1719 self.temp_file.name = os.fsencode(self.temp_file.name)
1720 self.curdir = os.fsencode(self.curdir)
1721 self.ext = os.fsencode(self.ext)
1722
1723
Christian Heimesada8c3b2008-03-18 18:26:33 +00001724class TestMove(unittest.TestCase):
1725
1726 def setUp(self):
1727 filename = "foo"
1728 self.src_dir = tempfile.mkdtemp()
1729 self.dst_dir = tempfile.mkdtemp()
1730 self.src_file = os.path.join(self.src_dir, filename)
1731 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001732 with open(self.src_file, "wb") as f:
1733 f.write(b"spam")
1734
1735 def tearDown(self):
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001736 for d in (self.src_dir, self.dst_dir):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001737 try:
1738 if d:
1739 shutil.rmtree(d)
1740 except:
1741 pass
1742
1743 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001744 with open(src, "rb") as f:
1745 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001746 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001747 with open(real_dst, "rb") as f:
1748 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001749 self.assertFalse(os.path.exists(src))
1750
1751 def _check_move_dir(self, src, dst, real_dst):
1752 contents = sorted(os.listdir(src))
1753 shutil.move(src, dst)
1754 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1755 self.assertFalse(os.path.exists(src))
1756
1757 def test_move_file(self):
1758 # Move a file to another location on the same filesystem.
1759 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1760
1761 def test_move_file_to_dir(self):
1762 # Move a file inside an existing dir on the same filesystem.
1763 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1764
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001765 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001766 def test_move_file_other_fs(self):
1767 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001768 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001769
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001770 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001771 def test_move_file_to_dir_other_fs(self):
1772 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001773 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001774
1775 def test_move_dir(self):
1776 # Move a dir to another location on the same filesystem.
1777 dst_dir = tempfile.mktemp()
1778 try:
1779 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1780 finally:
1781 try:
1782 shutil.rmtree(dst_dir)
1783 except:
1784 pass
1785
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001786 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001787 def test_move_dir_other_fs(self):
1788 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001789 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001790
1791 def test_move_dir_to_dir(self):
1792 # Move a dir inside an existing dir on the same filesystem.
1793 self._check_move_dir(self.src_dir, self.dst_dir,
1794 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1795
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001796 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001797 def test_move_dir_to_dir_other_fs(self):
1798 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001799 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001800
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001801 def test_move_dir_sep_to_dir(self):
1802 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1803 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1804
1805 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1806 def test_move_dir_altsep_to_dir(self):
1807 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1808 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1809
Christian Heimesada8c3b2008-03-18 18:26:33 +00001810 def test_existing_file_inside_dest_dir(self):
1811 # A file with the same name inside the destination dir already exists.
1812 with open(self.dst_file, "wb"):
1813 pass
1814 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1815
1816 def test_dont_move_dir_in_itself(self):
1817 # Moving a dir inside itself raises an Error.
1818 dst = os.path.join(self.src_dir, "bar")
1819 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1820
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001821 def test_destinsrc_false_negative(self):
1822 os.mkdir(TESTFN)
1823 try:
1824 for src, dst in [('srcdir', 'srcdir/dest')]:
1825 src = os.path.join(TESTFN, src)
1826 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001827 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001828 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001829 'dst (%s) is not in src (%s)' % (dst, src))
1830 finally:
1831 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001832
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001833 def test_destinsrc_false_positive(self):
1834 os.mkdir(TESTFN)
1835 try:
1836 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1837 src = os.path.join(TESTFN, src)
1838 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001839 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001840 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001841 'dst (%s) is in src (%s)' % (dst, src))
1842 finally:
1843 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001844
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001845 @support.skip_unless_symlink
1846 @mock_rename
1847 def test_move_file_symlink(self):
1848 dst = os.path.join(self.src_dir, 'bar')
1849 os.symlink(self.src_file, dst)
1850 shutil.move(dst, self.dst_file)
1851 self.assertTrue(os.path.islink(self.dst_file))
1852 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1853
1854 @support.skip_unless_symlink
1855 @mock_rename
1856 def test_move_file_symlink_to_dir(self):
1857 filename = "bar"
1858 dst = os.path.join(self.src_dir, filename)
1859 os.symlink(self.src_file, dst)
1860 shutil.move(dst, self.dst_dir)
1861 final_link = os.path.join(self.dst_dir, filename)
1862 self.assertTrue(os.path.islink(final_link))
1863 self.assertTrue(os.path.samefile(self.src_file, final_link))
1864
1865 @support.skip_unless_symlink
1866 @mock_rename
1867 def test_move_dangling_symlink(self):
1868 src = os.path.join(self.src_dir, 'baz')
1869 dst = os.path.join(self.src_dir, 'bar')
1870 os.symlink(src, dst)
1871 dst_link = os.path.join(self.dst_dir, 'quux')
1872 shutil.move(dst, dst_link)
1873 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07001874 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001875
1876 @support.skip_unless_symlink
1877 @mock_rename
1878 def test_move_dir_symlink(self):
1879 src = os.path.join(self.src_dir, 'baz')
1880 dst = os.path.join(self.src_dir, 'bar')
1881 os.mkdir(src)
1882 os.symlink(src, dst)
1883 dst_link = os.path.join(self.dst_dir, 'quux')
1884 shutil.move(dst, dst_link)
1885 self.assertTrue(os.path.islink(dst_link))
1886 self.assertTrue(os.path.samefile(src, dst_link))
1887
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001888 def test_move_return_value(self):
1889 rv = shutil.move(self.src_file, self.dst_dir)
1890 self.assertEqual(rv,
1891 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1892
1893 def test_move_as_rename_return_value(self):
1894 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1895 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1896
R David Murray6ffface2014-06-11 14:40:13 -04001897 @mock_rename
1898 def test_move_file_special_function(self):
1899 moved = []
1900 def _copy(src, dst):
1901 moved.append((src, dst))
1902 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1903 self.assertEqual(len(moved), 1)
1904
1905 @mock_rename
1906 def test_move_dir_special_function(self):
1907 moved = []
1908 def _copy(src, dst):
1909 moved.append((src, dst))
1910 support.create_empty_file(os.path.join(self.src_dir, 'child'))
1911 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1912 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1913 self.assertEqual(len(moved), 3)
1914
Tarek Ziadé5340db32010-04-19 22:30:51 +00001915
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001916class TestCopyFile(unittest.TestCase):
1917
1918 _delete = False
1919
1920 class Faux(object):
1921 _entered = False
1922 _exited_with = None
1923 _raised = False
1924 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
1925 self._raise_in_exit = raise_in_exit
1926 self._suppress_at_exit = suppress_at_exit
1927 def read(self, *args):
1928 return ''
1929 def __enter__(self):
1930 self._entered = True
1931 def __exit__(self, exc_type, exc_val, exc_tb):
1932 self._exited_with = exc_type, exc_val, exc_tb
1933 if self._raise_in_exit:
1934 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001935 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001936 return self._suppress_at_exit
1937
1938 def tearDown(self):
1939 if self._delete:
1940 del shutil.open
1941
1942 def _set_shutil_open(self, func):
1943 shutil.open = func
1944 self._delete = True
1945
1946 def test_w_source_open_fails(self):
1947 def _open(filename, mode='r'):
1948 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001949 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001950 assert 0 # shouldn't reach here.
1951
1952 self._set_shutil_open(_open)
1953
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001954 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001955
Victor Stinner937ee9e2018-06-26 02:11:06 +02001956 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001957 def test_w_dest_open_fails(self):
1958
1959 srcfile = self.Faux()
1960
1961 def _open(filename, mode='r'):
1962 if filename == 'srcfile':
1963 return srcfile
1964 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001965 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001966 assert 0 # shouldn't reach here.
1967
1968 self._set_shutil_open(_open)
1969
1970 shutil.copyfile('srcfile', 'destfile')
1971 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001972 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001973 self.assertEqual(srcfile._exited_with[1].args,
1974 ('Cannot open "destfile"',))
1975
Victor Stinner937ee9e2018-06-26 02:11:06 +02001976 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001977 def test_w_dest_close_fails(self):
1978
1979 srcfile = self.Faux()
1980 destfile = self.Faux(True)
1981
1982 def _open(filename, mode='r'):
1983 if filename == 'srcfile':
1984 return srcfile
1985 if filename == 'destfile':
1986 return destfile
1987 assert 0 # shouldn't reach here.
1988
1989 self._set_shutil_open(_open)
1990
1991 shutil.copyfile('srcfile', 'destfile')
1992 self.assertTrue(srcfile._entered)
1993 self.assertTrue(destfile._entered)
1994 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001995 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001996 self.assertEqual(srcfile._exited_with[1].args,
1997 ('Cannot close',))
1998
Victor Stinner937ee9e2018-06-26 02:11:06 +02001999 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002000 def test_w_source_close_fails(self):
2001
2002 srcfile = self.Faux(True)
2003 destfile = self.Faux()
2004
2005 def _open(filename, mode='r'):
2006 if filename == 'srcfile':
2007 return srcfile
2008 if filename == 'destfile':
2009 return destfile
2010 assert 0 # shouldn't reach here.
2011
2012 self._set_shutil_open(_open)
2013
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002014 self.assertRaises(OSError,
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002015 shutil.copyfile, 'srcfile', 'destfile')
2016 self.assertTrue(srcfile._entered)
2017 self.assertTrue(destfile._entered)
2018 self.assertFalse(destfile._raised)
2019 self.assertTrue(srcfile._exited_with[0] is None)
2020 self.assertTrue(srcfile._raised)
2021
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002022 def test_move_dir_caseinsensitive(self):
2023 # Renames a folder to the same name
2024 # but a different case.
2025
2026 self.src_dir = tempfile.mkdtemp()
Larry Hastings5b2f9c02012-06-25 23:50:01 -07002027 self.addCleanup(shutil.rmtree, self.src_dir, True)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002028 dst_dir = os.path.join(
2029 os.path.dirname(self.src_dir),
2030 os.path.basename(self.src_dir).upper())
2031 self.assertNotEqual(self.src_dir, dst_dir)
2032
2033 try:
2034 shutil.move(self.src_dir, dst_dir)
2035 self.assertTrue(os.path.isdir(dst_dir))
2036 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +02002037 os.rmdir(dst_dir)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002038
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002039
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002040class TestCopyFileObj(unittest.TestCase):
2041 FILESIZE = 2 * 1024 * 1024
2042
2043 @classmethod
2044 def setUpClass(cls):
2045 write_test_file(TESTFN, cls.FILESIZE)
2046
2047 @classmethod
2048 def tearDownClass(cls):
2049 support.unlink(TESTFN)
2050 support.unlink(TESTFN2)
2051
2052 def tearDown(self):
2053 support.unlink(TESTFN2)
2054
2055 @contextlib.contextmanager
2056 def get_files(self):
2057 with open(TESTFN, "rb") as src:
2058 with open(TESTFN2, "wb") as dst:
2059 yield (src, dst)
2060
2061 def assert_files_eq(self, src, dst):
2062 with open(src, 'rb') as fsrc:
2063 with open(dst, 'rb') as fdst:
2064 self.assertEqual(fsrc.read(), fdst.read())
2065
2066 def test_content(self):
2067 with self.get_files() as (src, dst):
2068 shutil.copyfileobj(src, dst)
2069 self.assert_files_eq(TESTFN, TESTFN2)
2070
2071 def test_file_not_closed(self):
2072 with self.get_files() as (src, dst):
2073 shutil.copyfileobj(src, dst)
2074 assert not src.closed
2075 assert not dst.closed
2076
2077 def test_file_offset(self):
2078 with self.get_files() as (src, dst):
2079 shutil.copyfileobj(src, dst)
2080 self.assertEqual(src.tell(), self.FILESIZE)
2081 self.assertEqual(dst.tell(), self.FILESIZE)
2082
2083 @unittest.skipIf(os.name != 'nt', "Windows only")
2084 def test_win_impl(self):
2085 # Make sure alternate Windows implementation is called.
2086 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2087 shutil.copyfile(TESTFN, TESTFN2)
2088 assert m.called
2089
2090 # File size is 2 MiB but max buf size should be 1 MiB.
2091 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2092
2093 # If file size < 1 MiB memoryview() length must be equal to
2094 # the actual file size.
2095 with tempfile.NamedTemporaryFile(delete=False) as f:
2096 f.write(b'foo')
2097 fname = f.name
2098 self.addCleanup(support.unlink, fname)
2099 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2100 shutil.copyfile(fname, TESTFN2)
2101 self.assertEqual(m.call_args[0][2], 3)
2102
2103 # Empty files should not rely on readinto() variant.
2104 with tempfile.NamedTemporaryFile(delete=False) as f:
2105 pass
2106 fname = f.name
2107 self.addCleanup(support.unlink, fname)
2108 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2109 shutil.copyfile(fname, TESTFN2)
2110 assert not m.called
2111 self.assert_files_eq(fname, TESTFN2)
2112
2113
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002114class _ZeroCopyFileTest(object):
2115 """Tests common to all zero-copy APIs."""
2116 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2117 FILEDATA = b""
2118 PATCHPOINT = ""
2119
2120 @classmethod
2121 def setUpClass(cls):
2122 write_test_file(TESTFN, cls.FILESIZE)
2123 with open(TESTFN, 'rb') as f:
2124 cls.FILEDATA = f.read()
2125 assert len(cls.FILEDATA) == cls.FILESIZE
2126
2127 @classmethod
2128 def tearDownClass(cls):
2129 support.unlink(TESTFN)
2130
2131 def tearDown(self):
2132 support.unlink(TESTFN2)
2133
2134 @contextlib.contextmanager
2135 def get_files(self):
2136 with open(TESTFN, "rb") as src:
2137 with open(TESTFN2, "wb") as dst:
2138 yield (src, dst)
2139
2140 def zerocopy_fun(self, *args, **kwargs):
2141 raise NotImplementedError("must be implemented in subclass")
2142
2143 def reset(self):
2144 self.tearDown()
2145 self.tearDownClass()
2146 self.setUpClass()
2147 self.setUp()
2148
2149 # ---
2150
2151 def test_regular_copy(self):
2152 with self.get_files() as (src, dst):
2153 self.zerocopy_fun(src, dst)
2154 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2155 # Make sure the fallback function is not called.
2156 with self.get_files() as (src, dst):
2157 with unittest.mock.patch('shutil.copyfileobj') as m:
2158 shutil.copyfile(TESTFN, TESTFN2)
2159 assert not m.called
2160
2161 def test_same_file(self):
2162 self.addCleanup(self.reset)
2163 with self.get_files() as (src, dst):
2164 with self.assertRaises(Exception):
2165 self.zerocopy_fun(src, src)
2166 # Make sure src file is not corrupted.
2167 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2168
2169 def test_non_existent_src(self):
2170 name = tempfile.mktemp()
2171 with self.assertRaises(FileNotFoundError) as cm:
2172 shutil.copyfile(name, "new")
2173 self.assertEqual(cm.exception.filename, name)
2174
2175 def test_empty_file(self):
2176 srcname = TESTFN + 'src'
2177 dstname = TESTFN + 'dst'
2178 self.addCleanup(lambda: support.unlink(srcname))
2179 self.addCleanup(lambda: support.unlink(dstname))
2180 with open(srcname, "wb"):
2181 pass
2182
2183 with open(srcname, "rb") as src:
2184 with open(dstname, "wb") as dst:
2185 self.zerocopy_fun(src, dst)
2186
2187 self.assertEqual(read_file(dstname, binary=True), b"")
2188
2189 def test_unhandled_exception(self):
2190 with unittest.mock.patch(self.PATCHPOINT,
2191 side_effect=ZeroDivisionError):
2192 self.assertRaises(ZeroDivisionError,
2193 shutil.copyfile, TESTFN, TESTFN2)
2194
2195 def test_exception_on_first_call(self):
2196 # Emulate a case where the first call to the zero-copy
2197 # function raises an exception in which case the function is
2198 # supposed to give up immediately.
2199 with unittest.mock.patch(self.PATCHPOINT,
2200 side_effect=OSError(errno.EINVAL, "yo")):
2201 with self.get_files() as (src, dst):
2202 with self.assertRaises(_GiveupOnFastCopy):
2203 self.zerocopy_fun(src, dst)
2204
2205 def test_filesystem_full(self):
2206 # Emulate a case where filesystem is full and sendfile() fails
2207 # on first call.
2208 with unittest.mock.patch(self.PATCHPOINT,
2209 side_effect=OSError(errno.ENOSPC, "yo")):
2210 with self.get_files() as (src, dst):
2211 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2212
2213
2214@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2215class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2216 PATCHPOINT = "os.sendfile"
2217
2218 def zerocopy_fun(self, fsrc, fdst):
2219 return shutil._fastcopy_sendfile(fsrc, fdst)
2220
2221 def test_non_regular_file_src(self):
2222 with io.BytesIO(self.FILEDATA) as src:
2223 with open(TESTFN2, "wb") as dst:
2224 with self.assertRaises(_GiveupOnFastCopy):
2225 self.zerocopy_fun(src, dst)
2226 shutil.copyfileobj(src, dst)
2227
2228 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2229
2230 def test_non_regular_file_dst(self):
2231 with open(TESTFN, "rb") as src:
2232 with io.BytesIO() as dst:
2233 with self.assertRaises(_GiveupOnFastCopy):
2234 self.zerocopy_fun(src, dst)
2235 shutil.copyfileobj(src, dst)
2236 dst.seek(0)
2237 self.assertEqual(dst.read(), self.FILEDATA)
2238
2239 def test_exception_on_second_call(self):
2240 def sendfile(*args, **kwargs):
2241 if not flag:
2242 flag.append(None)
2243 return orig_sendfile(*args, **kwargs)
2244 else:
2245 raise OSError(errno.EBADF, "yo")
2246
2247 flag = []
2248 orig_sendfile = os.sendfile
2249 with unittest.mock.patch('os.sendfile', create=True,
2250 side_effect=sendfile):
2251 with self.get_files() as (src, dst):
2252 with self.assertRaises(OSError) as cm:
2253 shutil._fastcopy_sendfile(src, dst)
2254 assert flag
2255 self.assertEqual(cm.exception.errno, errno.EBADF)
2256
2257 def test_cant_get_size(self):
2258 # Emulate a case where src file size cannot be determined.
2259 # Internally bufsize will be set to a small value and
2260 # sendfile() will be called repeatedly.
2261 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2262 with self.get_files() as (src, dst):
2263 shutil._fastcopy_sendfile(src, dst)
2264 assert m.called
2265 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2266
2267 def test_small_chunks(self):
2268 # Force internal file size detection to be smaller than the
2269 # actual file size. We want to force sendfile() to be called
2270 # multiple times, also in order to emulate a src fd which gets
2271 # bigger while it is being copied.
2272 mock = unittest.mock.Mock()
2273 mock.st_size = 65536 + 1
2274 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2275 with self.get_files() as (src, dst):
2276 shutil._fastcopy_sendfile(src, dst)
2277 assert m.called
2278 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2279
2280 def test_big_chunk(self):
2281 # Force internal file size detection to be +100MB bigger than
2282 # the actual file size. Make sure sendfile() does not rely on
2283 # file size value except for (maybe) a better throughput /
2284 # performance.
2285 mock = unittest.mock.Mock()
2286 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2287 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2288 with self.get_files() as (src, dst):
2289 shutil._fastcopy_sendfile(src, dst)
2290 assert m.called
2291 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2292
2293 def test_blocksize_arg(self):
2294 with unittest.mock.patch('os.sendfile',
2295 side_effect=ZeroDivisionError) as m:
2296 self.assertRaises(ZeroDivisionError,
2297 shutil.copyfile, TESTFN, TESTFN2)
2298 blocksize = m.call_args[0][3]
2299 # Make sure file size and the block size arg passed to
2300 # sendfile() are the same.
2301 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2302 # ...unless we're dealing with a small file.
2303 support.unlink(TESTFN2)
2304 write_file(TESTFN2, b"hello", binary=True)
2305 self.addCleanup(support.unlink, TESTFN2 + '3')
2306 self.assertRaises(ZeroDivisionError,
2307 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2308 blocksize = m.call_args[0][3]
2309 self.assertEqual(blocksize, 2 ** 23)
2310
2311 def test_file2file_not_supported(self):
2312 # Emulate a case where sendfile() only support file->socket
2313 # fds. In such a case copyfile() is supposed to skip the
2314 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002315 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002316 try:
2317 with unittest.mock.patch(
2318 self.PATCHPOINT,
2319 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2320 with self.get_files() as (src, dst):
2321 with self.assertRaises(_GiveupOnFastCopy):
2322 shutil._fastcopy_sendfile(src, dst)
2323 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002324 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002325
2326 with unittest.mock.patch(self.PATCHPOINT) as m:
2327 shutil.copyfile(TESTFN, TESTFN2)
2328 assert not m.called
2329 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002330 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002331
2332
Victor Stinner937ee9e2018-06-26 02:11:06 +02002333@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002334class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002335 PATCHPOINT = "posix._fcopyfile"
2336
2337 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002338 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002339
2340
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002341class TermsizeTests(unittest.TestCase):
2342 def test_does_not_crash(self):
2343 """Check if get_terminal_size() returns a meaningful value.
2344
2345 There's no easy portable way to actually check the size of the
2346 terminal, so let's check if it returns something sensible instead.
2347 """
2348 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002349 self.assertGreaterEqual(size.columns, 0)
2350 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002351
2352 def test_os_environ_first(self):
2353 "Check if environment variables have precedence"
2354
2355 with support.EnvironmentVarGuard() as env:
2356 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002357 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002358 size = shutil.get_terminal_size()
2359 self.assertEqual(size.columns, 777)
2360
2361 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002362 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002363 env['LINES'] = '888'
2364 size = shutil.get_terminal_size()
2365 self.assertEqual(size.lines, 888)
2366
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002367 def test_bad_environ(self):
2368 with support.EnvironmentVarGuard() as env:
2369 env['COLUMNS'] = 'xxx'
2370 env['LINES'] = 'yyy'
2371 size = shutil.get_terminal_size()
2372 self.assertGreaterEqual(size.columns, 0)
2373 self.assertGreaterEqual(size.lines, 0)
2374
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002375 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002376 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2377 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002378 def test_stty_match(self):
2379 """Check if stty returns the same results ignoring env
2380
2381 This test will fail if stdin and stdout are connected to
2382 different terminals with different sizes. Nevertheless, such
2383 situations should be pretty rare.
2384 """
2385 try:
2386 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002387 except (FileNotFoundError, PermissionError,
2388 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002389 self.skipTest("stty invocation failed")
2390 expected = (int(size[1]), int(size[0])) # reversed order
2391
2392 with support.EnvironmentVarGuard() as env:
2393 del env['LINES']
2394 del env['COLUMNS']
2395 actual = shutil.get_terminal_size()
2396
2397 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002398
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002399 def test_fallback(self):
2400 with support.EnvironmentVarGuard() as env:
2401 del env['LINES']
2402 del env['COLUMNS']
2403
2404 # sys.__stdout__ has no fileno()
2405 with support.swap_attr(sys, '__stdout__', None):
2406 size = shutil.get_terminal_size(fallback=(10, 20))
2407 self.assertEqual(size.columns, 10)
2408 self.assertEqual(size.lines, 20)
2409
2410 # sys.__stdout__ is not a terminal on Unix
2411 # or fileno() not in (0, 1, 2) on Windows
2412 with open(os.devnull, 'w') as f, \
2413 support.swap_attr(sys, '__stdout__', f):
2414 size = shutil.get_terminal_size(fallback=(30, 40))
2415 self.assertEqual(size.columns, 30)
2416 self.assertEqual(size.lines, 40)
2417
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002418
Berker Peksag8083cd62014-11-01 11:04:06 +02002419class PublicAPITests(unittest.TestCase):
2420 """Ensures that the correct values are exposed in the public API."""
2421
2422 def test_module_all_attribute(self):
2423 self.assertTrue(hasattr(shutil, '__all__'))
2424 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2425 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2426 'SpecialFileError', 'ExecError', 'make_archive',
2427 'get_archive_formats', 'register_archive_format',
2428 'unregister_archive_format', 'get_unpack_formats',
2429 'register_unpack_format', 'unregister_unpack_format',
2430 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2431 'get_terminal_size', 'SameFileError']
2432 if hasattr(os, 'statvfs') or os.name == 'nt':
2433 target_api.append('disk_usage')
2434 self.assertEqual(set(shutil.__all__), set(target_api))
2435
2436
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002437if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002438 unittest.main()