blob: b9fdfd1350a0961e943368d63bd9e52bd15a097f [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010037AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000038try:
39 import grp
40 import pwd
41 UID_GID_SUPPORT = True
42except ImportError:
43 UID_GID_SUPPORT = False
44
Steve Dowerdf2d4a62019-08-21 15:27:33 -070045try:
46 import _winapi
47except ImportError:
48 _winapi = None
49
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040050def _fake_rename(*args, **kwargs):
51 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010052 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040053
54def mock_rename(func):
55 @functools.wraps(func)
56 def wrap(*args, **kwargs):
57 try:
58 builtin_rename = os.rename
59 os.rename = _fake_rename
60 return func(*args, **kwargs)
61 finally:
62 os.rename = builtin_rename
63 return wrap
64
Éric Araujoa7e33a12011-08-12 19:51:35 +020065def write_file(path, content, binary=False):
66 """Write *content* to a file located at *path*.
67
68 If *path* is a tuple instead of a string, os.path.join will be used to
69 make a path. If *binary* is true, the file will be opened in binary
70 mode.
71 """
72 if isinstance(path, tuple):
73 path = os.path.join(*path)
74 with open(path, 'wb' if binary else 'w') as fp:
75 fp.write(content)
76
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020077def write_test_file(path, size):
78 """Create a test file with an arbitrary size and random text content."""
79 def chunks(total, step):
80 assert total >= step
81 while total > step:
82 yield step
83 total -= step
84 if total:
85 yield total
86
87 bufsize = min(size, 8192)
88 chunk = b"".join([random.choice(string.ascii_letters).encode()
89 for i in range(bufsize)])
90 with open(path, 'wb') as f:
91 for csize in chunks(size, bufsize):
92 f.write(chunk)
93 assert os.path.getsize(path) == size
94
Éric Araujoa7e33a12011-08-12 19:51:35 +020095def read_file(path, binary=False):
96 """Return contents from a file located at *path*.
97
98 If *path* is a tuple instead of a string, os.path.join will be used to
99 make a path. If *binary* is true, the file will be opened in binary
100 mode.
101 """
102 if isinstance(path, tuple):
103 path = os.path.join(*path)
104 with open(path, 'rb' if binary else 'r') as fp:
105 return fp.read()
106
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300107def rlistdir(path):
108 res = []
109 for name in sorted(os.listdir(path)):
110 p = os.path.join(path, name)
111 if os.path.isdir(p) and not os.path.islink(p):
112 res.append(name + '/')
113 for n in rlistdir(p):
114 res.append(name + '/' + n)
115 else:
116 res.append(name)
117 return res
118
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200119def supports_file2file_sendfile():
120 # ...apparently Linux and Solaris are the only ones
121 if not hasattr(os, "sendfile"):
122 return False
123 srcname = None
124 dstname = None
125 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800126 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200127 srcname = f.name
128 f.write(b"0123456789")
129
130 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800131 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200132 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200133 infd = src.fileno()
134 outfd = dst.fileno()
135 try:
136 os.sendfile(outfd, infd, 0, 2)
137 except OSError:
138 return False
139 else:
140 return True
141 finally:
142 if srcname is not None:
143 support.unlink(srcname)
144 if dstname is not None:
145 support.unlink(dstname)
146
147
148SUPPORTS_SENDFILE = supports_file2file_sendfile()
149
Michael Feltef110b12019-02-18 12:02:44 +0100150# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
151# The AIX command 'dump -o program' gives XCOFF header information
152# The second word of the last line in the maxdata value
153# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
154def _maxdataOK():
155 if AIX and sys.maxsize == 2147483647:
156 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
157 maxdata=hdrs.split("\n")[-1].split()[1]
158 return int(maxdata,16) >= 0x20000000
159 else:
160 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200161
Tarek Ziadé396fad72010-02-23 05:30:31 +0000162
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300163class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000164
Steve Dowerabde52c2019-11-15 09:49:21 -0800165 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000166 """Create a temporary directory that will be cleaned up.
167
168 Returns the path of the directory.
169 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800170 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300171 self.addCleanup(support.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000172 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000173
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300174
175class TestRmTree(BaseTest, unittest.TestCase):
176
Hynek Schlawack3b527782012-06-25 13:27:31 +0200177 def test_rmtree_works_on_bytes(self):
178 tmp = self.mkdtemp()
179 victim = os.path.join(tmp, 'killme')
180 os.mkdir(victim)
181 write_file(os.path.join(victim, 'somefile'), 'foo')
182 victim = os.fsencode(victim)
183 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700184 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200185
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200186 @support.skip_unless_symlink
187 def test_rmtree_fails_on_symlink(self):
188 tmp = self.mkdtemp()
189 dir_ = os.path.join(tmp, 'dir')
190 os.mkdir(dir_)
191 link = os.path.join(tmp, 'link')
192 os.symlink(dir_, link)
193 self.assertRaises(OSError, shutil.rmtree, link)
194 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100195 self.assertTrue(os.path.lexists(link))
196 errors = []
197 def onerror(*args):
198 errors.append(args)
199 shutil.rmtree(link, onerror=onerror)
200 self.assertEqual(len(errors), 1)
201 self.assertIs(errors[0][0], os.path.islink)
202 self.assertEqual(errors[0][1], link)
203 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200204
205 @support.skip_unless_symlink
206 def test_rmtree_works_on_symlinks(self):
207 tmp = self.mkdtemp()
208 dir1 = os.path.join(tmp, 'dir1')
209 dir2 = os.path.join(dir1, 'dir2')
210 dir3 = os.path.join(tmp, 'dir3')
211 for d in dir1, dir2, dir3:
212 os.mkdir(d)
213 file1 = os.path.join(tmp, 'file1')
214 write_file(file1, 'foo')
215 link1 = os.path.join(dir1, 'link1')
216 os.symlink(dir2, link1)
217 link2 = os.path.join(dir1, 'link2')
218 os.symlink(dir3, link2)
219 link3 = os.path.join(dir1, 'link3')
220 os.symlink(file1, link3)
221 # make sure symlinks are removed but not followed
222 shutil.rmtree(dir1)
223 self.assertFalse(os.path.exists(dir1))
224 self.assertTrue(os.path.exists(dir3))
225 self.assertTrue(os.path.exists(file1))
226
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700227 @unittest.skipUnless(_winapi, 'only relevant on Windows')
228 def test_rmtree_fails_on_junctions(self):
229 tmp = self.mkdtemp()
230 dir_ = os.path.join(tmp, 'dir')
231 os.mkdir(dir_)
232 link = os.path.join(tmp, 'link')
233 _winapi.CreateJunction(dir_, link)
Steve Dowerabde52c2019-11-15 09:49:21 -0800234 self.addCleanup(support.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700235 self.assertRaises(OSError, shutil.rmtree, link)
236 self.assertTrue(os.path.exists(dir_))
237 self.assertTrue(os.path.lexists(link))
238 errors = []
239 def onerror(*args):
240 errors.append(args)
241 shutil.rmtree(link, onerror=onerror)
242 self.assertEqual(len(errors), 1)
243 self.assertIs(errors[0][0], os.path.islink)
244 self.assertEqual(errors[0][1], link)
245 self.assertIsInstance(errors[0][2][1], OSError)
246
247 @unittest.skipUnless(_winapi, 'only relevant on Windows')
248 def test_rmtree_works_on_junctions(self):
249 tmp = self.mkdtemp()
250 dir1 = os.path.join(tmp, 'dir1')
251 dir2 = os.path.join(dir1, 'dir2')
252 dir3 = os.path.join(tmp, 'dir3')
253 for d in dir1, dir2, dir3:
254 os.mkdir(d)
255 file1 = os.path.join(tmp, 'file1')
256 write_file(file1, 'foo')
257 link1 = os.path.join(dir1, 'link1')
258 _winapi.CreateJunction(dir2, link1)
259 link2 = os.path.join(dir1, 'link2')
260 _winapi.CreateJunction(dir3, link2)
261 link3 = os.path.join(dir1, 'link3')
262 _winapi.CreateJunction(file1, link3)
263 # make sure junctions are removed but not followed
264 shutil.rmtree(dir1)
265 self.assertFalse(os.path.exists(dir1))
266 self.assertTrue(os.path.exists(dir3))
267 self.assertTrue(os.path.exists(file1))
268
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000269 def test_rmtree_errors(self):
270 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800271 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100272 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
273 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100274 shutil.rmtree(filename, ignore_errors=True)
275
276 # existing file
277 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100278 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100279 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100280 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100281 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100282 # The reason for this rather odd construct is that Windows sprinkles
283 # a \*.* at the end of file names. But only sometimes on some buildbots
284 possible_args = [filename, os.path.join(filename, '*.*')]
285 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100286 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100287 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100288 shutil.rmtree(filename, ignore_errors=True)
289 self.assertTrue(os.path.exists(filename))
290 errors = []
291 def onerror(*args):
292 errors.append(args)
293 shutil.rmtree(filename, onerror=onerror)
294 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200295 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100296 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100297 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100298 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100299 self.assertIs(errors[1][0], os.rmdir)
300 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100301 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100302 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000303
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000304
Serhiy Storchaka43767632013-11-03 21:31:38 +0200305 @unittest.skipIf(sys.platform[:6] == 'cygwin',
306 "This test can't be run on Cygwin (issue #1071513).")
307 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
308 "This test can't be run reliably as root (issue #1076467).")
309 def test_on_error(self):
310 self.errorState = 0
311 os.mkdir(TESTFN)
312 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200313
Serhiy Storchaka43767632013-11-03 21:31:38 +0200314 self.child_file_path = os.path.join(TESTFN, 'a')
315 self.child_dir_path = os.path.join(TESTFN, 'b')
316 support.create_empty_file(self.child_file_path)
317 os.mkdir(self.child_dir_path)
318 old_dir_mode = os.stat(TESTFN).st_mode
319 old_child_file_mode = os.stat(self.child_file_path).st_mode
320 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
321 # Make unwritable.
322 new_mode = stat.S_IREAD|stat.S_IEXEC
323 os.chmod(self.child_file_path, new_mode)
324 os.chmod(self.child_dir_path, new_mode)
325 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
328 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
329 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200330
Serhiy Storchaka43767632013-11-03 21:31:38 +0200331 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
332 # Test whether onerror has actually been called.
333 self.assertEqual(self.errorState, 3,
334 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000335
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000336 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000337 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200338 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000339 # This function is run when shutil.rmtree fails.
340 # 99.9% of the time it initially fails to remove
341 # a file in the directory, so the first time through
342 # func is os.remove.
343 # However, some Linux machines running ZFS on
344 # FUSE experienced a failure earlier in the process
345 # at os.listdir. The first failure may legally
346 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200347 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200348 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200349 self.assertEqual(arg, self.child_file_path)
350 elif func is os.rmdir:
351 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000352 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200353 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200354 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000355 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200356 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000357 else:
358 self.assertEqual(func, os.rmdir)
359 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000360 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200361 self.errorState = 3
362
363 def test_rmtree_does_not_choke_on_failing_lstat(self):
364 try:
365 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200366 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200367 if fn != TESTFN:
368 raise OSError()
369 else:
370 return orig_lstat(fn)
371 os.lstat = raiser
372
373 os.mkdir(TESTFN)
374 write_file((TESTFN, 'foo'), 'foo')
375 shutil.rmtree(TESTFN)
376 finally:
377 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000378
Hynek Schlawack2100b422012-06-23 20:28:32 +0200379 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200380 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
381 os.supports_dir_fd and
382 os.listdir in os.supports_fd and
383 os.stat in os.supports_follow_symlinks)
384 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200385 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000386 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200387 tmp_dir = self.mkdtemp()
388 d = os.path.join(tmp_dir, 'a')
389 os.mkdir(d)
390 try:
391 real_rmtree = shutil._rmtree_safe_fd
392 class Called(Exception): pass
393 def _raiser(*args, **kwargs):
394 raise Called
395 shutil._rmtree_safe_fd = _raiser
396 self.assertRaises(Called, shutil.rmtree, d)
397 finally:
398 shutil._rmtree_safe_fd = real_rmtree
399 else:
400 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000401 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200402
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000403 def test_rmtree_dont_delete_file(self):
404 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800405 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200406 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200407 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000408 os.remove(path)
409
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300410 @support.skip_unless_symlink
411 def test_rmtree_on_symlink(self):
412 # bug 1669.
413 os.mkdir(TESTFN)
414 try:
415 src = os.path.join(TESTFN, 'cheese')
416 dst = os.path.join(TESTFN, 'shop')
417 os.mkdir(src)
418 os.symlink(src, dst)
419 self.assertRaises(OSError, shutil.rmtree, dst)
420 shutil.rmtree(dst, ignore_errors=True)
421 finally:
422 shutil.rmtree(TESTFN, ignore_errors=True)
423
424 @unittest.skipUnless(_winapi, 'only relevant on Windows')
425 def test_rmtree_on_junction(self):
426 os.mkdir(TESTFN)
427 try:
428 src = os.path.join(TESTFN, 'cheese')
429 dst = os.path.join(TESTFN, 'shop')
430 os.mkdir(src)
431 open(os.path.join(src, 'spam'), 'wb').close()
432 _winapi.CreateJunction(src, dst)
433 self.assertRaises(OSError, shutil.rmtree, dst)
434 shutil.rmtree(dst, ignore_errors=True)
435 finally:
436 shutil.rmtree(TESTFN, ignore_errors=True)
437
438
439class TestCopyTree(BaseTest, unittest.TestCase):
440
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000441 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800442 src_dir = self.mkdtemp()
443 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200444 self.addCleanup(shutil.rmtree, src_dir)
445 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
446 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000447 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200448 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000449
Éric Araujoa7e33a12011-08-12 19:51:35 +0200450 shutil.copytree(src_dir, dst_dir)
451 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
452 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
453 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
454 'test.txt')))
455 actual = read_file((dst_dir, 'test.txt'))
456 self.assertEqual(actual, '123')
457 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
458 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000459
jab9e00d9e2018-12-28 13:03:40 -0500460 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800461 src_dir = self.mkdtemp()
462 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500463 self.addCleanup(shutil.rmtree, src_dir)
464 self.addCleanup(shutil.rmtree, dst_dir)
465
466 write_file((src_dir, 'nonexisting.txt'), '123')
467 os.mkdir(os.path.join(src_dir, 'existing_dir'))
468 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
469 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
470 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
471
472 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
473 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
474 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
475 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
476 'existing.txt')))
477 actual = read_file((dst_dir, 'nonexisting.txt'))
478 self.assertEqual(actual, '123')
479 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
480 self.assertEqual(actual, 'has been replaced')
481
482 with self.assertRaises(FileExistsError):
483 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
484
Antoine Pitrou78091e62011-12-29 18:54:15 +0100485 @support.skip_unless_symlink
486 def test_copytree_symlinks(self):
487 tmp_dir = self.mkdtemp()
488 src_dir = os.path.join(tmp_dir, 'src')
489 dst_dir = os.path.join(tmp_dir, 'dst')
490 sub_dir = os.path.join(src_dir, 'sub')
491 os.mkdir(src_dir)
492 os.mkdir(sub_dir)
493 write_file((src_dir, 'file.txt'), 'foo')
494 src_link = os.path.join(sub_dir, 'link')
495 dst_link = os.path.join(dst_dir, 'sub/link')
496 os.symlink(os.path.join(src_dir, 'file.txt'),
497 src_link)
498 if hasattr(os, 'lchmod'):
499 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
500 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
501 os.lchflags(src_link, stat.UF_NODUMP)
502 src_stat = os.lstat(src_link)
503 shutil.copytree(src_dir, dst_dir, symlinks=True)
504 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700505 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
506 # Bad practice to blindly strip the prefix as it may be required to
507 # correctly refer to the file, but we're only comparing paths here.
508 if os.name == 'nt' and actual.startswith('\\\\?\\'):
509 actual = actual[4:]
510 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100511 dst_stat = os.lstat(dst_link)
512 if hasattr(os, 'lchmod'):
513 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
514 if hasattr(os, 'lchflags'):
515 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
516
Georg Brandl2ee470f2008-07-16 12:55:28 +0000517 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000518 # creating data
519 join = os.path.join
520 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800521 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000522 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800523 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200524 write_file((src_dir, 'test.txt'), '123')
525 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000526 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200527 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000528 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200529 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000530 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
531 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200532 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
533 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000534
535 # testing glob-like patterns
536 try:
537 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
538 shutil.copytree(src_dir, dst_dir, ignore=patterns)
539 # checking the result: some elements should not be copied
540 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200541 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
542 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000543 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200544 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000545 try:
546 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
547 shutil.copytree(src_dir, dst_dir, ignore=patterns)
548 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200549 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
550 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
551 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000552 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200553 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000554
555 # testing callable-style
556 try:
557 def _filter(src, names):
558 res = []
559 for name in names:
560 path = os.path.join(src, name)
561
562 if (os.path.isdir(path) and
563 path.split()[-1] == 'subdir'):
564 res.append(name)
565 elif os.path.splitext(path)[-1] in ('.py'):
566 res.append(name)
567 return res
568
569 shutil.copytree(src_dir, dst_dir, ignore=_filter)
570
571 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200572 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
573 'test.py')))
574 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000575
576 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200577 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000578 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000579 shutil.rmtree(src_dir)
580 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000581
mbarkhau88704332020-01-24 14:51:16 +0000582 def test_copytree_arg_types_of_ignore(self):
583 join = os.path.join
584 exists = os.path.exists
585
586 tmp_dir = self.mkdtemp()
587 src_dir = join(tmp_dir, "source")
588
589 os.mkdir(join(src_dir))
590 os.mkdir(join(src_dir, 'test_dir'))
591 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
592 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
593
594 invokations = []
595
596 def _ignore(src, names):
597 invokations.append(src)
598 self.assertIsInstance(src, str)
599 self.assertIsInstance(names, list)
600 self.assertEqual(len(names), len(set(names)))
601 for name in names:
602 self.assertIsInstance(name, str)
603 return []
604
605 dst_dir = join(self.mkdtemp(), 'destination')
606 shutil.copytree(src_dir, dst_dir, ignore=_ignore)
607 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
608 'test.txt')))
609
610 dst_dir = join(self.mkdtemp(), 'destination')
611 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
612 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
613 'test.txt')))
614
615 dst_dir = join(self.mkdtemp(), 'destination')
616 src_dir_entry = list(os.scandir(tmp_dir))[0]
617 self.assertIsInstance(src_dir_entry, os.DirEntry)
618 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
619 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
620 'test.txt')))
621
622 self.assertEqual(len(invokations), 9)
623
Antoine Pitrouac601602013-08-16 19:35:02 +0200624 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800625 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200626 src_dir = os.path.join(tmp_dir, 'source')
627 os.mkdir(src_dir)
628 dst_dir = os.path.join(tmp_dir, 'destination')
629 self.addCleanup(shutil.rmtree, tmp_dir)
630
631 os.chmod(src_dir, 0o777)
632 write_file((src_dir, 'permissive.txt'), '123')
633 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
634 write_file((src_dir, 'restrictive.txt'), '456')
635 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
636 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Steve Dowerabde52c2019-11-15 09:49:21 -0800637 self.addCleanup(support.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200638 os.chmod(restrictive_subdir, 0o600)
639
640 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400641 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
642 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200643 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400644 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200645 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
646 restrictive_subdir_dst = os.path.join(dst_dir,
647 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400648 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200649 os.stat(restrictive_subdir_dst).st_mode)
650
Berker Peksag884afd92014-12-10 02:50:32 +0200651 @unittest.mock.patch('os.chmod')
652 def test_copytree_winerror(self, mock_patch):
653 # When copying to VFAT, copystat() raises OSError. On Windows, the
654 # exception object has a meaningful 'winerror' attribute, but not
655 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800656 src_dir = self.mkdtemp()
657 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200658 self.addCleanup(shutil.rmtree, src_dir)
659 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
660
661 mock_patch.side_effect = PermissionError('ka-boom')
662 with self.assertRaises(shutil.Error):
663 shutil.copytree(src_dir, dst_dir)
664
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100665 def test_copytree_custom_copy_function(self):
666 # See: https://bugs.python.org/issue35648
667 def custom_cpfun(a, b):
668 flag.append(None)
669 self.assertIsInstance(a, str)
670 self.assertIsInstance(b, str)
671 self.assertEqual(a, os.path.join(src, 'foo'))
672 self.assertEqual(b, os.path.join(dst, 'foo'))
673
674 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800675 src = self.mkdtemp()
676 dst = tempfile.mktemp(dir=self.mkdtemp())
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100677 with open(os.path.join(src, 'foo'), 'w') as f:
678 f.close()
679 shutil.copytree(src, dst, copy_function=custom_cpfun)
680 self.assertEqual(len(flag), 1)
681
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300682 # Issue #3002: copyfile and copytree block indefinitely on named pipes
683 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
684 @support.skip_unless_symlink
685 def test_copytree_named_pipe(self):
686 os.mkdir(TESTFN)
687 try:
688 subdir = os.path.join(TESTFN, "subdir")
689 os.mkdir(subdir)
690 pipe = os.path.join(subdir, "mypipe")
691 try:
692 os.mkfifo(pipe)
693 except PermissionError as e:
694 self.skipTest('os.mkfifo(): %s' % e)
695 try:
696 shutil.copytree(TESTFN, TESTFN2)
697 except shutil.Error as e:
698 errors = e.args[0]
699 self.assertEqual(len(errors), 1)
700 src, dst, error_msg = errors[0]
701 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
702 else:
703 self.fail("shutil.Error should have been raised")
704 finally:
705 shutil.rmtree(TESTFN, ignore_errors=True)
706 shutil.rmtree(TESTFN2, ignore_errors=True)
707
708 def test_copytree_special_func(self):
709 src_dir = self.mkdtemp()
710 dst_dir = os.path.join(self.mkdtemp(), 'destination')
711 write_file((src_dir, 'test.txt'), '123')
712 os.mkdir(os.path.join(src_dir, 'test_dir'))
713 write_file((src_dir, 'test_dir', 'test.txt'), '456')
714
715 copied = []
716 def _copy(src, dst):
717 copied.append((src, dst))
718
719 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
720 self.assertEqual(len(copied), 2)
721
722 @support.skip_unless_symlink
723 def test_copytree_dangling_symlinks(self):
724 # a dangling symlink raises an error at the end
725 src_dir = self.mkdtemp()
726 dst_dir = os.path.join(self.mkdtemp(), 'destination')
727 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
728 os.mkdir(os.path.join(src_dir, 'test_dir'))
729 write_file((src_dir, 'test_dir', 'test.txt'), '456')
730 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
731
732 # a dangling symlink is ignored with the proper flag
733 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
734 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
735 self.assertNotIn('test.txt', os.listdir(dst_dir))
736
737 # a dangling symlink is copied if symlinks=True
738 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
739 shutil.copytree(src_dir, dst_dir, symlinks=True)
740 self.assertIn('test.txt', os.listdir(dst_dir))
741
742 @support.skip_unless_symlink
743 def test_copytree_symlink_dir(self):
744 src_dir = self.mkdtemp()
745 dst_dir = os.path.join(self.mkdtemp(), 'destination')
746 os.mkdir(os.path.join(src_dir, 'real_dir'))
747 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
748 pass
749 os.symlink(os.path.join(src_dir, 'real_dir'),
750 os.path.join(src_dir, 'link_to_dir'),
751 target_is_directory=True)
752
753 shutil.copytree(src_dir, dst_dir, symlinks=False)
754 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
755 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
756
757 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
758 shutil.copytree(src_dir, dst_dir, symlinks=True)
759 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
760 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
761
762 def test_copytree_return_value(self):
763 # copytree returns its destination path.
764 src_dir = self.mkdtemp()
765 dst_dir = src_dir + "dest"
766 self.addCleanup(shutil.rmtree, dst_dir, True)
767 src = os.path.join(src_dir, 'foo')
768 write_file(src, 'foo')
769 rv = shutil.copytree(src_dir, dst_dir)
770 self.assertEqual(['foo'], os.listdir(rv))
771
Bruno P. Kinoshita9bbcbc92019-11-27 14:10:37 +1300772 def test_copytree_subdirectory(self):
773 # copytree where dst is a subdirectory of src, see Issue 38688
774 base_dir = self.mkdtemp()
775 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
776 src_dir = os.path.join(base_dir, "t", "pg")
777 dst_dir = os.path.join(src_dir, "somevendor", "1.0")
778 os.makedirs(src_dir)
779 src = os.path.join(src_dir, 'pol')
780 write_file(src, 'pol')
781 rv = shutil.copytree(src_dir, dst_dir)
782 self.assertEqual(['pol'], os.listdir(rv))
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300783
784class TestCopy(BaseTest, unittest.TestCase):
785
786 ### shutil.copymode
787
788 @support.skip_unless_symlink
789 def test_copymode_follow_symlinks(self):
790 tmp_dir = self.mkdtemp()
791 src = os.path.join(tmp_dir, 'foo')
792 dst = os.path.join(tmp_dir, 'bar')
793 src_link = os.path.join(tmp_dir, 'baz')
794 dst_link = os.path.join(tmp_dir, 'quux')
795 write_file(src, 'foo')
796 write_file(dst, 'foo')
797 os.symlink(src, src_link)
798 os.symlink(dst, dst_link)
799 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
800 # file to file
801 os.chmod(dst, stat.S_IRWXO)
802 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
803 shutil.copymode(src, dst)
804 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
805 # On Windows, os.chmod does not follow symlinks (issue #15411)
806 if os.name != 'nt':
807 # follow src link
808 os.chmod(dst, stat.S_IRWXO)
809 shutil.copymode(src_link, dst)
810 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
811 # follow dst link
812 os.chmod(dst, stat.S_IRWXO)
813 shutil.copymode(src, dst_link)
814 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
815 # follow both links
816 os.chmod(dst, stat.S_IRWXO)
817 shutil.copymode(src_link, dst_link)
818 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
819
820 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
821 @support.skip_unless_symlink
822 def test_copymode_symlink_to_symlink(self):
823 tmp_dir = self.mkdtemp()
824 src = os.path.join(tmp_dir, 'foo')
825 dst = os.path.join(tmp_dir, 'bar')
826 src_link = os.path.join(tmp_dir, 'baz')
827 dst_link = os.path.join(tmp_dir, 'quux')
828 write_file(src, 'foo')
829 write_file(dst, 'foo')
830 os.symlink(src, src_link)
831 os.symlink(dst, dst_link)
832 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
833 os.chmod(dst, stat.S_IRWXU)
834 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
835 # link to link
836 os.lchmod(dst_link, stat.S_IRWXO)
837 shutil.copymode(src_link, dst_link, follow_symlinks=False)
838 self.assertEqual(os.lstat(src_link).st_mode,
839 os.lstat(dst_link).st_mode)
840 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
841 # src link - use chmod
842 os.lchmod(dst_link, stat.S_IRWXO)
843 shutil.copymode(src_link, dst, follow_symlinks=False)
844 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
845 # dst link - use chmod
846 os.lchmod(dst_link, stat.S_IRWXO)
847 shutil.copymode(src, dst_link, follow_symlinks=False)
848 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
849
850 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
851 @support.skip_unless_symlink
852 def test_copymode_symlink_to_symlink_wo_lchmod(self):
853 tmp_dir = self.mkdtemp()
854 src = os.path.join(tmp_dir, 'foo')
855 dst = os.path.join(tmp_dir, 'bar')
856 src_link = os.path.join(tmp_dir, 'baz')
857 dst_link = os.path.join(tmp_dir, 'quux')
858 write_file(src, 'foo')
859 write_file(dst, 'foo')
860 os.symlink(src, src_link)
861 os.symlink(dst, dst_link)
862 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
863
864 ### shutil.copystat
865
866 @support.skip_unless_symlink
867 def test_copystat_symlinks(self):
868 tmp_dir = self.mkdtemp()
869 src = os.path.join(tmp_dir, 'foo')
870 dst = os.path.join(tmp_dir, 'bar')
871 src_link = os.path.join(tmp_dir, 'baz')
872 dst_link = os.path.join(tmp_dir, 'qux')
873 write_file(src, 'foo')
874 src_stat = os.stat(src)
875 os.utime(src, (src_stat.st_atime,
876 src_stat.st_mtime - 42.0)) # ensure different mtimes
877 write_file(dst, 'bar')
878 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
879 os.symlink(src, src_link)
880 os.symlink(dst, dst_link)
881 if hasattr(os, 'lchmod'):
882 os.lchmod(src_link, stat.S_IRWXO)
883 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
884 os.lchflags(src_link, stat.UF_NODUMP)
885 src_link_stat = os.lstat(src_link)
886 # follow
887 if hasattr(os, 'lchmod'):
888 shutil.copystat(src_link, dst_link, follow_symlinks=True)
889 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
890 # don't follow
891 shutil.copystat(src_link, dst_link, follow_symlinks=False)
892 dst_link_stat = os.lstat(dst_link)
893 if os.utime in os.supports_follow_symlinks:
894 for attr in 'st_atime', 'st_mtime':
895 # The modification times may be truncated in the new file.
896 self.assertLessEqual(getattr(src_link_stat, attr),
897 getattr(dst_link_stat, attr) + 1)
898 if hasattr(os, 'lchmod'):
899 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
900 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
901 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
902 # tell to follow but dst is not a link
903 shutil.copystat(src_link, dst, follow_symlinks=False)
904 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
905 00000.1)
906
907 @unittest.skipUnless(hasattr(os, 'chflags') and
908 hasattr(errno, 'EOPNOTSUPP') and
909 hasattr(errno, 'ENOTSUP'),
910 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
911 def test_copystat_handles_harmless_chflags_errors(self):
912 tmpdir = self.mkdtemp()
913 file1 = os.path.join(tmpdir, 'file1')
914 file2 = os.path.join(tmpdir, 'file2')
915 write_file(file1, 'xxx')
916 write_file(file2, 'xxx')
917
918 def make_chflags_raiser(err):
919 ex = OSError()
920
921 def _chflags_raiser(path, flags, *, follow_symlinks=True):
922 ex.errno = err
923 raise ex
924 return _chflags_raiser
925 old_chflags = os.chflags
926 try:
927 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
928 os.chflags = make_chflags_raiser(err)
929 shutil.copystat(file1, file2)
930 # assert others errors break it
931 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
932 self.assertRaises(OSError, shutil.copystat, file1, file2)
933 finally:
934 os.chflags = old_chflags
935
936 ### shutil.copyxattr
937
938 @support.skip_unless_xattr
939 def test_copyxattr(self):
940 tmp_dir = self.mkdtemp()
941 src = os.path.join(tmp_dir, 'foo')
942 write_file(src, 'foo')
943 dst = os.path.join(tmp_dir, 'bar')
944 write_file(dst, 'bar')
945
946 # no xattr == no problem
947 shutil._copyxattr(src, dst)
948 # common case
949 os.setxattr(src, 'user.foo', b'42')
950 os.setxattr(src, 'user.bar', b'43')
951 shutil._copyxattr(src, dst)
952 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
953 self.assertEqual(
954 os.getxattr(src, 'user.foo'),
955 os.getxattr(dst, 'user.foo'))
956 # check errors don't affect other attrs
957 os.remove(dst)
958 write_file(dst, 'bar')
959 os_error = OSError(errno.EPERM, 'EPERM')
960
961 def _raise_on_user_foo(fname, attr, val, **kwargs):
962 if attr == 'user.foo':
963 raise os_error
964 else:
965 orig_setxattr(fname, attr, val, **kwargs)
966 try:
967 orig_setxattr = os.setxattr
968 os.setxattr = _raise_on_user_foo
969 shutil._copyxattr(src, dst)
970 self.assertIn('user.bar', os.listxattr(dst))
971 finally:
972 os.setxattr = orig_setxattr
973 # the source filesystem not supporting xattrs should be ok, too.
974 def _raise_on_src(fname, *, follow_symlinks=True):
975 if fname == src:
976 raise OSError(errno.ENOTSUP, 'Operation not supported')
977 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
978 try:
979 orig_listxattr = os.listxattr
980 os.listxattr = _raise_on_src
981 shutil._copyxattr(src, dst)
982 finally:
983 os.listxattr = orig_listxattr
984
985 # test that shutil.copystat copies xattrs
986 src = os.path.join(tmp_dir, 'the_original')
987 srcro = os.path.join(tmp_dir, 'the_original_ro')
988 write_file(src, src)
989 write_file(srcro, srcro)
990 os.setxattr(src, 'user.the_value', b'fiddly')
991 os.setxattr(srcro, 'user.the_value', b'fiddly')
992 os.chmod(srcro, 0o444)
993 dst = os.path.join(tmp_dir, 'the_copy')
994 dstro = os.path.join(tmp_dir, 'the_copy_ro')
995 write_file(dst, dst)
996 write_file(dstro, dstro)
997 shutil.copystat(src, dst)
998 shutil.copystat(srcro, dstro)
999 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1000 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1001
1002 @support.skip_unless_symlink
1003 @support.skip_unless_xattr
1004 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
1005 'root privileges required')
1006 def test_copyxattr_symlinks(self):
1007 # On Linux, it's only possible to access non-user xattr for symlinks;
1008 # which in turn require root privileges. This test should be expanded
1009 # as soon as other platforms gain support for extended attributes.
1010 tmp_dir = self.mkdtemp()
1011 src = os.path.join(tmp_dir, 'foo')
1012 src_link = os.path.join(tmp_dir, 'baz')
1013 write_file(src, 'foo')
1014 os.symlink(src, src_link)
1015 os.setxattr(src, 'trusted.foo', b'42')
1016 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1017 dst = os.path.join(tmp_dir, 'bar')
1018 dst_link = os.path.join(tmp_dir, 'qux')
1019 write_file(dst, 'bar')
1020 os.symlink(dst, dst_link)
1021 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1022 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1023 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1024 shutil._copyxattr(src_link, dst, follow_symlinks=False)
1025 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1026
1027 ### shutil.copy
1028
1029 def _copy_file(self, method):
1030 fname = 'test.txt'
1031 tmpdir = self.mkdtemp()
1032 write_file((tmpdir, fname), 'xxx')
1033 file1 = os.path.join(tmpdir, fname)
1034 tmpdir2 = self.mkdtemp()
1035 method(file1, tmpdir2)
1036 file2 = os.path.join(tmpdir2, fname)
1037 return (file1, file2)
1038
1039 def test_copy(self):
1040 # Ensure that the copied file exists and has the same mode bits.
1041 file1, file2 = self._copy_file(shutil.copy)
1042 self.assertTrue(os.path.exists(file2))
1043 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1044
1045 @support.skip_unless_symlink
1046 def test_copy_symlinks(self):
1047 tmp_dir = self.mkdtemp()
1048 src = os.path.join(tmp_dir, 'foo')
1049 dst = os.path.join(tmp_dir, 'bar')
1050 src_link = os.path.join(tmp_dir, 'baz')
1051 write_file(src, 'foo')
1052 os.symlink(src, src_link)
1053 if hasattr(os, 'lchmod'):
1054 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1055 # don't follow
1056 shutil.copy(src_link, dst, follow_symlinks=True)
1057 self.assertFalse(os.path.islink(dst))
1058 self.assertEqual(read_file(src), read_file(dst))
1059 os.remove(dst)
1060 # follow
1061 shutil.copy(src_link, dst, follow_symlinks=False)
1062 self.assertTrue(os.path.islink(dst))
1063 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1064 if hasattr(os, 'lchmod'):
1065 self.assertEqual(os.lstat(src_link).st_mode,
1066 os.lstat(dst).st_mode)
1067
1068 ### shutil.copy2
1069
1070 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1071 def test_copy2(self):
1072 # Ensure that the copied file exists and has the same mode and
1073 # modification time bits.
1074 file1, file2 = self._copy_file(shutil.copy2)
1075 self.assertTrue(os.path.exists(file2))
1076 file1_stat = os.stat(file1)
1077 file2_stat = os.stat(file2)
1078 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1079 for attr in 'st_atime', 'st_mtime':
1080 # The modification times may be truncated in the new file.
1081 self.assertLessEqual(getattr(file1_stat, attr),
1082 getattr(file2_stat, attr) + 1)
1083 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1084 self.assertEqual(getattr(file1_stat, 'st_flags'),
1085 getattr(file2_stat, 'st_flags'))
1086
1087 @support.skip_unless_symlink
1088 def test_copy2_symlinks(self):
1089 tmp_dir = self.mkdtemp()
1090 src = os.path.join(tmp_dir, 'foo')
1091 dst = os.path.join(tmp_dir, 'bar')
1092 src_link = os.path.join(tmp_dir, 'baz')
1093 write_file(src, 'foo')
1094 os.symlink(src, src_link)
1095 if hasattr(os, 'lchmod'):
1096 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1097 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1098 os.lchflags(src_link, stat.UF_NODUMP)
1099 src_stat = os.stat(src)
1100 src_link_stat = os.lstat(src_link)
1101 # follow
1102 shutil.copy2(src_link, dst, follow_symlinks=True)
1103 self.assertFalse(os.path.islink(dst))
1104 self.assertEqual(read_file(src), read_file(dst))
1105 os.remove(dst)
1106 # don't follow
1107 shutil.copy2(src_link, dst, follow_symlinks=False)
1108 self.assertTrue(os.path.islink(dst))
1109 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1110 dst_stat = os.lstat(dst)
1111 if os.utime in os.supports_follow_symlinks:
1112 for attr in 'st_atime', 'st_mtime':
1113 # The modification times may be truncated in the new file.
1114 self.assertLessEqual(getattr(src_link_stat, attr),
1115 getattr(dst_stat, attr) + 1)
1116 if hasattr(os, 'lchmod'):
1117 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1118 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1119 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1120 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1121
1122 @support.skip_unless_xattr
1123 def test_copy2_xattr(self):
1124 tmp_dir = self.mkdtemp()
1125 src = os.path.join(tmp_dir, 'foo')
1126 dst = os.path.join(tmp_dir, 'bar')
1127 write_file(src, 'foo')
1128 os.setxattr(src, 'user.foo', b'42')
1129 shutil.copy2(src, dst)
1130 self.assertEqual(
1131 os.getxattr(src, 'user.foo'),
1132 os.getxattr(dst, 'user.foo'))
1133 os.remove(dst)
1134
1135 def test_copy_return_value(self):
1136 # copy and copy2 both return their destination path.
1137 for fn in (shutil.copy, shutil.copy2):
1138 src_dir = self.mkdtemp()
1139 dst_dir = self.mkdtemp()
1140 src = os.path.join(src_dir, 'foo')
1141 write_file(src, 'foo')
1142 rv = fn(src, dst_dir)
1143 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1144 rv = fn(src, os.path.join(dst_dir, 'bar'))
1145 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1146
1147 ### shutil.copyfile
1148
1149 @support.skip_unless_symlink
1150 def test_copyfile_symlinks(self):
1151 tmp_dir = self.mkdtemp()
1152 src = os.path.join(tmp_dir, 'src')
1153 dst = os.path.join(tmp_dir, 'dst')
1154 dst_link = os.path.join(tmp_dir, 'dst_link')
1155 link = os.path.join(tmp_dir, 'link')
1156 write_file(src, 'foo')
1157 os.symlink(src, link)
1158 # don't follow
1159 shutil.copyfile(link, dst_link, follow_symlinks=False)
1160 self.assertTrue(os.path.islink(dst_link))
1161 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1162 # follow
1163 shutil.copyfile(link, dst)
1164 self.assertFalse(os.path.islink(dst))
1165
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001166 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001167 def test_dont_copy_file_onto_link_to_itself(self):
1168 # bug 851123.
1169 os.mkdir(TESTFN)
1170 src = os.path.join(TESTFN, 'cheese')
1171 dst = os.path.join(TESTFN, 'shop')
1172 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001173 with open(src, 'w') as f:
1174 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001175 try:
1176 os.link(src, dst)
1177 except PermissionError as e:
1178 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001179 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001180 with open(src, 'r') as f:
1181 self.assertEqual(f.read(), 'cheddar')
1182 os.remove(dst)
1183 finally:
1184 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001185
Brian Curtin3b4499c2010-12-28 14:31:47 +00001186 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001187 def test_dont_copy_file_onto_symlink_to_itself(self):
1188 # bug 851123.
1189 os.mkdir(TESTFN)
1190 src = os.path.join(TESTFN, 'cheese')
1191 dst = os.path.join(TESTFN, 'shop')
1192 try:
1193 with open(src, 'w') as f:
1194 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001195 # Using `src` here would mean we end up with a symlink pointing
1196 # to TESTFN/TESTFN/cheese, while it should point at
1197 # TESTFN/cheese.
1198 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001199 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001200 with open(src, 'r') as f:
1201 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001202 os.remove(dst)
1203 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001204 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001205
Serhiy Storchaka43767632013-11-03 21:31:38 +02001206 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1207 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1208 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001209 try:
1210 os.mkfifo(TESTFN)
1211 except PermissionError as e:
1212 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001213 try:
1214 self.assertRaises(shutil.SpecialFileError,
1215 shutil.copyfile, TESTFN, TESTFN2)
1216 self.assertRaises(shutil.SpecialFileError,
1217 shutil.copyfile, __file__, TESTFN)
1218 finally:
1219 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001220
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001221 def test_copyfile_return_value(self):
1222 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001223 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001224 dst_dir = self.mkdtemp()
1225 dst_file = os.path.join(dst_dir, 'bar')
1226 src_file = os.path.join(src_dir, 'foo')
1227 write_file(src_file, 'foo')
1228 rv = shutil.copyfile(src_file, dst_file)
1229 self.assertTrue(os.path.exists(rv))
1230 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001231
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001232 def test_copyfile_same_file(self):
1233 # copyfile() should raise SameFileError if the source and destination
1234 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001235 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001236 src_file = os.path.join(src_dir, 'foo')
1237 write_file(src_file, 'foo')
1238 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1239 # But Error should work too, to stay backward compatible.
1240 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1241 # Make sure file is not corrupted.
1242 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001243
Tarek Ziadéfb437512010-04-20 08:57:33 +00001244
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001245class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001246
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001247 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001248
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001249 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001250 def test_make_tarball(self):
1251 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001252 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001253
1254 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001255 # force shutil to create the directory
1256 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001257 # working with relative paths
1258 work_dir = os.path.dirname(tmpdir2)
1259 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001260
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001261 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001262 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001263 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001264
1265 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001266 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001267 self.assertTrue(os.path.isfile(tarball))
1268 self.assertTrue(tarfile.is_tarfile(tarball))
1269 with tarfile.open(tarball, 'r:gz') as tf:
1270 self.assertCountEqual(tf.getnames(),
1271 ['.', './sub', './sub2',
1272 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001273
1274 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001275 with support.change_cwd(work_dir):
1276 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001277 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001278 self.assertTrue(os.path.isfile(tarball))
1279 self.assertTrue(tarfile.is_tarfile(tarball))
1280 with tarfile.open(tarball, 'r') as tf:
1281 self.assertCountEqual(tf.getnames(),
1282 ['.', './sub', './sub2',
1283 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001284
1285 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001286 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001287 names = tar.getnames()
1288 names.sort()
1289 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001290
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001291 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001292 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001293 root_dir = self.mkdtemp()
1294 dist = os.path.join(root_dir, base_dir)
1295 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001296 write_file((dist, 'file1'), 'xxx')
1297 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001298 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001299 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001300 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001301 if base_dir:
1302 write_file((root_dir, 'outer'), 'xxx')
1303 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001304
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001305 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001306 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001307 'Need the tar command to run')
1308 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001309 root_dir, base_dir = self._create_files()
1310 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001311 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001312
1313 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001314 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001315 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001316
1317 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001318 tarball2 = os.path.join(root_dir, 'archive2.tar')
1319 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001320 subprocess.check_call(tar_cmd, cwd=root_dir,
1321 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001322
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001323 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001324 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001325 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001326
1327 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001328 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1329 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001330 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001331
1332 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001333 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1334 dry_run=True)
1335 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001336 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001337
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001338 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001339 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001340 # creating something to zip
1341 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001342
1343 tmpdir2 = self.mkdtemp()
1344 # force shutil to create the directory
1345 os.rmdir(tmpdir2)
1346 # working with relative paths
1347 work_dir = os.path.dirname(tmpdir2)
1348 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001349
1350 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001351 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001352 res = make_archive(rel_base_name, 'zip', root_dir)
1353
1354 self.assertEqual(res, base_name + '.zip')
1355 self.assertTrue(os.path.isfile(res))
1356 self.assertTrue(zipfile.is_zipfile(res))
1357 with zipfile.ZipFile(res) as zf:
1358 self.assertCountEqual(zf.namelist(),
1359 ['dist/', 'dist/sub/', 'dist/sub2/',
1360 'dist/file1', 'dist/file2', 'dist/sub/file3',
1361 'outer'])
1362
1363 with support.change_cwd(work_dir):
1364 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001365 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001366
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001367 self.assertEqual(res, base_name + '.zip')
1368 self.assertTrue(os.path.isfile(res))
1369 self.assertTrue(zipfile.is_zipfile(res))
1370 with zipfile.ZipFile(res) as zf:
1371 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001372 ['dist/', 'dist/sub/', 'dist/sub2/',
1373 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001374
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001375 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001376 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001377 'Need the zip command to run')
1378 def test_zipfile_vs_zip(self):
1379 root_dir, base_dir = self._create_files()
1380 base_name = os.path.join(self.mkdtemp(), 'archive')
1381 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1382
1383 # check if ZIP file was created
1384 self.assertEqual(archive, base_name + '.zip')
1385 self.assertTrue(os.path.isfile(archive))
1386
1387 # now create another ZIP file using `zip`
1388 archive2 = os.path.join(root_dir, 'archive2.zip')
1389 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001390 subprocess.check_call(zip_cmd, cwd=root_dir,
1391 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001392
1393 self.assertTrue(os.path.isfile(archive2))
1394 # let's compare both ZIP files
1395 with zipfile.ZipFile(archive) as zf:
1396 names = zf.namelist()
1397 with zipfile.ZipFile(archive2) as zf:
1398 names2 = zf.namelist()
1399 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001400
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001401 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001402 @unittest.skipUnless(shutil.which('unzip'),
1403 'Need the unzip command to run')
1404 def test_unzip_zipfile(self):
1405 root_dir, base_dir = self._create_files()
1406 base_name = os.path.join(self.mkdtemp(), 'archive')
1407 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1408
1409 # check if ZIP file was created
1410 self.assertEqual(archive, base_name + '.zip')
1411 self.assertTrue(os.path.isfile(archive))
1412
1413 # now check the ZIP file using `unzip -t`
1414 zip_cmd = ['unzip', '-t', archive]
1415 with support.change_cwd(root_dir):
1416 try:
1417 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1418 except subprocess.CalledProcessError as exc:
1419 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001420 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001421 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001422 msg = "{}\n\n**Unzip Output**\n{}"
1423 self.fail(msg.format(exc, details))
1424
Tarek Ziadé396fad72010-02-23 05:30:31 +00001425 def test_make_archive(self):
1426 tmpdir = self.mkdtemp()
1427 base_name = os.path.join(tmpdir, 'archive')
1428 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1429
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001430 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001431 def test_make_archive_owner_group(self):
1432 # testing make_archive with owner and group, with various combinations
1433 # this works even if there's not gid/uid support
1434 if UID_GID_SUPPORT:
1435 group = grp.getgrgid(0)[0]
1436 owner = pwd.getpwuid(0)[0]
1437 else:
1438 group = owner = 'root'
1439
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001440 root_dir, base_dir = self._create_files()
1441 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001442 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1443 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001444 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001445
1446 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001447 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001448
1449 res = make_archive(base_name, 'tar', root_dir, base_dir,
1450 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001451 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001452
1453 res = make_archive(base_name, 'tar', root_dir, base_dir,
1454 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001455 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001456
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001457
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001458 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001459 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1460 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001461 root_dir, base_dir = self._create_files()
1462 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001463 group = grp.getgrgid(0)[0]
1464 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001465 with support.change_cwd(root_dir):
1466 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1467 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001468
1469 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001470 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001471
1472 # now checks the rights
1473 archive = tarfile.open(archive_name)
1474 try:
1475 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001476 self.assertEqual(member.uid, 0)
1477 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001478 finally:
1479 archive.close()
1480
1481 def test_make_archive_cwd(self):
1482 current_dir = os.getcwd()
1483 def _breaks(*args, **kw):
1484 raise RuntimeError()
1485
1486 register_archive_format('xxx', _breaks, [], 'xxx file')
1487 try:
1488 try:
1489 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1490 except Exception:
1491 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001492 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001493 finally:
1494 unregister_archive_format('xxx')
1495
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001496 def test_make_tarfile_in_curdir(self):
1497 # Issue #21280
1498 root_dir = self.mkdtemp()
1499 with support.change_cwd(root_dir):
1500 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1501 self.assertTrue(os.path.isfile('test.tar'))
1502
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001503 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001504 def test_make_zipfile_in_curdir(self):
1505 # Issue #21280
1506 root_dir = self.mkdtemp()
1507 with support.change_cwd(root_dir):
1508 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1509 self.assertTrue(os.path.isfile('test.zip'))
1510
Tarek Ziadé396fad72010-02-23 05:30:31 +00001511 def test_register_archive_format(self):
1512
1513 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1514 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1515 1)
1516 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1517 [(1, 2), (1, 2, 3)])
1518
1519 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1520 formats = [name for name, params in get_archive_formats()]
1521 self.assertIn('xxx', formats)
1522
1523 unregister_archive_format('xxx')
1524 formats = [name for name, params in get_archive_formats()]
1525 self.assertNotIn('xxx', formats)
1526
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001527 ### shutil.unpack_archive
1528
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001529 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001530 self.check_unpack_archive_with_converter(format, lambda path: path)
1531 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001532 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001533
1534 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001535 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001536 expected = rlistdir(root_dir)
1537 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001538
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001539 base_name = os.path.join(self.mkdtemp(), 'archive')
1540 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001541
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001542 # let's try to unpack it now
1543 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001544 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001545 self.assertEqual(rlistdir(tmpdir2), expected)
1546
1547 # and again, this time with the format specified
1548 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001549 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001550 self.assertEqual(rlistdir(tmpdir3), expected)
1551
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001552 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1553 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001554
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001555 def test_unpack_archive_tar(self):
1556 self.check_unpack_archive('tar')
1557
1558 @support.requires_zlib
1559 def test_unpack_archive_gztar(self):
1560 self.check_unpack_archive('gztar')
1561
1562 @support.requires_bz2
1563 def test_unpack_archive_bztar(self):
1564 self.check_unpack_archive('bztar')
1565
1566 @support.requires_lzma
Michael Feltef110b12019-02-18 12:02:44 +01001567 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001568 def test_unpack_archive_xztar(self):
1569 self.check_unpack_archive('xztar')
1570
1571 @support.requires_zlib
1572 def test_unpack_archive_zip(self):
1573 self.check_unpack_archive('zip')
1574
Martin Pantereb995702016-07-28 01:11:04 +00001575 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001576
1577 formats = get_unpack_formats()
1578
1579 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001580 self.assertEqual(extra, 1)
1581 self.assertEqual(filename, 'stuff.boo')
1582 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001583
1584 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1585 unpack_archive('stuff.boo', 'xx')
1586
1587 # trying to register a .boo unpacker again
1588 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1589 ['.boo'], _boo)
1590
1591 # should work now
1592 unregister_unpack_format('Boo')
1593 register_unpack_format('Boo2', ['.boo'], _boo)
1594 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1595 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1596
1597 # let's leave a clean state
1598 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001599 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001600
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001601
1602class TestMisc(BaseTest, unittest.TestCase):
1603
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001604 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1605 "disk_usage not available on this platform")
1606 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001607 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001608 for attr in ('total', 'used', 'free'):
1609 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001610 self.assertGreater(usage.total, 0)
1611 self.assertGreater(usage.used, 0)
1612 self.assertGreaterEqual(usage.free, 0)
1613 self.assertGreaterEqual(usage.total, usage.used)
1614 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001615
Victor Stinnerdc525f42018-12-11 12:05:21 +01001616 # bpo-32557: Check that disk_usage() also accepts a filename
1617 shutil.disk_usage(__file__)
1618
Sandro Tosid902a142011-08-22 23:28:27 +02001619 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1620 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1621 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001622 dirname = self.mkdtemp()
1623 filename = tempfile.mktemp(dir=dirname)
1624 write_file(filename, 'testing chown function')
1625
1626 with self.assertRaises(ValueError):
1627 shutil.chown(filename)
1628
1629 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001630 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001631
1632 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001633 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001634
1635 with self.assertRaises(TypeError):
1636 shutil.chown(filename, b'spam')
1637
1638 with self.assertRaises(TypeError):
1639 shutil.chown(filename, 3.14)
1640
1641 uid = os.getuid()
1642 gid = os.getgid()
1643
1644 def check_chown(path, uid=None, gid=None):
1645 s = os.stat(filename)
1646 if uid is not None:
1647 self.assertEqual(uid, s.st_uid)
1648 if gid is not None:
1649 self.assertEqual(gid, s.st_gid)
1650
1651 shutil.chown(filename, uid, gid)
1652 check_chown(filename, uid, gid)
1653 shutil.chown(filename, uid)
1654 check_chown(filename, uid)
1655 shutil.chown(filename, user=uid)
1656 check_chown(filename, uid)
1657 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001658 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001659
1660 shutil.chown(dirname, uid, gid)
1661 check_chown(dirname, uid, gid)
1662 shutil.chown(dirname, uid)
1663 check_chown(dirname, uid)
1664 shutil.chown(dirname, user=uid)
1665 check_chown(dirname, uid)
1666 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001667 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001668
Matthias Braun52268942020-03-17 09:51:44 -07001669 try:
1670 user = pwd.getpwuid(uid)[0]
1671 group = grp.getgrgid(gid)[0]
1672 except KeyError:
1673 # On some systems uid/gid cannot be resolved.
1674 pass
1675 else:
1676 shutil.chown(filename, user, group)
1677 check_chown(filename, uid, gid)
1678 shutil.chown(dirname, user, group)
1679 check_chown(dirname, uid, gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001680
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001681
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001682class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001683
1684 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001685 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001686 # Give the temp_file an ".exe" suffix for all.
1687 # It's needed on Windows and not harmful on other platforms.
1688 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001689 prefix="Tmp",
1690 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001691 os.chmod(self.temp_file.name, stat.S_IXUSR)
1692 self.addCleanup(self.temp_file.close)
1693 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001694 self.env_path = self.dir
1695 self.curdir = os.curdir
1696 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001697
1698 def test_basic(self):
1699 # Given an EXE in a directory, it should be returned.
1700 rv = shutil.which(self.file, path=self.dir)
1701 self.assertEqual(rv, self.temp_file.name)
1702
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001703 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001704 # When given the fully qualified path to an executable that exists,
1705 # it should be returned.
1706 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001707 self.assertEqual(rv, self.temp_file.name)
1708
1709 def test_relative_cmd(self):
1710 # When given the relative path with a directory part to an executable
1711 # that exists, it should be returned.
1712 base_dir, tail_dir = os.path.split(self.dir)
1713 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001714 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001715 rv = shutil.which(relpath, path=self.temp_dir)
1716 self.assertEqual(rv, relpath)
1717 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001718 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001719 rv = shutil.which(relpath, path=base_dir)
1720 self.assertIsNone(rv)
1721
1722 def test_cwd(self):
1723 # Issue #16957
1724 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001725 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001726 rv = shutil.which(self.file, path=base_dir)
1727 if sys.platform == "win32":
1728 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001729 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001730 else:
1731 # Other platforms: shouldn't match in the current directory.
1732 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001733
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001734 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1735 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001736 def test_non_matching_mode(self):
1737 # Set the file read-only and ask for writeable files.
1738 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001739 if os.access(self.temp_file.name, os.W_OK):
1740 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001741 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1742 self.assertIsNone(rv)
1743
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001744 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001745 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001746 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001747 rv = shutil.which(self.file, path=tail_dir)
1748 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001749
Brian Curtinc57a3452012-06-22 16:00:30 -05001750 def test_nonexistent_file(self):
1751 # Return None when no matching executable file is found on the path.
1752 rv = shutil.which("foo.exe", path=self.dir)
1753 self.assertIsNone(rv)
1754
1755 @unittest.skipUnless(sys.platform == "win32",
1756 "pathext check is Windows-only")
1757 def test_pathext_checking(self):
1758 # Ask for the file without the ".exe" extension, then ensure that
1759 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001760 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001761 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001762
Barry Warsaw618738b2013-04-16 11:05:03 -04001763 def test_environ_path(self):
1764 with support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001765 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001766 rv = shutil.which(self.file)
1767 self.assertEqual(rv, self.temp_file.name)
1768
Victor Stinner228a3c92019-04-17 16:26:36 +02001769 def test_environ_path_empty(self):
1770 # PATH='': no match
1771 with support.EnvironmentVarGuard() as env:
1772 env['PATH'] = ''
1773 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1774 create=True), \
1775 support.swap_attr(os, 'defpath', self.dir), \
1776 support.change_cwd(self.dir):
1777 rv = shutil.which(self.file)
1778 self.assertIsNone(rv)
1779
1780 def test_environ_path_cwd(self):
1781 expected_cwd = os.path.basename(self.temp_file.name)
1782 if sys.platform == "win32":
1783 curdir = os.curdir
1784 if isinstance(expected_cwd, bytes):
1785 curdir = os.fsencode(curdir)
1786 expected_cwd = os.path.join(curdir, expected_cwd)
1787
1788 # PATH=':': explicitly looks in the current directory
1789 with support.EnvironmentVarGuard() as env:
1790 env['PATH'] = os.pathsep
1791 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1792 create=True), \
1793 support.swap_attr(os, 'defpath', self.dir):
1794 rv = shutil.which(self.file)
1795 self.assertIsNone(rv)
1796
1797 # look in current directory
1798 with support.change_cwd(self.dir):
1799 rv = shutil.which(self.file)
1800 self.assertEqual(rv, expected_cwd)
1801
1802 def test_environ_path_missing(self):
1803 with support.EnvironmentVarGuard() as env:
1804 env.pop('PATH', None)
1805
1806 # without confstr
1807 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1808 create=True), \
1809 support.swap_attr(os, 'defpath', self.dir):
1810 rv = shutil.which(self.file)
1811 self.assertEqual(rv, self.temp_file.name)
1812
1813 # with confstr
1814 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1815 create=True), \
1816 support.swap_attr(os, 'defpath', ''):
1817 rv = shutil.which(self.file)
1818 self.assertEqual(rv, self.temp_file.name)
1819
Barry Warsaw618738b2013-04-16 11:05:03 -04001820 def test_empty_path(self):
1821 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001822 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001823 support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001824 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001825 rv = shutil.which(self.file, path='')
1826 self.assertIsNone(rv)
1827
1828 def test_empty_path_no_PATH(self):
1829 with support.EnvironmentVarGuard() as env:
1830 env.pop('PATH', None)
1831 rv = shutil.which(self.file)
1832 self.assertIsNone(rv)
1833
Victor Stinner228a3c92019-04-17 16:26:36 +02001834 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1835 def test_pathext(self):
1836 ext = ".xyz"
1837 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1838 prefix="Tmp2", suffix=ext)
1839 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1840 self.addCleanup(temp_filexyz.close)
1841
1842 # strip path and extension
1843 program = os.path.basename(temp_filexyz.name)
1844 program = os.path.splitext(program)[0]
1845
1846 with support.EnvironmentVarGuard() as env:
1847 env['PATHEXT'] = ext
1848 rv = shutil.which(program, path=self.temp_dir)
1849 self.assertEqual(rv, temp_filexyz.name)
1850
Brian Curtinc57a3452012-06-22 16:00:30 -05001851
Cheryl Sabella5680f652019-02-13 06:25:10 -05001852class TestWhichBytes(TestWhich):
1853 def setUp(self):
1854 TestWhich.setUp(self)
1855 self.dir = os.fsencode(self.dir)
1856 self.file = os.fsencode(self.file)
1857 self.temp_file.name = os.fsencode(self.temp_file.name)
1858 self.curdir = os.fsencode(self.curdir)
1859 self.ext = os.fsencode(self.ext)
1860
1861
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001862class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001863
1864 def setUp(self):
1865 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001866 self.src_dir = self.mkdtemp()
1867 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001868 self.src_file = os.path.join(self.src_dir, filename)
1869 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001870 with open(self.src_file, "wb") as f:
1871 f.write(b"spam")
1872
Christian Heimesada8c3b2008-03-18 18:26:33 +00001873 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001874 with open(src, "rb") as f:
1875 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001876 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001877 with open(real_dst, "rb") as f:
1878 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001879 self.assertFalse(os.path.exists(src))
1880
1881 def _check_move_dir(self, src, dst, real_dst):
1882 contents = sorted(os.listdir(src))
1883 shutil.move(src, dst)
1884 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1885 self.assertFalse(os.path.exists(src))
1886
1887 def test_move_file(self):
1888 # Move a file to another location on the same filesystem.
1889 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1890
1891 def test_move_file_to_dir(self):
1892 # Move a file inside an existing dir on the same filesystem.
1893 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1894
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001895 def test_move_file_to_dir_pathlike_src(self):
1896 # Move a pathlike file to another location on the same filesystem.
1897 src = pathlib.Path(self.src_file)
1898 self._check_move_file(src, self.dst_dir, self.dst_file)
1899
1900 def test_move_file_to_dir_pathlike_dst(self):
1901 # Move a file to another pathlike location on the same filesystem.
1902 dst = pathlib.Path(self.dst_dir)
1903 self._check_move_file(self.src_file, dst, self.dst_file)
1904
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001905 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001906 def test_move_file_other_fs(self):
1907 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001908 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001909
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001910 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001911 def test_move_file_to_dir_other_fs(self):
1912 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001913 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001914
1915 def test_move_dir(self):
1916 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001917 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001918 try:
1919 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1920 finally:
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001921 support.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001922
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001923 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001924 def test_move_dir_other_fs(self):
1925 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001926 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001927
1928 def test_move_dir_to_dir(self):
1929 # Move a dir inside an existing dir on the same filesystem.
1930 self._check_move_dir(self.src_dir, self.dst_dir,
1931 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1932
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001933 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001934 def test_move_dir_to_dir_other_fs(self):
1935 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001936 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001937
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001938 def test_move_dir_sep_to_dir(self):
1939 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1940 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1941
1942 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1943 def test_move_dir_altsep_to_dir(self):
1944 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1945 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1946
Christian Heimesada8c3b2008-03-18 18:26:33 +00001947 def test_existing_file_inside_dest_dir(self):
1948 # A file with the same name inside the destination dir already exists.
1949 with open(self.dst_file, "wb"):
1950 pass
1951 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1952
1953 def test_dont_move_dir_in_itself(self):
1954 # Moving a dir inside itself raises an Error.
1955 dst = os.path.join(self.src_dir, "bar")
1956 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1957
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001958 def test_destinsrc_false_negative(self):
1959 os.mkdir(TESTFN)
1960 try:
1961 for src, dst in [('srcdir', 'srcdir/dest')]:
1962 src = os.path.join(TESTFN, src)
1963 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001964 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001965 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001966 'dst (%s) is not in src (%s)' % (dst, src))
1967 finally:
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001968 support.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001969
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001970 def test_destinsrc_false_positive(self):
1971 os.mkdir(TESTFN)
1972 try:
1973 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1974 src = os.path.join(TESTFN, src)
1975 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001976 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001977 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001978 'dst (%s) is in src (%s)' % (dst, src))
1979 finally:
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001980 support.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001981
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001982 @support.skip_unless_symlink
1983 @mock_rename
1984 def test_move_file_symlink(self):
1985 dst = os.path.join(self.src_dir, 'bar')
1986 os.symlink(self.src_file, dst)
1987 shutil.move(dst, self.dst_file)
1988 self.assertTrue(os.path.islink(self.dst_file))
1989 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1990
1991 @support.skip_unless_symlink
1992 @mock_rename
1993 def test_move_file_symlink_to_dir(self):
1994 filename = "bar"
1995 dst = os.path.join(self.src_dir, filename)
1996 os.symlink(self.src_file, dst)
1997 shutil.move(dst, self.dst_dir)
1998 final_link = os.path.join(self.dst_dir, filename)
1999 self.assertTrue(os.path.islink(final_link))
2000 self.assertTrue(os.path.samefile(self.src_file, final_link))
2001
2002 @support.skip_unless_symlink
2003 @mock_rename
2004 def test_move_dangling_symlink(self):
2005 src = os.path.join(self.src_dir, 'baz')
2006 dst = os.path.join(self.src_dir, 'bar')
2007 os.symlink(src, dst)
2008 dst_link = os.path.join(self.dst_dir, 'quux')
2009 shutil.move(dst, dst_link)
2010 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07002011 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002012
2013 @support.skip_unless_symlink
2014 @mock_rename
2015 def test_move_dir_symlink(self):
2016 src = os.path.join(self.src_dir, 'baz')
2017 dst = os.path.join(self.src_dir, 'bar')
2018 os.mkdir(src)
2019 os.symlink(src, dst)
2020 dst_link = os.path.join(self.dst_dir, 'quux')
2021 shutil.move(dst, dst_link)
2022 self.assertTrue(os.path.islink(dst_link))
2023 self.assertTrue(os.path.samefile(src, dst_link))
2024
Brian Curtin0d0a1de2012-06-18 18:41:07 -05002025 def test_move_return_value(self):
2026 rv = shutil.move(self.src_file, self.dst_dir)
2027 self.assertEqual(rv,
2028 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2029
2030 def test_move_as_rename_return_value(self):
2031 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2032 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2033
R David Murray6ffface2014-06-11 14:40:13 -04002034 @mock_rename
2035 def test_move_file_special_function(self):
2036 moved = []
2037 def _copy(src, dst):
2038 moved.append((src, dst))
2039 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2040 self.assertEqual(len(moved), 1)
2041
2042 @mock_rename
2043 def test_move_dir_special_function(self):
2044 moved = []
2045 def _copy(src, dst):
2046 moved.append((src, dst))
2047 support.create_empty_file(os.path.join(self.src_dir, 'child'))
2048 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
2049 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2050 self.assertEqual(len(moved), 3)
2051
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002052 def test_move_dir_caseinsensitive(self):
2053 # Renames a folder to the same name
2054 # but a different case.
2055
2056 self.src_dir = self.mkdtemp()
2057 dst_dir = os.path.join(
2058 os.path.dirname(self.src_dir),
2059 os.path.basename(self.src_dir).upper())
2060 self.assertNotEqual(self.src_dir, dst_dir)
2061
2062 try:
2063 shutil.move(self.src_dir, dst_dir)
2064 self.assertTrue(os.path.isdir(dst_dir))
2065 finally:
2066 os.rmdir(dst_dir)
2067
Tarek Ziadé5340db32010-04-19 22:30:51 +00002068
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002069class TestCopyFile(unittest.TestCase):
2070
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002071 class Faux(object):
2072 _entered = False
2073 _exited_with = None
2074 _raised = False
2075 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2076 self._raise_in_exit = raise_in_exit
2077 self._suppress_at_exit = suppress_at_exit
2078 def read(self, *args):
2079 return ''
2080 def __enter__(self):
2081 self._entered = True
2082 def __exit__(self, exc_type, exc_val, exc_tb):
2083 self._exited_with = exc_type, exc_val, exc_tb
2084 if self._raise_in_exit:
2085 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002086 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002087 return self._suppress_at_exit
2088
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002089 def test_w_source_open_fails(self):
2090 def _open(filename, mode='r'):
2091 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002092 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002093 assert 0 # shouldn't reach here.
2094
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002095 with support.swap_attr(shutil, 'open', _open):
2096 with self.assertRaises(OSError):
2097 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002098
Victor Stinner937ee9e2018-06-26 02:11:06 +02002099 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002100 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002101 srcfile = self.Faux()
2102
2103 def _open(filename, mode='r'):
2104 if filename == 'srcfile':
2105 return srcfile
2106 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002107 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002108 assert 0 # shouldn't reach here.
2109
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002110 with support.swap_attr(shutil, 'open', _open):
2111 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002112 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002113 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002114 self.assertEqual(srcfile._exited_with[1].args,
2115 ('Cannot open "destfile"',))
2116
Victor Stinner937ee9e2018-06-26 02:11:06 +02002117 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002118 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002119 srcfile = self.Faux()
2120 destfile = self.Faux(True)
2121
2122 def _open(filename, mode='r'):
2123 if filename == 'srcfile':
2124 return srcfile
2125 if filename == 'destfile':
2126 return destfile
2127 assert 0 # shouldn't reach here.
2128
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002129 with support.swap_attr(shutil, 'open', _open):
2130 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002131 self.assertTrue(srcfile._entered)
2132 self.assertTrue(destfile._entered)
2133 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002134 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002135 self.assertEqual(srcfile._exited_with[1].args,
2136 ('Cannot close',))
2137
Victor Stinner937ee9e2018-06-26 02:11:06 +02002138 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002139 def test_w_source_close_fails(self):
2140
2141 srcfile = self.Faux(True)
2142 destfile = self.Faux()
2143
2144 def _open(filename, mode='r'):
2145 if filename == 'srcfile':
2146 return srcfile
2147 if filename == 'destfile':
2148 return destfile
2149 assert 0 # shouldn't reach here.
2150
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002151 with support.swap_attr(shutil, 'open', _open):
2152 with self.assertRaises(OSError):
2153 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002154 self.assertTrue(srcfile._entered)
2155 self.assertTrue(destfile._entered)
2156 self.assertFalse(destfile._raised)
2157 self.assertTrue(srcfile._exited_with[0] is None)
2158 self.assertTrue(srcfile._raised)
2159
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002160
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002161class TestCopyFileObj(unittest.TestCase):
2162 FILESIZE = 2 * 1024 * 1024
2163
2164 @classmethod
2165 def setUpClass(cls):
2166 write_test_file(TESTFN, cls.FILESIZE)
2167
2168 @classmethod
2169 def tearDownClass(cls):
2170 support.unlink(TESTFN)
2171 support.unlink(TESTFN2)
2172
2173 def tearDown(self):
2174 support.unlink(TESTFN2)
2175
2176 @contextlib.contextmanager
2177 def get_files(self):
2178 with open(TESTFN, "rb") as src:
2179 with open(TESTFN2, "wb") as dst:
2180 yield (src, dst)
2181
2182 def assert_files_eq(self, src, dst):
2183 with open(src, 'rb') as fsrc:
2184 with open(dst, 'rb') as fdst:
2185 self.assertEqual(fsrc.read(), fdst.read())
2186
2187 def test_content(self):
2188 with self.get_files() as (src, dst):
2189 shutil.copyfileobj(src, dst)
2190 self.assert_files_eq(TESTFN, TESTFN2)
2191
2192 def test_file_not_closed(self):
2193 with self.get_files() as (src, dst):
2194 shutil.copyfileobj(src, dst)
2195 assert not src.closed
2196 assert not dst.closed
2197
2198 def test_file_offset(self):
2199 with self.get_files() as (src, dst):
2200 shutil.copyfileobj(src, dst)
2201 self.assertEqual(src.tell(), self.FILESIZE)
2202 self.assertEqual(dst.tell(), self.FILESIZE)
2203
2204 @unittest.skipIf(os.name != 'nt', "Windows only")
2205 def test_win_impl(self):
2206 # Make sure alternate Windows implementation is called.
2207 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2208 shutil.copyfile(TESTFN, TESTFN2)
2209 assert m.called
2210
2211 # File size is 2 MiB but max buf size should be 1 MiB.
2212 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2213
2214 # If file size < 1 MiB memoryview() length must be equal to
2215 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002216 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002217 f.write(b'foo')
2218 fname = f.name
2219 self.addCleanup(support.unlink, fname)
2220 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2221 shutil.copyfile(fname, TESTFN2)
2222 self.assertEqual(m.call_args[0][2], 3)
2223
2224 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002225 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002226 pass
2227 fname = f.name
2228 self.addCleanup(support.unlink, fname)
2229 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2230 shutil.copyfile(fname, TESTFN2)
2231 assert not m.called
2232 self.assert_files_eq(fname, TESTFN2)
2233
2234
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002235class _ZeroCopyFileTest(object):
2236 """Tests common to all zero-copy APIs."""
2237 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2238 FILEDATA = b""
2239 PATCHPOINT = ""
2240
2241 @classmethod
2242 def setUpClass(cls):
2243 write_test_file(TESTFN, cls.FILESIZE)
2244 with open(TESTFN, 'rb') as f:
2245 cls.FILEDATA = f.read()
2246 assert len(cls.FILEDATA) == cls.FILESIZE
2247
2248 @classmethod
2249 def tearDownClass(cls):
2250 support.unlink(TESTFN)
2251
2252 def tearDown(self):
2253 support.unlink(TESTFN2)
2254
2255 @contextlib.contextmanager
2256 def get_files(self):
2257 with open(TESTFN, "rb") as src:
2258 with open(TESTFN2, "wb") as dst:
2259 yield (src, dst)
2260
2261 def zerocopy_fun(self, *args, **kwargs):
2262 raise NotImplementedError("must be implemented in subclass")
2263
2264 def reset(self):
2265 self.tearDown()
2266 self.tearDownClass()
2267 self.setUpClass()
2268 self.setUp()
2269
2270 # ---
2271
2272 def test_regular_copy(self):
2273 with self.get_files() as (src, dst):
2274 self.zerocopy_fun(src, dst)
2275 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2276 # Make sure the fallback function is not called.
2277 with self.get_files() as (src, dst):
2278 with unittest.mock.patch('shutil.copyfileobj') as m:
2279 shutil.copyfile(TESTFN, TESTFN2)
2280 assert not m.called
2281
2282 def test_same_file(self):
2283 self.addCleanup(self.reset)
2284 with self.get_files() as (src, dst):
2285 with self.assertRaises(Exception):
2286 self.zerocopy_fun(src, src)
2287 # Make sure src file is not corrupted.
2288 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2289
2290 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002291 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002292 with self.assertRaises(FileNotFoundError) as cm:
2293 shutil.copyfile(name, "new")
2294 self.assertEqual(cm.exception.filename, name)
2295
2296 def test_empty_file(self):
2297 srcname = TESTFN + 'src'
2298 dstname = TESTFN + 'dst'
2299 self.addCleanup(lambda: support.unlink(srcname))
2300 self.addCleanup(lambda: support.unlink(dstname))
2301 with open(srcname, "wb"):
2302 pass
2303
2304 with open(srcname, "rb") as src:
2305 with open(dstname, "wb") as dst:
2306 self.zerocopy_fun(src, dst)
2307
2308 self.assertEqual(read_file(dstname, binary=True), b"")
2309
2310 def test_unhandled_exception(self):
2311 with unittest.mock.patch(self.PATCHPOINT,
2312 side_effect=ZeroDivisionError):
2313 self.assertRaises(ZeroDivisionError,
2314 shutil.copyfile, TESTFN, TESTFN2)
2315
2316 def test_exception_on_first_call(self):
2317 # Emulate a case where the first call to the zero-copy
2318 # function raises an exception in which case the function is
2319 # supposed to give up immediately.
2320 with unittest.mock.patch(self.PATCHPOINT,
2321 side_effect=OSError(errno.EINVAL, "yo")):
2322 with self.get_files() as (src, dst):
2323 with self.assertRaises(_GiveupOnFastCopy):
2324 self.zerocopy_fun(src, dst)
2325
2326 def test_filesystem_full(self):
2327 # Emulate a case where filesystem is full and sendfile() fails
2328 # on first call.
2329 with unittest.mock.patch(self.PATCHPOINT,
2330 side_effect=OSError(errno.ENOSPC, "yo")):
2331 with self.get_files() as (src, dst):
2332 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2333
2334
2335@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2336class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2337 PATCHPOINT = "os.sendfile"
2338
2339 def zerocopy_fun(self, fsrc, fdst):
2340 return shutil._fastcopy_sendfile(fsrc, fdst)
2341
2342 def test_non_regular_file_src(self):
2343 with io.BytesIO(self.FILEDATA) as src:
2344 with open(TESTFN2, "wb") as dst:
2345 with self.assertRaises(_GiveupOnFastCopy):
2346 self.zerocopy_fun(src, dst)
2347 shutil.copyfileobj(src, dst)
2348
2349 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2350
2351 def test_non_regular_file_dst(self):
2352 with open(TESTFN, "rb") as src:
2353 with io.BytesIO() as dst:
2354 with self.assertRaises(_GiveupOnFastCopy):
2355 self.zerocopy_fun(src, dst)
2356 shutil.copyfileobj(src, dst)
2357 dst.seek(0)
2358 self.assertEqual(dst.read(), self.FILEDATA)
2359
2360 def test_exception_on_second_call(self):
2361 def sendfile(*args, **kwargs):
2362 if not flag:
2363 flag.append(None)
2364 return orig_sendfile(*args, **kwargs)
2365 else:
2366 raise OSError(errno.EBADF, "yo")
2367
2368 flag = []
2369 orig_sendfile = os.sendfile
2370 with unittest.mock.patch('os.sendfile', create=True,
2371 side_effect=sendfile):
2372 with self.get_files() as (src, dst):
2373 with self.assertRaises(OSError) as cm:
2374 shutil._fastcopy_sendfile(src, dst)
2375 assert flag
2376 self.assertEqual(cm.exception.errno, errno.EBADF)
2377
2378 def test_cant_get_size(self):
2379 # Emulate a case where src file size cannot be determined.
2380 # Internally bufsize will be set to a small value and
2381 # sendfile() will be called repeatedly.
2382 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2383 with self.get_files() as (src, dst):
2384 shutil._fastcopy_sendfile(src, dst)
2385 assert m.called
2386 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2387
2388 def test_small_chunks(self):
2389 # Force internal file size detection to be smaller than the
2390 # actual file size. We want to force sendfile() to be called
2391 # multiple times, also in order to emulate a src fd which gets
2392 # bigger while it is being copied.
2393 mock = unittest.mock.Mock()
2394 mock.st_size = 65536 + 1
2395 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2396 with self.get_files() as (src, dst):
2397 shutil._fastcopy_sendfile(src, dst)
2398 assert m.called
2399 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2400
2401 def test_big_chunk(self):
2402 # Force internal file size detection to be +100MB bigger than
2403 # the actual file size. Make sure sendfile() does not rely on
2404 # file size value except for (maybe) a better throughput /
2405 # performance.
2406 mock = unittest.mock.Mock()
2407 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2408 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2409 with self.get_files() as (src, dst):
2410 shutil._fastcopy_sendfile(src, dst)
2411 assert m.called
2412 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2413
2414 def test_blocksize_arg(self):
2415 with unittest.mock.patch('os.sendfile',
2416 side_effect=ZeroDivisionError) as m:
2417 self.assertRaises(ZeroDivisionError,
2418 shutil.copyfile, TESTFN, TESTFN2)
2419 blocksize = m.call_args[0][3]
2420 # Make sure file size and the block size arg passed to
2421 # sendfile() are the same.
2422 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2423 # ...unless we're dealing with a small file.
2424 support.unlink(TESTFN2)
2425 write_file(TESTFN2, b"hello", binary=True)
2426 self.addCleanup(support.unlink, TESTFN2 + '3')
2427 self.assertRaises(ZeroDivisionError,
2428 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2429 blocksize = m.call_args[0][3]
2430 self.assertEqual(blocksize, 2 ** 23)
2431
2432 def test_file2file_not_supported(self):
2433 # Emulate a case where sendfile() only support file->socket
2434 # fds. In such a case copyfile() is supposed to skip the
2435 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002436 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002437 try:
2438 with unittest.mock.patch(
2439 self.PATCHPOINT,
2440 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2441 with self.get_files() as (src, dst):
2442 with self.assertRaises(_GiveupOnFastCopy):
2443 shutil._fastcopy_sendfile(src, dst)
2444 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002445 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002446
2447 with unittest.mock.patch(self.PATCHPOINT) as m:
2448 shutil.copyfile(TESTFN, TESTFN2)
2449 assert not m.called
2450 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002451 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002452
2453
Victor Stinner937ee9e2018-06-26 02:11:06 +02002454@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002455class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002456 PATCHPOINT = "posix._fcopyfile"
2457
2458 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002459 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002460
2461
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002462class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002463 def test_does_not_crash(self):
2464 """Check if get_terminal_size() returns a meaningful value.
2465
2466 There's no easy portable way to actually check the size of the
2467 terminal, so let's check if it returns something sensible instead.
2468 """
2469 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002470 self.assertGreaterEqual(size.columns, 0)
2471 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002472
2473 def test_os_environ_first(self):
2474 "Check if environment variables have precedence"
2475
2476 with support.EnvironmentVarGuard() as env:
2477 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002478 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002479 size = shutil.get_terminal_size()
2480 self.assertEqual(size.columns, 777)
2481
2482 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002483 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002484 env['LINES'] = '888'
2485 size = shutil.get_terminal_size()
2486 self.assertEqual(size.lines, 888)
2487
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002488 def test_bad_environ(self):
2489 with support.EnvironmentVarGuard() as env:
2490 env['COLUMNS'] = 'xxx'
2491 env['LINES'] = 'yyy'
2492 size = shutil.get_terminal_size()
2493 self.assertGreaterEqual(size.columns, 0)
2494 self.assertGreaterEqual(size.lines, 0)
2495
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002496 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002497 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2498 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002499 def test_stty_match(self):
2500 """Check if stty returns the same results ignoring env
2501
2502 This test will fail if stdin and stdout are connected to
2503 different terminals with different sizes. Nevertheless, such
2504 situations should be pretty rare.
2505 """
2506 try:
2507 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002508 except (FileNotFoundError, PermissionError,
2509 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002510 self.skipTest("stty invocation failed")
2511 expected = (int(size[1]), int(size[0])) # reversed order
2512
2513 with support.EnvironmentVarGuard() as env:
2514 del env['LINES']
2515 del env['COLUMNS']
2516 actual = shutil.get_terminal_size()
2517
2518 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002519
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002520 def test_fallback(self):
2521 with support.EnvironmentVarGuard() as env:
2522 del env['LINES']
2523 del env['COLUMNS']
2524
2525 # sys.__stdout__ has no fileno()
2526 with support.swap_attr(sys, '__stdout__', None):
2527 size = shutil.get_terminal_size(fallback=(10, 20))
2528 self.assertEqual(size.columns, 10)
2529 self.assertEqual(size.lines, 20)
2530
2531 # sys.__stdout__ is not a terminal on Unix
2532 # or fileno() not in (0, 1, 2) on Windows
2533 with open(os.devnull, 'w') as f, \
2534 support.swap_attr(sys, '__stdout__', f):
2535 size = shutil.get_terminal_size(fallback=(30, 40))
2536 self.assertEqual(size.columns, 30)
2537 self.assertEqual(size.lines, 40)
2538
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002539
Berker Peksag8083cd62014-11-01 11:04:06 +02002540class PublicAPITests(unittest.TestCase):
2541 """Ensures that the correct values are exposed in the public API."""
2542
2543 def test_module_all_attribute(self):
2544 self.assertTrue(hasattr(shutil, '__all__'))
2545 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2546 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2547 'SpecialFileError', 'ExecError', 'make_archive',
2548 'get_archive_formats', 'register_archive_format',
2549 'unregister_archive_format', 'get_unpack_formats',
2550 'register_unpack_format', 'unregister_unpack_format',
2551 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2552 'get_terminal_size', 'SameFileError']
2553 if hasattr(os, 'statvfs') or os.name == 'nt':
2554 target_api.append('disk_usage')
2555 self.assertEqual(set(shutil.__all__), set(target_api))
2556
2557
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002558if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002559 unittest.main()