blob: dd5589b2e3a5c92b1d9638032e420c2b70eb41ed [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010037AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000038try:
39 import grp
40 import pwd
41 UID_GID_SUPPORT = True
42except ImportError:
43 UID_GID_SUPPORT = False
44
Steve Dowerdf2d4a62019-08-21 15:27:33 -070045try:
46 import _winapi
47except ImportError:
48 _winapi = None
49
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040050def _fake_rename(*args, **kwargs):
51 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010052 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040053
54def mock_rename(func):
55 @functools.wraps(func)
56 def wrap(*args, **kwargs):
57 try:
58 builtin_rename = os.rename
59 os.rename = _fake_rename
60 return func(*args, **kwargs)
61 finally:
62 os.rename = builtin_rename
63 return wrap
64
Éric Araujoa7e33a12011-08-12 19:51:35 +020065def write_file(path, content, binary=False):
66 """Write *content* to a file located at *path*.
67
68 If *path* is a tuple instead of a string, os.path.join will be used to
69 make a path. If *binary* is true, the file will be opened in binary
70 mode.
71 """
72 if isinstance(path, tuple):
73 path = os.path.join(*path)
74 with open(path, 'wb' if binary else 'w') as fp:
75 fp.write(content)
76
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020077def write_test_file(path, size):
78 """Create a test file with an arbitrary size and random text content."""
79 def chunks(total, step):
80 assert total >= step
81 while total > step:
82 yield step
83 total -= step
84 if total:
85 yield total
86
87 bufsize = min(size, 8192)
88 chunk = b"".join([random.choice(string.ascii_letters).encode()
89 for i in range(bufsize)])
90 with open(path, 'wb') as f:
91 for csize in chunks(size, bufsize):
92 f.write(chunk)
93 assert os.path.getsize(path) == size
94
Éric Araujoa7e33a12011-08-12 19:51:35 +020095def read_file(path, binary=False):
96 """Return contents from a file located at *path*.
97
98 If *path* is a tuple instead of a string, os.path.join will be used to
99 make a path. If *binary* is true, the file will be opened in binary
100 mode.
101 """
102 if isinstance(path, tuple):
103 path = os.path.join(*path)
104 with open(path, 'rb' if binary else 'r') as fp:
105 return fp.read()
106
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300107def rlistdir(path):
108 res = []
109 for name in sorted(os.listdir(path)):
110 p = os.path.join(path, name)
111 if os.path.isdir(p) and not os.path.islink(p):
112 res.append(name + '/')
113 for n in rlistdir(p):
114 res.append(name + '/' + n)
115 else:
116 res.append(name)
117 return res
118
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200119def supports_file2file_sendfile():
120 # ...apparently Linux and Solaris are the only ones
121 if not hasattr(os, "sendfile"):
122 return False
123 srcname = None
124 dstname = None
125 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800126 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200127 srcname = f.name
128 f.write(b"0123456789")
129
130 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800131 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200132 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200133 infd = src.fileno()
134 outfd = dst.fileno()
135 try:
136 os.sendfile(outfd, infd, 0, 2)
137 except OSError:
138 return False
139 else:
140 return True
141 finally:
142 if srcname is not None:
143 support.unlink(srcname)
144 if dstname is not None:
145 support.unlink(dstname)
146
147
148SUPPORTS_SENDFILE = supports_file2file_sendfile()
149
Michael Feltef110b12019-02-18 12:02:44 +0100150# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
151# The AIX command 'dump -o program' gives XCOFF header information
152# The second word of the last line in the maxdata value
153# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
154def _maxdataOK():
155 if AIX and sys.maxsize == 2147483647:
156 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
157 maxdata=hdrs.split("\n")[-1].split()[1]
158 return int(maxdata,16) >= 0x20000000
159 else:
160 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200161
Tarek Ziadé396fad72010-02-23 05:30:31 +0000162
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300163class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000164
Steve Dowerabde52c2019-11-15 09:49:21 -0800165 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000166 """Create a temporary directory that will be cleaned up.
167
168 Returns the path of the directory.
169 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800170 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300171 self.addCleanup(support.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000172 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000173
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300174
175class TestRmTree(BaseTest, unittest.TestCase):
176
Hynek Schlawack3b527782012-06-25 13:27:31 +0200177 def test_rmtree_works_on_bytes(self):
178 tmp = self.mkdtemp()
179 victim = os.path.join(tmp, 'killme')
180 os.mkdir(victim)
181 write_file(os.path.join(victim, 'somefile'), 'foo')
182 victim = os.fsencode(victim)
183 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700184 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200185
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200186 @support.skip_unless_symlink
187 def test_rmtree_fails_on_symlink(self):
188 tmp = self.mkdtemp()
189 dir_ = os.path.join(tmp, 'dir')
190 os.mkdir(dir_)
191 link = os.path.join(tmp, 'link')
192 os.symlink(dir_, link)
193 self.assertRaises(OSError, shutil.rmtree, link)
194 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100195 self.assertTrue(os.path.lexists(link))
196 errors = []
197 def onerror(*args):
198 errors.append(args)
199 shutil.rmtree(link, onerror=onerror)
200 self.assertEqual(len(errors), 1)
201 self.assertIs(errors[0][0], os.path.islink)
202 self.assertEqual(errors[0][1], link)
203 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200204
205 @support.skip_unless_symlink
206 def test_rmtree_works_on_symlinks(self):
207 tmp = self.mkdtemp()
208 dir1 = os.path.join(tmp, 'dir1')
209 dir2 = os.path.join(dir1, 'dir2')
210 dir3 = os.path.join(tmp, 'dir3')
211 for d in dir1, dir2, dir3:
212 os.mkdir(d)
213 file1 = os.path.join(tmp, 'file1')
214 write_file(file1, 'foo')
215 link1 = os.path.join(dir1, 'link1')
216 os.symlink(dir2, link1)
217 link2 = os.path.join(dir1, 'link2')
218 os.symlink(dir3, link2)
219 link3 = os.path.join(dir1, 'link3')
220 os.symlink(file1, link3)
221 # make sure symlinks are removed but not followed
222 shutil.rmtree(dir1)
223 self.assertFalse(os.path.exists(dir1))
224 self.assertTrue(os.path.exists(dir3))
225 self.assertTrue(os.path.exists(file1))
226
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700227 @unittest.skipUnless(_winapi, 'only relevant on Windows')
228 def test_rmtree_fails_on_junctions(self):
229 tmp = self.mkdtemp()
230 dir_ = os.path.join(tmp, 'dir')
231 os.mkdir(dir_)
232 link = os.path.join(tmp, 'link')
233 _winapi.CreateJunction(dir_, link)
Steve Dowerabde52c2019-11-15 09:49:21 -0800234 self.addCleanup(support.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700235 self.assertRaises(OSError, shutil.rmtree, link)
236 self.assertTrue(os.path.exists(dir_))
237 self.assertTrue(os.path.lexists(link))
238 errors = []
239 def onerror(*args):
240 errors.append(args)
241 shutil.rmtree(link, onerror=onerror)
242 self.assertEqual(len(errors), 1)
243 self.assertIs(errors[0][0], os.path.islink)
244 self.assertEqual(errors[0][1], link)
245 self.assertIsInstance(errors[0][2][1], OSError)
246
247 @unittest.skipUnless(_winapi, 'only relevant on Windows')
248 def test_rmtree_works_on_junctions(self):
249 tmp = self.mkdtemp()
250 dir1 = os.path.join(tmp, 'dir1')
251 dir2 = os.path.join(dir1, 'dir2')
252 dir3 = os.path.join(tmp, 'dir3')
253 for d in dir1, dir2, dir3:
254 os.mkdir(d)
255 file1 = os.path.join(tmp, 'file1')
256 write_file(file1, 'foo')
257 link1 = os.path.join(dir1, 'link1')
258 _winapi.CreateJunction(dir2, link1)
259 link2 = os.path.join(dir1, 'link2')
260 _winapi.CreateJunction(dir3, link2)
261 link3 = os.path.join(dir1, 'link3')
262 _winapi.CreateJunction(file1, link3)
263 # make sure junctions are removed but not followed
264 shutil.rmtree(dir1)
265 self.assertFalse(os.path.exists(dir1))
266 self.assertTrue(os.path.exists(dir3))
267 self.assertTrue(os.path.exists(file1))
268
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000269 def test_rmtree_errors(self):
270 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800271 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100272 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
273 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100274 shutil.rmtree(filename, ignore_errors=True)
275
276 # existing file
277 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100278 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100279 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100280 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100281 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100282 # The reason for this rather odd construct is that Windows sprinkles
283 # a \*.* at the end of file names. But only sometimes on some buildbots
284 possible_args = [filename, os.path.join(filename, '*.*')]
285 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100286 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100287 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100288 shutil.rmtree(filename, ignore_errors=True)
289 self.assertTrue(os.path.exists(filename))
290 errors = []
291 def onerror(*args):
292 errors.append(args)
293 shutil.rmtree(filename, onerror=onerror)
294 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200295 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100296 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100297 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100298 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100299 self.assertIs(errors[1][0], os.rmdir)
300 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100301 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100302 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000303
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000304
Serhiy Storchaka43767632013-11-03 21:31:38 +0200305 @unittest.skipIf(sys.platform[:6] == 'cygwin',
306 "This test can't be run on Cygwin (issue #1071513).")
307 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
308 "This test can't be run reliably as root (issue #1076467).")
309 def test_on_error(self):
310 self.errorState = 0
311 os.mkdir(TESTFN)
312 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200313
Serhiy Storchaka43767632013-11-03 21:31:38 +0200314 self.child_file_path = os.path.join(TESTFN, 'a')
315 self.child_dir_path = os.path.join(TESTFN, 'b')
316 support.create_empty_file(self.child_file_path)
317 os.mkdir(self.child_dir_path)
318 old_dir_mode = os.stat(TESTFN).st_mode
319 old_child_file_mode = os.stat(self.child_file_path).st_mode
320 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
321 # Make unwritable.
322 new_mode = stat.S_IREAD|stat.S_IEXEC
323 os.chmod(self.child_file_path, new_mode)
324 os.chmod(self.child_dir_path, new_mode)
325 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
328 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
329 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200330
Serhiy Storchaka43767632013-11-03 21:31:38 +0200331 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
332 # Test whether onerror has actually been called.
333 self.assertEqual(self.errorState, 3,
334 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000335
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000336 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000337 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200338 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000339 # This function is run when shutil.rmtree fails.
340 # 99.9% of the time it initially fails to remove
341 # a file in the directory, so the first time through
342 # func is os.remove.
343 # However, some Linux machines running ZFS on
344 # FUSE experienced a failure earlier in the process
345 # at os.listdir. The first failure may legally
346 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200347 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200348 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200349 self.assertEqual(arg, self.child_file_path)
350 elif func is os.rmdir:
351 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000352 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200353 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200354 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000355 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200356 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000357 else:
358 self.assertEqual(func, os.rmdir)
359 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000360 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200361 self.errorState = 3
362
363 def test_rmtree_does_not_choke_on_failing_lstat(self):
364 try:
365 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200366 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200367 if fn != TESTFN:
368 raise OSError()
369 else:
370 return orig_lstat(fn)
371 os.lstat = raiser
372
373 os.mkdir(TESTFN)
374 write_file((TESTFN, 'foo'), 'foo')
375 shutil.rmtree(TESTFN)
376 finally:
377 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000378
Hynek Schlawack2100b422012-06-23 20:28:32 +0200379 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200380 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
381 os.supports_dir_fd and
382 os.listdir in os.supports_fd and
383 os.stat in os.supports_follow_symlinks)
384 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200385 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000386 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200387 tmp_dir = self.mkdtemp()
388 d = os.path.join(tmp_dir, 'a')
389 os.mkdir(d)
390 try:
391 real_rmtree = shutil._rmtree_safe_fd
392 class Called(Exception): pass
393 def _raiser(*args, **kwargs):
394 raise Called
395 shutil._rmtree_safe_fd = _raiser
396 self.assertRaises(Called, shutil.rmtree, d)
397 finally:
398 shutil._rmtree_safe_fd = real_rmtree
399 else:
400 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000401 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200402
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000403 def test_rmtree_dont_delete_file(self):
404 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800405 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200406 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200407 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000408 os.remove(path)
409
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300410 @support.skip_unless_symlink
411 def test_rmtree_on_symlink(self):
412 # bug 1669.
413 os.mkdir(TESTFN)
414 try:
415 src = os.path.join(TESTFN, 'cheese')
416 dst = os.path.join(TESTFN, 'shop')
417 os.mkdir(src)
418 os.symlink(src, dst)
419 self.assertRaises(OSError, shutil.rmtree, dst)
420 shutil.rmtree(dst, ignore_errors=True)
421 finally:
422 shutil.rmtree(TESTFN, ignore_errors=True)
423
424 @unittest.skipUnless(_winapi, 'only relevant on Windows')
425 def test_rmtree_on_junction(self):
426 os.mkdir(TESTFN)
427 try:
428 src = os.path.join(TESTFN, 'cheese')
429 dst = os.path.join(TESTFN, 'shop')
430 os.mkdir(src)
431 open(os.path.join(src, 'spam'), 'wb').close()
432 _winapi.CreateJunction(src, dst)
433 self.assertRaises(OSError, shutil.rmtree, dst)
434 shutil.rmtree(dst, ignore_errors=True)
435 finally:
436 shutil.rmtree(TESTFN, ignore_errors=True)
437
438
439class TestCopyTree(BaseTest, unittest.TestCase):
440
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000441 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800442 src_dir = self.mkdtemp()
443 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200444 self.addCleanup(shutil.rmtree, src_dir)
445 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
446 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000447 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200448 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000449
Éric Araujoa7e33a12011-08-12 19:51:35 +0200450 shutil.copytree(src_dir, dst_dir)
451 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
452 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
453 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
454 'test.txt')))
455 actual = read_file((dst_dir, 'test.txt'))
456 self.assertEqual(actual, '123')
457 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
458 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000459
jab9e00d9e2018-12-28 13:03:40 -0500460 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800461 src_dir = self.mkdtemp()
462 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500463 self.addCleanup(shutil.rmtree, src_dir)
464 self.addCleanup(shutil.rmtree, dst_dir)
465
466 write_file((src_dir, 'nonexisting.txt'), '123')
467 os.mkdir(os.path.join(src_dir, 'existing_dir'))
468 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
469 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
470 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
471
472 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
473 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
474 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
475 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
476 'existing.txt')))
477 actual = read_file((dst_dir, 'nonexisting.txt'))
478 self.assertEqual(actual, '123')
479 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
480 self.assertEqual(actual, 'has been replaced')
481
482 with self.assertRaises(FileExistsError):
483 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
484
Antoine Pitrou78091e62011-12-29 18:54:15 +0100485 @support.skip_unless_symlink
486 def test_copytree_symlinks(self):
487 tmp_dir = self.mkdtemp()
488 src_dir = os.path.join(tmp_dir, 'src')
489 dst_dir = os.path.join(tmp_dir, 'dst')
490 sub_dir = os.path.join(src_dir, 'sub')
491 os.mkdir(src_dir)
492 os.mkdir(sub_dir)
493 write_file((src_dir, 'file.txt'), 'foo')
494 src_link = os.path.join(sub_dir, 'link')
495 dst_link = os.path.join(dst_dir, 'sub/link')
496 os.symlink(os.path.join(src_dir, 'file.txt'),
497 src_link)
498 if hasattr(os, 'lchmod'):
499 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
500 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
501 os.lchflags(src_link, stat.UF_NODUMP)
502 src_stat = os.lstat(src_link)
503 shutil.copytree(src_dir, dst_dir, symlinks=True)
504 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700505 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
506 # Bad practice to blindly strip the prefix as it may be required to
507 # correctly refer to the file, but we're only comparing paths here.
508 if os.name == 'nt' and actual.startswith('\\\\?\\'):
509 actual = actual[4:]
510 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100511 dst_stat = os.lstat(dst_link)
512 if hasattr(os, 'lchmod'):
513 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
514 if hasattr(os, 'lchflags'):
515 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
516
Georg Brandl2ee470f2008-07-16 12:55:28 +0000517 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000518 # creating data
519 join = os.path.join
520 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800521 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000522 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800523 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200524 write_file((src_dir, 'test.txt'), '123')
525 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000526 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200527 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000528 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200529 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000530 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
531 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200532 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
533 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000534
535 # testing glob-like patterns
536 try:
537 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
538 shutil.copytree(src_dir, dst_dir, ignore=patterns)
539 # checking the result: some elements should not be copied
540 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200541 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
542 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000543 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200544 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000545 try:
546 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
547 shutil.copytree(src_dir, dst_dir, ignore=patterns)
548 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200549 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
550 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
551 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000552 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200553 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000554
555 # testing callable-style
556 try:
557 def _filter(src, names):
558 res = []
559 for name in names:
560 path = os.path.join(src, name)
561
562 if (os.path.isdir(path) and
563 path.split()[-1] == 'subdir'):
564 res.append(name)
565 elif os.path.splitext(path)[-1] in ('.py'):
566 res.append(name)
567 return res
568
569 shutil.copytree(src_dir, dst_dir, ignore=_filter)
570
571 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200572 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
573 'test.py')))
574 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000575
576 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200577 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000578 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000579 shutil.rmtree(src_dir)
580 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000581
Antoine Pitrouac601602013-08-16 19:35:02 +0200582 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800583 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200584 src_dir = os.path.join(tmp_dir, 'source')
585 os.mkdir(src_dir)
586 dst_dir = os.path.join(tmp_dir, 'destination')
587 self.addCleanup(shutil.rmtree, tmp_dir)
588
589 os.chmod(src_dir, 0o777)
590 write_file((src_dir, 'permissive.txt'), '123')
591 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
592 write_file((src_dir, 'restrictive.txt'), '456')
593 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
594 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Steve Dowerabde52c2019-11-15 09:49:21 -0800595 self.addCleanup(support.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200596 os.chmod(restrictive_subdir, 0o600)
597
598 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400599 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
600 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200601 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400602 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200603 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
604 restrictive_subdir_dst = os.path.join(dst_dir,
605 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400606 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200607 os.stat(restrictive_subdir_dst).st_mode)
608
Berker Peksag884afd92014-12-10 02:50:32 +0200609 @unittest.mock.patch('os.chmod')
610 def test_copytree_winerror(self, mock_patch):
611 # When copying to VFAT, copystat() raises OSError. On Windows, the
612 # exception object has a meaningful 'winerror' attribute, but not
613 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800614 src_dir = self.mkdtemp()
615 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200616 self.addCleanup(shutil.rmtree, src_dir)
617 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
618
619 mock_patch.side_effect = PermissionError('ka-boom')
620 with self.assertRaises(shutil.Error):
621 shutil.copytree(src_dir, dst_dir)
622
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100623 def test_copytree_custom_copy_function(self):
624 # See: https://bugs.python.org/issue35648
625 def custom_cpfun(a, b):
626 flag.append(None)
627 self.assertIsInstance(a, str)
628 self.assertIsInstance(b, str)
629 self.assertEqual(a, os.path.join(src, 'foo'))
630 self.assertEqual(b, os.path.join(dst, 'foo'))
631
632 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800633 src = self.mkdtemp()
634 dst = tempfile.mktemp(dir=self.mkdtemp())
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100635 with open(os.path.join(src, 'foo'), 'w') as f:
636 f.close()
637 shutil.copytree(src, dst, copy_function=custom_cpfun)
638 self.assertEqual(len(flag), 1)
639
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300640 # Issue #3002: copyfile and copytree block indefinitely on named pipes
641 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
642 @support.skip_unless_symlink
643 def test_copytree_named_pipe(self):
644 os.mkdir(TESTFN)
645 try:
646 subdir = os.path.join(TESTFN, "subdir")
647 os.mkdir(subdir)
648 pipe = os.path.join(subdir, "mypipe")
649 try:
650 os.mkfifo(pipe)
651 except PermissionError as e:
652 self.skipTest('os.mkfifo(): %s' % e)
653 try:
654 shutil.copytree(TESTFN, TESTFN2)
655 except shutil.Error as e:
656 errors = e.args[0]
657 self.assertEqual(len(errors), 1)
658 src, dst, error_msg = errors[0]
659 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
660 else:
661 self.fail("shutil.Error should have been raised")
662 finally:
663 shutil.rmtree(TESTFN, ignore_errors=True)
664 shutil.rmtree(TESTFN2, ignore_errors=True)
665
666 def test_copytree_special_func(self):
667 src_dir = self.mkdtemp()
668 dst_dir = os.path.join(self.mkdtemp(), 'destination')
669 write_file((src_dir, 'test.txt'), '123')
670 os.mkdir(os.path.join(src_dir, 'test_dir'))
671 write_file((src_dir, 'test_dir', 'test.txt'), '456')
672
673 copied = []
674 def _copy(src, dst):
675 copied.append((src, dst))
676
677 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
678 self.assertEqual(len(copied), 2)
679
680 @support.skip_unless_symlink
681 def test_copytree_dangling_symlinks(self):
682 # a dangling symlink raises an error at the end
683 src_dir = self.mkdtemp()
684 dst_dir = os.path.join(self.mkdtemp(), 'destination')
685 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
686 os.mkdir(os.path.join(src_dir, 'test_dir'))
687 write_file((src_dir, 'test_dir', 'test.txt'), '456')
688 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
689
690 # a dangling symlink is ignored with the proper flag
691 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
692 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
693 self.assertNotIn('test.txt', os.listdir(dst_dir))
694
695 # a dangling symlink is copied if symlinks=True
696 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
697 shutil.copytree(src_dir, dst_dir, symlinks=True)
698 self.assertIn('test.txt', os.listdir(dst_dir))
699
700 @support.skip_unless_symlink
701 def test_copytree_symlink_dir(self):
702 src_dir = self.mkdtemp()
703 dst_dir = os.path.join(self.mkdtemp(), 'destination')
704 os.mkdir(os.path.join(src_dir, 'real_dir'))
705 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
706 pass
707 os.symlink(os.path.join(src_dir, 'real_dir'),
708 os.path.join(src_dir, 'link_to_dir'),
709 target_is_directory=True)
710
711 shutil.copytree(src_dir, dst_dir, symlinks=False)
712 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
713 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
714
715 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
716 shutil.copytree(src_dir, dst_dir, symlinks=True)
717 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
718 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
719
720 def test_copytree_return_value(self):
721 # copytree returns its destination path.
722 src_dir = self.mkdtemp()
723 dst_dir = src_dir + "dest"
724 self.addCleanup(shutil.rmtree, dst_dir, True)
725 src = os.path.join(src_dir, 'foo')
726 write_file(src, 'foo')
727 rv = shutil.copytree(src_dir, dst_dir)
728 self.assertEqual(['foo'], os.listdir(rv))
729
730
731class TestCopy(BaseTest, unittest.TestCase):
732
733 ### shutil.copymode
734
735 @support.skip_unless_symlink
736 def test_copymode_follow_symlinks(self):
737 tmp_dir = self.mkdtemp()
738 src = os.path.join(tmp_dir, 'foo')
739 dst = os.path.join(tmp_dir, 'bar')
740 src_link = os.path.join(tmp_dir, 'baz')
741 dst_link = os.path.join(tmp_dir, 'quux')
742 write_file(src, 'foo')
743 write_file(dst, 'foo')
744 os.symlink(src, src_link)
745 os.symlink(dst, dst_link)
746 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
747 # file to file
748 os.chmod(dst, stat.S_IRWXO)
749 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
750 shutil.copymode(src, dst)
751 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
752 # On Windows, os.chmod does not follow symlinks (issue #15411)
753 if os.name != 'nt':
754 # follow src link
755 os.chmod(dst, stat.S_IRWXO)
756 shutil.copymode(src_link, dst)
757 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
758 # follow dst link
759 os.chmod(dst, stat.S_IRWXO)
760 shutil.copymode(src, dst_link)
761 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
762 # follow both links
763 os.chmod(dst, stat.S_IRWXO)
764 shutil.copymode(src_link, dst_link)
765 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
766
767 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
768 @support.skip_unless_symlink
769 def test_copymode_symlink_to_symlink(self):
770 tmp_dir = self.mkdtemp()
771 src = os.path.join(tmp_dir, 'foo')
772 dst = os.path.join(tmp_dir, 'bar')
773 src_link = os.path.join(tmp_dir, 'baz')
774 dst_link = os.path.join(tmp_dir, 'quux')
775 write_file(src, 'foo')
776 write_file(dst, 'foo')
777 os.symlink(src, src_link)
778 os.symlink(dst, dst_link)
779 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
780 os.chmod(dst, stat.S_IRWXU)
781 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
782 # link to link
783 os.lchmod(dst_link, stat.S_IRWXO)
784 shutil.copymode(src_link, dst_link, follow_symlinks=False)
785 self.assertEqual(os.lstat(src_link).st_mode,
786 os.lstat(dst_link).st_mode)
787 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
788 # src link - use chmod
789 os.lchmod(dst_link, stat.S_IRWXO)
790 shutil.copymode(src_link, dst, follow_symlinks=False)
791 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
792 # dst link - use chmod
793 os.lchmod(dst_link, stat.S_IRWXO)
794 shutil.copymode(src, dst_link, follow_symlinks=False)
795 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
796
797 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
798 @support.skip_unless_symlink
799 def test_copymode_symlink_to_symlink_wo_lchmod(self):
800 tmp_dir = self.mkdtemp()
801 src = os.path.join(tmp_dir, 'foo')
802 dst = os.path.join(tmp_dir, 'bar')
803 src_link = os.path.join(tmp_dir, 'baz')
804 dst_link = os.path.join(tmp_dir, 'quux')
805 write_file(src, 'foo')
806 write_file(dst, 'foo')
807 os.symlink(src, src_link)
808 os.symlink(dst, dst_link)
809 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
810
811 ### shutil.copystat
812
813 @support.skip_unless_symlink
814 def test_copystat_symlinks(self):
815 tmp_dir = self.mkdtemp()
816 src = os.path.join(tmp_dir, 'foo')
817 dst = os.path.join(tmp_dir, 'bar')
818 src_link = os.path.join(tmp_dir, 'baz')
819 dst_link = os.path.join(tmp_dir, 'qux')
820 write_file(src, 'foo')
821 src_stat = os.stat(src)
822 os.utime(src, (src_stat.st_atime,
823 src_stat.st_mtime - 42.0)) # ensure different mtimes
824 write_file(dst, 'bar')
825 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
826 os.symlink(src, src_link)
827 os.symlink(dst, dst_link)
828 if hasattr(os, 'lchmod'):
829 os.lchmod(src_link, stat.S_IRWXO)
830 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
831 os.lchflags(src_link, stat.UF_NODUMP)
832 src_link_stat = os.lstat(src_link)
833 # follow
834 if hasattr(os, 'lchmod'):
835 shutil.copystat(src_link, dst_link, follow_symlinks=True)
836 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
837 # don't follow
838 shutil.copystat(src_link, dst_link, follow_symlinks=False)
839 dst_link_stat = os.lstat(dst_link)
840 if os.utime in os.supports_follow_symlinks:
841 for attr in 'st_atime', 'st_mtime':
842 # The modification times may be truncated in the new file.
843 self.assertLessEqual(getattr(src_link_stat, attr),
844 getattr(dst_link_stat, attr) + 1)
845 if hasattr(os, 'lchmod'):
846 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
847 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
848 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
849 # tell to follow but dst is not a link
850 shutil.copystat(src_link, dst, follow_symlinks=False)
851 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
852 00000.1)
853
854 @unittest.skipUnless(hasattr(os, 'chflags') and
855 hasattr(errno, 'EOPNOTSUPP') and
856 hasattr(errno, 'ENOTSUP'),
857 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
858 def test_copystat_handles_harmless_chflags_errors(self):
859 tmpdir = self.mkdtemp()
860 file1 = os.path.join(tmpdir, 'file1')
861 file2 = os.path.join(tmpdir, 'file2')
862 write_file(file1, 'xxx')
863 write_file(file2, 'xxx')
864
865 def make_chflags_raiser(err):
866 ex = OSError()
867
868 def _chflags_raiser(path, flags, *, follow_symlinks=True):
869 ex.errno = err
870 raise ex
871 return _chflags_raiser
872 old_chflags = os.chflags
873 try:
874 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
875 os.chflags = make_chflags_raiser(err)
876 shutil.copystat(file1, file2)
877 # assert others errors break it
878 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
879 self.assertRaises(OSError, shutil.copystat, file1, file2)
880 finally:
881 os.chflags = old_chflags
882
883 ### shutil.copyxattr
884
885 @support.skip_unless_xattr
886 def test_copyxattr(self):
887 tmp_dir = self.mkdtemp()
888 src = os.path.join(tmp_dir, 'foo')
889 write_file(src, 'foo')
890 dst = os.path.join(tmp_dir, 'bar')
891 write_file(dst, 'bar')
892
893 # no xattr == no problem
894 shutil._copyxattr(src, dst)
895 # common case
896 os.setxattr(src, 'user.foo', b'42')
897 os.setxattr(src, 'user.bar', b'43')
898 shutil._copyxattr(src, dst)
899 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
900 self.assertEqual(
901 os.getxattr(src, 'user.foo'),
902 os.getxattr(dst, 'user.foo'))
903 # check errors don't affect other attrs
904 os.remove(dst)
905 write_file(dst, 'bar')
906 os_error = OSError(errno.EPERM, 'EPERM')
907
908 def _raise_on_user_foo(fname, attr, val, **kwargs):
909 if attr == 'user.foo':
910 raise os_error
911 else:
912 orig_setxattr(fname, attr, val, **kwargs)
913 try:
914 orig_setxattr = os.setxattr
915 os.setxattr = _raise_on_user_foo
916 shutil._copyxattr(src, dst)
917 self.assertIn('user.bar', os.listxattr(dst))
918 finally:
919 os.setxattr = orig_setxattr
920 # the source filesystem not supporting xattrs should be ok, too.
921 def _raise_on_src(fname, *, follow_symlinks=True):
922 if fname == src:
923 raise OSError(errno.ENOTSUP, 'Operation not supported')
924 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
925 try:
926 orig_listxattr = os.listxattr
927 os.listxattr = _raise_on_src
928 shutil._copyxattr(src, dst)
929 finally:
930 os.listxattr = orig_listxattr
931
932 # test that shutil.copystat copies xattrs
933 src = os.path.join(tmp_dir, 'the_original')
934 srcro = os.path.join(tmp_dir, 'the_original_ro')
935 write_file(src, src)
936 write_file(srcro, srcro)
937 os.setxattr(src, 'user.the_value', b'fiddly')
938 os.setxattr(srcro, 'user.the_value', b'fiddly')
939 os.chmod(srcro, 0o444)
940 dst = os.path.join(tmp_dir, 'the_copy')
941 dstro = os.path.join(tmp_dir, 'the_copy_ro')
942 write_file(dst, dst)
943 write_file(dstro, dstro)
944 shutil.copystat(src, dst)
945 shutil.copystat(srcro, dstro)
946 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
947 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
948
949 @support.skip_unless_symlink
950 @support.skip_unless_xattr
951 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
952 'root privileges required')
953 def test_copyxattr_symlinks(self):
954 # On Linux, it's only possible to access non-user xattr for symlinks;
955 # which in turn require root privileges. This test should be expanded
956 # as soon as other platforms gain support for extended attributes.
957 tmp_dir = self.mkdtemp()
958 src = os.path.join(tmp_dir, 'foo')
959 src_link = os.path.join(tmp_dir, 'baz')
960 write_file(src, 'foo')
961 os.symlink(src, src_link)
962 os.setxattr(src, 'trusted.foo', b'42')
963 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
964 dst = os.path.join(tmp_dir, 'bar')
965 dst_link = os.path.join(tmp_dir, 'qux')
966 write_file(dst, 'bar')
967 os.symlink(dst, dst_link)
968 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
969 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
970 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
971 shutil._copyxattr(src_link, dst, follow_symlinks=False)
972 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
973
974 ### shutil.copy
975
976 def _copy_file(self, method):
977 fname = 'test.txt'
978 tmpdir = self.mkdtemp()
979 write_file((tmpdir, fname), 'xxx')
980 file1 = os.path.join(tmpdir, fname)
981 tmpdir2 = self.mkdtemp()
982 method(file1, tmpdir2)
983 file2 = os.path.join(tmpdir2, fname)
984 return (file1, file2)
985
986 def test_copy(self):
987 # Ensure that the copied file exists and has the same mode bits.
988 file1, file2 = self._copy_file(shutil.copy)
989 self.assertTrue(os.path.exists(file2))
990 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
991
992 @support.skip_unless_symlink
993 def test_copy_symlinks(self):
994 tmp_dir = self.mkdtemp()
995 src = os.path.join(tmp_dir, 'foo')
996 dst = os.path.join(tmp_dir, 'bar')
997 src_link = os.path.join(tmp_dir, 'baz')
998 write_file(src, 'foo')
999 os.symlink(src, src_link)
1000 if hasattr(os, 'lchmod'):
1001 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1002 # don't follow
1003 shutil.copy(src_link, dst, follow_symlinks=True)
1004 self.assertFalse(os.path.islink(dst))
1005 self.assertEqual(read_file(src), read_file(dst))
1006 os.remove(dst)
1007 # follow
1008 shutil.copy(src_link, dst, follow_symlinks=False)
1009 self.assertTrue(os.path.islink(dst))
1010 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1011 if hasattr(os, 'lchmod'):
1012 self.assertEqual(os.lstat(src_link).st_mode,
1013 os.lstat(dst).st_mode)
1014
1015 ### shutil.copy2
1016
1017 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1018 def test_copy2(self):
1019 # Ensure that the copied file exists and has the same mode and
1020 # modification time bits.
1021 file1, file2 = self._copy_file(shutil.copy2)
1022 self.assertTrue(os.path.exists(file2))
1023 file1_stat = os.stat(file1)
1024 file2_stat = os.stat(file2)
1025 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1026 for attr in 'st_atime', 'st_mtime':
1027 # The modification times may be truncated in the new file.
1028 self.assertLessEqual(getattr(file1_stat, attr),
1029 getattr(file2_stat, attr) + 1)
1030 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1031 self.assertEqual(getattr(file1_stat, 'st_flags'),
1032 getattr(file2_stat, 'st_flags'))
1033
1034 @support.skip_unless_symlink
1035 def test_copy2_symlinks(self):
1036 tmp_dir = self.mkdtemp()
1037 src = os.path.join(tmp_dir, 'foo')
1038 dst = os.path.join(tmp_dir, 'bar')
1039 src_link = os.path.join(tmp_dir, 'baz')
1040 write_file(src, 'foo')
1041 os.symlink(src, src_link)
1042 if hasattr(os, 'lchmod'):
1043 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1044 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1045 os.lchflags(src_link, stat.UF_NODUMP)
1046 src_stat = os.stat(src)
1047 src_link_stat = os.lstat(src_link)
1048 # follow
1049 shutil.copy2(src_link, dst, follow_symlinks=True)
1050 self.assertFalse(os.path.islink(dst))
1051 self.assertEqual(read_file(src), read_file(dst))
1052 os.remove(dst)
1053 # don't follow
1054 shutil.copy2(src_link, dst, follow_symlinks=False)
1055 self.assertTrue(os.path.islink(dst))
1056 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1057 dst_stat = os.lstat(dst)
1058 if os.utime in os.supports_follow_symlinks:
1059 for attr in 'st_atime', 'st_mtime':
1060 # The modification times may be truncated in the new file.
1061 self.assertLessEqual(getattr(src_link_stat, attr),
1062 getattr(dst_stat, attr) + 1)
1063 if hasattr(os, 'lchmod'):
1064 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1065 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1066 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1067 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1068
1069 @support.skip_unless_xattr
1070 def test_copy2_xattr(self):
1071 tmp_dir = self.mkdtemp()
1072 src = os.path.join(tmp_dir, 'foo')
1073 dst = os.path.join(tmp_dir, 'bar')
1074 write_file(src, 'foo')
1075 os.setxattr(src, 'user.foo', b'42')
1076 shutil.copy2(src, dst)
1077 self.assertEqual(
1078 os.getxattr(src, 'user.foo'),
1079 os.getxattr(dst, 'user.foo'))
1080 os.remove(dst)
1081
1082 def test_copy_return_value(self):
1083 # copy and copy2 both return their destination path.
1084 for fn in (shutil.copy, shutil.copy2):
1085 src_dir = self.mkdtemp()
1086 dst_dir = self.mkdtemp()
1087 src = os.path.join(src_dir, 'foo')
1088 write_file(src, 'foo')
1089 rv = fn(src, dst_dir)
1090 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1091 rv = fn(src, os.path.join(dst_dir, 'bar'))
1092 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1093
1094 ### shutil.copyfile
1095
1096 @support.skip_unless_symlink
1097 def test_copyfile_symlinks(self):
1098 tmp_dir = self.mkdtemp()
1099 src = os.path.join(tmp_dir, 'src')
1100 dst = os.path.join(tmp_dir, 'dst')
1101 dst_link = os.path.join(tmp_dir, 'dst_link')
1102 link = os.path.join(tmp_dir, 'link')
1103 write_file(src, 'foo')
1104 os.symlink(src, link)
1105 # don't follow
1106 shutil.copyfile(link, dst_link, follow_symlinks=False)
1107 self.assertTrue(os.path.islink(dst_link))
1108 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1109 # follow
1110 shutil.copyfile(link, dst)
1111 self.assertFalse(os.path.islink(dst))
1112
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001113 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001114 def test_dont_copy_file_onto_link_to_itself(self):
1115 # bug 851123.
1116 os.mkdir(TESTFN)
1117 src = os.path.join(TESTFN, 'cheese')
1118 dst = os.path.join(TESTFN, 'shop')
1119 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001120 with open(src, 'w') as f:
1121 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001122 try:
1123 os.link(src, dst)
1124 except PermissionError as e:
1125 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001126 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001127 with open(src, 'r') as f:
1128 self.assertEqual(f.read(), 'cheddar')
1129 os.remove(dst)
1130 finally:
1131 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001132
Brian Curtin3b4499c2010-12-28 14:31:47 +00001133 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001134 def test_dont_copy_file_onto_symlink_to_itself(self):
1135 # bug 851123.
1136 os.mkdir(TESTFN)
1137 src = os.path.join(TESTFN, 'cheese')
1138 dst = os.path.join(TESTFN, 'shop')
1139 try:
1140 with open(src, 'w') as f:
1141 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001142 # Using `src` here would mean we end up with a symlink pointing
1143 # to TESTFN/TESTFN/cheese, while it should point at
1144 # TESTFN/cheese.
1145 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001146 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001147 with open(src, 'r') as f:
1148 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001149 os.remove(dst)
1150 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001151 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001152
Serhiy Storchaka43767632013-11-03 21:31:38 +02001153 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1154 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1155 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001156 try:
1157 os.mkfifo(TESTFN)
1158 except PermissionError as e:
1159 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001160 try:
1161 self.assertRaises(shutil.SpecialFileError,
1162 shutil.copyfile, TESTFN, TESTFN2)
1163 self.assertRaises(shutil.SpecialFileError,
1164 shutil.copyfile, __file__, TESTFN)
1165 finally:
1166 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001167
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001168 def test_copyfile_return_value(self):
1169 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001170 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001171 dst_dir = self.mkdtemp()
1172 dst_file = os.path.join(dst_dir, 'bar')
1173 src_file = os.path.join(src_dir, 'foo')
1174 write_file(src_file, 'foo')
1175 rv = shutil.copyfile(src_file, dst_file)
1176 self.assertTrue(os.path.exists(rv))
1177 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001178
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001179 def test_copyfile_same_file(self):
1180 # copyfile() should raise SameFileError if the source and destination
1181 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001182 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001183 src_file = os.path.join(src_dir, 'foo')
1184 write_file(src_file, 'foo')
1185 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1186 # But Error should work too, to stay backward compatible.
1187 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1188 # Make sure file is not corrupted.
1189 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001190
Tarek Ziadéfb437512010-04-20 08:57:33 +00001191
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001192class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001193
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001194 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001195
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001196 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001197 def test_make_tarball(self):
1198 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001199 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001200
1201 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001202 # force shutil to create the directory
1203 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001204 # working with relative paths
1205 work_dir = os.path.dirname(tmpdir2)
1206 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001207
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001208 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001209 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001210 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001211
1212 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001213 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001214 self.assertTrue(os.path.isfile(tarball))
1215 self.assertTrue(tarfile.is_tarfile(tarball))
1216 with tarfile.open(tarball, 'r:gz') as tf:
1217 self.assertCountEqual(tf.getnames(),
1218 ['.', './sub', './sub2',
1219 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001220
1221 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001222 with support.change_cwd(work_dir):
1223 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001224 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001225 self.assertTrue(os.path.isfile(tarball))
1226 self.assertTrue(tarfile.is_tarfile(tarball))
1227 with tarfile.open(tarball, 'r') as tf:
1228 self.assertCountEqual(tf.getnames(),
1229 ['.', './sub', './sub2',
1230 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001231
1232 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001233 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001234 names = tar.getnames()
1235 names.sort()
1236 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001237
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001238 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001239 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001240 root_dir = self.mkdtemp()
1241 dist = os.path.join(root_dir, base_dir)
1242 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001243 write_file((dist, 'file1'), 'xxx')
1244 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001245 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001246 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001247 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001248 if base_dir:
1249 write_file((root_dir, 'outer'), 'xxx')
1250 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001251
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001252 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001253 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001254 'Need the tar command to run')
1255 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001256 root_dir, base_dir = self._create_files()
1257 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001258 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001259
1260 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001261 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001262 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001263
1264 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001265 tarball2 = os.path.join(root_dir, 'archive2.tar')
1266 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001267 subprocess.check_call(tar_cmd, cwd=root_dir,
1268 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001269
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001270 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001271 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001272 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001273
1274 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001275 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1276 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001277 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001278
1279 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001280 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1281 dry_run=True)
1282 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001283 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001284
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001285 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001286 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001287 # creating something to zip
1288 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001289
1290 tmpdir2 = self.mkdtemp()
1291 # force shutil to create the directory
1292 os.rmdir(tmpdir2)
1293 # working with relative paths
1294 work_dir = os.path.dirname(tmpdir2)
1295 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001296
1297 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001298 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001299 res = make_archive(rel_base_name, 'zip', root_dir)
1300
1301 self.assertEqual(res, base_name + '.zip')
1302 self.assertTrue(os.path.isfile(res))
1303 self.assertTrue(zipfile.is_zipfile(res))
1304 with zipfile.ZipFile(res) as zf:
1305 self.assertCountEqual(zf.namelist(),
1306 ['dist/', 'dist/sub/', 'dist/sub2/',
1307 'dist/file1', 'dist/file2', 'dist/sub/file3',
1308 'outer'])
1309
1310 with support.change_cwd(work_dir):
1311 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001312 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001313
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001314 self.assertEqual(res, base_name + '.zip')
1315 self.assertTrue(os.path.isfile(res))
1316 self.assertTrue(zipfile.is_zipfile(res))
1317 with zipfile.ZipFile(res) as zf:
1318 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001319 ['dist/', 'dist/sub/', 'dist/sub2/',
1320 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001321
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001322 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001323 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001324 'Need the zip command to run')
1325 def test_zipfile_vs_zip(self):
1326 root_dir, base_dir = self._create_files()
1327 base_name = os.path.join(self.mkdtemp(), 'archive')
1328 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1329
1330 # check if ZIP file was created
1331 self.assertEqual(archive, base_name + '.zip')
1332 self.assertTrue(os.path.isfile(archive))
1333
1334 # now create another ZIP file using `zip`
1335 archive2 = os.path.join(root_dir, 'archive2.zip')
1336 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001337 subprocess.check_call(zip_cmd, cwd=root_dir,
1338 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001339
1340 self.assertTrue(os.path.isfile(archive2))
1341 # let's compare both ZIP files
1342 with zipfile.ZipFile(archive) as zf:
1343 names = zf.namelist()
1344 with zipfile.ZipFile(archive2) as zf:
1345 names2 = zf.namelist()
1346 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001347
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001348 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001349 @unittest.skipUnless(shutil.which('unzip'),
1350 'Need the unzip command to run')
1351 def test_unzip_zipfile(self):
1352 root_dir, base_dir = self._create_files()
1353 base_name = os.path.join(self.mkdtemp(), 'archive')
1354 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1355
1356 # check if ZIP file was created
1357 self.assertEqual(archive, base_name + '.zip')
1358 self.assertTrue(os.path.isfile(archive))
1359
1360 # now check the ZIP file using `unzip -t`
1361 zip_cmd = ['unzip', '-t', archive]
1362 with support.change_cwd(root_dir):
1363 try:
1364 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1365 except subprocess.CalledProcessError as exc:
1366 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001367 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001368 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001369 msg = "{}\n\n**Unzip Output**\n{}"
1370 self.fail(msg.format(exc, details))
1371
Tarek Ziadé396fad72010-02-23 05:30:31 +00001372 def test_make_archive(self):
1373 tmpdir = self.mkdtemp()
1374 base_name = os.path.join(tmpdir, 'archive')
1375 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1376
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001377 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001378 def test_make_archive_owner_group(self):
1379 # testing make_archive with owner and group, with various combinations
1380 # this works even if there's not gid/uid support
1381 if UID_GID_SUPPORT:
1382 group = grp.getgrgid(0)[0]
1383 owner = pwd.getpwuid(0)[0]
1384 else:
1385 group = owner = 'root'
1386
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001387 root_dir, base_dir = self._create_files()
1388 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001389 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1390 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001391 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001392
1393 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001394 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001395
1396 res = make_archive(base_name, 'tar', root_dir, base_dir,
1397 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001398 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001399
1400 res = make_archive(base_name, 'tar', root_dir, base_dir,
1401 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001402 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001403
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001404
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001405 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001406 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1407 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001408 root_dir, base_dir = self._create_files()
1409 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001410 group = grp.getgrgid(0)[0]
1411 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001412 with support.change_cwd(root_dir):
1413 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1414 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001415
1416 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001417 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001418
1419 # now checks the rights
1420 archive = tarfile.open(archive_name)
1421 try:
1422 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001423 self.assertEqual(member.uid, 0)
1424 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001425 finally:
1426 archive.close()
1427
1428 def test_make_archive_cwd(self):
1429 current_dir = os.getcwd()
1430 def _breaks(*args, **kw):
1431 raise RuntimeError()
1432
1433 register_archive_format('xxx', _breaks, [], 'xxx file')
1434 try:
1435 try:
1436 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1437 except Exception:
1438 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001439 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001440 finally:
1441 unregister_archive_format('xxx')
1442
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001443 def test_make_tarfile_in_curdir(self):
1444 # Issue #21280
1445 root_dir = self.mkdtemp()
1446 with support.change_cwd(root_dir):
1447 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1448 self.assertTrue(os.path.isfile('test.tar'))
1449
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001450 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001451 def test_make_zipfile_in_curdir(self):
1452 # Issue #21280
1453 root_dir = self.mkdtemp()
1454 with support.change_cwd(root_dir):
1455 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1456 self.assertTrue(os.path.isfile('test.zip'))
1457
Tarek Ziadé396fad72010-02-23 05:30:31 +00001458 def test_register_archive_format(self):
1459
1460 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1461 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1462 1)
1463 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1464 [(1, 2), (1, 2, 3)])
1465
1466 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1467 formats = [name for name, params in get_archive_formats()]
1468 self.assertIn('xxx', formats)
1469
1470 unregister_archive_format('xxx')
1471 formats = [name for name, params in get_archive_formats()]
1472 self.assertNotIn('xxx', formats)
1473
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001474 ### shutil.unpack_archive
1475
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001476 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001477 self.check_unpack_archive_with_converter(format, lambda path: path)
1478 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001479 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001480
1481 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001482 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001483 expected = rlistdir(root_dir)
1484 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001485
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001486 base_name = os.path.join(self.mkdtemp(), 'archive')
1487 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001488
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001489 # let's try to unpack it now
1490 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001491 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001492 self.assertEqual(rlistdir(tmpdir2), expected)
1493
1494 # and again, this time with the format specified
1495 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001496 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001497 self.assertEqual(rlistdir(tmpdir3), expected)
1498
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001499 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1500 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001501
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001502 def test_unpack_archive_tar(self):
1503 self.check_unpack_archive('tar')
1504
1505 @support.requires_zlib
1506 def test_unpack_archive_gztar(self):
1507 self.check_unpack_archive('gztar')
1508
1509 @support.requires_bz2
1510 def test_unpack_archive_bztar(self):
1511 self.check_unpack_archive('bztar')
1512
1513 @support.requires_lzma
Michael Feltef110b12019-02-18 12:02:44 +01001514 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001515 def test_unpack_archive_xztar(self):
1516 self.check_unpack_archive('xztar')
1517
1518 @support.requires_zlib
1519 def test_unpack_archive_zip(self):
1520 self.check_unpack_archive('zip')
1521
Martin Pantereb995702016-07-28 01:11:04 +00001522 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001523
1524 formats = get_unpack_formats()
1525
1526 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001527 self.assertEqual(extra, 1)
1528 self.assertEqual(filename, 'stuff.boo')
1529 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001530
1531 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1532 unpack_archive('stuff.boo', 'xx')
1533
1534 # trying to register a .boo unpacker again
1535 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1536 ['.boo'], _boo)
1537
1538 # should work now
1539 unregister_unpack_format('Boo')
1540 register_unpack_format('Boo2', ['.boo'], _boo)
1541 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1542 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1543
1544 # let's leave a clean state
1545 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001546 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001547
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001548
1549class TestMisc(BaseTest, unittest.TestCase):
1550
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001551 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1552 "disk_usage not available on this platform")
1553 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001554 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001555 for attr in ('total', 'used', 'free'):
1556 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001557 self.assertGreater(usage.total, 0)
1558 self.assertGreater(usage.used, 0)
1559 self.assertGreaterEqual(usage.free, 0)
1560 self.assertGreaterEqual(usage.total, usage.used)
1561 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001562
Victor Stinnerdc525f42018-12-11 12:05:21 +01001563 # bpo-32557: Check that disk_usage() also accepts a filename
1564 shutil.disk_usage(__file__)
1565
Sandro Tosid902a142011-08-22 23:28:27 +02001566 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1567 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1568 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001569 dirname = self.mkdtemp()
1570 filename = tempfile.mktemp(dir=dirname)
1571 write_file(filename, 'testing chown function')
1572
1573 with self.assertRaises(ValueError):
1574 shutil.chown(filename)
1575
1576 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001577 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001578
1579 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001580 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001581
1582 with self.assertRaises(TypeError):
1583 shutil.chown(filename, b'spam')
1584
1585 with self.assertRaises(TypeError):
1586 shutil.chown(filename, 3.14)
1587
1588 uid = os.getuid()
1589 gid = os.getgid()
1590
1591 def check_chown(path, uid=None, gid=None):
1592 s = os.stat(filename)
1593 if uid is not None:
1594 self.assertEqual(uid, s.st_uid)
1595 if gid is not None:
1596 self.assertEqual(gid, s.st_gid)
1597
1598 shutil.chown(filename, uid, gid)
1599 check_chown(filename, uid, gid)
1600 shutil.chown(filename, uid)
1601 check_chown(filename, uid)
1602 shutil.chown(filename, user=uid)
1603 check_chown(filename, uid)
1604 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001605 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001606
1607 shutil.chown(dirname, uid, gid)
1608 check_chown(dirname, uid, gid)
1609 shutil.chown(dirname, uid)
1610 check_chown(dirname, uid)
1611 shutil.chown(dirname, user=uid)
1612 check_chown(dirname, uid)
1613 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001614 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001615
1616 user = pwd.getpwuid(uid)[0]
1617 group = grp.getgrgid(gid)[0]
1618 shutil.chown(filename, user, group)
1619 check_chown(filename, uid, gid)
1620 shutil.chown(dirname, user, group)
1621 check_chown(dirname, uid, gid)
1622
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001623
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001624class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001625
1626 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001627 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001628 # Give the temp_file an ".exe" suffix for all.
1629 # It's needed on Windows and not harmful on other platforms.
1630 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001631 prefix="Tmp",
1632 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001633 os.chmod(self.temp_file.name, stat.S_IXUSR)
1634 self.addCleanup(self.temp_file.close)
1635 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001636 self.env_path = self.dir
1637 self.curdir = os.curdir
1638 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001639
1640 def test_basic(self):
1641 # Given an EXE in a directory, it should be returned.
1642 rv = shutil.which(self.file, path=self.dir)
1643 self.assertEqual(rv, self.temp_file.name)
1644
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001645 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001646 # When given the fully qualified path to an executable that exists,
1647 # it should be returned.
1648 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001649 self.assertEqual(rv, self.temp_file.name)
1650
1651 def test_relative_cmd(self):
1652 # When given the relative path with a directory part to an executable
1653 # that exists, it should be returned.
1654 base_dir, tail_dir = os.path.split(self.dir)
1655 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001656 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001657 rv = shutil.which(relpath, path=self.temp_dir)
1658 self.assertEqual(rv, relpath)
1659 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001660 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001661 rv = shutil.which(relpath, path=base_dir)
1662 self.assertIsNone(rv)
1663
1664 def test_cwd(self):
1665 # Issue #16957
1666 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001667 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001668 rv = shutil.which(self.file, path=base_dir)
1669 if sys.platform == "win32":
1670 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001671 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001672 else:
1673 # Other platforms: shouldn't match in the current directory.
1674 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001675
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001676 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1677 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001678 def test_non_matching_mode(self):
1679 # Set the file read-only and ask for writeable files.
1680 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001681 if os.access(self.temp_file.name, os.W_OK):
1682 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001683 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1684 self.assertIsNone(rv)
1685
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001686 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001687 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001688 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001689 rv = shutil.which(self.file, path=tail_dir)
1690 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001691
Brian Curtinc57a3452012-06-22 16:00:30 -05001692 def test_nonexistent_file(self):
1693 # Return None when no matching executable file is found on the path.
1694 rv = shutil.which("foo.exe", path=self.dir)
1695 self.assertIsNone(rv)
1696
1697 @unittest.skipUnless(sys.platform == "win32",
1698 "pathext check is Windows-only")
1699 def test_pathext_checking(self):
1700 # Ask for the file without the ".exe" extension, then ensure that
1701 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001702 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001703 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001704
Barry Warsaw618738b2013-04-16 11:05:03 -04001705 def test_environ_path(self):
1706 with support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001707 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001708 rv = shutil.which(self.file)
1709 self.assertEqual(rv, self.temp_file.name)
1710
Victor Stinner228a3c92019-04-17 16:26:36 +02001711 def test_environ_path_empty(self):
1712 # PATH='': no match
1713 with support.EnvironmentVarGuard() as env:
1714 env['PATH'] = ''
1715 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1716 create=True), \
1717 support.swap_attr(os, 'defpath', self.dir), \
1718 support.change_cwd(self.dir):
1719 rv = shutil.which(self.file)
1720 self.assertIsNone(rv)
1721
1722 def test_environ_path_cwd(self):
1723 expected_cwd = os.path.basename(self.temp_file.name)
1724 if sys.platform == "win32":
1725 curdir = os.curdir
1726 if isinstance(expected_cwd, bytes):
1727 curdir = os.fsencode(curdir)
1728 expected_cwd = os.path.join(curdir, expected_cwd)
1729
1730 # PATH=':': explicitly looks in the current directory
1731 with support.EnvironmentVarGuard() as env:
1732 env['PATH'] = os.pathsep
1733 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1734 create=True), \
1735 support.swap_attr(os, 'defpath', self.dir):
1736 rv = shutil.which(self.file)
1737 self.assertIsNone(rv)
1738
1739 # look in current directory
1740 with support.change_cwd(self.dir):
1741 rv = shutil.which(self.file)
1742 self.assertEqual(rv, expected_cwd)
1743
1744 def test_environ_path_missing(self):
1745 with support.EnvironmentVarGuard() as env:
1746 env.pop('PATH', None)
1747
1748 # without confstr
1749 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1750 create=True), \
1751 support.swap_attr(os, 'defpath', self.dir):
1752 rv = shutil.which(self.file)
1753 self.assertEqual(rv, self.temp_file.name)
1754
1755 # with confstr
1756 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1757 create=True), \
1758 support.swap_attr(os, 'defpath', ''):
1759 rv = shutil.which(self.file)
1760 self.assertEqual(rv, self.temp_file.name)
1761
Barry Warsaw618738b2013-04-16 11:05:03 -04001762 def test_empty_path(self):
1763 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001764 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001765 support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001766 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001767 rv = shutil.which(self.file, path='')
1768 self.assertIsNone(rv)
1769
1770 def test_empty_path_no_PATH(self):
1771 with support.EnvironmentVarGuard() as env:
1772 env.pop('PATH', None)
1773 rv = shutil.which(self.file)
1774 self.assertIsNone(rv)
1775
Victor Stinner228a3c92019-04-17 16:26:36 +02001776 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1777 def test_pathext(self):
1778 ext = ".xyz"
1779 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1780 prefix="Tmp2", suffix=ext)
1781 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1782 self.addCleanup(temp_filexyz.close)
1783
1784 # strip path and extension
1785 program = os.path.basename(temp_filexyz.name)
1786 program = os.path.splitext(program)[0]
1787
1788 with support.EnvironmentVarGuard() as env:
1789 env['PATHEXT'] = ext
1790 rv = shutil.which(program, path=self.temp_dir)
1791 self.assertEqual(rv, temp_filexyz.name)
1792
Brian Curtinc57a3452012-06-22 16:00:30 -05001793
Cheryl Sabella5680f652019-02-13 06:25:10 -05001794class TestWhichBytes(TestWhich):
1795 def setUp(self):
1796 TestWhich.setUp(self)
1797 self.dir = os.fsencode(self.dir)
1798 self.file = os.fsencode(self.file)
1799 self.temp_file.name = os.fsencode(self.temp_file.name)
1800 self.curdir = os.fsencode(self.curdir)
1801 self.ext = os.fsencode(self.ext)
1802
1803
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001804class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001805
1806 def setUp(self):
1807 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001808 self.src_dir = self.mkdtemp()
1809 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001810 self.src_file = os.path.join(self.src_dir, filename)
1811 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001812 with open(self.src_file, "wb") as f:
1813 f.write(b"spam")
1814
Christian Heimesada8c3b2008-03-18 18:26:33 +00001815 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001816 with open(src, "rb") as f:
1817 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001818 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001819 with open(real_dst, "rb") as f:
1820 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001821 self.assertFalse(os.path.exists(src))
1822
1823 def _check_move_dir(self, src, dst, real_dst):
1824 contents = sorted(os.listdir(src))
1825 shutil.move(src, dst)
1826 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1827 self.assertFalse(os.path.exists(src))
1828
1829 def test_move_file(self):
1830 # Move a file to another location on the same filesystem.
1831 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1832
1833 def test_move_file_to_dir(self):
1834 # Move a file inside an existing dir on the same filesystem.
1835 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1836
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001837 def test_move_file_to_dir_pathlike_src(self):
1838 # Move a pathlike file to another location on the same filesystem.
1839 src = pathlib.Path(self.src_file)
1840 self._check_move_file(src, self.dst_dir, self.dst_file)
1841
1842 def test_move_file_to_dir_pathlike_dst(self):
1843 # Move a file to another pathlike location on the same filesystem.
1844 dst = pathlib.Path(self.dst_dir)
1845 self._check_move_file(self.src_file, dst, self.dst_file)
1846
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001847 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001848 def test_move_file_other_fs(self):
1849 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001850 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001851
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001852 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001853 def test_move_file_to_dir_other_fs(self):
1854 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001855 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001856
1857 def test_move_dir(self):
1858 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001859 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001860 try:
1861 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1862 finally:
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001863 support.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001864
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001865 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001866 def test_move_dir_other_fs(self):
1867 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001868 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001869
1870 def test_move_dir_to_dir(self):
1871 # Move a dir inside an existing dir on the same filesystem.
1872 self._check_move_dir(self.src_dir, self.dst_dir,
1873 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1874
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001875 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001876 def test_move_dir_to_dir_other_fs(self):
1877 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001878 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001879
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001880 def test_move_dir_sep_to_dir(self):
1881 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1882 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1883
1884 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1885 def test_move_dir_altsep_to_dir(self):
1886 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1887 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1888
Christian Heimesada8c3b2008-03-18 18:26:33 +00001889 def test_existing_file_inside_dest_dir(self):
1890 # A file with the same name inside the destination dir already exists.
1891 with open(self.dst_file, "wb"):
1892 pass
1893 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1894
1895 def test_dont_move_dir_in_itself(self):
1896 # Moving a dir inside itself raises an Error.
1897 dst = os.path.join(self.src_dir, "bar")
1898 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1899
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001900 def test_destinsrc_false_negative(self):
1901 os.mkdir(TESTFN)
1902 try:
1903 for src, dst in [('srcdir', 'srcdir/dest')]:
1904 src = os.path.join(TESTFN, src)
1905 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001906 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001907 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001908 'dst (%s) is not in src (%s)' % (dst, src))
1909 finally:
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001910 support.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001911
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001912 def test_destinsrc_false_positive(self):
1913 os.mkdir(TESTFN)
1914 try:
1915 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1916 src = os.path.join(TESTFN, src)
1917 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001918 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001919 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001920 'dst (%s) is in src (%s)' % (dst, src))
1921 finally:
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001922 support.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001923
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001924 @support.skip_unless_symlink
1925 @mock_rename
1926 def test_move_file_symlink(self):
1927 dst = os.path.join(self.src_dir, 'bar')
1928 os.symlink(self.src_file, dst)
1929 shutil.move(dst, self.dst_file)
1930 self.assertTrue(os.path.islink(self.dst_file))
1931 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1932
1933 @support.skip_unless_symlink
1934 @mock_rename
1935 def test_move_file_symlink_to_dir(self):
1936 filename = "bar"
1937 dst = os.path.join(self.src_dir, filename)
1938 os.symlink(self.src_file, dst)
1939 shutil.move(dst, self.dst_dir)
1940 final_link = os.path.join(self.dst_dir, filename)
1941 self.assertTrue(os.path.islink(final_link))
1942 self.assertTrue(os.path.samefile(self.src_file, final_link))
1943
1944 @support.skip_unless_symlink
1945 @mock_rename
1946 def test_move_dangling_symlink(self):
1947 src = os.path.join(self.src_dir, 'baz')
1948 dst = os.path.join(self.src_dir, 'bar')
1949 os.symlink(src, dst)
1950 dst_link = os.path.join(self.dst_dir, 'quux')
1951 shutil.move(dst, dst_link)
1952 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07001953 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001954
1955 @support.skip_unless_symlink
1956 @mock_rename
1957 def test_move_dir_symlink(self):
1958 src = os.path.join(self.src_dir, 'baz')
1959 dst = os.path.join(self.src_dir, 'bar')
1960 os.mkdir(src)
1961 os.symlink(src, dst)
1962 dst_link = os.path.join(self.dst_dir, 'quux')
1963 shutil.move(dst, dst_link)
1964 self.assertTrue(os.path.islink(dst_link))
1965 self.assertTrue(os.path.samefile(src, dst_link))
1966
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001967 def test_move_return_value(self):
1968 rv = shutil.move(self.src_file, self.dst_dir)
1969 self.assertEqual(rv,
1970 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1971
1972 def test_move_as_rename_return_value(self):
1973 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1974 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1975
R David Murray6ffface2014-06-11 14:40:13 -04001976 @mock_rename
1977 def test_move_file_special_function(self):
1978 moved = []
1979 def _copy(src, dst):
1980 moved.append((src, dst))
1981 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1982 self.assertEqual(len(moved), 1)
1983
1984 @mock_rename
1985 def test_move_dir_special_function(self):
1986 moved = []
1987 def _copy(src, dst):
1988 moved.append((src, dst))
1989 support.create_empty_file(os.path.join(self.src_dir, 'child'))
1990 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1991 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1992 self.assertEqual(len(moved), 3)
1993
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001994 def test_move_dir_caseinsensitive(self):
1995 # Renames a folder to the same name
1996 # but a different case.
1997
1998 self.src_dir = self.mkdtemp()
1999 dst_dir = os.path.join(
2000 os.path.dirname(self.src_dir),
2001 os.path.basename(self.src_dir).upper())
2002 self.assertNotEqual(self.src_dir, dst_dir)
2003
2004 try:
2005 shutil.move(self.src_dir, dst_dir)
2006 self.assertTrue(os.path.isdir(dst_dir))
2007 finally:
2008 os.rmdir(dst_dir)
2009
Tarek Ziadé5340db32010-04-19 22:30:51 +00002010
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002011class TestCopyFile(unittest.TestCase):
2012
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002013 class Faux(object):
2014 _entered = False
2015 _exited_with = None
2016 _raised = False
2017 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2018 self._raise_in_exit = raise_in_exit
2019 self._suppress_at_exit = suppress_at_exit
2020 def read(self, *args):
2021 return ''
2022 def __enter__(self):
2023 self._entered = True
2024 def __exit__(self, exc_type, exc_val, exc_tb):
2025 self._exited_with = exc_type, exc_val, exc_tb
2026 if self._raise_in_exit:
2027 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002028 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002029 return self._suppress_at_exit
2030
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002031 def test_w_source_open_fails(self):
2032 def _open(filename, mode='r'):
2033 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002034 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002035 assert 0 # shouldn't reach here.
2036
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002037 with support.swap_attr(shutil, 'open', _open):
2038 with self.assertRaises(OSError):
2039 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002040
Victor Stinner937ee9e2018-06-26 02:11:06 +02002041 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002042 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002043 srcfile = self.Faux()
2044
2045 def _open(filename, mode='r'):
2046 if filename == 'srcfile':
2047 return srcfile
2048 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002049 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002050 assert 0 # shouldn't reach here.
2051
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002052 with support.swap_attr(shutil, 'open', _open):
2053 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002054 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002055 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002056 self.assertEqual(srcfile._exited_with[1].args,
2057 ('Cannot open "destfile"',))
2058
Victor Stinner937ee9e2018-06-26 02:11:06 +02002059 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002060 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002061 srcfile = self.Faux()
2062 destfile = self.Faux(True)
2063
2064 def _open(filename, mode='r'):
2065 if filename == 'srcfile':
2066 return srcfile
2067 if filename == 'destfile':
2068 return destfile
2069 assert 0 # shouldn't reach here.
2070
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002071 with support.swap_attr(shutil, 'open', _open):
2072 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002073 self.assertTrue(srcfile._entered)
2074 self.assertTrue(destfile._entered)
2075 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002076 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002077 self.assertEqual(srcfile._exited_with[1].args,
2078 ('Cannot close',))
2079
Victor Stinner937ee9e2018-06-26 02:11:06 +02002080 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002081 def test_w_source_close_fails(self):
2082
2083 srcfile = self.Faux(True)
2084 destfile = self.Faux()
2085
2086 def _open(filename, mode='r'):
2087 if filename == 'srcfile':
2088 return srcfile
2089 if filename == 'destfile':
2090 return destfile
2091 assert 0 # shouldn't reach here.
2092
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002093 with support.swap_attr(shutil, 'open', _open):
2094 with self.assertRaises(OSError):
2095 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002096 self.assertTrue(srcfile._entered)
2097 self.assertTrue(destfile._entered)
2098 self.assertFalse(destfile._raised)
2099 self.assertTrue(srcfile._exited_with[0] is None)
2100 self.assertTrue(srcfile._raised)
2101
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002102
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002103class TestCopyFileObj(unittest.TestCase):
2104 FILESIZE = 2 * 1024 * 1024
2105
2106 @classmethod
2107 def setUpClass(cls):
2108 write_test_file(TESTFN, cls.FILESIZE)
2109
2110 @classmethod
2111 def tearDownClass(cls):
2112 support.unlink(TESTFN)
2113 support.unlink(TESTFN2)
2114
2115 def tearDown(self):
2116 support.unlink(TESTFN2)
2117
2118 @contextlib.contextmanager
2119 def get_files(self):
2120 with open(TESTFN, "rb") as src:
2121 with open(TESTFN2, "wb") as dst:
2122 yield (src, dst)
2123
2124 def assert_files_eq(self, src, dst):
2125 with open(src, 'rb') as fsrc:
2126 with open(dst, 'rb') as fdst:
2127 self.assertEqual(fsrc.read(), fdst.read())
2128
2129 def test_content(self):
2130 with self.get_files() as (src, dst):
2131 shutil.copyfileobj(src, dst)
2132 self.assert_files_eq(TESTFN, TESTFN2)
2133
2134 def test_file_not_closed(self):
2135 with self.get_files() as (src, dst):
2136 shutil.copyfileobj(src, dst)
2137 assert not src.closed
2138 assert not dst.closed
2139
2140 def test_file_offset(self):
2141 with self.get_files() as (src, dst):
2142 shutil.copyfileobj(src, dst)
2143 self.assertEqual(src.tell(), self.FILESIZE)
2144 self.assertEqual(dst.tell(), self.FILESIZE)
2145
2146 @unittest.skipIf(os.name != 'nt', "Windows only")
2147 def test_win_impl(self):
2148 # Make sure alternate Windows implementation is called.
2149 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2150 shutil.copyfile(TESTFN, TESTFN2)
2151 assert m.called
2152
2153 # File size is 2 MiB but max buf size should be 1 MiB.
2154 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2155
2156 # If file size < 1 MiB memoryview() length must be equal to
2157 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002158 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002159 f.write(b'foo')
2160 fname = f.name
2161 self.addCleanup(support.unlink, fname)
2162 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2163 shutil.copyfile(fname, TESTFN2)
2164 self.assertEqual(m.call_args[0][2], 3)
2165
2166 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002167 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002168 pass
2169 fname = f.name
2170 self.addCleanup(support.unlink, fname)
2171 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2172 shutil.copyfile(fname, TESTFN2)
2173 assert not m.called
2174 self.assert_files_eq(fname, TESTFN2)
2175
2176
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002177class _ZeroCopyFileTest(object):
2178 """Tests common to all zero-copy APIs."""
2179 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2180 FILEDATA = b""
2181 PATCHPOINT = ""
2182
2183 @classmethod
2184 def setUpClass(cls):
2185 write_test_file(TESTFN, cls.FILESIZE)
2186 with open(TESTFN, 'rb') as f:
2187 cls.FILEDATA = f.read()
2188 assert len(cls.FILEDATA) == cls.FILESIZE
2189
2190 @classmethod
2191 def tearDownClass(cls):
2192 support.unlink(TESTFN)
2193
2194 def tearDown(self):
2195 support.unlink(TESTFN2)
2196
2197 @contextlib.contextmanager
2198 def get_files(self):
2199 with open(TESTFN, "rb") as src:
2200 with open(TESTFN2, "wb") as dst:
2201 yield (src, dst)
2202
2203 def zerocopy_fun(self, *args, **kwargs):
2204 raise NotImplementedError("must be implemented in subclass")
2205
2206 def reset(self):
2207 self.tearDown()
2208 self.tearDownClass()
2209 self.setUpClass()
2210 self.setUp()
2211
2212 # ---
2213
2214 def test_regular_copy(self):
2215 with self.get_files() as (src, dst):
2216 self.zerocopy_fun(src, dst)
2217 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2218 # Make sure the fallback function is not called.
2219 with self.get_files() as (src, dst):
2220 with unittest.mock.patch('shutil.copyfileobj') as m:
2221 shutil.copyfile(TESTFN, TESTFN2)
2222 assert not m.called
2223
2224 def test_same_file(self):
2225 self.addCleanup(self.reset)
2226 with self.get_files() as (src, dst):
2227 with self.assertRaises(Exception):
2228 self.zerocopy_fun(src, src)
2229 # Make sure src file is not corrupted.
2230 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2231
2232 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002233 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002234 with self.assertRaises(FileNotFoundError) as cm:
2235 shutil.copyfile(name, "new")
2236 self.assertEqual(cm.exception.filename, name)
2237
2238 def test_empty_file(self):
2239 srcname = TESTFN + 'src'
2240 dstname = TESTFN + 'dst'
2241 self.addCleanup(lambda: support.unlink(srcname))
2242 self.addCleanup(lambda: support.unlink(dstname))
2243 with open(srcname, "wb"):
2244 pass
2245
2246 with open(srcname, "rb") as src:
2247 with open(dstname, "wb") as dst:
2248 self.zerocopy_fun(src, dst)
2249
2250 self.assertEqual(read_file(dstname, binary=True), b"")
2251
2252 def test_unhandled_exception(self):
2253 with unittest.mock.patch(self.PATCHPOINT,
2254 side_effect=ZeroDivisionError):
2255 self.assertRaises(ZeroDivisionError,
2256 shutil.copyfile, TESTFN, TESTFN2)
2257
2258 def test_exception_on_first_call(self):
2259 # Emulate a case where the first call to the zero-copy
2260 # function raises an exception in which case the function is
2261 # supposed to give up immediately.
2262 with unittest.mock.patch(self.PATCHPOINT,
2263 side_effect=OSError(errno.EINVAL, "yo")):
2264 with self.get_files() as (src, dst):
2265 with self.assertRaises(_GiveupOnFastCopy):
2266 self.zerocopy_fun(src, dst)
2267
2268 def test_filesystem_full(self):
2269 # Emulate a case where filesystem is full and sendfile() fails
2270 # on first call.
2271 with unittest.mock.patch(self.PATCHPOINT,
2272 side_effect=OSError(errno.ENOSPC, "yo")):
2273 with self.get_files() as (src, dst):
2274 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2275
2276
2277@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2278class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2279 PATCHPOINT = "os.sendfile"
2280
2281 def zerocopy_fun(self, fsrc, fdst):
2282 return shutil._fastcopy_sendfile(fsrc, fdst)
2283
2284 def test_non_regular_file_src(self):
2285 with io.BytesIO(self.FILEDATA) as src:
2286 with open(TESTFN2, "wb") as dst:
2287 with self.assertRaises(_GiveupOnFastCopy):
2288 self.zerocopy_fun(src, dst)
2289 shutil.copyfileobj(src, dst)
2290
2291 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2292
2293 def test_non_regular_file_dst(self):
2294 with open(TESTFN, "rb") as src:
2295 with io.BytesIO() as dst:
2296 with self.assertRaises(_GiveupOnFastCopy):
2297 self.zerocopy_fun(src, dst)
2298 shutil.copyfileobj(src, dst)
2299 dst.seek(0)
2300 self.assertEqual(dst.read(), self.FILEDATA)
2301
2302 def test_exception_on_second_call(self):
2303 def sendfile(*args, **kwargs):
2304 if not flag:
2305 flag.append(None)
2306 return orig_sendfile(*args, **kwargs)
2307 else:
2308 raise OSError(errno.EBADF, "yo")
2309
2310 flag = []
2311 orig_sendfile = os.sendfile
2312 with unittest.mock.patch('os.sendfile', create=True,
2313 side_effect=sendfile):
2314 with self.get_files() as (src, dst):
2315 with self.assertRaises(OSError) as cm:
2316 shutil._fastcopy_sendfile(src, dst)
2317 assert flag
2318 self.assertEqual(cm.exception.errno, errno.EBADF)
2319
2320 def test_cant_get_size(self):
2321 # Emulate a case where src file size cannot be determined.
2322 # Internally bufsize will be set to a small value and
2323 # sendfile() will be called repeatedly.
2324 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2325 with self.get_files() as (src, dst):
2326 shutil._fastcopy_sendfile(src, dst)
2327 assert m.called
2328 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2329
2330 def test_small_chunks(self):
2331 # Force internal file size detection to be smaller than the
2332 # actual file size. We want to force sendfile() to be called
2333 # multiple times, also in order to emulate a src fd which gets
2334 # bigger while it is being copied.
2335 mock = unittest.mock.Mock()
2336 mock.st_size = 65536 + 1
2337 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2338 with self.get_files() as (src, dst):
2339 shutil._fastcopy_sendfile(src, dst)
2340 assert m.called
2341 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2342
2343 def test_big_chunk(self):
2344 # Force internal file size detection to be +100MB bigger than
2345 # the actual file size. Make sure sendfile() does not rely on
2346 # file size value except for (maybe) a better throughput /
2347 # performance.
2348 mock = unittest.mock.Mock()
2349 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2350 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2351 with self.get_files() as (src, dst):
2352 shutil._fastcopy_sendfile(src, dst)
2353 assert m.called
2354 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2355
2356 def test_blocksize_arg(self):
2357 with unittest.mock.patch('os.sendfile',
2358 side_effect=ZeroDivisionError) as m:
2359 self.assertRaises(ZeroDivisionError,
2360 shutil.copyfile, TESTFN, TESTFN2)
2361 blocksize = m.call_args[0][3]
2362 # Make sure file size and the block size arg passed to
2363 # sendfile() are the same.
2364 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2365 # ...unless we're dealing with a small file.
2366 support.unlink(TESTFN2)
2367 write_file(TESTFN2, b"hello", binary=True)
2368 self.addCleanup(support.unlink, TESTFN2 + '3')
2369 self.assertRaises(ZeroDivisionError,
2370 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2371 blocksize = m.call_args[0][3]
2372 self.assertEqual(blocksize, 2 ** 23)
2373
2374 def test_file2file_not_supported(self):
2375 # Emulate a case where sendfile() only support file->socket
2376 # fds. In such a case copyfile() is supposed to skip the
2377 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002378 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002379 try:
2380 with unittest.mock.patch(
2381 self.PATCHPOINT,
2382 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2383 with self.get_files() as (src, dst):
2384 with self.assertRaises(_GiveupOnFastCopy):
2385 shutil._fastcopy_sendfile(src, dst)
2386 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002387 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002388
2389 with unittest.mock.patch(self.PATCHPOINT) as m:
2390 shutil.copyfile(TESTFN, TESTFN2)
2391 assert not m.called
2392 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002393 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002394
2395
Victor Stinner937ee9e2018-06-26 02:11:06 +02002396@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002397class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002398 PATCHPOINT = "posix._fcopyfile"
2399
2400 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002401 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002402
2403
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002404class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002405 def test_does_not_crash(self):
2406 """Check if get_terminal_size() returns a meaningful value.
2407
2408 There's no easy portable way to actually check the size of the
2409 terminal, so let's check if it returns something sensible instead.
2410 """
2411 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002412 self.assertGreaterEqual(size.columns, 0)
2413 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002414
2415 def test_os_environ_first(self):
2416 "Check if environment variables have precedence"
2417
2418 with support.EnvironmentVarGuard() as env:
2419 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002420 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002421 size = shutil.get_terminal_size()
2422 self.assertEqual(size.columns, 777)
2423
2424 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002425 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002426 env['LINES'] = '888'
2427 size = shutil.get_terminal_size()
2428 self.assertEqual(size.lines, 888)
2429
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002430 def test_bad_environ(self):
2431 with support.EnvironmentVarGuard() as env:
2432 env['COLUMNS'] = 'xxx'
2433 env['LINES'] = 'yyy'
2434 size = shutil.get_terminal_size()
2435 self.assertGreaterEqual(size.columns, 0)
2436 self.assertGreaterEqual(size.lines, 0)
2437
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002438 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002439 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2440 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002441 def test_stty_match(self):
2442 """Check if stty returns the same results ignoring env
2443
2444 This test will fail if stdin and stdout are connected to
2445 different terminals with different sizes. Nevertheless, such
2446 situations should be pretty rare.
2447 """
2448 try:
2449 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002450 except (FileNotFoundError, PermissionError,
2451 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002452 self.skipTest("stty invocation failed")
2453 expected = (int(size[1]), int(size[0])) # reversed order
2454
2455 with support.EnvironmentVarGuard() as env:
2456 del env['LINES']
2457 del env['COLUMNS']
2458 actual = shutil.get_terminal_size()
2459
2460 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002461
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002462 def test_fallback(self):
2463 with support.EnvironmentVarGuard() as env:
2464 del env['LINES']
2465 del env['COLUMNS']
2466
2467 # sys.__stdout__ has no fileno()
2468 with support.swap_attr(sys, '__stdout__', None):
2469 size = shutil.get_terminal_size(fallback=(10, 20))
2470 self.assertEqual(size.columns, 10)
2471 self.assertEqual(size.lines, 20)
2472
2473 # sys.__stdout__ is not a terminal on Unix
2474 # or fileno() not in (0, 1, 2) on Windows
2475 with open(os.devnull, 'w') as f, \
2476 support.swap_attr(sys, '__stdout__', f):
2477 size = shutil.get_terminal_size(fallback=(30, 40))
2478 self.assertEqual(size.columns, 30)
2479 self.assertEqual(size.lines, 40)
2480
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002481
Berker Peksag8083cd62014-11-01 11:04:06 +02002482class PublicAPITests(unittest.TestCase):
2483 """Ensures that the correct values are exposed in the public API."""
2484
2485 def test_module_all_attribute(self):
2486 self.assertTrue(hasattr(shutil, '__all__'))
2487 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2488 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2489 'SpecialFileError', 'ExecError', 'make_archive',
2490 'get_archive_formats', 'register_archive_format',
2491 'unregister_archive_format', 'get_unpack_formats',
2492 'register_unpack_format', 'unregister_unpack_format',
2493 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2494 'get_terminal_size', 'SameFileError']
2495 if hasattr(os, 'statvfs') or os.name == 'nt':
2496 target_api.append('disk_usage')
2497 self.assertEqual(set(shutil.__all__), set(target_api))
2498
2499
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002500if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002501 unittest.main()