blob: 208718bb1281050c0570a9c024cc156a46291f0f [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010037AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000038try:
39 import grp
40 import pwd
41 UID_GID_SUPPORT = True
42except ImportError:
43 UID_GID_SUPPORT = False
44
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040045def _fake_rename(*args, **kwargs):
46 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010047 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040048
49def mock_rename(func):
50 @functools.wraps(func)
51 def wrap(*args, **kwargs):
52 try:
53 builtin_rename = os.rename
54 os.rename = _fake_rename
55 return func(*args, **kwargs)
56 finally:
57 os.rename = builtin_rename
58 return wrap
59
Éric Araujoa7e33a12011-08-12 19:51:35 +020060def write_file(path, content, binary=False):
61 """Write *content* to a file located at *path*.
62
63 If *path* is a tuple instead of a string, os.path.join will be used to
64 make a path. If *binary* is true, the file will be opened in binary
65 mode.
66 """
67 if isinstance(path, tuple):
68 path = os.path.join(*path)
69 with open(path, 'wb' if binary else 'w') as fp:
70 fp.write(content)
71
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020072def write_test_file(path, size):
73 """Create a test file with an arbitrary size and random text content."""
74 def chunks(total, step):
75 assert total >= step
76 while total > step:
77 yield step
78 total -= step
79 if total:
80 yield total
81
82 bufsize = min(size, 8192)
83 chunk = b"".join([random.choice(string.ascii_letters).encode()
84 for i in range(bufsize)])
85 with open(path, 'wb') as f:
86 for csize in chunks(size, bufsize):
87 f.write(chunk)
88 assert os.path.getsize(path) == size
89
Éric Araujoa7e33a12011-08-12 19:51:35 +020090def read_file(path, binary=False):
91 """Return contents from a file located at *path*.
92
93 If *path* is a tuple instead of a string, os.path.join will be used to
94 make a path. If *binary* is true, the file will be opened in binary
95 mode.
96 """
97 if isinstance(path, tuple):
98 path = os.path.join(*path)
99 with open(path, 'rb' if binary else 'r') as fp:
100 return fp.read()
101
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300102def rlistdir(path):
103 res = []
104 for name in sorted(os.listdir(path)):
105 p = os.path.join(path, name)
106 if os.path.isdir(p) and not os.path.islink(p):
107 res.append(name + '/')
108 for n in rlistdir(p):
109 res.append(name + '/' + n)
110 else:
111 res.append(name)
112 return res
113
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200114def supports_file2file_sendfile():
115 # ...apparently Linux and Solaris are the only ones
116 if not hasattr(os, "sendfile"):
117 return False
118 srcname = None
119 dstname = None
120 try:
121 with tempfile.NamedTemporaryFile("wb", delete=False) as f:
122 srcname = f.name
123 f.write(b"0123456789")
124
125 with open(srcname, "rb") as src:
126 with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
127 dstname = f.name
128 infd = src.fileno()
129 outfd = dst.fileno()
130 try:
131 os.sendfile(outfd, infd, 0, 2)
132 except OSError:
133 return False
134 else:
135 return True
136 finally:
137 if srcname is not None:
138 support.unlink(srcname)
139 if dstname is not None:
140 support.unlink(dstname)
141
142
143SUPPORTS_SENDFILE = supports_file2file_sendfile()
144
Michael Feltef110b12019-02-18 12:02:44 +0100145# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
146# The AIX command 'dump -o program' gives XCOFF header information
147# The second word of the last line in the maxdata value
148# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
149def _maxdataOK():
150 if AIX and sys.maxsize == 2147483647:
151 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
152 maxdata=hdrs.split("\n")[-1].split()[1]
153 return int(maxdata,16) >= 0x20000000
154 else:
155 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200156
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000157class TestShutil(unittest.TestCase):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000158
159 def setUp(self):
160 super(TestShutil, self).setUp()
161 self.tempdirs = []
162
163 def tearDown(self):
164 super(TestShutil, self).tearDown()
165 while self.tempdirs:
166 d = self.tempdirs.pop()
167 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
168
Tarek Ziadé396fad72010-02-23 05:30:31 +0000169
170 def mkdtemp(self):
171 """Create a temporary directory that will be cleaned up.
172
173 Returns the path of the directory.
174 """
175 d = tempfile.mkdtemp()
176 self.tempdirs.append(d)
177 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000178
Hynek Schlawack3b527782012-06-25 13:27:31 +0200179 def test_rmtree_works_on_bytes(self):
180 tmp = self.mkdtemp()
181 victim = os.path.join(tmp, 'killme')
182 os.mkdir(victim)
183 write_file(os.path.join(victim, 'somefile'), 'foo')
184 victim = os.fsencode(victim)
185 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700186 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200187
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200188 @support.skip_unless_symlink
189 def test_rmtree_fails_on_symlink(self):
190 tmp = self.mkdtemp()
191 dir_ = os.path.join(tmp, 'dir')
192 os.mkdir(dir_)
193 link = os.path.join(tmp, 'link')
194 os.symlink(dir_, link)
195 self.assertRaises(OSError, shutil.rmtree, link)
196 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100197 self.assertTrue(os.path.lexists(link))
198 errors = []
199 def onerror(*args):
200 errors.append(args)
201 shutil.rmtree(link, onerror=onerror)
202 self.assertEqual(len(errors), 1)
203 self.assertIs(errors[0][0], os.path.islink)
204 self.assertEqual(errors[0][1], link)
205 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200206
207 @support.skip_unless_symlink
208 def test_rmtree_works_on_symlinks(self):
209 tmp = self.mkdtemp()
210 dir1 = os.path.join(tmp, 'dir1')
211 dir2 = os.path.join(dir1, 'dir2')
212 dir3 = os.path.join(tmp, 'dir3')
213 for d in dir1, dir2, dir3:
214 os.mkdir(d)
215 file1 = os.path.join(tmp, 'file1')
216 write_file(file1, 'foo')
217 link1 = os.path.join(dir1, 'link1')
218 os.symlink(dir2, link1)
219 link2 = os.path.join(dir1, 'link2')
220 os.symlink(dir3, link2)
221 link3 = os.path.join(dir1, 'link3')
222 os.symlink(file1, link3)
223 # make sure symlinks are removed but not followed
224 shutil.rmtree(dir1)
225 self.assertFalse(os.path.exists(dir1))
226 self.assertTrue(os.path.exists(dir3))
227 self.assertTrue(os.path.exists(file1))
228
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000229 def test_rmtree_errors(self):
230 # filename is guaranteed not to exist
231 filename = tempfile.mktemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100232 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
233 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100234 shutil.rmtree(filename, ignore_errors=True)
235
236 # existing file
237 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100238 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100239 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100240 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100241 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100242 # The reason for this rather odd construct is that Windows sprinkles
243 # a \*.* at the end of file names. But only sometimes on some buildbots
244 possible_args = [filename, os.path.join(filename, '*.*')]
245 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100246 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100247 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100248 shutil.rmtree(filename, ignore_errors=True)
249 self.assertTrue(os.path.exists(filename))
250 errors = []
251 def onerror(*args):
252 errors.append(args)
253 shutil.rmtree(filename, onerror=onerror)
254 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200255 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100256 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100257 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100258 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100259 self.assertIs(errors[1][0], os.rmdir)
260 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100261 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100262 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000263
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000264
Serhiy Storchaka43767632013-11-03 21:31:38 +0200265 @unittest.skipIf(sys.platform[:6] == 'cygwin',
266 "This test can't be run on Cygwin (issue #1071513).")
267 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
268 "This test can't be run reliably as root (issue #1076467).")
269 def test_on_error(self):
270 self.errorState = 0
271 os.mkdir(TESTFN)
272 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200273
Serhiy Storchaka43767632013-11-03 21:31:38 +0200274 self.child_file_path = os.path.join(TESTFN, 'a')
275 self.child_dir_path = os.path.join(TESTFN, 'b')
276 support.create_empty_file(self.child_file_path)
277 os.mkdir(self.child_dir_path)
278 old_dir_mode = os.stat(TESTFN).st_mode
279 old_child_file_mode = os.stat(self.child_file_path).st_mode
280 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
281 # Make unwritable.
282 new_mode = stat.S_IREAD|stat.S_IEXEC
283 os.chmod(self.child_file_path, new_mode)
284 os.chmod(self.child_dir_path, new_mode)
285 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000286
Serhiy Storchaka43767632013-11-03 21:31:38 +0200287 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
288 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
289 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200290
Serhiy Storchaka43767632013-11-03 21:31:38 +0200291 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
292 # Test whether onerror has actually been called.
293 self.assertEqual(self.errorState, 3,
294 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000295
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000296 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000297 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200298 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000299 # This function is run when shutil.rmtree fails.
300 # 99.9% of the time it initially fails to remove
301 # a file in the directory, so the first time through
302 # func is os.remove.
303 # However, some Linux machines running ZFS on
304 # FUSE experienced a failure earlier in the process
305 # at os.listdir. The first failure may legally
306 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200307 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200308 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200309 self.assertEqual(arg, self.child_file_path)
310 elif func is os.rmdir:
311 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000312 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200313 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200314 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000315 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200316 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000317 else:
318 self.assertEqual(func, os.rmdir)
319 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000320 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200321 self.errorState = 3
322
323 def test_rmtree_does_not_choke_on_failing_lstat(self):
324 try:
325 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200326 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200327 if fn != TESTFN:
328 raise OSError()
329 else:
330 return orig_lstat(fn)
331 os.lstat = raiser
332
333 os.mkdir(TESTFN)
334 write_file((TESTFN, 'foo'), 'foo')
335 shutil.rmtree(TESTFN)
336 finally:
337 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000338
Antoine Pitrou78091e62011-12-29 18:54:15 +0100339 @support.skip_unless_symlink
340 def test_copymode_follow_symlinks(self):
341 tmp_dir = self.mkdtemp()
342 src = os.path.join(tmp_dir, 'foo')
343 dst = os.path.join(tmp_dir, 'bar')
344 src_link = os.path.join(tmp_dir, 'baz')
345 dst_link = os.path.join(tmp_dir, 'quux')
346 write_file(src, 'foo')
347 write_file(dst, 'foo')
348 os.symlink(src, src_link)
349 os.symlink(dst, dst_link)
350 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
351 # file to file
352 os.chmod(dst, stat.S_IRWXO)
353 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
354 shutil.copymode(src, dst)
355 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou3f48ac92014-01-01 02:50:45 +0100356 # On Windows, os.chmod does not follow symlinks (issue #15411)
357 if os.name != 'nt':
358 # follow src link
359 os.chmod(dst, stat.S_IRWXO)
360 shutil.copymode(src_link, dst)
361 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
362 # follow dst link
363 os.chmod(dst, stat.S_IRWXO)
364 shutil.copymode(src, dst_link)
365 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
366 # follow both links
367 os.chmod(dst, stat.S_IRWXO)
368 shutil.copymode(src_link, dst_link)
369 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100370
371 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
372 @support.skip_unless_symlink
373 def test_copymode_symlink_to_symlink(self):
374 tmp_dir = self.mkdtemp()
375 src = os.path.join(tmp_dir, 'foo')
376 dst = os.path.join(tmp_dir, 'bar')
377 src_link = os.path.join(tmp_dir, 'baz')
378 dst_link = os.path.join(tmp_dir, 'quux')
379 write_file(src, 'foo')
380 write_file(dst, 'foo')
381 os.symlink(src, src_link)
382 os.symlink(dst, dst_link)
383 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
384 os.chmod(dst, stat.S_IRWXU)
385 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
386 # link to link
387 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700388 shutil.copymode(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100389 self.assertEqual(os.lstat(src_link).st_mode,
390 os.lstat(dst_link).st_mode)
391 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
392 # src link - use chmod
393 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700394 shutil.copymode(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100395 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
396 # dst link - use chmod
397 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700398 shutil.copymode(src, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100399 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
400
401 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
402 @support.skip_unless_symlink
403 def test_copymode_symlink_to_symlink_wo_lchmod(self):
404 tmp_dir = self.mkdtemp()
405 src = os.path.join(tmp_dir, 'foo')
406 dst = os.path.join(tmp_dir, 'bar')
407 src_link = os.path.join(tmp_dir, 'baz')
408 dst_link = os.path.join(tmp_dir, 'quux')
409 write_file(src, 'foo')
410 write_file(dst, 'foo')
411 os.symlink(src, src_link)
412 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700413 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
Antoine Pitrou78091e62011-12-29 18:54:15 +0100414
415 @support.skip_unless_symlink
416 def test_copystat_symlinks(self):
417 tmp_dir = self.mkdtemp()
418 src = os.path.join(tmp_dir, 'foo')
419 dst = os.path.join(tmp_dir, 'bar')
420 src_link = os.path.join(tmp_dir, 'baz')
421 dst_link = os.path.join(tmp_dir, 'qux')
422 write_file(src, 'foo')
423 src_stat = os.stat(src)
424 os.utime(src, (src_stat.st_atime,
425 src_stat.st_mtime - 42.0)) # ensure different mtimes
426 write_file(dst, 'bar')
427 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
428 os.symlink(src, src_link)
429 os.symlink(dst, dst_link)
430 if hasattr(os, 'lchmod'):
431 os.lchmod(src_link, stat.S_IRWXO)
432 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
433 os.lchflags(src_link, stat.UF_NODUMP)
434 src_link_stat = os.lstat(src_link)
435 # follow
436 if hasattr(os, 'lchmod'):
Larry Hastingsb4038062012-07-15 10:57:38 -0700437 shutil.copystat(src_link, dst_link, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100438 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
439 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700440 shutil.copystat(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100441 dst_link_stat = os.lstat(dst_link)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700442 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100443 for attr in 'st_atime', 'st_mtime':
444 # The modification times may be truncated in the new file.
445 self.assertLessEqual(getattr(src_link_stat, attr),
446 getattr(dst_link_stat, attr) + 1)
447 if hasattr(os, 'lchmod'):
448 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
449 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
450 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
451 # tell to follow but dst is not a link
Larry Hastingsb4038062012-07-15 10:57:38 -0700452 shutil.copystat(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100453 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
454 00000.1)
455
Ned Deilybaf75712012-05-10 17:05:19 -0700456 @unittest.skipUnless(hasattr(os, 'chflags') and
457 hasattr(errno, 'EOPNOTSUPP') and
458 hasattr(errno, 'ENOTSUP'),
459 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
460 def test_copystat_handles_harmless_chflags_errors(self):
461 tmpdir = self.mkdtemp()
462 file1 = os.path.join(tmpdir, 'file1')
463 file2 = os.path.join(tmpdir, 'file2')
464 write_file(file1, 'xxx')
465 write_file(file2, 'xxx')
466
467 def make_chflags_raiser(err):
468 ex = OSError()
469
Larry Hastings90867a52012-06-22 17:01:41 -0700470 def _chflags_raiser(path, flags, *, follow_symlinks=True):
Ned Deilybaf75712012-05-10 17:05:19 -0700471 ex.errno = err
472 raise ex
473 return _chflags_raiser
474 old_chflags = os.chflags
475 try:
476 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
477 os.chflags = make_chflags_raiser(err)
478 shutil.copystat(file1, file2)
479 # assert others errors break it
480 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
481 self.assertRaises(OSError, shutil.copystat, file1, file2)
482 finally:
483 os.chflags = old_chflags
484
Antoine Pitrou424246f2012-05-12 19:02:01 +0200485 @support.skip_unless_xattr
486 def test_copyxattr(self):
487 tmp_dir = self.mkdtemp()
488 src = os.path.join(tmp_dir, 'foo')
489 write_file(src, 'foo')
490 dst = os.path.join(tmp_dir, 'bar')
491 write_file(dst, 'bar')
492
493 # no xattr == no problem
494 shutil._copyxattr(src, dst)
495 # common case
496 os.setxattr(src, 'user.foo', b'42')
497 os.setxattr(src, 'user.bar', b'43')
498 shutil._copyxattr(src, dst)
Gregory P. Smith1093bf22014-01-17 12:01:22 -0800499 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200500 self.assertEqual(
501 os.getxattr(src, 'user.foo'),
502 os.getxattr(dst, 'user.foo'))
503 # check errors don't affect other attrs
504 os.remove(dst)
505 write_file(dst, 'bar')
506 os_error = OSError(errno.EPERM, 'EPERM')
507
Larry Hastings9cf065c2012-06-22 16:30:09 -0700508 def _raise_on_user_foo(fname, attr, val, **kwargs):
Antoine Pitrou424246f2012-05-12 19:02:01 +0200509 if attr == 'user.foo':
510 raise os_error
511 else:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700512 orig_setxattr(fname, attr, val, **kwargs)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200513 try:
514 orig_setxattr = os.setxattr
515 os.setxattr = _raise_on_user_foo
516 shutil._copyxattr(src, dst)
Antoine Pitrou61597d32012-05-12 23:37:35 +0200517 self.assertIn('user.bar', os.listxattr(dst))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200518 finally:
519 os.setxattr = orig_setxattr
Hynek Schlawack0beab052013-02-05 08:22:44 +0100520 # the source filesystem not supporting xattrs should be ok, too.
521 def _raise_on_src(fname, *, follow_symlinks=True):
522 if fname == src:
523 raise OSError(errno.ENOTSUP, 'Operation not supported')
524 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
525 try:
526 orig_listxattr = os.listxattr
527 os.listxattr = _raise_on_src
528 shutil._copyxattr(src, dst)
529 finally:
530 os.listxattr = orig_listxattr
Antoine Pitrou424246f2012-05-12 19:02:01 +0200531
Larry Hastingsad5ae042012-07-14 17:55:11 -0700532 # test that shutil.copystat copies xattrs
533 src = os.path.join(tmp_dir, 'the_original')
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500534 srcro = os.path.join(tmp_dir, 'the_original_ro')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700535 write_file(src, src)
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500536 write_file(srcro, srcro)
Larry Hastingsad5ae042012-07-14 17:55:11 -0700537 os.setxattr(src, 'user.the_value', b'fiddly')
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500538 os.setxattr(srcro, 'user.the_value', b'fiddly')
539 os.chmod(srcro, 0o444)
Larry Hastingsad5ae042012-07-14 17:55:11 -0700540 dst = os.path.join(tmp_dir, 'the_copy')
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500541 dstro = os.path.join(tmp_dir, 'the_copy_ro')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700542 write_file(dst, dst)
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500543 write_file(dstro, dstro)
Larry Hastingsad5ae042012-07-14 17:55:11 -0700544 shutil.copystat(src, dst)
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500545 shutil.copystat(srcro, dstro)
Hynek Schlawackc2d481f2012-07-16 17:11:10 +0200546 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
Olexa Bilaniuk79efbb72019-05-09 22:22:06 -0500547 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700548
Antoine Pitrou424246f2012-05-12 19:02:01 +0200549 @support.skip_unless_symlink
550 @support.skip_unless_xattr
551 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
552 'root privileges required')
553 def test_copyxattr_symlinks(self):
554 # On Linux, it's only possible to access non-user xattr for symlinks;
555 # which in turn require root privileges. This test should be expanded
556 # as soon as other platforms gain support for extended attributes.
557 tmp_dir = self.mkdtemp()
558 src = os.path.join(tmp_dir, 'foo')
559 src_link = os.path.join(tmp_dir, 'baz')
560 write_file(src, 'foo')
561 os.symlink(src, src_link)
562 os.setxattr(src, 'trusted.foo', b'42')
Larry Hastings9cf065c2012-06-22 16:30:09 -0700563 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200564 dst = os.path.join(tmp_dir, 'bar')
565 dst_link = os.path.join(tmp_dir, 'qux')
566 write_file(dst, 'bar')
567 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700568 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700569 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
Antoine Pitrou424246f2012-05-12 19:02:01 +0200570 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
Larry Hastingsb4038062012-07-15 10:57:38 -0700571 shutil._copyxattr(src_link, dst, follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200572 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
573
Antoine Pitrou78091e62011-12-29 18:54:15 +0100574 @support.skip_unless_symlink
575 def test_copy_symlinks(self):
576 tmp_dir = self.mkdtemp()
577 src = os.path.join(tmp_dir, 'foo')
578 dst = os.path.join(tmp_dir, 'bar')
579 src_link = os.path.join(tmp_dir, 'baz')
580 write_file(src, 'foo')
581 os.symlink(src, src_link)
582 if hasattr(os, 'lchmod'):
583 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
584 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700585 shutil.copy(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100586 self.assertFalse(os.path.islink(dst))
587 self.assertEqual(read_file(src), read_file(dst))
588 os.remove(dst)
589 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700590 shutil.copy(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100591 self.assertTrue(os.path.islink(dst))
592 self.assertEqual(os.readlink(dst), os.readlink(src_link))
593 if hasattr(os, 'lchmod'):
594 self.assertEqual(os.lstat(src_link).st_mode,
595 os.lstat(dst).st_mode)
596
597 @support.skip_unless_symlink
598 def test_copy2_symlinks(self):
599 tmp_dir = self.mkdtemp()
600 src = os.path.join(tmp_dir, 'foo')
601 dst = os.path.join(tmp_dir, 'bar')
602 src_link = os.path.join(tmp_dir, 'baz')
603 write_file(src, 'foo')
604 os.symlink(src, src_link)
605 if hasattr(os, 'lchmod'):
606 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
607 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
608 os.lchflags(src_link, stat.UF_NODUMP)
609 src_stat = os.stat(src)
610 src_link_stat = os.lstat(src_link)
611 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700612 shutil.copy2(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100613 self.assertFalse(os.path.islink(dst))
614 self.assertEqual(read_file(src), read_file(dst))
615 os.remove(dst)
616 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700617 shutil.copy2(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100618 self.assertTrue(os.path.islink(dst))
619 self.assertEqual(os.readlink(dst), os.readlink(src_link))
620 dst_stat = os.lstat(dst)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700621 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100622 for attr in 'st_atime', 'st_mtime':
623 # The modification times may be truncated in the new file.
624 self.assertLessEqual(getattr(src_link_stat, attr),
625 getattr(dst_stat, attr) + 1)
626 if hasattr(os, 'lchmod'):
627 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
628 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
629 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
630 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
631
Antoine Pitrou424246f2012-05-12 19:02:01 +0200632 @support.skip_unless_xattr
633 def test_copy2_xattr(self):
634 tmp_dir = self.mkdtemp()
635 src = os.path.join(tmp_dir, 'foo')
636 dst = os.path.join(tmp_dir, 'bar')
637 write_file(src, 'foo')
638 os.setxattr(src, 'user.foo', b'42')
639 shutil.copy2(src, dst)
640 self.assertEqual(
641 os.getxattr(src, 'user.foo'),
642 os.getxattr(dst, 'user.foo'))
643 os.remove(dst)
644
Antoine Pitrou78091e62011-12-29 18:54:15 +0100645 @support.skip_unless_symlink
646 def test_copyfile_symlinks(self):
647 tmp_dir = self.mkdtemp()
648 src = os.path.join(tmp_dir, 'src')
649 dst = os.path.join(tmp_dir, 'dst')
650 dst_link = os.path.join(tmp_dir, 'dst_link')
651 link = os.path.join(tmp_dir, 'link')
652 write_file(src, 'foo')
653 os.symlink(src, link)
654 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700655 shutil.copyfile(link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100656 self.assertTrue(os.path.islink(dst_link))
657 self.assertEqual(os.readlink(link), os.readlink(dst_link))
658 # follow
659 shutil.copyfile(link, dst)
660 self.assertFalse(os.path.islink(dst))
661
Hynek Schlawack2100b422012-06-23 20:28:32 +0200662 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200663 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
664 os.supports_dir_fd and
665 os.listdir in os.supports_fd and
666 os.stat in os.supports_follow_symlinks)
667 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200668 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000669 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200670 tmp_dir = self.mkdtemp()
671 d = os.path.join(tmp_dir, 'a')
672 os.mkdir(d)
673 try:
674 real_rmtree = shutil._rmtree_safe_fd
675 class Called(Exception): pass
676 def _raiser(*args, **kwargs):
677 raise Called
678 shutil._rmtree_safe_fd = _raiser
679 self.assertRaises(Called, shutil.rmtree, d)
680 finally:
681 shutil._rmtree_safe_fd = real_rmtree
682 else:
683 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000684 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200685
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000686 def test_rmtree_dont_delete_file(self):
687 # When called on a file instead of a directory, don't delete it.
688 handle, path = tempfile.mkstemp()
Victor Stinnerbf816222011-06-30 23:25:47 +0200689 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200690 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000691 os.remove(path)
692
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000693 def test_copytree_simple(self):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000694 src_dir = tempfile.mkdtemp()
695 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200696 self.addCleanup(shutil.rmtree, src_dir)
697 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
698 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000699 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200700 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000701
Éric Araujoa7e33a12011-08-12 19:51:35 +0200702 shutil.copytree(src_dir, dst_dir)
703 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
704 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
705 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
706 'test.txt')))
707 actual = read_file((dst_dir, 'test.txt'))
708 self.assertEqual(actual, '123')
709 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
710 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000711
jab9e00d9e2018-12-28 13:03:40 -0500712 def test_copytree_dirs_exist_ok(self):
713 src_dir = tempfile.mkdtemp()
714 dst_dir = tempfile.mkdtemp()
715 self.addCleanup(shutil.rmtree, src_dir)
716 self.addCleanup(shutil.rmtree, dst_dir)
717
718 write_file((src_dir, 'nonexisting.txt'), '123')
719 os.mkdir(os.path.join(src_dir, 'existing_dir'))
720 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
721 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
722 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
723
724 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
725 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
726 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
727 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
728 'existing.txt')))
729 actual = read_file((dst_dir, 'nonexisting.txt'))
730 self.assertEqual(actual, '123')
731 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
732 self.assertEqual(actual, 'has been replaced')
733
734 with self.assertRaises(FileExistsError):
735 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
736
Antoine Pitrou78091e62011-12-29 18:54:15 +0100737 @support.skip_unless_symlink
738 def test_copytree_symlinks(self):
739 tmp_dir = self.mkdtemp()
740 src_dir = os.path.join(tmp_dir, 'src')
741 dst_dir = os.path.join(tmp_dir, 'dst')
742 sub_dir = os.path.join(src_dir, 'sub')
743 os.mkdir(src_dir)
744 os.mkdir(sub_dir)
745 write_file((src_dir, 'file.txt'), 'foo')
746 src_link = os.path.join(sub_dir, 'link')
747 dst_link = os.path.join(dst_dir, 'sub/link')
748 os.symlink(os.path.join(src_dir, 'file.txt'),
749 src_link)
750 if hasattr(os, 'lchmod'):
751 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
752 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
753 os.lchflags(src_link, stat.UF_NODUMP)
754 src_stat = os.lstat(src_link)
755 shutil.copytree(src_dir, dst_dir, symlinks=True)
756 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
757 self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
758 os.path.join(src_dir, 'file.txt'))
759 dst_stat = os.lstat(dst_link)
760 if hasattr(os, 'lchmod'):
761 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
762 if hasattr(os, 'lchflags'):
763 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
764
Georg Brandl2ee470f2008-07-16 12:55:28 +0000765 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000766 # creating data
767 join = os.path.join
768 exists = os.path.exists
769 src_dir = tempfile.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000770 try:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000771 dst_dir = join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200772 write_file((src_dir, 'test.txt'), '123')
773 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000774 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200775 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000776 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200777 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000778 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
779 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200780 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
781 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000782
783 # testing glob-like patterns
784 try:
785 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
786 shutil.copytree(src_dir, dst_dir, ignore=patterns)
787 # checking the result: some elements should not be copied
788 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200789 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
790 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000791 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200792 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000793 try:
794 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
795 shutil.copytree(src_dir, dst_dir, ignore=patterns)
796 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200797 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
798 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
799 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000800 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200801 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000802
803 # testing callable-style
804 try:
805 def _filter(src, names):
806 res = []
807 for name in names:
808 path = os.path.join(src, name)
809
810 if (os.path.isdir(path) and
811 path.split()[-1] == 'subdir'):
812 res.append(name)
813 elif os.path.splitext(path)[-1] in ('.py'):
814 res.append(name)
815 return res
816
817 shutil.copytree(src_dir, dst_dir, ignore=_filter)
818
819 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200820 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
821 'test.py')))
822 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000823
824 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200825 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000826 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000827 shutil.rmtree(src_dir)
828 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000829
Antoine Pitrouac601602013-08-16 19:35:02 +0200830 def test_copytree_retains_permissions(self):
831 tmp_dir = tempfile.mkdtemp()
832 src_dir = os.path.join(tmp_dir, 'source')
833 os.mkdir(src_dir)
834 dst_dir = os.path.join(tmp_dir, 'destination')
835 self.addCleanup(shutil.rmtree, tmp_dir)
836
837 os.chmod(src_dir, 0o777)
838 write_file((src_dir, 'permissive.txt'), '123')
839 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
840 write_file((src_dir, 'restrictive.txt'), '456')
841 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
842 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
843 os.chmod(restrictive_subdir, 0o600)
844
845 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400846 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
847 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200848 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400849 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200850 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
851 restrictive_subdir_dst = os.path.join(dst_dir,
852 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400853 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200854 os.stat(restrictive_subdir_dst).st_mode)
855
Berker Peksag884afd92014-12-10 02:50:32 +0200856 @unittest.mock.patch('os.chmod')
857 def test_copytree_winerror(self, mock_patch):
858 # When copying to VFAT, copystat() raises OSError. On Windows, the
859 # exception object has a meaningful 'winerror' attribute, but not
860 # on other operating systems. Do not assume 'winerror' is set.
861 src_dir = tempfile.mkdtemp()
862 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
863 self.addCleanup(shutil.rmtree, src_dir)
864 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
865
866 mock_patch.side_effect = PermissionError('ka-boom')
867 with self.assertRaises(shutil.Error):
868 shutil.copytree(src_dir, dst_dir)
869
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100870 def test_copytree_custom_copy_function(self):
871 # See: https://bugs.python.org/issue35648
872 def custom_cpfun(a, b):
873 flag.append(None)
874 self.assertIsInstance(a, str)
875 self.assertIsInstance(b, str)
876 self.assertEqual(a, os.path.join(src, 'foo'))
877 self.assertEqual(b, os.path.join(dst, 'foo'))
878
879 flag = []
880 src = tempfile.mkdtemp()
881 dst = tempfile.mktemp()
882 self.addCleanup(shutil.rmtree, src)
883 with open(os.path.join(src, 'foo'), 'w') as f:
884 f.close()
885 shutil.copytree(src, dst, copy_function=custom_cpfun)
886 self.assertEqual(len(flag), 1)
887
Zachary Ware9fe6d862013-12-08 00:20:35 -0600888 @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000889 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000890 def test_dont_copy_file_onto_link_to_itself(self):
891 # bug 851123.
892 os.mkdir(TESTFN)
893 src = os.path.join(TESTFN, 'cheese')
894 dst = os.path.join(TESTFN, 'shop')
895 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000896 with open(src, 'w') as f:
897 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +0100898 try:
899 os.link(src, dst)
900 except PermissionError as e:
901 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +0200902 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000903 with open(src, 'r') as f:
904 self.assertEqual(f.read(), 'cheddar')
905 os.remove(dst)
906 finally:
907 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000908
Brian Curtin3b4499c2010-12-28 14:31:47 +0000909 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000910 def test_dont_copy_file_onto_symlink_to_itself(self):
911 # bug 851123.
912 os.mkdir(TESTFN)
913 src = os.path.join(TESTFN, 'cheese')
914 dst = os.path.join(TESTFN, 'shop')
915 try:
916 with open(src, 'w') as f:
917 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000918 # Using `src` here would mean we end up with a symlink pointing
919 # to TESTFN/TESTFN/cheese, while it should point at
920 # TESTFN/cheese.
921 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +0200922 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +0000923 with open(src, 'r') as f:
924 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000925 os.remove(dst)
926 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000927 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +0000928
Brian Curtin3b4499c2010-12-28 14:31:47 +0000929 @support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +0000930 def test_rmtree_on_symlink(self):
931 # bug 1669.
932 os.mkdir(TESTFN)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000933 try:
Brian Curtind40e6f72010-07-08 21:39:08 +0000934 src = os.path.join(TESTFN, 'cheese')
935 dst = os.path.join(TESTFN, 'shop')
936 os.mkdir(src)
937 os.symlink(src, dst)
938 self.assertRaises(OSError, shutil.rmtree, dst)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200939 shutil.rmtree(dst, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000940 finally:
Brian Curtind40e6f72010-07-08 21:39:08 +0000941 shutil.rmtree(TESTFN, ignore_errors=True)
942
Serhiy Storchaka43767632013-11-03 21:31:38 +0200943 # Issue #3002: copyfile and copytree block indefinitely on named pipes
944 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
945 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +0100946 try:
947 os.mkfifo(TESTFN)
948 except PermissionError as e:
949 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200950 try:
951 self.assertRaises(shutil.SpecialFileError,
952 shutil.copyfile, TESTFN, TESTFN2)
953 self.assertRaises(shutil.SpecialFileError,
954 shutil.copyfile, __file__, TESTFN)
955 finally:
956 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000957
Serhiy Storchaka43767632013-11-03 21:31:38 +0200958 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
959 @support.skip_unless_symlink
960 def test_copytree_named_pipe(self):
961 os.mkdir(TESTFN)
962 try:
963 subdir = os.path.join(TESTFN, "subdir")
964 os.mkdir(subdir)
965 pipe = os.path.join(subdir, "mypipe")
xdegaye92c2ca72017-11-12 17:31:07 +0100966 try:
967 os.mkfifo(pipe)
968 except PermissionError as e:
969 self.skipTest('os.mkfifo(): %s' % e)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000970 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200971 shutil.copytree(TESTFN, TESTFN2)
972 except shutil.Error as e:
973 errors = e.args[0]
974 self.assertEqual(len(errors), 1)
975 src, dst, error_msg = errors[0]
976 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
977 else:
978 self.fail("shutil.Error should have been raised")
979 finally:
980 shutil.rmtree(TESTFN, ignore_errors=True)
981 shutil.rmtree(TESTFN2, ignore_errors=True)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000982
Tarek Ziadé5340db32010-04-19 22:30:51 +0000983 def test_copytree_special_func(self):
984
985 src_dir = self.mkdtemp()
986 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200987 write_file((src_dir, 'test.txt'), '123')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000988 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200989 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000990
991 copied = []
992 def _copy(src, dst):
993 copied.append((src, dst))
994
995 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000996 self.assertEqual(len(copied), 2)
Tarek Ziadé5340db32010-04-19 22:30:51 +0000997
Brian Curtin3b4499c2010-12-28 14:31:47 +0000998 @support.skip_unless_symlink
Tarek Ziadéfb437512010-04-20 08:57:33 +0000999 def test_copytree_dangling_symlinks(self):
1000
1001 # a dangling symlink raises an error at the end
1002 src_dir = self.mkdtemp()
1003 dst_dir = os.path.join(self.mkdtemp(), 'destination')
1004 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
1005 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001006 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001007 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
1008
1009 # a dangling symlink is ignored with the proper flag
1010 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1011 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
1012 self.assertNotIn('test.txt', os.listdir(dst_dir))
1013
1014 # a dangling symlink is copied if symlinks=True
1015 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
1016 shutil.copytree(src_dir, dst_dir, symlinks=True)
1017 self.assertIn('test.txt', os.listdir(dst_dir))
1018
Berker Peksag5a294d82015-07-25 14:53:48 +03001019 @support.skip_unless_symlink
1020 def test_copytree_symlink_dir(self):
1021 src_dir = self.mkdtemp()
1022 dst_dir = os.path.join(self.mkdtemp(), 'destination')
1023 os.mkdir(os.path.join(src_dir, 'real_dir'))
1024 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
1025 pass
1026 os.symlink(os.path.join(src_dir, 'real_dir'),
1027 os.path.join(src_dir, 'link_to_dir'),
1028 target_is_directory=True)
1029
1030 shutil.copytree(src_dir, dst_dir, symlinks=False)
1031 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1032 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1033
1034 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1035 shutil.copytree(src_dir, dst_dir, symlinks=True)
1036 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1037 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1038
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001039 def _copy_file(self, method):
1040 fname = 'test.txt'
1041 tmpdir = self.mkdtemp()
Éric Araujoa7e33a12011-08-12 19:51:35 +02001042 write_file((tmpdir, fname), 'xxx')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001043 file1 = os.path.join(tmpdir, fname)
1044 tmpdir2 = self.mkdtemp()
1045 method(file1, tmpdir2)
1046 file2 = os.path.join(tmpdir2, fname)
1047 return (file1, file2)
1048
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001049 def test_copy(self):
1050 # Ensure that the copied file exists and has the same mode bits.
1051 file1, file2 = self._copy_file(shutil.copy)
1052 self.assertTrue(os.path.exists(file2))
1053 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1054
Senthil Kumaran0c2dba52011-07-03 18:21:38 -07001055 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001056 def test_copy2(self):
1057 # Ensure that the copied file exists and has the same mode and
1058 # modification time bits.
1059 file1, file2 = self._copy_file(shutil.copy2)
1060 self.assertTrue(os.path.exists(file2))
1061 file1_stat = os.stat(file1)
1062 file2_stat = os.stat(file2)
1063 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1064 for attr in 'st_atime', 'st_mtime':
1065 # The modification times may be truncated in the new file.
1066 self.assertLessEqual(getattr(file1_stat, attr),
1067 getattr(file2_stat, attr) + 1)
1068 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1069 self.assertEqual(getattr(file1_stat, 'st_flags'),
1070 getattr(file2_stat, 'st_flags'))
1071
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001072 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001073 def test_make_tarball(self):
1074 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001075 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001076
1077 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001078 # force shutil to create the directory
1079 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001080 # working with relative paths
1081 work_dir = os.path.dirname(tmpdir2)
1082 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001083
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001084 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001085 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001086 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001087
1088 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001089 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001090 self.assertTrue(os.path.isfile(tarball))
1091 self.assertTrue(tarfile.is_tarfile(tarball))
1092 with tarfile.open(tarball, 'r:gz') as tf:
1093 self.assertCountEqual(tf.getnames(),
1094 ['.', './sub', './sub2',
1095 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001096
1097 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001098 with support.change_cwd(work_dir):
1099 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001100 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001101 self.assertTrue(os.path.isfile(tarball))
1102 self.assertTrue(tarfile.is_tarfile(tarball))
1103 with tarfile.open(tarball, 'r') as tf:
1104 self.assertCountEqual(tf.getnames(),
1105 ['.', './sub', './sub2',
1106 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001107
1108 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001109 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001110 names = tar.getnames()
1111 names.sort()
1112 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001113
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001114 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001115 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001116 root_dir = self.mkdtemp()
1117 dist = os.path.join(root_dir, base_dir)
1118 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001119 write_file((dist, 'file1'), 'xxx')
1120 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001121 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001122 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001123 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001124 if base_dir:
1125 write_file((root_dir, 'outer'), 'xxx')
1126 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001127
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001128 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001129 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001130 'Need the tar command to run')
1131 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001132 root_dir, base_dir = self._create_files()
1133 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001134 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001135
1136 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001137 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001138 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001139
1140 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001141 tarball2 = os.path.join(root_dir, 'archive2.tar')
1142 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001143 subprocess.check_call(tar_cmd, cwd=root_dir,
1144 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001145
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001146 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001147 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001148 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001149
1150 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001151 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1152 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001153 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001154
1155 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001156 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1157 dry_run=True)
1158 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001159 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001160
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001161 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001162 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001163 # creating something to zip
1164 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001165
1166 tmpdir2 = self.mkdtemp()
1167 # force shutil to create the directory
1168 os.rmdir(tmpdir2)
1169 # working with relative paths
1170 work_dir = os.path.dirname(tmpdir2)
1171 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001172
1173 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001174 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001175 res = make_archive(rel_base_name, 'zip', root_dir)
1176
1177 self.assertEqual(res, base_name + '.zip')
1178 self.assertTrue(os.path.isfile(res))
1179 self.assertTrue(zipfile.is_zipfile(res))
1180 with zipfile.ZipFile(res) as zf:
1181 self.assertCountEqual(zf.namelist(),
1182 ['dist/', 'dist/sub/', 'dist/sub2/',
1183 'dist/file1', 'dist/file2', 'dist/sub/file3',
1184 'outer'])
1185
1186 with support.change_cwd(work_dir):
1187 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001188 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001189
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001190 self.assertEqual(res, base_name + '.zip')
1191 self.assertTrue(os.path.isfile(res))
1192 self.assertTrue(zipfile.is_zipfile(res))
1193 with zipfile.ZipFile(res) as zf:
1194 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001195 ['dist/', 'dist/sub/', 'dist/sub2/',
1196 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001197
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001198 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001199 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001200 'Need the zip command to run')
1201 def test_zipfile_vs_zip(self):
1202 root_dir, base_dir = self._create_files()
1203 base_name = os.path.join(self.mkdtemp(), 'archive')
1204 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1205
1206 # check if ZIP file was created
1207 self.assertEqual(archive, base_name + '.zip')
1208 self.assertTrue(os.path.isfile(archive))
1209
1210 # now create another ZIP file using `zip`
1211 archive2 = os.path.join(root_dir, 'archive2.zip')
1212 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001213 subprocess.check_call(zip_cmd, cwd=root_dir,
1214 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001215
1216 self.assertTrue(os.path.isfile(archive2))
1217 # let's compare both ZIP files
1218 with zipfile.ZipFile(archive) as zf:
1219 names = zf.namelist()
1220 with zipfile.ZipFile(archive2) as zf:
1221 names2 = zf.namelist()
1222 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001223
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001224 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001225 @unittest.skipUnless(shutil.which('unzip'),
1226 'Need the unzip command to run')
1227 def test_unzip_zipfile(self):
1228 root_dir, base_dir = self._create_files()
1229 base_name = os.path.join(self.mkdtemp(), 'archive')
1230 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1231
1232 # check if ZIP file was created
1233 self.assertEqual(archive, base_name + '.zip')
1234 self.assertTrue(os.path.isfile(archive))
1235
1236 # now check the ZIP file using `unzip -t`
1237 zip_cmd = ['unzip', '-t', archive]
1238 with support.change_cwd(root_dir):
1239 try:
1240 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1241 except subprocess.CalledProcessError as exc:
1242 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001243 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001244 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001245 msg = "{}\n\n**Unzip Output**\n{}"
1246 self.fail(msg.format(exc, details))
1247
Tarek Ziadé396fad72010-02-23 05:30:31 +00001248 def test_make_archive(self):
1249 tmpdir = self.mkdtemp()
1250 base_name = os.path.join(tmpdir, 'archive')
1251 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1252
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001253 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001254 def test_make_archive_owner_group(self):
1255 # testing make_archive with owner and group, with various combinations
1256 # this works even if there's not gid/uid support
1257 if UID_GID_SUPPORT:
1258 group = grp.getgrgid(0)[0]
1259 owner = pwd.getpwuid(0)[0]
1260 else:
1261 group = owner = 'root'
1262
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001263 root_dir, base_dir = self._create_files()
1264 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001265 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1266 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001267 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001268
1269 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001270 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001271
1272 res = make_archive(base_name, 'tar', root_dir, base_dir,
1273 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001274 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001275
1276 res = make_archive(base_name, 'tar', root_dir, base_dir,
1277 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001278 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001279
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001280
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001281 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001282 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1283 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001284 root_dir, base_dir = self._create_files()
1285 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001286 group = grp.getgrgid(0)[0]
1287 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001288 with support.change_cwd(root_dir):
1289 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1290 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001291
1292 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001293 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001294
1295 # now checks the rights
1296 archive = tarfile.open(archive_name)
1297 try:
1298 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001299 self.assertEqual(member.uid, 0)
1300 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001301 finally:
1302 archive.close()
1303
1304 def test_make_archive_cwd(self):
1305 current_dir = os.getcwd()
1306 def _breaks(*args, **kw):
1307 raise RuntimeError()
1308
1309 register_archive_format('xxx', _breaks, [], 'xxx file')
1310 try:
1311 try:
1312 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1313 except Exception:
1314 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001315 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001316 finally:
1317 unregister_archive_format('xxx')
1318
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001319 def test_make_tarfile_in_curdir(self):
1320 # Issue #21280
1321 root_dir = self.mkdtemp()
1322 with support.change_cwd(root_dir):
1323 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1324 self.assertTrue(os.path.isfile('test.tar'))
1325
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001326 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001327 def test_make_zipfile_in_curdir(self):
1328 # Issue #21280
1329 root_dir = self.mkdtemp()
1330 with support.change_cwd(root_dir):
1331 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1332 self.assertTrue(os.path.isfile('test.zip'))
1333
Tarek Ziadé396fad72010-02-23 05:30:31 +00001334 def test_register_archive_format(self):
1335
1336 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1337 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1338 1)
1339 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1340 [(1, 2), (1, 2, 3)])
1341
1342 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1343 formats = [name for name, params in get_archive_formats()]
1344 self.assertIn('xxx', formats)
1345
1346 unregister_archive_format('xxx')
1347 formats = [name for name, params in get_archive_formats()]
1348 self.assertNotIn('xxx', formats)
1349
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001350 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001351 self.check_unpack_archive_with_converter(format, lambda path: path)
1352 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001353 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001354
1355 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001356 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001357 expected = rlistdir(root_dir)
1358 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001359
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001360 base_name = os.path.join(self.mkdtemp(), 'archive')
1361 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001362
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001363 # let's try to unpack it now
1364 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001365 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001366 self.assertEqual(rlistdir(tmpdir2), expected)
1367
1368 # and again, this time with the format specified
1369 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001370 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001371 self.assertEqual(rlistdir(tmpdir3), expected)
1372
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001373 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1374 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001375
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001376 def test_unpack_archive_tar(self):
1377 self.check_unpack_archive('tar')
1378
1379 @support.requires_zlib
1380 def test_unpack_archive_gztar(self):
1381 self.check_unpack_archive('gztar')
1382
1383 @support.requires_bz2
1384 def test_unpack_archive_bztar(self):
1385 self.check_unpack_archive('bztar')
1386
1387 @support.requires_lzma
Michael Feltef110b12019-02-18 12:02:44 +01001388 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001389 def test_unpack_archive_xztar(self):
1390 self.check_unpack_archive('xztar')
1391
1392 @support.requires_zlib
1393 def test_unpack_archive_zip(self):
1394 self.check_unpack_archive('zip')
1395
Martin Pantereb995702016-07-28 01:11:04 +00001396 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001397
1398 formats = get_unpack_formats()
1399
1400 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001401 self.assertEqual(extra, 1)
1402 self.assertEqual(filename, 'stuff.boo')
1403 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001404
1405 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1406 unpack_archive('stuff.boo', 'xx')
1407
1408 # trying to register a .boo unpacker again
1409 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1410 ['.boo'], _boo)
1411
1412 # should work now
1413 unregister_unpack_format('Boo')
1414 register_unpack_format('Boo2', ['.boo'], _boo)
1415 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1416 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1417
1418 # let's leave a clean state
1419 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001420 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001421
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001422 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1423 "disk_usage not available on this platform")
1424 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001425 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001426 for attr in ('total', 'used', 'free'):
1427 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001428 self.assertGreater(usage.total, 0)
1429 self.assertGreater(usage.used, 0)
1430 self.assertGreaterEqual(usage.free, 0)
1431 self.assertGreaterEqual(usage.total, usage.used)
1432 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001433
Victor Stinnerdc525f42018-12-11 12:05:21 +01001434 # bpo-32557: Check that disk_usage() also accepts a filename
1435 shutil.disk_usage(__file__)
1436
Sandro Tosid902a142011-08-22 23:28:27 +02001437 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1438 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1439 def test_chown(self):
1440
1441 # cleaned-up automatically by TestShutil.tearDown method
1442 dirname = self.mkdtemp()
1443 filename = tempfile.mktemp(dir=dirname)
1444 write_file(filename, 'testing chown function')
1445
1446 with self.assertRaises(ValueError):
1447 shutil.chown(filename)
1448
1449 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001450 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001451
1452 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001453 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001454
1455 with self.assertRaises(TypeError):
1456 shutil.chown(filename, b'spam')
1457
1458 with self.assertRaises(TypeError):
1459 shutil.chown(filename, 3.14)
1460
1461 uid = os.getuid()
1462 gid = os.getgid()
1463
1464 def check_chown(path, uid=None, gid=None):
1465 s = os.stat(filename)
1466 if uid is not None:
1467 self.assertEqual(uid, s.st_uid)
1468 if gid is not None:
1469 self.assertEqual(gid, s.st_gid)
1470
1471 shutil.chown(filename, uid, gid)
1472 check_chown(filename, uid, gid)
1473 shutil.chown(filename, uid)
1474 check_chown(filename, uid)
1475 shutil.chown(filename, user=uid)
1476 check_chown(filename, uid)
1477 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001478 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001479
1480 shutil.chown(dirname, uid, gid)
1481 check_chown(dirname, uid, gid)
1482 shutil.chown(dirname, uid)
1483 check_chown(dirname, uid)
1484 shutil.chown(dirname, user=uid)
1485 check_chown(dirname, uid)
1486 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001487 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001488
1489 user = pwd.getpwuid(uid)[0]
1490 group = grp.getgrgid(gid)[0]
1491 shutil.chown(filename, user, group)
1492 check_chown(filename, uid, gid)
1493 shutil.chown(dirname, user, group)
1494 check_chown(dirname, uid, gid)
1495
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001496 def test_copy_return_value(self):
1497 # copy and copy2 both return their destination path.
1498 for fn in (shutil.copy, shutil.copy2):
1499 src_dir = self.mkdtemp()
1500 dst_dir = self.mkdtemp()
1501 src = os.path.join(src_dir, 'foo')
1502 write_file(src, 'foo')
1503 rv = fn(src, dst_dir)
1504 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1505 rv = fn(src, os.path.join(dst_dir, 'bar'))
1506 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1507
1508 def test_copyfile_return_value(self):
1509 # copytree returns its destination path.
1510 src_dir = self.mkdtemp()
1511 dst_dir = self.mkdtemp()
1512 dst_file = os.path.join(dst_dir, 'bar')
1513 src_file = os.path.join(src_dir, 'foo')
1514 write_file(src_file, 'foo')
1515 rv = shutil.copyfile(src_file, dst_file)
1516 self.assertTrue(os.path.exists(rv))
1517 self.assertEqual(read_file(src_file), read_file(dst_file))
1518
Hynek Schlawack48653762012-10-07 12:49:58 +02001519 def test_copyfile_same_file(self):
1520 # copyfile() should raise SameFileError if the source and destination
1521 # are the same.
1522 src_dir = self.mkdtemp()
1523 src_file = os.path.join(src_dir, 'foo')
1524 write_file(src_file, 'foo')
1525 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
Hynek Schlawack27ddb572012-10-28 13:59:27 +01001526 # But Error should work too, to stay backward compatible.
1527 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001528 # Make sure file is not corrupted.
1529 self.assertEqual(read_file(src_file), 'foo')
Hynek Schlawack48653762012-10-07 12:49:58 +02001530
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001531 def test_copytree_return_value(self):
1532 # copytree returns its destination path.
1533 src_dir = self.mkdtemp()
1534 dst_dir = src_dir + "dest"
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001535 self.addCleanup(shutil.rmtree, dst_dir, True)
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001536 src = os.path.join(src_dir, 'foo')
1537 write_file(src, 'foo')
1538 rv = shutil.copytree(src_dir, dst_dir)
1539 self.assertEqual(['foo'], os.listdir(rv))
1540
Christian Heimes9bd667a2008-01-20 15:14:11 +00001541
Brian Curtinc57a3452012-06-22 16:00:30 -05001542class TestWhich(unittest.TestCase):
1543
1544 def setUp(self):
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001545 self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001546 self.addCleanup(shutil.rmtree, self.temp_dir, True)
Brian Curtinc57a3452012-06-22 16:00:30 -05001547 # Give the temp_file an ".exe" suffix for all.
1548 # It's needed on Windows and not harmful on other platforms.
1549 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001550 prefix="Tmp",
1551 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001552 os.chmod(self.temp_file.name, stat.S_IXUSR)
1553 self.addCleanup(self.temp_file.close)
1554 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001555 self.env_path = self.dir
1556 self.curdir = os.curdir
1557 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001558
1559 def test_basic(self):
1560 # Given an EXE in a directory, it should be returned.
1561 rv = shutil.which(self.file, path=self.dir)
1562 self.assertEqual(rv, self.temp_file.name)
1563
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001564 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001565 # When given the fully qualified path to an executable that exists,
1566 # it should be returned.
1567 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001568 self.assertEqual(rv, self.temp_file.name)
1569
1570 def test_relative_cmd(self):
1571 # When given the relative path with a directory part to an executable
1572 # that exists, it should be returned.
1573 base_dir, tail_dir = os.path.split(self.dir)
1574 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001575 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001576 rv = shutil.which(relpath, path=self.temp_dir)
1577 self.assertEqual(rv, relpath)
1578 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001579 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001580 rv = shutil.which(relpath, path=base_dir)
1581 self.assertIsNone(rv)
1582
1583 def test_cwd(self):
1584 # Issue #16957
1585 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001586 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001587 rv = shutil.which(self.file, path=base_dir)
1588 if sys.platform == "win32":
1589 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001590 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001591 else:
1592 # Other platforms: shouldn't match in the current directory.
1593 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001594
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001595 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1596 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001597 def test_non_matching_mode(self):
1598 # Set the file read-only and ask for writeable files.
1599 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001600 if os.access(self.temp_file.name, os.W_OK):
1601 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001602 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1603 self.assertIsNone(rv)
1604
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001605 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001606 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001607 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001608 rv = shutil.which(self.file, path=tail_dir)
1609 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001610
Brian Curtinc57a3452012-06-22 16:00:30 -05001611 def test_nonexistent_file(self):
1612 # Return None when no matching executable file is found on the path.
1613 rv = shutil.which("foo.exe", path=self.dir)
1614 self.assertIsNone(rv)
1615
1616 @unittest.skipUnless(sys.platform == "win32",
1617 "pathext check is Windows-only")
1618 def test_pathext_checking(self):
1619 # Ask for the file without the ".exe" extension, then ensure that
1620 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001621 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001622 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001623
Barry Warsaw618738b2013-04-16 11:05:03 -04001624 def test_environ_path(self):
1625 with support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001626 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001627 rv = shutil.which(self.file)
1628 self.assertEqual(rv, self.temp_file.name)
1629
Victor Stinner228a3c92019-04-17 16:26:36 +02001630 def test_environ_path_empty(self):
1631 # PATH='': no match
1632 with support.EnvironmentVarGuard() as env:
1633 env['PATH'] = ''
1634 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1635 create=True), \
1636 support.swap_attr(os, 'defpath', self.dir), \
1637 support.change_cwd(self.dir):
1638 rv = shutil.which(self.file)
1639 self.assertIsNone(rv)
1640
1641 def test_environ_path_cwd(self):
1642 expected_cwd = os.path.basename(self.temp_file.name)
1643 if sys.platform == "win32":
1644 curdir = os.curdir
1645 if isinstance(expected_cwd, bytes):
1646 curdir = os.fsencode(curdir)
1647 expected_cwd = os.path.join(curdir, expected_cwd)
1648
1649 # PATH=':': explicitly looks in the current directory
1650 with support.EnvironmentVarGuard() as env:
1651 env['PATH'] = os.pathsep
1652 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1653 create=True), \
1654 support.swap_attr(os, 'defpath', self.dir):
1655 rv = shutil.which(self.file)
1656 self.assertIsNone(rv)
1657
1658 # look in current directory
1659 with support.change_cwd(self.dir):
1660 rv = shutil.which(self.file)
1661 self.assertEqual(rv, expected_cwd)
1662
1663 def test_environ_path_missing(self):
1664 with support.EnvironmentVarGuard() as env:
1665 env.pop('PATH', None)
1666
1667 # without confstr
1668 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1669 create=True), \
1670 support.swap_attr(os, 'defpath', self.dir):
1671 rv = shutil.which(self.file)
1672 self.assertEqual(rv, self.temp_file.name)
1673
1674 # with confstr
1675 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1676 create=True), \
1677 support.swap_attr(os, 'defpath', ''):
1678 rv = shutil.which(self.file)
1679 self.assertEqual(rv, self.temp_file.name)
1680
Barry Warsaw618738b2013-04-16 11:05:03 -04001681 def test_empty_path(self):
1682 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001683 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001684 support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001685 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001686 rv = shutil.which(self.file, path='')
1687 self.assertIsNone(rv)
1688
1689 def test_empty_path_no_PATH(self):
1690 with support.EnvironmentVarGuard() as env:
1691 env.pop('PATH', None)
1692 rv = shutil.which(self.file)
1693 self.assertIsNone(rv)
1694
Victor Stinner228a3c92019-04-17 16:26:36 +02001695 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1696 def test_pathext(self):
1697 ext = ".xyz"
1698 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1699 prefix="Tmp2", suffix=ext)
1700 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1701 self.addCleanup(temp_filexyz.close)
1702
1703 # strip path and extension
1704 program = os.path.basename(temp_filexyz.name)
1705 program = os.path.splitext(program)[0]
1706
1707 with support.EnvironmentVarGuard() as env:
1708 env['PATHEXT'] = ext
1709 rv = shutil.which(program, path=self.temp_dir)
1710 self.assertEqual(rv, temp_filexyz.name)
1711
Brian Curtinc57a3452012-06-22 16:00:30 -05001712
Cheryl Sabella5680f652019-02-13 06:25:10 -05001713class TestWhichBytes(TestWhich):
1714 def setUp(self):
1715 TestWhich.setUp(self)
1716 self.dir = os.fsencode(self.dir)
1717 self.file = os.fsencode(self.file)
1718 self.temp_file.name = os.fsencode(self.temp_file.name)
1719 self.curdir = os.fsencode(self.curdir)
1720 self.ext = os.fsencode(self.ext)
1721
1722
Christian Heimesada8c3b2008-03-18 18:26:33 +00001723class TestMove(unittest.TestCase):
1724
1725 def setUp(self):
1726 filename = "foo"
1727 self.src_dir = tempfile.mkdtemp()
1728 self.dst_dir = tempfile.mkdtemp()
1729 self.src_file = os.path.join(self.src_dir, filename)
1730 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001731 with open(self.src_file, "wb") as f:
1732 f.write(b"spam")
1733
1734 def tearDown(self):
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001735 for d in (self.src_dir, self.dst_dir):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001736 try:
1737 if d:
1738 shutil.rmtree(d)
1739 except:
1740 pass
1741
1742 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001743 with open(src, "rb") as f:
1744 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001745 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001746 with open(real_dst, "rb") as f:
1747 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001748 self.assertFalse(os.path.exists(src))
1749
1750 def _check_move_dir(self, src, dst, real_dst):
1751 contents = sorted(os.listdir(src))
1752 shutil.move(src, dst)
1753 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1754 self.assertFalse(os.path.exists(src))
1755
1756 def test_move_file(self):
1757 # Move a file to another location on the same filesystem.
1758 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1759
1760 def test_move_file_to_dir(self):
1761 # Move a file inside an existing dir on the same filesystem.
1762 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1763
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001764 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001765 def test_move_file_other_fs(self):
1766 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001767 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001768
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001769 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001770 def test_move_file_to_dir_other_fs(self):
1771 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001772 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001773
1774 def test_move_dir(self):
1775 # Move a dir to another location on the same filesystem.
1776 dst_dir = tempfile.mktemp()
1777 try:
1778 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1779 finally:
1780 try:
1781 shutil.rmtree(dst_dir)
1782 except:
1783 pass
1784
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001785 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001786 def test_move_dir_other_fs(self):
1787 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001788 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001789
1790 def test_move_dir_to_dir(self):
1791 # Move a dir inside an existing dir on the same filesystem.
1792 self._check_move_dir(self.src_dir, self.dst_dir,
1793 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1794
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001795 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001796 def test_move_dir_to_dir_other_fs(self):
1797 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001798 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001799
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001800 def test_move_dir_sep_to_dir(self):
1801 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1802 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1803
1804 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1805 def test_move_dir_altsep_to_dir(self):
1806 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1807 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1808
Christian Heimesada8c3b2008-03-18 18:26:33 +00001809 def test_existing_file_inside_dest_dir(self):
1810 # A file with the same name inside the destination dir already exists.
1811 with open(self.dst_file, "wb"):
1812 pass
1813 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1814
1815 def test_dont_move_dir_in_itself(self):
1816 # Moving a dir inside itself raises an Error.
1817 dst = os.path.join(self.src_dir, "bar")
1818 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1819
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001820 def test_destinsrc_false_negative(self):
1821 os.mkdir(TESTFN)
1822 try:
1823 for src, dst in [('srcdir', 'srcdir/dest')]:
1824 src = os.path.join(TESTFN, src)
1825 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001826 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001827 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001828 'dst (%s) is not in src (%s)' % (dst, src))
1829 finally:
1830 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001831
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001832 def test_destinsrc_false_positive(self):
1833 os.mkdir(TESTFN)
1834 try:
1835 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1836 src = os.path.join(TESTFN, src)
1837 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001838 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001839 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001840 'dst (%s) is in src (%s)' % (dst, src))
1841 finally:
1842 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001843
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001844 @support.skip_unless_symlink
1845 @mock_rename
1846 def test_move_file_symlink(self):
1847 dst = os.path.join(self.src_dir, 'bar')
1848 os.symlink(self.src_file, dst)
1849 shutil.move(dst, self.dst_file)
1850 self.assertTrue(os.path.islink(self.dst_file))
1851 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1852
1853 @support.skip_unless_symlink
1854 @mock_rename
1855 def test_move_file_symlink_to_dir(self):
1856 filename = "bar"
1857 dst = os.path.join(self.src_dir, filename)
1858 os.symlink(self.src_file, dst)
1859 shutil.move(dst, self.dst_dir)
1860 final_link = os.path.join(self.dst_dir, filename)
1861 self.assertTrue(os.path.islink(final_link))
1862 self.assertTrue(os.path.samefile(self.src_file, final_link))
1863
1864 @support.skip_unless_symlink
1865 @mock_rename
1866 def test_move_dangling_symlink(self):
1867 src = os.path.join(self.src_dir, 'baz')
1868 dst = os.path.join(self.src_dir, 'bar')
1869 os.symlink(src, dst)
1870 dst_link = os.path.join(self.dst_dir, 'quux')
1871 shutil.move(dst, dst_link)
1872 self.assertTrue(os.path.islink(dst_link))
Antoine Pitrou3f48ac92014-01-01 02:50:45 +01001873 # On Windows, os.path.realpath does not follow symlinks (issue #9949)
1874 if os.name == 'nt':
1875 self.assertEqual(os.path.realpath(src), os.readlink(dst_link))
1876 else:
1877 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001878
1879 @support.skip_unless_symlink
1880 @mock_rename
1881 def test_move_dir_symlink(self):
1882 src = os.path.join(self.src_dir, 'baz')
1883 dst = os.path.join(self.src_dir, 'bar')
1884 os.mkdir(src)
1885 os.symlink(src, dst)
1886 dst_link = os.path.join(self.dst_dir, 'quux')
1887 shutil.move(dst, dst_link)
1888 self.assertTrue(os.path.islink(dst_link))
1889 self.assertTrue(os.path.samefile(src, dst_link))
1890
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001891 def test_move_return_value(self):
1892 rv = shutil.move(self.src_file, self.dst_dir)
1893 self.assertEqual(rv,
1894 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1895
1896 def test_move_as_rename_return_value(self):
1897 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1898 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1899
R David Murray6ffface2014-06-11 14:40:13 -04001900 @mock_rename
1901 def test_move_file_special_function(self):
1902 moved = []
1903 def _copy(src, dst):
1904 moved.append((src, dst))
1905 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1906 self.assertEqual(len(moved), 1)
1907
1908 @mock_rename
1909 def test_move_dir_special_function(self):
1910 moved = []
1911 def _copy(src, dst):
1912 moved.append((src, dst))
1913 support.create_empty_file(os.path.join(self.src_dir, 'child'))
1914 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1915 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1916 self.assertEqual(len(moved), 3)
1917
Tarek Ziadé5340db32010-04-19 22:30:51 +00001918
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001919class TestCopyFile(unittest.TestCase):
1920
1921 _delete = False
1922
1923 class Faux(object):
1924 _entered = False
1925 _exited_with = None
1926 _raised = False
1927 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
1928 self._raise_in_exit = raise_in_exit
1929 self._suppress_at_exit = suppress_at_exit
1930 def read(self, *args):
1931 return ''
1932 def __enter__(self):
1933 self._entered = True
1934 def __exit__(self, exc_type, exc_val, exc_tb):
1935 self._exited_with = exc_type, exc_val, exc_tb
1936 if self._raise_in_exit:
1937 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001938 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001939 return self._suppress_at_exit
1940
1941 def tearDown(self):
1942 if self._delete:
1943 del shutil.open
1944
1945 def _set_shutil_open(self, func):
1946 shutil.open = func
1947 self._delete = True
1948
1949 def test_w_source_open_fails(self):
1950 def _open(filename, mode='r'):
1951 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001952 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001953 assert 0 # shouldn't reach here.
1954
1955 self._set_shutil_open(_open)
1956
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001957 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001958
Victor Stinner937ee9e2018-06-26 02:11:06 +02001959 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001960 def test_w_dest_open_fails(self):
1961
1962 srcfile = self.Faux()
1963
1964 def _open(filename, mode='r'):
1965 if filename == 'srcfile':
1966 return srcfile
1967 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001968 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001969 assert 0 # shouldn't reach here.
1970
1971 self._set_shutil_open(_open)
1972
1973 shutil.copyfile('srcfile', 'destfile')
1974 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001975 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001976 self.assertEqual(srcfile._exited_with[1].args,
1977 ('Cannot open "destfile"',))
1978
Victor Stinner937ee9e2018-06-26 02:11:06 +02001979 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001980 def test_w_dest_close_fails(self):
1981
1982 srcfile = self.Faux()
1983 destfile = self.Faux(True)
1984
1985 def _open(filename, mode='r'):
1986 if filename == 'srcfile':
1987 return srcfile
1988 if filename == 'destfile':
1989 return destfile
1990 assert 0 # shouldn't reach here.
1991
1992 self._set_shutil_open(_open)
1993
1994 shutil.copyfile('srcfile', 'destfile')
1995 self.assertTrue(srcfile._entered)
1996 self.assertTrue(destfile._entered)
1997 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001998 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001999 self.assertEqual(srcfile._exited_with[1].args,
2000 ('Cannot close',))
2001
Victor Stinner937ee9e2018-06-26 02:11:06 +02002002 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002003 def test_w_source_close_fails(self):
2004
2005 srcfile = self.Faux(True)
2006 destfile = self.Faux()
2007
2008 def _open(filename, mode='r'):
2009 if filename == 'srcfile':
2010 return srcfile
2011 if filename == 'destfile':
2012 return destfile
2013 assert 0 # shouldn't reach here.
2014
2015 self._set_shutil_open(_open)
2016
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002017 self.assertRaises(OSError,
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002018 shutil.copyfile, 'srcfile', 'destfile')
2019 self.assertTrue(srcfile._entered)
2020 self.assertTrue(destfile._entered)
2021 self.assertFalse(destfile._raised)
2022 self.assertTrue(srcfile._exited_with[0] is None)
2023 self.assertTrue(srcfile._raised)
2024
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002025 def test_move_dir_caseinsensitive(self):
2026 # Renames a folder to the same name
2027 # but a different case.
2028
2029 self.src_dir = tempfile.mkdtemp()
Larry Hastings5b2f9c02012-06-25 23:50:01 -07002030 self.addCleanup(shutil.rmtree, self.src_dir, True)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002031 dst_dir = os.path.join(
2032 os.path.dirname(self.src_dir),
2033 os.path.basename(self.src_dir).upper())
2034 self.assertNotEqual(self.src_dir, dst_dir)
2035
2036 try:
2037 shutil.move(self.src_dir, dst_dir)
2038 self.assertTrue(os.path.isdir(dst_dir))
2039 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +02002040 os.rmdir(dst_dir)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002041
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002042
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002043class TestCopyFileObj(unittest.TestCase):
2044 FILESIZE = 2 * 1024 * 1024
2045
2046 @classmethod
2047 def setUpClass(cls):
2048 write_test_file(TESTFN, cls.FILESIZE)
2049
2050 @classmethod
2051 def tearDownClass(cls):
2052 support.unlink(TESTFN)
2053 support.unlink(TESTFN2)
2054
2055 def tearDown(self):
2056 support.unlink(TESTFN2)
2057
2058 @contextlib.contextmanager
2059 def get_files(self):
2060 with open(TESTFN, "rb") as src:
2061 with open(TESTFN2, "wb") as dst:
2062 yield (src, dst)
2063
2064 def assert_files_eq(self, src, dst):
2065 with open(src, 'rb') as fsrc:
2066 with open(dst, 'rb') as fdst:
2067 self.assertEqual(fsrc.read(), fdst.read())
2068
2069 def test_content(self):
2070 with self.get_files() as (src, dst):
2071 shutil.copyfileobj(src, dst)
2072 self.assert_files_eq(TESTFN, TESTFN2)
2073
2074 def test_file_not_closed(self):
2075 with self.get_files() as (src, dst):
2076 shutil.copyfileobj(src, dst)
2077 assert not src.closed
2078 assert not dst.closed
2079
2080 def test_file_offset(self):
2081 with self.get_files() as (src, dst):
2082 shutil.copyfileobj(src, dst)
2083 self.assertEqual(src.tell(), self.FILESIZE)
2084 self.assertEqual(dst.tell(), self.FILESIZE)
2085
2086 @unittest.skipIf(os.name != 'nt', "Windows only")
2087 def test_win_impl(self):
2088 # Make sure alternate Windows implementation is called.
2089 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2090 shutil.copyfile(TESTFN, TESTFN2)
2091 assert m.called
2092
2093 # File size is 2 MiB but max buf size should be 1 MiB.
2094 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2095
2096 # If file size < 1 MiB memoryview() length must be equal to
2097 # the actual file size.
2098 with tempfile.NamedTemporaryFile(delete=False) as f:
2099 f.write(b'foo')
2100 fname = f.name
2101 self.addCleanup(support.unlink, fname)
2102 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2103 shutil.copyfile(fname, TESTFN2)
2104 self.assertEqual(m.call_args[0][2], 3)
2105
2106 # Empty files should not rely on readinto() variant.
2107 with tempfile.NamedTemporaryFile(delete=False) as f:
2108 pass
2109 fname = f.name
2110 self.addCleanup(support.unlink, fname)
2111 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2112 shutil.copyfile(fname, TESTFN2)
2113 assert not m.called
2114 self.assert_files_eq(fname, TESTFN2)
2115
2116
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002117class _ZeroCopyFileTest(object):
2118 """Tests common to all zero-copy APIs."""
2119 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2120 FILEDATA = b""
2121 PATCHPOINT = ""
2122
2123 @classmethod
2124 def setUpClass(cls):
2125 write_test_file(TESTFN, cls.FILESIZE)
2126 with open(TESTFN, 'rb') as f:
2127 cls.FILEDATA = f.read()
2128 assert len(cls.FILEDATA) == cls.FILESIZE
2129
2130 @classmethod
2131 def tearDownClass(cls):
2132 support.unlink(TESTFN)
2133
2134 def tearDown(self):
2135 support.unlink(TESTFN2)
2136
2137 @contextlib.contextmanager
2138 def get_files(self):
2139 with open(TESTFN, "rb") as src:
2140 with open(TESTFN2, "wb") as dst:
2141 yield (src, dst)
2142
2143 def zerocopy_fun(self, *args, **kwargs):
2144 raise NotImplementedError("must be implemented in subclass")
2145
2146 def reset(self):
2147 self.tearDown()
2148 self.tearDownClass()
2149 self.setUpClass()
2150 self.setUp()
2151
2152 # ---
2153
2154 def test_regular_copy(self):
2155 with self.get_files() as (src, dst):
2156 self.zerocopy_fun(src, dst)
2157 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2158 # Make sure the fallback function is not called.
2159 with self.get_files() as (src, dst):
2160 with unittest.mock.patch('shutil.copyfileobj') as m:
2161 shutil.copyfile(TESTFN, TESTFN2)
2162 assert not m.called
2163
2164 def test_same_file(self):
2165 self.addCleanup(self.reset)
2166 with self.get_files() as (src, dst):
2167 with self.assertRaises(Exception):
2168 self.zerocopy_fun(src, src)
2169 # Make sure src file is not corrupted.
2170 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2171
2172 def test_non_existent_src(self):
2173 name = tempfile.mktemp()
2174 with self.assertRaises(FileNotFoundError) as cm:
2175 shutil.copyfile(name, "new")
2176 self.assertEqual(cm.exception.filename, name)
2177
2178 def test_empty_file(self):
2179 srcname = TESTFN + 'src'
2180 dstname = TESTFN + 'dst'
2181 self.addCleanup(lambda: support.unlink(srcname))
2182 self.addCleanup(lambda: support.unlink(dstname))
2183 with open(srcname, "wb"):
2184 pass
2185
2186 with open(srcname, "rb") as src:
2187 with open(dstname, "wb") as dst:
2188 self.zerocopy_fun(src, dst)
2189
2190 self.assertEqual(read_file(dstname, binary=True), b"")
2191
2192 def test_unhandled_exception(self):
2193 with unittest.mock.patch(self.PATCHPOINT,
2194 side_effect=ZeroDivisionError):
2195 self.assertRaises(ZeroDivisionError,
2196 shutil.copyfile, TESTFN, TESTFN2)
2197
2198 def test_exception_on_first_call(self):
2199 # Emulate a case where the first call to the zero-copy
2200 # function raises an exception in which case the function is
2201 # supposed to give up immediately.
2202 with unittest.mock.patch(self.PATCHPOINT,
2203 side_effect=OSError(errno.EINVAL, "yo")):
2204 with self.get_files() as (src, dst):
2205 with self.assertRaises(_GiveupOnFastCopy):
2206 self.zerocopy_fun(src, dst)
2207
2208 def test_filesystem_full(self):
2209 # Emulate a case where filesystem is full and sendfile() fails
2210 # on first call.
2211 with unittest.mock.patch(self.PATCHPOINT,
2212 side_effect=OSError(errno.ENOSPC, "yo")):
2213 with self.get_files() as (src, dst):
2214 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2215
2216
2217@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2218class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2219 PATCHPOINT = "os.sendfile"
2220
2221 def zerocopy_fun(self, fsrc, fdst):
2222 return shutil._fastcopy_sendfile(fsrc, fdst)
2223
2224 def test_non_regular_file_src(self):
2225 with io.BytesIO(self.FILEDATA) as src:
2226 with open(TESTFN2, "wb") as dst:
2227 with self.assertRaises(_GiveupOnFastCopy):
2228 self.zerocopy_fun(src, dst)
2229 shutil.copyfileobj(src, dst)
2230
2231 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2232
2233 def test_non_regular_file_dst(self):
2234 with open(TESTFN, "rb") as src:
2235 with io.BytesIO() as dst:
2236 with self.assertRaises(_GiveupOnFastCopy):
2237 self.zerocopy_fun(src, dst)
2238 shutil.copyfileobj(src, dst)
2239 dst.seek(0)
2240 self.assertEqual(dst.read(), self.FILEDATA)
2241
2242 def test_exception_on_second_call(self):
2243 def sendfile(*args, **kwargs):
2244 if not flag:
2245 flag.append(None)
2246 return orig_sendfile(*args, **kwargs)
2247 else:
2248 raise OSError(errno.EBADF, "yo")
2249
2250 flag = []
2251 orig_sendfile = os.sendfile
2252 with unittest.mock.patch('os.sendfile', create=True,
2253 side_effect=sendfile):
2254 with self.get_files() as (src, dst):
2255 with self.assertRaises(OSError) as cm:
2256 shutil._fastcopy_sendfile(src, dst)
2257 assert flag
2258 self.assertEqual(cm.exception.errno, errno.EBADF)
2259
2260 def test_cant_get_size(self):
2261 # Emulate a case where src file size cannot be determined.
2262 # Internally bufsize will be set to a small value and
2263 # sendfile() will be called repeatedly.
2264 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2265 with self.get_files() as (src, dst):
2266 shutil._fastcopy_sendfile(src, dst)
2267 assert m.called
2268 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2269
2270 def test_small_chunks(self):
2271 # Force internal file size detection to be smaller than the
2272 # actual file size. We want to force sendfile() to be called
2273 # multiple times, also in order to emulate a src fd which gets
2274 # bigger while it is being copied.
2275 mock = unittest.mock.Mock()
2276 mock.st_size = 65536 + 1
2277 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2278 with self.get_files() as (src, dst):
2279 shutil._fastcopy_sendfile(src, dst)
2280 assert m.called
2281 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2282
2283 def test_big_chunk(self):
2284 # Force internal file size detection to be +100MB bigger than
2285 # the actual file size. Make sure sendfile() does not rely on
2286 # file size value except for (maybe) a better throughput /
2287 # performance.
2288 mock = unittest.mock.Mock()
2289 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2290 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2291 with self.get_files() as (src, dst):
2292 shutil._fastcopy_sendfile(src, dst)
2293 assert m.called
2294 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2295
2296 def test_blocksize_arg(self):
2297 with unittest.mock.patch('os.sendfile',
2298 side_effect=ZeroDivisionError) as m:
2299 self.assertRaises(ZeroDivisionError,
2300 shutil.copyfile, TESTFN, TESTFN2)
2301 blocksize = m.call_args[0][3]
2302 # Make sure file size and the block size arg passed to
2303 # sendfile() are the same.
2304 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2305 # ...unless we're dealing with a small file.
2306 support.unlink(TESTFN2)
2307 write_file(TESTFN2, b"hello", binary=True)
2308 self.addCleanup(support.unlink, TESTFN2 + '3')
2309 self.assertRaises(ZeroDivisionError,
2310 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2311 blocksize = m.call_args[0][3]
2312 self.assertEqual(blocksize, 2 ** 23)
2313
2314 def test_file2file_not_supported(self):
2315 # Emulate a case where sendfile() only support file->socket
2316 # fds. In such a case copyfile() is supposed to skip the
2317 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002318 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002319 try:
2320 with unittest.mock.patch(
2321 self.PATCHPOINT,
2322 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2323 with self.get_files() as (src, dst):
2324 with self.assertRaises(_GiveupOnFastCopy):
2325 shutil._fastcopy_sendfile(src, dst)
2326 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002327 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002328
2329 with unittest.mock.patch(self.PATCHPOINT) as m:
2330 shutil.copyfile(TESTFN, TESTFN2)
2331 assert not m.called
2332 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002333 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002334
2335
Victor Stinner937ee9e2018-06-26 02:11:06 +02002336@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002337class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002338 PATCHPOINT = "posix._fcopyfile"
2339
2340 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002341 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002342
2343
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002344class TermsizeTests(unittest.TestCase):
2345 def test_does_not_crash(self):
2346 """Check if get_terminal_size() returns a meaningful value.
2347
2348 There's no easy portable way to actually check the size of the
2349 terminal, so let's check if it returns something sensible instead.
2350 """
2351 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002352 self.assertGreaterEqual(size.columns, 0)
2353 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002354
2355 def test_os_environ_first(self):
2356 "Check if environment variables have precedence"
2357
2358 with support.EnvironmentVarGuard() as env:
2359 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002360 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002361 size = shutil.get_terminal_size()
2362 self.assertEqual(size.columns, 777)
2363
2364 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002365 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002366 env['LINES'] = '888'
2367 size = shutil.get_terminal_size()
2368 self.assertEqual(size.lines, 888)
2369
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002370 def test_bad_environ(self):
2371 with support.EnvironmentVarGuard() as env:
2372 env['COLUMNS'] = 'xxx'
2373 env['LINES'] = 'yyy'
2374 size = shutil.get_terminal_size()
2375 self.assertGreaterEqual(size.columns, 0)
2376 self.assertGreaterEqual(size.lines, 0)
2377
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002378 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002379 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2380 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002381 def test_stty_match(self):
2382 """Check if stty returns the same results ignoring env
2383
2384 This test will fail if stdin and stdout are connected to
2385 different terminals with different sizes. Nevertheless, such
2386 situations should be pretty rare.
2387 """
2388 try:
2389 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002390 except (FileNotFoundError, PermissionError,
2391 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002392 self.skipTest("stty invocation failed")
2393 expected = (int(size[1]), int(size[0])) # reversed order
2394
2395 with support.EnvironmentVarGuard() as env:
2396 del env['LINES']
2397 del env['COLUMNS']
2398 actual = shutil.get_terminal_size()
2399
2400 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002401
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002402 def test_fallback(self):
2403 with support.EnvironmentVarGuard() as env:
2404 del env['LINES']
2405 del env['COLUMNS']
2406
2407 # sys.__stdout__ has no fileno()
2408 with support.swap_attr(sys, '__stdout__', None):
2409 size = shutil.get_terminal_size(fallback=(10, 20))
2410 self.assertEqual(size.columns, 10)
2411 self.assertEqual(size.lines, 20)
2412
2413 # sys.__stdout__ is not a terminal on Unix
2414 # or fileno() not in (0, 1, 2) on Windows
2415 with open(os.devnull, 'w') as f, \
2416 support.swap_attr(sys, '__stdout__', f):
2417 size = shutil.get_terminal_size(fallback=(30, 40))
2418 self.assertEqual(size.columns, 30)
2419 self.assertEqual(size.lines, 40)
2420
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002421
Berker Peksag8083cd62014-11-01 11:04:06 +02002422class PublicAPITests(unittest.TestCase):
2423 """Ensures that the correct values are exposed in the public API."""
2424
2425 def test_module_all_attribute(self):
2426 self.assertTrue(hasattr(shutil, '__all__'))
2427 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2428 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2429 'SpecialFileError', 'ExecError', 'make_archive',
2430 'get_archive_formats', 'register_archive_format',
2431 'unregister_archive_format', 'get_unpack_formats',
2432 'register_unpack_format', 'unregister_unpack_format',
2433 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2434 'get_terminal_size', 'SameFileError']
2435 if hasattr(os, 'statvfs') or os.name == 'nt':
2436 target_api.append('disk_usage')
2437 self.assertEqual(set(shutil.__all__), set(target_api))
2438
2439
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002440if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002441 unittest.main()