blob: 460b979ba93323d97c2995c89e59aeac2bbd613b [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010037AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000038try:
39 import grp
40 import pwd
41 UID_GID_SUPPORT = True
42except ImportError:
43 UID_GID_SUPPORT = False
44
Steve Dowerdf2d4a62019-08-21 15:27:33 -070045try:
46 import _winapi
47except ImportError:
48 _winapi = None
49
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040050def _fake_rename(*args, **kwargs):
51 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010052 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040053
54def mock_rename(func):
55 @functools.wraps(func)
56 def wrap(*args, **kwargs):
57 try:
58 builtin_rename = os.rename
59 os.rename = _fake_rename
60 return func(*args, **kwargs)
61 finally:
62 os.rename = builtin_rename
63 return wrap
64
Éric Araujoa7e33a12011-08-12 19:51:35 +020065def write_file(path, content, binary=False):
66 """Write *content* to a file located at *path*.
67
68 If *path* is a tuple instead of a string, os.path.join will be used to
69 make a path. If *binary* is true, the file will be opened in binary
70 mode.
71 """
72 if isinstance(path, tuple):
73 path = os.path.join(*path)
74 with open(path, 'wb' if binary else 'w') as fp:
75 fp.write(content)
76
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020077def write_test_file(path, size):
78 """Create a test file with an arbitrary size and random text content."""
79 def chunks(total, step):
80 assert total >= step
81 while total > step:
82 yield step
83 total -= step
84 if total:
85 yield total
86
87 bufsize = min(size, 8192)
88 chunk = b"".join([random.choice(string.ascii_letters).encode()
89 for i in range(bufsize)])
90 with open(path, 'wb') as f:
91 for csize in chunks(size, bufsize):
92 f.write(chunk)
93 assert os.path.getsize(path) == size
94
Éric Araujoa7e33a12011-08-12 19:51:35 +020095def read_file(path, binary=False):
96 """Return contents from a file located at *path*.
97
98 If *path* is a tuple instead of a string, os.path.join will be used to
99 make a path. If *binary* is true, the file will be opened in binary
100 mode.
101 """
102 if isinstance(path, tuple):
103 path = os.path.join(*path)
104 with open(path, 'rb' if binary else 'r') as fp:
105 return fp.read()
106
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300107def rlistdir(path):
108 res = []
109 for name in sorted(os.listdir(path)):
110 p = os.path.join(path, name)
111 if os.path.isdir(p) and not os.path.islink(p):
112 res.append(name + '/')
113 for n in rlistdir(p):
114 res.append(name + '/' + n)
115 else:
116 res.append(name)
117 return res
118
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200119def supports_file2file_sendfile():
120 # ...apparently Linux and Solaris are the only ones
121 if not hasattr(os, "sendfile"):
122 return False
123 srcname = None
124 dstname = None
125 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800126 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200127 srcname = f.name
128 f.write(b"0123456789")
129
130 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800131 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200132 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200133 infd = src.fileno()
134 outfd = dst.fileno()
135 try:
136 os.sendfile(outfd, infd, 0, 2)
137 except OSError:
138 return False
139 else:
140 return True
141 finally:
142 if srcname is not None:
143 support.unlink(srcname)
144 if dstname is not None:
145 support.unlink(dstname)
146
147
148SUPPORTS_SENDFILE = supports_file2file_sendfile()
149
Michael Feltef110b12019-02-18 12:02:44 +0100150# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
151# The AIX command 'dump -o program' gives XCOFF header information
152# The second word of the last line in the maxdata value
153# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
154def _maxdataOK():
155 if AIX and sys.maxsize == 2147483647:
156 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
157 maxdata=hdrs.split("\n")[-1].split()[1]
158 return int(maxdata,16) >= 0x20000000
159 else:
160 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200161
Tarek Ziadé396fad72010-02-23 05:30:31 +0000162
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300163class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000164
Steve Dowerabde52c2019-11-15 09:49:21 -0800165 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000166 """Create a temporary directory that will be cleaned up.
167
168 Returns the path of the directory.
169 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800170 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300171 self.addCleanup(support.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000172 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000173
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300174
175class TestRmTree(BaseTest, unittest.TestCase):
176
Hynek Schlawack3b527782012-06-25 13:27:31 +0200177 def test_rmtree_works_on_bytes(self):
178 tmp = self.mkdtemp()
179 victim = os.path.join(tmp, 'killme')
180 os.mkdir(victim)
181 write_file(os.path.join(victim, 'somefile'), 'foo')
182 victim = os.fsencode(victim)
183 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700184 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200185
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200186 @support.skip_unless_symlink
187 def test_rmtree_fails_on_symlink(self):
188 tmp = self.mkdtemp()
189 dir_ = os.path.join(tmp, 'dir')
190 os.mkdir(dir_)
191 link = os.path.join(tmp, 'link')
192 os.symlink(dir_, link)
193 self.assertRaises(OSError, shutil.rmtree, link)
194 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100195 self.assertTrue(os.path.lexists(link))
196 errors = []
197 def onerror(*args):
198 errors.append(args)
199 shutil.rmtree(link, onerror=onerror)
200 self.assertEqual(len(errors), 1)
201 self.assertIs(errors[0][0], os.path.islink)
202 self.assertEqual(errors[0][1], link)
203 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200204
205 @support.skip_unless_symlink
206 def test_rmtree_works_on_symlinks(self):
207 tmp = self.mkdtemp()
208 dir1 = os.path.join(tmp, 'dir1')
209 dir2 = os.path.join(dir1, 'dir2')
210 dir3 = os.path.join(tmp, 'dir3')
211 for d in dir1, dir2, dir3:
212 os.mkdir(d)
213 file1 = os.path.join(tmp, 'file1')
214 write_file(file1, 'foo')
215 link1 = os.path.join(dir1, 'link1')
216 os.symlink(dir2, link1)
217 link2 = os.path.join(dir1, 'link2')
218 os.symlink(dir3, link2)
219 link3 = os.path.join(dir1, 'link3')
220 os.symlink(file1, link3)
221 # make sure symlinks are removed but not followed
222 shutil.rmtree(dir1)
223 self.assertFalse(os.path.exists(dir1))
224 self.assertTrue(os.path.exists(dir3))
225 self.assertTrue(os.path.exists(file1))
226
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700227 @unittest.skipUnless(_winapi, 'only relevant on Windows')
228 def test_rmtree_fails_on_junctions(self):
229 tmp = self.mkdtemp()
230 dir_ = os.path.join(tmp, 'dir')
231 os.mkdir(dir_)
232 link = os.path.join(tmp, 'link')
233 _winapi.CreateJunction(dir_, link)
Steve Dowerabde52c2019-11-15 09:49:21 -0800234 self.addCleanup(support.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700235 self.assertRaises(OSError, shutil.rmtree, link)
236 self.assertTrue(os.path.exists(dir_))
237 self.assertTrue(os.path.lexists(link))
238 errors = []
239 def onerror(*args):
240 errors.append(args)
241 shutil.rmtree(link, onerror=onerror)
242 self.assertEqual(len(errors), 1)
243 self.assertIs(errors[0][0], os.path.islink)
244 self.assertEqual(errors[0][1], link)
245 self.assertIsInstance(errors[0][2][1], OSError)
246
247 @unittest.skipUnless(_winapi, 'only relevant on Windows')
248 def test_rmtree_works_on_junctions(self):
249 tmp = self.mkdtemp()
250 dir1 = os.path.join(tmp, 'dir1')
251 dir2 = os.path.join(dir1, 'dir2')
252 dir3 = os.path.join(tmp, 'dir3')
253 for d in dir1, dir2, dir3:
254 os.mkdir(d)
255 file1 = os.path.join(tmp, 'file1')
256 write_file(file1, 'foo')
257 link1 = os.path.join(dir1, 'link1')
258 _winapi.CreateJunction(dir2, link1)
259 link2 = os.path.join(dir1, 'link2')
260 _winapi.CreateJunction(dir3, link2)
261 link3 = os.path.join(dir1, 'link3')
262 _winapi.CreateJunction(file1, link3)
263 # make sure junctions are removed but not followed
264 shutil.rmtree(dir1)
265 self.assertFalse(os.path.exists(dir1))
266 self.assertTrue(os.path.exists(dir3))
267 self.assertTrue(os.path.exists(file1))
268
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000269 def test_rmtree_errors(self):
270 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800271 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100272 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
273 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100274 shutil.rmtree(filename, ignore_errors=True)
275
276 # existing file
277 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100278 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100279 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100280 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100281 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100282 # The reason for this rather odd construct is that Windows sprinkles
283 # a \*.* at the end of file names. But only sometimes on some buildbots
284 possible_args = [filename, os.path.join(filename, '*.*')]
285 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100286 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100287 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100288 shutil.rmtree(filename, ignore_errors=True)
289 self.assertTrue(os.path.exists(filename))
290 errors = []
291 def onerror(*args):
292 errors.append(args)
293 shutil.rmtree(filename, onerror=onerror)
294 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200295 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100296 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100297 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100298 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100299 self.assertIs(errors[1][0], os.rmdir)
300 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100301 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100302 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000303
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000304
Serhiy Storchaka43767632013-11-03 21:31:38 +0200305 @unittest.skipIf(sys.platform[:6] == 'cygwin',
306 "This test can't be run on Cygwin (issue #1071513).")
307 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
308 "This test can't be run reliably as root (issue #1076467).")
309 def test_on_error(self):
310 self.errorState = 0
311 os.mkdir(TESTFN)
312 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200313
Serhiy Storchaka43767632013-11-03 21:31:38 +0200314 self.child_file_path = os.path.join(TESTFN, 'a')
315 self.child_dir_path = os.path.join(TESTFN, 'b')
316 support.create_empty_file(self.child_file_path)
317 os.mkdir(self.child_dir_path)
318 old_dir_mode = os.stat(TESTFN).st_mode
319 old_child_file_mode = os.stat(self.child_file_path).st_mode
320 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
321 # Make unwritable.
322 new_mode = stat.S_IREAD|stat.S_IEXEC
323 os.chmod(self.child_file_path, new_mode)
324 os.chmod(self.child_dir_path, new_mode)
325 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
328 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
329 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200330
Serhiy Storchaka43767632013-11-03 21:31:38 +0200331 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
332 # Test whether onerror has actually been called.
333 self.assertEqual(self.errorState, 3,
334 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000335
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000336 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000337 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200338 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000339 # This function is run when shutil.rmtree fails.
340 # 99.9% of the time it initially fails to remove
341 # a file in the directory, so the first time through
342 # func is os.remove.
343 # However, some Linux machines running ZFS on
344 # FUSE experienced a failure earlier in the process
345 # at os.listdir. The first failure may legally
346 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200347 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200348 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200349 self.assertEqual(arg, self.child_file_path)
350 elif func is os.rmdir:
351 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000352 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200353 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200354 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000355 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200356 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000357 else:
358 self.assertEqual(func, os.rmdir)
359 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000360 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200361 self.errorState = 3
362
363 def test_rmtree_does_not_choke_on_failing_lstat(self):
364 try:
365 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200366 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200367 if fn != TESTFN:
368 raise OSError()
369 else:
370 return orig_lstat(fn)
371 os.lstat = raiser
372
373 os.mkdir(TESTFN)
374 write_file((TESTFN, 'foo'), 'foo')
375 shutil.rmtree(TESTFN)
376 finally:
377 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000378
Hynek Schlawack2100b422012-06-23 20:28:32 +0200379 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200380 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
381 os.supports_dir_fd and
382 os.listdir in os.supports_fd and
383 os.stat in os.supports_follow_symlinks)
384 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200385 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000386 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200387 tmp_dir = self.mkdtemp()
388 d = os.path.join(tmp_dir, 'a')
389 os.mkdir(d)
390 try:
391 real_rmtree = shutil._rmtree_safe_fd
392 class Called(Exception): pass
393 def _raiser(*args, **kwargs):
394 raise Called
395 shutil._rmtree_safe_fd = _raiser
396 self.assertRaises(Called, shutil.rmtree, d)
397 finally:
398 shutil._rmtree_safe_fd = real_rmtree
399 else:
400 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000401 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200402
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000403 def test_rmtree_dont_delete_file(self):
404 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800405 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200406 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200407 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000408 os.remove(path)
409
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300410 @support.skip_unless_symlink
411 def test_rmtree_on_symlink(self):
412 # bug 1669.
413 os.mkdir(TESTFN)
414 try:
415 src = os.path.join(TESTFN, 'cheese')
416 dst = os.path.join(TESTFN, 'shop')
417 os.mkdir(src)
418 os.symlink(src, dst)
419 self.assertRaises(OSError, shutil.rmtree, dst)
420 shutil.rmtree(dst, ignore_errors=True)
421 finally:
422 shutil.rmtree(TESTFN, ignore_errors=True)
423
424 @unittest.skipUnless(_winapi, 'only relevant on Windows')
425 def test_rmtree_on_junction(self):
426 os.mkdir(TESTFN)
427 try:
428 src = os.path.join(TESTFN, 'cheese')
429 dst = os.path.join(TESTFN, 'shop')
430 os.mkdir(src)
431 open(os.path.join(src, 'spam'), 'wb').close()
432 _winapi.CreateJunction(src, dst)
433 self.assertRaises(OSError, shutil.rmtree, dst)
434 shutil.rmtree(dst, ignore_errors=True)
435 finally:
436 shutil.rmtree(TESTFN, ignore_errors=True)
437
438
439class TestCopyTree(BaseTest, unittest.TestCase):
440
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000441 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800442 src_dir = self.mkdtemp()
443 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200444 self.addCleanup(shutil.rmtree, src_dir)
445 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
446 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000447 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200448 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000449
Éric Araujoa7e33a12011-08-12 19:51:35 +0200450 shutil.copytree(src_dir, dst_dir)
451 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
452 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
453 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
454 'test.txt')))
455 actual = read_file((dst_dir, 'test.txt'))
456 self.assertEqual(actual, '123')
457 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
458 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000459
jab9e00d9e2018-12-28 13:03:40 -0500460 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800461 src_dir = self.mkdtemp()
462 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500463 self.addCleanup(shutil.rmtree, src_dir)
464 self.addCleanup(shutil.rmtree, dst_dir)
465
466 write_file((src_dir, 'nonexisting.txt'), '123')
467 os.mkdir(os.path.join(src_dir, 'existing_dir'))
468 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
469 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
470 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
471
472 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
473 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
474 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
475 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
476 'existing.txt')))
477 actual = read_file((dst_dir, 'nonexisting.txt'))
478 self.assertEqual(actual, '123')
479 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
480 self.assertEqual(actual, 'has been replaced')
481
482 with self.assertRaises(FileExistsError):
483 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
484
Antoine Pitrou78091e62011-12-29 18:54:15 +0100485 @support.skip_unless_symlink
486 def test_copytree_symlinks(self):
487 tmp_dir = self.mkdtemp()
488 src_dir = os.path.join(tmp_dir, 'src')
489 dst_dir = os.path.join(tmp_dir, 'dst')
490 sub_dir = os.path.join(src_dir, 'sub')
491 os.mkdir(src_dir)
492 os.mkdir(sub_dir)
493 write_file((src_dir, 'file.txt'), 'foo')
494 src_link = os.path.join(sub_dir, 'link')
495 dst_link = os.path.join(dst_dir, 'sub/link')
496 os.symlink(os.path.join(src_dir, 'file.txt'),
497 src_link)
498 if hasattr(os, 'lchmod'):
499 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
500 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
501 os.lchflags(src_link, stat.UF_NODUMP)
502 src_stat = os.lstat(src_link)
503 shutil.copytree(src_dir, dst_dir, symlinks=True)
504 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700505 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
506 # Bad practice to blindly strip the prefix as it may be required to
507 # correctly refer to the file, but we're only comparing paths here.
508 if os.name == 'nt' and actual.startswith('\\\\?\\'):
509 actual = actual[4:]
510 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100511 dst_stat = os.lstat(dst_link)
512 if hasattr(os, 'lchmod'):
513 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
514 if hasattr(os, 'lchflags'):
515 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
516
Georg Brandl2ee470f2008-07-16 12:55:28 +0000517 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000518 # creating data
519 join = os.path.join
520 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800521 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000522 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800523 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200524 write_file((src_dir, 'test.txt'), '123')
525 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000526 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200527 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000528 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200529 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000530 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
531 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200532 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
533 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000534
535 # testing glob-like patterns
536 try:
537 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
538 shutil.copytree(src_dir, dst_dir, ignore=patterns)
539 # checking the result: some elements should not be copied
540 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200541 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
542 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000543 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200544 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000545 try:
546 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
547 shutil.copytree(src_dir, dst_dir, ignore=patterns)
548 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200549 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
550 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
551 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000552 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200553 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000554
555 # testing callable-style
556 try:
557 def _filter(src, names):
558 res = []
559 for name in names:
560 path = os.path.join(src, name)
561
562 if (os.path.isdir(path) and
563 path.split()[-1] == 'subdir'):
564 res.append(name)
565 elif os.path.splitext(path)[-1] in ('.py'):
566 res.append(name)
567 return res
568
569 shutil.copytree(src_dir, dst_dir, ignore=_filter)
570
571 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200572 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
573 'test.py')))
574 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000575
576 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200577 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000578 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000579 shutil.rmtree(src_dir)
580 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000581
Antoine Pitrouac601602013-08-16 19:35:02 +0200582 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800583 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200584 src_dir = os.path.join(tmp_dir, 'source')
585 os.mkdir(src_dir)
586 dst_dir = os.path.join(tmp_dir, 'destination')
587 self.addCleanup(shutil.rmtree, tmp_dir)
588
589 os.chmod(src_dir, 0o777)
590 write_file((src_dir, 'permissive.txt'), '123')
591 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
592 write_file((src_dir, 'restrictive.txt'), '456')
593 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
594 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Steve Dowerabde52c2019-11-15 09:49:21 -0800595 self.addCleanup(support.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200596 os.chmod(restrictive_subdir, 0o600)
597
598 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400599 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
600 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200601 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400602 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200603 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
604 restrictive_subdir_dst = os.path.join(dst_dir,
605 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400606 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200607 os.stat(restrictive_subdir_dst).st_mode)
608
Berker Peksag884afd92014-12-10 02:50:32 +0200609 @unittest.mock.patch('os.chmod')
610 def test_copytree_winerror(self, mock_patch):
611 # When copying to VFAT, copystat() raises OSError. On Windows, the
612 # exception object has a meaningful 'winerror' attribute, but not
613 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800614 src_dir = self.mkdtemp()
615 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200616 self.addCleanup(shutil.rmtree, src_dir)
617 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
618
619 mock_patch.side_effect = PermissionError('ka-boom')
620 with self.assertRaises(shutil.Error):
621 shutil.copytree(src_dir, dst_dir)
622
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100623 def test_copytree_custom_copy_function(self):
624 # See: https://bugs.python.org/issue35648
625 def custom_cpfun(a, b):
626 flag.append(None)
627 self.assertIsInstance(a, str)
628 self.assertIsInstance(b, str)
629 self.assertEqual(a, os.path.join(src, 'foo'))
630 self.assertEqual(b, os.path.join(dst, 'foo'))
631
632 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800633 src = self.mkdtemp()
634 dst = tempfile.mktemp(dir=self.mkdtemp())
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100635 with open(os.path.join(src, 'foo'), 'w') as f:
636 f.close()
637 shutil.copytree(src, dst, copy_function=custom_cpfun)
638 self.assertEqual(len(flag), 1)
639
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300640 # Issue #3002: copyfile and copytree block indefinitely on named pipes
641 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
642 @support.skip_unless_symlink
643 def test_copytree_named_pipe(self):
644 os.mkdir(TESTFN)
645 try:
646 subdir = os.path.join(TESTFN, "subdir")
647 os.mkdir(subdir)
648 pipe = os.path.join(subdir, "mypipe")
649 try:
650 os.mkfifo(pipe)
651 except PermissionError as e:
652 self.skipTest('os.mkfifo(): %s' % e)
653 try:
654 shutil.copytree(TESTFN, TESTFN2)
655 except shutil.Error as e:
656 errors = e.args[0]
657 self.assertEqual(len(errors), 1)
658 src, dst, error_msg = errors[0]
659 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
660 else:
661 self.fail("shutil.Error should have been raised")
662 finally:
663 shutil.rmtree(TESTFN, ignore_errors=True)
664 shutil.rmtree(TESTFN2, ignore_errors=True)
665
666 def test_copytree_special_func(self):
667 src_dir = self.mkdtemp()
668 dst_dir = os.path.join(self.mkdtemp(), 'destination')
669 write_file((src_dir, 'test.txt'), '123')
670 os.mkdir(os.path.join(src_dir, 'test_dir'))
671 write_file((src_dir, 'test_dir', 'test.txt'), '456')
672
673 copied = []
674 def _copy(src, dst):
675 copied.append((src, dst))
676
677 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
678 self.assertEqual(len(copied), 2)
679
680 @support.skip_unless_symlink
681 def test_copytree_dangling_symlinks(self):
682 # a dangling symlink raises an error at the end
683 src_dir = self.mkdtemp()
684 dst_dir = os.path.join(self.mkdtemp(), 'destination')
685 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
686 os.mkdir(os.path.join(src_dir, 'test_dir'))
687 write_file((src_dir, 'test_dir', 'test.txt'), '456')
688 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
689
690 # a dangling symlink is ignored with the proper flag
691 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
692 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
693 self.assertNotIn('test.txt', os.listdir(dst_dir))
694
695 # a dangling symlink is copied if symlinks=True
696 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
697 shutil.copytree(src_dir, dst_dir, symlinks=True)
698 self.assertIn('test.txt', os.listdir(dst_dir))
699
700 @support.skip_unless_symlink
701 def test_copytree_symlink_dir(self):
702 src_dir = self.mkdtemp()
703 dst_dir = os.path.join(self.mkdtemp(), 'destination')
704 os.mkdir(os.path.join(src_dir, 'real_dir'))
705 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
706 pass
707 os.symlink(os.path.join(src_dir, 'real_dir'),
708 os.path.join(src_dir, 'link_to_dir'),
709 target_is_directory=True)
710
711 shutil.copytree(src_dir, dst_dir, symlinks=False)
712 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
713 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
714
715 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
716 shutil.copytree(src_dir, dst_dir, symlinks=True)
717 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
718 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
719
720 def test_copytree_return_value(self):
721 # copytree returns its destination path.
722 src_dir = self.mkdtemp()
723 dst_dir = src_dir + "dest"
724 self.addCleanup(shutil.rmtree, dst_dir, True)
725 src = os.path.join(src_dir, 'foo')
726 write_file(src, 'foo')
727 rv = shutil.copytree(src_dir, dst_dir)
728 self.assertEqual(['foo'], os.listdir(rv))
729
Bruno P. Kinoshita9bbcbc92019-11-27 14:10:37 +1300730 def test_copytree_subdirectory(self):
731 # copytree where dst is a subdirectory of src, see Issue 38688
732 base_dir = self.mkdtemp()
733 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
734 src_dir = os.path.join(base_dir, "t", "pg")
735 dst_dir = os.path.join(src_dir, "somevendor", "1.0")
736 os.makedirs(src_dir)
737 src = os.path.join(src_dir, 'pol')
738 write_file(src, 'pol')
739 rv = shutil.copytree(src_dir, dst_dir)
740 self.assertEqual(['pol'], os.listdir(rv))
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300741
742class TestCopy(BaseTest, unittest.TestCase):
743
744 ### shutil.copymode
745
746 @support.skip_unless_symlink
747 def test_copymode_follow_symlinks(self):
748 tmp_dir = self.mkdtemp()
749 src = os.path.join(tmp_dir, 'foo')
750 dst = os.path.join(tmp_dir, 'bar')
751 src_link = os.path.join(tmp_dir, 'baz')
752 dst_link = os.path.join(tmp_dir, 'quux')
753 write_file(src, 'foo')
754 write_file(dst, 'foo')
755 os.symlink(src, src_link)
756 os.symlink(dst, dst_link)
757 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
758 # file to file
759 os.chmod(dst, stat.S_IRWXO)
760 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
761 shutil.copymode(src, dst)
762 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
763 # On Windows, os.chmod does not follow symlinks (issue #15411)
764 if os.name != 'nt':
765 # follow src link
766 os.chmod(dst, stat.S_IRWXO)
767 shutil.copymode(src_link, dst)
768 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
769 # follow dst link
770 os.chmod(dst, stat.S_IRWXO)
771 shutil.copymode(src, dst_link)
772 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
773 # follow both links
774 os.chmod(dst, stat.S_IRWXO)
775 shutil.copymode(src_link, dst_link)
776 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
777
778 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
779 @support.skip_unless_symlink
780 def test_copymode_symlink_to_symlink(self):
781 tmp_dir = self.mkdtemp()
782 src = os.path.join(tmp_dir, 'foo')
783 dst = os.path.join(tmp_dir, 'bar')
784 src_link = os.path.join(tmp_dir, 'baz')
785 dst_link = os.path.join(tmp_dir, 'quux')
786 write_file(src, 'foo')
787 write_file(dst, 'foo')
788 os.symlink(src, src_link)
789 os.symlink(dst, dst_link)
790 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
791 os.chmod(dst, stat.S_IRWXU)
792 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
793 # link to link
794 os.lchmod(dst_link, stat.S_IRWXO)
795 shutil.copymode(src_link, dst_link, follow_symlinks=False)
796 self.assertEqual(os.lstat(src_link).st_mode,
797 os.lstat(dst_link).st_mode)
798 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
799 # src link - use chmod
800 os.lchmod(dst_link, stat.S_IRWXO)
801 shutil.copymode(src_link, dst, follow_symlinks=False)
802 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
803 # dst link - use chmod
804 os.lchmod(dst_link, stat.S_IRWXO)
805 shutil.copymode(src, dst_link, follow_symlinks=False)
806 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
807
808 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
809 @support.skip_unless_symlink
810 def test_copymode_symlink_to_symlink_wo_lchmod(self):
811 tmp_dir = self.mkdtemp()
812 src = os.path.join(tmp_dir, 'foo')
813 dst = os.path.join(tmp_dir, 'bar')
814 src_link = os.path.join(tmp_dir, 'baz')
815 dst_link = os.path.join(tmp_dir, 'quux')
816 write_file(src, 'foo')
817 write_file(dst, 'foo')
818 os.symlink(src, src_link)
819 os.symlink(dst, dst_link)
820 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
821
822 ### shutil.copystat
823
824 @support.skip_unless_symlink
825 def test_copystat_symlinks(self):
826 tmp_dir = self.mkdtemp()
827 src = os.path.join(tmp_dir, 'foo')
828 dst = os.path.join(tmp_dir, 'bar')
829 src_link = os.path.join(tmp_dir, 'baz')
830 dst_link = os.path.join(tmp_dir, 'qux')
831 write_file(src, 'foo')
832 src_stat = os.stat(src)
833 os.utime(src, (src_stat.st_atime,
834 src_stat.st_mtime - 42.0)) # ensure different mtimes
835 write_file(dst, 'bar')
836 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
837 os.symlink(src, src_link)
838 os.symlink(dst, dst_link)
839 if hasattr(os, 'lchmod'):
840 os.lchmod(src_link, stat.S_IRWXO)
841 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
842 os.lchflags(src_link, stat.UF_NODUMP)
843 src_link_stat = os.lstat(src_link)
844 # follow
845 if hasattr(os, 'lchmod'):
846 shutil.copystat(src_link, dst_link, follow_symlinks=True)
847 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
848 # don't follow
849 shutil.copystat(src_link, dst_link, follow_symlinks=False)
850 dst_link_stat = os.lstat(dst_link)
851 if os.utime in os.supports_follow_symlinks:
852 for attr in 'st_atime', 'st_mtime':
853 # The modification times may be truncated in the new file.
854 self.assertLessEqual(getattr(src_link_stat, attr),
855 getattr(dst_link_stat, attr) + 1)
856 if hasattr(os, 'lchmod'):
857 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
858 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
859 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
860 # tell to follow but dst is not a link
861 shutil.copystat(src_link, dst, follow_symlinks=False)
862 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
863 00000.1)
864
865 @unittest.skipUnless(hasattr(os, 'chflags') and
866 hasattr(errno, 'EOPNOTSUPP') and
867 hasattr(errno, 'ENOTSUP'),
868 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
869 def test_copystat_handles_harmless_chflags_errors(self):
870 tmpdir = self.mkdtemp()
871 file1 = os.path.join(tmpdir, 'file1')
872 file2 = os.path.join(tmpdir, 'file2')
873 write_file(file1, 'xxx')
874 write_file(file2, 'xxx')
875
876 def make_chflags_raiser(err):
877 ex = OSError()
878
879 def _chflags_raiser(path, flags, *, follow_symlinks=True):
880 ex.errno = err
881 raise ex
882 return _chflags_raiser
883 old_chflags = os.chflags
884 try:
885 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
886 os.chflags = make_chflags_raiser(err)
887 shutil.copystat(file1, file2)
888 # assert others errors break it
889 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
890 self.assertRaises(OSError, shutil.copystat, file1, file2)
891 finally:
892 os.chflags = old_chflags
893
894 ### shutil.copyxattr
895
896 @support.skip_unless_xattr
897 def test_copyxattr(self):
898 tmp_dir = self.mkdtemp()
899 src = os.path.join(tmp_dir, 'foo')
900 write_file(src, 'foo')
901 dst = os.path.join(tmp_dir, 'bar')
902 write_file(dst, 'bar')
903
904 # no xattr == no problem
905 shutil._copyxattr(src, dst)
906 # common case
907 os.setxattr(src, 'user.foo', b'42')
908 os.setxattr(src, 'user.bar', b'43')
909 shutil._copyxattr(src, dst)
910 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
911 self.assertEqual(
912 os.getxattr(src, 'user.foo'),
913 os.getxattr(dst, 'user.foo'))
914 # check errors don't affect other attrs
915 os.remove(dst)
916 write_file(dst, 'bar')
917 os_error = OSError(errno.EPERM, 'EPERM')
918
919 def _raise_on_user_foo(fname, attr, val, **kwargs):
920 if attr == 'user.foo':
921 raise os_error
922 else:
923 orig_setxattr(fname, attr, val, **kwargs)
924 try:
925 orig_setxattr = os.setxattr
926 os.setxattr = _raise_on_user_foo
927 shutil._copyxattr(src, dst)
928 self.assertIn('user.bar', os.listxattr(dst))
929 finally:
930 os.setxattr = orig_setxattr
931 # the source filesystem not supporting xattrs should be ok, too.
932 def _raise_on_src(fname, *, follow_symlinks=True):
933 if fname == src:
934 raise OSError(errno.ENOTSUP, 'Operation not supported')
935 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
936 try:
937 orig_listxattr = os.listxattr
938 os.listxattr = _raise_on_src
939 shutil._copyxattr(src, dst)
940 finally:
941 os.listxattr = orig_listxattr
942
943 # test that shutil.copystat copies xattrs
944 src = os.path.join(tmp_dir, 'the_original')
945 srcro = os.path.join(tmp_dir, 'the_original_ro')
946 write_file(src, src)
947 write_file(srcro, srcro)
948 os.setxattr(src, 'user.the_value', b'fiddly')
949 os.setxattr(srcro, 'user.the_value', b'fiddly')
950 os.chmod(srcro, 0o444)
951 dst = os.path.join(tmp_dir, 'the_copy')
952 dstro = os.path.join(tmp_dir, 'the_copy_ro')
953 write_file(dst, dst)
954 write_file(dstro, dstro)
955 shutil.copystat(src, dst)
956 shutil.copystat(srcro, dstro)
957 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
958 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
959
960 @support.skip_unless_symlink
961 @support.skip_unless_xattr
962 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
963 'root privileges required')
964 def test_copyxattr_symlinks(self):
965 # On Linux, it's only possible to access non-user xattr for symlinks;
966 # which in turn require root privileges. This test should be expanded
967 # as soon as other platforms gain support for extended attributes.
968 tmp_dir = self.mkdtemp()
969 src = os.path.join(tmp_dir, 'foo')
970 src_link = os.path.join(tmp_dir, 'baz')
971 write_file(src, 'foo')
972 os.symlink(src, src_link)
973 os.setxattr(src, 'trusted.foo', b'42')
974 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
975 dst = os.path.join(tmp_dir, 'bar')
976 dst_link = os.path.join(tmp_dir, 'qux')
977 write_file(dst, 'bar')
978 os.symlink(dst, dst_link)
979 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
980 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
981 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
982 shutil._copyxattr(src_link, dst, follow_symlinks=False)
983 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
984
985 ### shutil.copy
986
987 def _copy_file(self, method):
988 fname = 'test.txt'
989 tmpdir = self.mkdtemp()
990 write_file((tmpdir, fname), 'xxx')
991 file1 = os.path.join(tmpdir, fname)
992 tmpdir2 = self.mkdtemp()
993 method(file1, tmpdir2)
994 file2 = os.path.join(tmpdir2, fname)
995 return (file1, file2)
996
997 def test_copy(self):
998 # Ensure that the copied file exists and has the same mode bits.
999 file1, file2 = self._copy_file(shutil.copy)
1000 self.assertTrue(os.path.exists(file2))
1001 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1002
1003 @support.skip_unless_symlink
1004 def test_copy_symlinks(self):
1005 tmp_dir = self.mkdtemp()
1006 src = os.path.join(tmp_dir, 'foo')
1007 dst = os.path.join(tmp_dir, 'bar')
1008 src_link = os.path.join(tmp_dir, 'baz')
1009 write_file(src, 'foo')
1010 os.symlink(src, src_link)
1011 if hasattr(os, 'lchmod'):
1012 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1013 # don't follow
1014 shutil.copy(src_link, dst, follow_symlinks=True)
1015 self.assertFalse(os.path.islink(dst))
1016 self.assertEqual(read_file(src), read_file(dst))
1017 os.remove(dst)
1018 # follow
1019 shutil.copy(src_link, dst, follow_symlinks=False)
1020 self.assertTrue(os.path.islink(dst))
1021 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1022 if hasattr(os, 'lchmod'):
1023 self.assertEqual(os.lstat(src_link).st_mode,
1024 os.lstat(dst).st_mode)
1025
1026 ### shutil.copy2
1027
1028 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1029 def test_copy2(self):
1030 # Ensure that the copied file exists and has the same mode and
1031 # modification time bits.
1032 file1, file2 = self._copy_file(shutil.copy2)
1033 self.assertTrue(os.path.exists(file2))
1034 file1_stat = os.stat(file1)
1035 file2_stat = os.stat(file2)
1036 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1037 for attr in 'st_atime', 'st_mtime':
1038 # The modification times may be truncated in the new file.
1039 self.assertLessEqual(getattr(file1_stat, attr),
1040 getattr(file2_stat, attr) + 1)
1041 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1042 self.assertEqual(getattr(file1_stat, 'st_flags'),
1043 getattr(file2_stat, 'st_flags'))
1044
1045 @support.skip_unless_symlink
1046 def test_copy2_symlinks(self):
1047 tmp_dir = self.mkdtemp()
1048 src = os.path.join(tmp_dir, 'foo')
1049 dst = os.path.join(tmp_dir, 'bar')
1050 src_link = os.path.join(tmp_dir, 'baz')
1051 write_file(src, 'foo')
1052 os.symlink(src, src_link)
1053 if hasattr(os, 'lchmod'):
1054 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1055 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1056 os.lchflags(src_link, stat.UF_NODUMP)
1057 src_stat = os.stat(src)
1058 src_link_stat = os.lstat(src_link)
1059 # follow
1060 shutil.copy2(src_link, dst, follow_symlinks=True)
1061 self.assertFalse(os.path.islink(dst))
1062 self.assertEqual(read_file(src), read_file(dst))
1063 os.remove(dst)
1064 # don't follow
1065 shutil.copy2(src_link, dst, follow_symlinks=False)
1066 self.assertTrue(os.path.islink(dst))
1067 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1068 dst_stat = os.lstat(dst)
1069 if os.utime in os.supports_follow_symlinks:
1070 for attr in 'st_atime', 'st_mtime':
1071 # The modification times may be truncated in the new file.
1072 self.assertLessEqual(getattr(src_link_stat, attr),
1073 getattr(dst_stat, attr) + 1)
1074 if hasattr(os, 'lchmod'):
1075 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1076 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1077 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1078 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1079
1080 @support.skip_unless_xattr
1081 def test_copy2_xattr(self):
1082 tmp_dir = self.mkdtemp()
1083 src = os.path.join(tmp_dir, 'foo')
1084 dst = os.path.join(tmp_dir, 'bar')
1085 write_file(src, 'foo')
1086 os.setxattr(src, 'user.foo', b'42')
1087 shutil.copy2(src, dst)
1088 self.assertEqual(
1089 os.getxattr(src, 'user.foo'),
1090 os.getxattr(dst, 'user.foo'))
1091 os.remove(dst)
1092
1093 def test_copy_return_value(self):
1094 # copy and copy2 both return their destination path.
1095 for fn in (shutil.copy, shutil.copy2):
1096 src_dir = self.mkdtemp()
1097 dst_dir = self.mkdtemp()
1098 src = os.path.join(src_dir, 'foo')
1099 write_file(src, 'foo')
1100 rv = fn(src, dst_dir)
1101 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1102 rv = fn(src, os.path.join(dst_dir, 'bar'))
1103 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1104
1105 ### shutil.copyfile
1106
1107 @support.skip_unless_symlink
1108 def test_copyfile_symlinks(self):
1109 tmp_dir = self.mkdtemp()
1110 src = os.path.join(tmp_dir, 'src')
1111 dst = os.path.join(tmp_dir, 'dst')
1112 dst_link = os.path.join(tmp_dir, 'dst_link')
1113 link = os.path.join(tmp_dir, 'link')
1114 write_file(src, 'foo')
1115 os.symlink(src, link)
1116 # don't follow
1117 shutil.copyfile(link, dst_link, follow_symlinks=False)
1118 self.assertTrue(os.path.islink(dst_link))
1119 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1120 # follow
1121 shutil.copyfile(link, dst)
1122 self.assertFalse(os.path.islink(dst))
1123
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001124 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001125 def test_dont_copy_file_onto_link_to_itself(self):
1126 # bug 851123.
1127 os.mkdir(TESTFN)
1128 src = os.path.join(TESTFN, 'cheese')
1129 dst = os.path.join(TESTFN, 'shop')
1130 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001131 with open(src, 'w') as f:
1132 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001133 try:
1134 os.link(src, dst)
1135 except PermissionError as e:
1136 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001137 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001138 with open(src, 'r') as f:
1139 self.assertEqual(f.read(), 'cheddar')
1140 os.remove(dst)
1141 finally:
1142 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001143
Brian Curtin3b4499c2010-12-28 14:31:47 +00001144 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001145 def test_dont_copy_file_onto_symlink_to_itself(self):
1146 # bug 851123.
1147 os.mkdir(TESTFN)
1148 src = os.path.join(TESTFN, 'cheese')
1149 dst = os.path.join(TESTFN, 'shop')
1150 try:
1151 with open(src, 'w') as f:
1152 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001153 # Using `src` here would mean we end up with a symlink pointing
1154 # to TESTFN/TESTFN/cheese, while it should point at
1155 # TESTFN/cheese.
1156 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001157 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001158 with open(src, 'r') as f:
1159 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001160 os.remove(dst)
1161 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001162 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001163
Serhiy Storchaka43767632013-11-03 21:31:38 +02001164 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1165 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1166 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001167 try:
1168 os.mkfifo(TESTFN)
1169 except PermissionError as e:
1170 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001171 try:
1172 self.assertRaises(shutil.SpecialFileError,
1173 shutil.copyfile, TESTFN, TESTFN2)
1174 self.assertRaises(shutil.SpecialFileError,
1175 shutil.copyfile, __file__, TESTFN)
1176 finally:
1177 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001178
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001179 def test_copyfile_return_value(self):
1180 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001181 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001182 dst_dir = self.mkdtemp()
1183 dst_file = os.path.join(dst_dir, 'bar')
1184 src_file = os.path.join(src_dir, 'foo')
1185 write_file(src_file, 'foo')
1186 rv = shutil.copyfile(src_file, dst_file)
1187 self.assertTrue(os.path.exists(rv))
1188 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001189
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001190 def test_copyfile_same_file(self):
1191 # copyfile() should raise SameFileError if the source and destination
1192 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001193 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001194 src_file = os.path.join(src_dir, 'foo')
1195 write_file(src_file, 'foo')
1196 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1197 # But Error should work too, to stay backward compatible.
1198 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1199 # Make sure file is not corrupted.
1200 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001201
Tarek Ziadéfb437512010-04-20 08:57:33 +00001202
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001203class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001204
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001205 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001206
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001207 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001208 def test_make_tarball(self):
1209 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001210 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001211
1212 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001213 # force shutil to create the directory
1214 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001215 # working with relative paths
1216 work_dir = os.path.dirname(tmpdir2)
1217 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001218
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001219 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001220 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001221 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001222
1223 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001224 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001225 self.assertTrue(os.path.isfile(tarball))
1226 self.assertTrue(tarfile.is_tarfile(tarball))
1227 with tarfile.open(tarball, 'r:gz') as tf:
1228 self.assertCountEqual(tf.getnames(),
1229 ['.', './sub', './sub2',
1230 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001231
1232 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001233 with support.change_cwd(work_dir):
1234 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001235 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001236 self.assertTrue(os.path.isfile(tarball))
1237 self.assertTrue(tarfile.is_tarfile(tarball))
1238 with tarfile.open(tarball, 'r') as tf:
1239 self.assertCountEqual(tf.getnames(),
1240 ['.', './sub', './sub2',
1241 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001242
1243 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001244 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001245 names = tar.getnames()
1246 names.sort()
1247 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001248
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001249 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001250 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001251 root_dir = self.mkdtemp()
1252 dist = os.path.join(root_dir, base_dir)
1253 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001254 write_file((dist, 'file1'), 'xxx')
1255 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001256 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001257 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001258 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001259 if base_dir:
1260 write_file((root_dir, 'outer'), 'xxx')
1261 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001262
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001263 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001264 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001265 'Need the tar command to run')
1266 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001267 root_dir, base_dir = self._create_files()
1268 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001269 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001270
1271 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001272 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001273 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001274
1275 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001276 tarball2 = os.path.join(root_dir, 'archive2.tar')
1277 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001278 subprocess.check_call(tar_cmd, cwd=root_dir,
1279 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001280
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001281 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001282 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001283 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001284
1285 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001286 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1287 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001288 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001289
1290 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001291 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1292 dry_run=True)
1293 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001294 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001295
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001296 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001297 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001298 # creating something to zip
1299 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001300
1301 tmpdir2 = self.mkdtemp()
1302 # force shutil to create the directory
1303 os.rmdir(tmpdir2)
1304 # working with relative paths
1305 work_dir = os.path.dirname(tmpdir2)
1306 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001307
1308 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001309 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001310 res = make_archive(rel_base_name, 'zip', root_dir)
1311
1312 self.assertEqual(res, base_name + '.zip')
1313 self.assertTrue(os.path.isfile(res))
1314 self.assertTrue(zipfile.is_zipfile(res))
1315 with zipfile.ZipFile(res) as zf:
1316 self.assertCountEqual(zf.namelist(),
1317 ['dist/', 'dist/sub/', 'dist/sub2/',
1318 'dist/file1', 'dist/file2', 'dist/sub/file3',
1319 'outer'])
1320
1321 with support.change_cwd(work_dir):
1322 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001323 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001324
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001325 self.assertEqual(res, base_name + '.zip')
1326 self.assertTrue(os.path.isfile(res))
1327 self.assertTrue(zipfile.is_zipfile(res))
1328 with zipfile.ZipFile(res) as zf:
1329 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001330 ['dist/', 'dist/sub/', 'dist/sub2/',
1331 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001332
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001333 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001334 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001335 'Need the zip command to run')
1336 def test_zipfile_vs_zip(self):
1337 root_dir, base_dir = self._create_files()
1338 base_name = os.path.join(self.mkdtemp(), 'archive')
1339 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1340
1341 # check if ZIP file was created
1342 self.assertEqual(archive, base_name + '.zip')
1343 self.assertTrue(os.path.isfile(archive))
1344
1345 # now create another ZIP file using `zip`
1346 archive2 = os.path.join(root_dir, 'archive2.zip')
1347 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001348 subprocess.check_call(zip_cmd, cwd=root_dir,
1349 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001350
1351 self.assertTrue(os.path.isfile(archive2))
1352 # let's compare both ZIP files
1353 with zipfile.ZipFile(archive) as zf:
1354 names = zf.namelist()
1355 with zipfile.ZipFile(archive2) as zf:
1356 names2 = zf.namelist()
1357 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001358
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001359 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001360 @unittest.skipUnless(shutil.which('unzip'),
1361 'Need the unzip command to run')
1362 def test_unzip_zipfile(self):
1363 root_dir, base_dir = self._create_files()
1364 base_name = os.path.join(self.mkdtemp(), 'archive')
1365 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1366
1367 # check if ZIP file was created
1368 self.assertEqual(archive, base_name + '.zip')
1369 self.assertTrue(os.path.isfile(archive))
1370
1371 # now check the ZIP file using `unzip -t`
1372 zip_cmd = ['unzip', '-t', archive]
1373 with support.change_cwd(root_dir):
1374 try:
1375 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1376 except subprocess.CalledProcessError as exc:
1377 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001378 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001379 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001380 msg = "{}\n\n**Unzip Output**\n{}"
1381 self.fail(msg.format(exc, details))
1382
Tarek Ziadé396fad72010-02-23 05:30:31 +00001383 def test_make_archive(self):
1384 tmpdir = self.mkdtemp()
1385 base_name = os.path.join(tmpdir, 'archive')
1386 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1387
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001388 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001389 def test_make_archive_owner_group(self):
1390 # testing make_archive with owner and group, with various combinations
1391 # this works even if there's not gid/uid support
1392 if UID_GID_SUPPORT:
1393 group = grp.getgrgid(0)[0]
1394 owner = pwd.getpwuid(0)[0]
1395 else:
1396 group = owner = 'root'
1397
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001398 root_dir, base_dir = self._create_files()
1399 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001400 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1401 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001402 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001403
1404 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001405 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001406
1407 res = make_archive(base_name, 'tar', root_dir, base_dir,
1408 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001409 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001410
1411 res = make_archive(base_name, 'tar', root_dir, base_dir,
1412 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001413 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001414
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001415
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001416 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001417 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1418 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001419 root_dir, base_dir = self._create_files()
1420 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001421 group = grp.getgrgid(0)[0]
1422 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001423 with support.change_cwd(root_dir):
1424 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1425 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001426
1427 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001428 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001429
1430 # now checks the rights
1431 archive = tarfile.open(archive_name)
1432 try:
1433 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001434 self.assertEqual(member.uid, 0)
1435 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001436 finally:
1437 archive.close()
1438
1439 def test_make_archive_cwd(self):
1440 current_dir = os.getcwd()
1441 def _breaks(*args, **kw):
1442 raise RuntimeError()
1443
1444 register_archive_format('xxx', _breaks, [], 'xxx file')
1445 try:
1446 try:
1447 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1448 except Exception:
1449 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001450 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001451 finally:
1452 unregister_archive_format('xxx')
1453
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001454 def test_make_tarfile_in_curdir(self):
1455 # Issue #21280
1456 root_dir = self.mkdtemp()
1457 with support.change_cwd(root_dir):
1458 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1459 self.assertTrue(os.path.isfile('test.tar'))
1460
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001461 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001462 def test_make_zipfile_in_curdir(self):
1463 # Issue #21280
1464 root_dir = self.mkdtemp()
1465 with support.change_cwd(root_dir):
1466 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1467 self.assertTrue(os.path.isfile('test.zip'))
1468
Tarek Ziadé396fad72010-02-23 05:30:31 +00001469 def test_register_archive_format(self):
1470
1471 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1472 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1473 1)
1474 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1475 [(1, 2), (1, 2, 3)])
1476
1477 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1478 formats = [name for name, params in get_archive_formats()]
1479 self.assertIn('xxx', formats)
1480
1481 unregister_archive_format('xxx')
1482 formats = [name for name, params in get_archive_formats()]
1483 self.assertNotIn('xxx', formats)
1484
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001485 ### shutil.unpack_archive
1486
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001487 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001488 self.check_unpack_archive_with_converter(format, lambda path: path)
1489 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001490 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001491
1492 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001493 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001494 expected = rlistdir(root_dir)
1495 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001496
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001497 base_name = os.path.join(self.mkdtemp(), 'archive')
1498 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001499
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001500 # let's try to unpack it now
1501 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001502 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001503 self.assertEqual(rlistdir(tmpdir2), expected)
1504
1505 # and again, this time with the format specified
1506 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001507 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001508 self.assertEqual(rlistdir(tmpdir3), expected)
1509
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001510 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1511 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001512
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001513 def test_unpack_archive_tar(self):
1514 self.check_unpack_archive('tar')
1515
1516 @support.requires_zlib
1517 def test_unpack_archive_gztar(self):
1518 self.check_unpack_archive('gztar')
1519
1520 @support.requires_bz2
1521 def test_unpack_archive_bztar(self):
1522 self.check_unpack_archive('bztar')
1523
1524 @support.requires_lzma
Michael Feltef110b12019-02-18 12:02:44 +01001525 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001526 def test_unpack_archive_xztar(self):
1527 self.check_unpack_archive('xztar')
1528
1529 @support.requires_zlib
1530 def test_unpack_archive_zip(self):
1531 self.check_unpack_archive('zip')
1532
Martin Pantereb995702016-07-28 01:11:04 +00001533 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001534
1535 formats = get_unpack_formats()
1536
1537 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001538 self.assertEqual(extra, 1)
1539 self.assertEqual(filename, 'stuff.boo')
1540 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001541
1542 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1543 unpack_archive('stuff.boo', 'xx')
1544
1545 # trying to register a .boo unpacker again
1546 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1547 ['.boo'], _boo)
1548
1549 # should work now
1550 unregister_unpack_format('Boo')
1551 register_unpack_format('Boo2', ['.boo'], _boo)
1552 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1553 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1554
1555 # let's leave a clean state
1556 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001557 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001558
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001559
1560class TestMisc(BaseTest, unittest.TestCase):
1561
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001562 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1563 "disk_usage not available on this platform")
1564 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001565 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001566 for attr in ('total', 'used', 'free'):
1567 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001568 self.assertGreater(usage.total, 0)
1569 self.assertGreater(usage.used, 0)
1570 self.assertGreaterEqual(usage.free, 0)
1571 self.assertGreaterEqual(usage.total, usage.used)
1572 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001573
Victor Stinnerdc525f42018-12-11 12:05:21 +01001574 # bpo-32557: Check that disk_usage() also accepts a filename
1575 shutil.disk_usage(__file__)
1576
Sandro Tosid902a142011-08-22 23:28:27 +02001577 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1578 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1579 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001580 dirname = self.mkdtemp()
1581 filename = tempfile.mktemp(dir=dirname)
1582 write_file(filename, 'testing chown function')
1583
1584 with self.assertRaises(ValueError):
1585 shutil.chown(filename)
1586
1587 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001588 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001589
1590 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001591 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001592
1593 with self.assertRaises(TypeError):
1594 shutil.chown(filename, b'spam')
1595
1596 with self.assertRaises(TypeError):
1597 shutil.chown(filename, 3.14)
1598
1599 uid = os.getuid()
1600 gid = os.getgid()
1601
1602 def check_chown(path, uid=None, gid=None):
1603 s = os.stat(filename)
1604 if uid is not None:
1605 self.assertEqual(uid, s.st_uid)
1606 if gid is not None:
1607 self.assertEqual(gid, s.st_gid)
1608
1609 shutil.chown(filename, uid, gid)
1610 check_chown(filename, uid, gid)
1611 shutil.chown(filename, uid)
1612 check_chown(filename, uid)
1613 shutil.chown(filename, user=uid)
1614 check_chown(filename, uid)
1615 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001616 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001617
1618 shutil.chown(dirname, uid, gid)
1619 check_chown(dirname, uid, gid)
1620 shutil.chown(dirname, uid)
1621 check_chown(dirname, uid)
1622 shutil.chown(dirname, user=uid)
1623 check_chown(dirname, uid)
1624 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001625 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001626
1627 user = pwd.getpwuid(uid)[0]
1628 group = grp.getgrgid(gid)[0]
1629 shutil.chown(filename, user, group)
1630 check_chown(filename, uid, gid)
1631 shutil.chown(dirname, user, group)
1632 check_chown(dirname, uid, gid)
1633
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001634
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001635class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001636
1637 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001638 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001639 # Give the temp_file an ".exe" suffix for all.
1640 # It's needed on Windows and not harmful on other platforms.
1641 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001642 prefix="Tmp",
1643 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001644 os.chmod(self.temp_file.name, stat.S_IXUSR)
1645 self.addCleanup(self.temp_file.close)
1646 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001647 self.env_path = self.dir
1648 self.curdir = os.curdir
1649 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001650
1651 def test_basic(self):
1652 # Given an EXE in a directory, it should be returned.
1653 rv = shutil.which(self.file, path=self.dir)
1654 self.assertEqual(rv, self.temp_file.name)
1655
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001656 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001657 # When given the fully qualified path to an executable that exists,
1658 # it should be returned.
1659 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001660 self.assertEqual(rv, self.temp_file.name)
1661
1662 def test_relative_cmd(self):
1663 # When given the relative path with a directory part to an executable
1664 # that exists, it should be returned.
1665 base_dir, tail_dir = os.path.split(self.dir)
1666 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001667 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001668 rv = shutil.which(relpath, path=self.temp_dir)
1669 self.assertEqual(rv, relpath)
1670 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001671 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001672 rv = shutil.which(relpath, path=base_dir)
1673 self.assertIsNone(rv)
1674
1675 def test_cwd(self):
1676 # Issue #16957
1677 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001678 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001679 rv = shutil.which(self.file, path=base_dir)
1680 if sys.platform == "win32":
1681 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001682 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001683 else:
1684 # Other platforms: shouldn't match in the current directory.
1685 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001686
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001687 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1688 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001689 def test_non_matching_mode(self):
1690 # Set the file read-only and ask for writeable files.
1691 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001692 if os.access(self.temp_file.name, os.W_OK):
1693 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001694 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1695 self.assertIsNone(rv)
1696
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001697 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001698 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001699 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001700 rv = shutil.which(self.file, path=tail_dir)
1701 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001702
Brian Curtinc57a3452012-06-22 16:00:30 -05001703 def test_nonexistent_file(self):
1704 # Return None when no matching executable file is found on the path.
1705 rv = shutil.which("foo.exe", path=self.dir)
1706 self.assertIsNone(rv)
1707
1708 @unittest.skipUnless(sys.platform == "win32",
1709 "pathext check is Windows-only")
1710 def test_pathext_checking(self):
1711 # Ask for the file without the ".exe" extension, then ensure that
1712 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001713 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001714 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001715
Barry Warsaw618738b2013-04-16 11:05:03 -04001716 def test_environ_path(self):
1717 with support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001718 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001719 rv = shutil.which(self.file)
1720 self.assertEqual(rv, self.temp_file.name)
1721
Victor Stinner228a3c92019-04-17 16:26:36 +02001722 def test_environ_path_empty(self):
1723 # PATH='': no match
1724 with support.EnvironmentVarGuard() as env:
1725 env['PATH'] = ''
1726 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1727 create=True), \
1728 support.swap_attr(os, 'defpath', self.dir), \
1729 support.change_cwd(self.dir):
1730 rv = shutil.which(self.file)
1731 self.assertIsNone(rv)
1732
1733 def test_environ_path_cwd(self):
1734 expected_cwd = os.path.basename(self.temp_file.name)
1735 if sys.platform == "win32":
1736 curdir = os.curdir
1737 if isinstance(expected_cwd, bytes):
1738 curdir = os.fsencode(curdir)
1739 expected_cwd = os.path.join(curdir, expected_cwd)
1740
1741 # PATH=':': explicitly looks in the current directory
1742 with support.EnvironmentVarGuard() as env:
1743 env['PATH'] = os.pathsep
1744 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1745 create=True), \
1746 support.swap_attr(os, 'defpath', self.dir):
1747 rv = shutil.which(self.file)
1748 self.assertIsNone(rv)
1749
1750 # look in current directory
1751 with support.change_cwd(self.dir):
1752 rv = shutil.which(self.file)
1753 self.assertEqual(rv, expected_cwd)
1754
1755 def test_environ_path_missing(self):
1756 with support.EnvironmentVarGuard() as env:
1757 env.pop('PATH', None)
1758
1759 # without confstr
1760 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1761 create=True), \
1762 support.swap_attr(os, 'defpath', self.dir):
1763 rv = shutil.which(self.file)
1764 self.assertEqual(rv, self.temp_file.name)
1765
1766 # with confstr
1767 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1768 create=True), \
1769 support.swap_attr(os, 'defpath', ''):
1770 rv = shutil.which(self.file)
1771 self.assertEqual(rv, self.temp_file.name)
1772
Barry Warsaw618738b2013-04-16 11:05:03 -04001773 def test_empty_path(self):
1774 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001775 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001776 support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001777 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001778 rv = shutil.which(self.file, path='')
1779 self.assertIsNone(rv)
1780
1781 def test_empty_path_no_PATH(self):
1782 with support.EnvironmentVarGuard() as env:
1783 env.pop('PATH', None)
1784 rv = shutil.which(self.file)
1785 self.assertIsNone(rv)
1786
Victor Stinner228a3c92019-04-17 16:26:36 +02001787 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1788 def test_pathext(self):
1789 ext = ".xyz"
1790 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1791 prefix="Tmp2", suffix=ext)
1792 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1793 self.addCleanup(temp_filexyz.close)
1794
1795 # strip path and extension
1796 program = os.path.basename(temp_filexyz.name)
1797 program = os.path.splitext(program)[0]
1798
1799 with support.EnvironmentVarGuard() as env:
1800 env['PATHEXT'] = ext
1801 rv = shutil.which(program, path=self.temp_dir)
1802 self.assertEqual(rv, temp_filexyz.name)
1803
Brian Curtinc57a3452012-06-22 16:00:30 -05001804
Cheryl Sabella5680f652019-02-13 06:25:10 -05001805class TestWhichBytes(TestWhich):
1806 def setUp(self):
1807 TestWhich.setUp(self)
1808 self.dir = os.fsencode(self.dir)
1809 self.file = os.fsencode(self.file)
1810 self.temp_file.name = os.fsencode(self.temp_file.name)
1811 self.curdir = os.fsencode(self.curdir)
1812 self.ext = os.fsencode(self.ext)
1813
1814
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001815class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001816
1817 def setUp(self):
1818 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001819 self.src_dir = self.mkdtemp()
1820 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001821 self.src_file = os.path.join(self.src_dir, filename)
1822 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001823 with open(self.src_file, "wb") as f:
1824 f.write(b"spam")
1825
Christian Heimesada8c3b2008-03-18 18:26:33 +00001826 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001827 with open(src, "rb") as f:
1828 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001829 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001830 with open(real_dst, "rb") as f:
1831 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001832 self.assertFalse(os.path.exists(src))
1833
1834 def _check_move_dir(self, src, dst, real_dst):
1835 contents = sorted(os.listdir(src))
1836 shutil.move(src, dst)
1837 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1838 self.assertFalse(os.path.exists(src))
1839
1840 def test_move_file(self):
1841 # Move a file to another location on the same filesystem.
1842 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1843
1844 def test_move_file_to_dir(self):
1845 # Move a file inside an existing dir on the same filesystem.
1846 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1847
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001848 def test_move_file_to_dir_pathlike_src(self):
1849 # Move a pathlike file to another location on the same filesystem.
1850 src = pathlib.Path(self.src_file)
1851 self._check_move_file(src, self.dst_dir, self.dst_file)
1852
1853 def test_move_file_to_dir_pathlike_dst(self):
1854 # Move a file to another pathlike location on the same filesystem.
1855 dst = pathlib.Path(self.dst_dir)
1856 self._check_move_file(self.src_file, dst, self.dst_file)
1857
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001858 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001859 def test_move_file_other_fs(self):
1860 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001861 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001862
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001863 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001864 def test_move_file_to_dir_other_fs(self):
1865 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001866 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001867
1868 def test_move_dir(self):
1869 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001870 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001871 try:
1872 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1873 finally:
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001874 support.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001875
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001876 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001877 def test_move_dir_other_fs(self):
1878 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001879 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001880
1881 def test_move_dir_to_dir(self):
1882 # Move a dir inside an existing dir on the same filesystem.
1883 self._check_move_dir(self.src_dir, self.dst_dir,
1884 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1885
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001886 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001887 def test_move_dir_to_dir_other_fs(self):
1888 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001889 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001890
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001891 def test_move_dir_sep_to_dir(self):
1892 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1893 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1894
1895 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1896 def test_move_dir_altsep_to_dir(self):
1897 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1898 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1899
Christian Heimesada8c3b2008-03-18 18:26:33 +00001900 def test_existing_file_inside_dest_dir(self):
1901 # A file with the same name inside the destination dir already exists.
1902 with open(self.dst_file, "wb"):
1903 pass
1904 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1905
1906 def test_dont_move_dir_in_itself(self):
1907 # Moving a dir inside itself raises an Error.
1908 dst = os.path.join(self.src_dir, "bar")
1909 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1910
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001911 def test_destinsrc_false_negative(self):
1912 os.mkdir(TESTFN)
1913 try:
1914 for src, dst in [('srcdir', 'srcdir/dest')]:
1915 src = os.path.join(TESTFN, src)
1916 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001917 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001918 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001919 'dst (%s) is not in src (%s)' % (dst, src))
1920 finally:
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001921 support.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001922
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001923 def test_destinsrc_false_positive(self):
1924 os.mkdir(TESTFN)
1925 try:
1926 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1927 src = os.path.join(TESTFN, src)
1928 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001929 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001930 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001931 'dst (%s) is in src (%s)' % (dst, src))
1932 finally:
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001933 support.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001934
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001935 @support.skip_unless_symlink
1936 @mock_rename
1937 def test_move_file_symlink(self):
1938 dst = os.path.join(self.src_dir, 'bar')
1939 os.symlink(self.src_file, dst)
1940 shutil.move(dst, self.dst_file)
1941 self.assertTrue(os.path.islink(self.dst_file))
1942 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1943
1944 @support.skip_unless_symlink
1945 @mock_rename
1946 def test_move_file_symlink_to_dir(self):
1947 filename = "bar"
1948 dst = os.path.join(self.src_dir, filename)
1949 os.symlink(self.src_file, dst)
1950 shutil.move(dst, self.dst_dir)
1951 final_link = os.path.join(self.dst_dir, filename)
1952 self.assertTrue(os.path.islink(final_link))
1953 self.assertTrue(os.path.samefile(self.src_file, final_link))
1954
1955 @support.skip_unless_symlink
1956 @mock_rename
1957 def test_move_dangling_symlink(self):
1958 src = os.path.join(self.src_dir, 'baz')
1959 dst = os.path.join(self.src_dir, 'bar')
1960 os.symlink(src, dst)
1961 dst_link = os.path.join(self.dst_dir, 'quux')
1962 shutil.move(dst, dst_link)
1963 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07001964 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001965
1966 @support.skip_unless_symlink
1967 @mock_rename
1968 def test_move_dir_symlink(self):
1969 src = os.path.join(self.src_dir, 'baz')
1970 dst = os.path.join(self.src_dir, 'bar')
1971 os.mkdir(src)
1972 os.symlink(src, dst)
1973 dst_link = os.path.join(self.dst_dir, 'quux')
1974 shutil.move(dst, dst_link)
1975 self.assertTrue(os.path.islink(dst_link))
1976 self.assertTrue(os.path.samefile(src, dst_link))
1977
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001978 def test_move_return_value(self):
1979 rv = shutil.move(self.src_file, self.dst_dir)
1980 self.assertEqual(rv,
1981 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1982
1983 def test_move_as_rename_return_value(self):
1984 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1985 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1986
R David Murray6ffface2014-06-11 14:40:13 -04001987 @mock_rename
1988 def test_move_file_special_function(self):
1989 moved = []
1990 def _copy(src, dst):
1991 moved.append((src, dst))
1992 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1993 self.assertEqual(len(moved), 1)
1994
1995 @mock_rename
1996 def test_move_dir_special_function(self):
1997 moved = []
1998 def _copy(src, dst):
1999 moved.append((src, dst))
2000 support.create_empty_file(os.path.join(self.src_dir, 'child'))
2001 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
2002 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2003 self.assertEqual(len(moved), 3)
2004
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002005 def test_move_dir_caseinsensitive(self):
2006 # Renames a folder to the same name
2007 # but a different case.
2008
2009 self.src_dir = self.mkdtemp()
2010 dst_dir = os.path.join(
2011 os.path.dirname(self.src_dir),
2012 os.path.basename(self.src_dir).upper())
2013 self.assertNotEqual(self.src_dir, dst_dir)
2014
2015 try:
2016 shutil.move(self.src_dir, dst_dir)
2017 self.assertTrue(os.path.isdir(dst_dir))
2018 finally:
2019 os.rmdir(dst_dir)
2020
Tarek Ziadé5340db32010-04-19 22:30:51 +00002021
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002022class TestCopyFile(unittest.TestCase):
2023
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002024 class Faux(object):
2025 _entered = False
2026 _exited_with = None
2027 _raised = False
2028 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2029 self._raise_in_exit = raise_in_exit
2030 self._suppress_at_exit = suppress_at_exit
2031 def read(self, *args):
2032 return ''
2033 def __enter__(self):
2034 self._entered = True
2035 def __exit__(self, exc_type, exc_val, exc_tb):
2036 self._exited_with = exc_type, exc_val, exc_tb
2037 if self._raise_in_exit:
2038 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002039 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002040 return self._suppress_at_exit
2041
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002042 def test_w_source_open_fails(self):
2043 def _open(filename, mode='r'):
2044 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002045 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002046 assert 0 # shouldn't reach here.
2047
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002048 with support.swap_attr(shutil, 'open', _open):
2049 with self.assertRaises(OSError):
2050 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002051
Victor Stinner937ee9e2018-06-26 02:11:06 +02002052 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002053 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002054 srcfile = self.Faux()
2055
2056 def _open(filename, mode='r'):
2057 if filename == 'srcfile':
2058 return srcfile
2059 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002060 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002061 assert 0 # shouldn't reach here.
2062
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002063 with support.swap_attr(shutil, 'open', _open):
2064 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002065 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002066 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002067 self.assertEqual(srcfile._exited_with[1].args,
2068 ('Cannot open "destfile"',))
2069
Victor Stinner937ee9e2018-06-26 02:11:06 +02002070 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002071 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002072 srcfile = self.Faux()
2073 destfile = self.Faux(True)
2074
2075 def _open(filename, mode='r'):
2076 if filename == 'srcfile':
2077 return srcfile
2078 if filename == 'destfile':
2079 return destfile
2080 assert 0 # shouldn't reach here.
2081
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002082 with support.swap_attr(shutil, 'open', _open):
2083 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002084 self.assertTrue(srcfile._entered)
2085 self.assertTrue(destfile._entered)
2086 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002087 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002088 self.assertEqual(srcfile._exited_with[1].args,
2089 ('Cannot close',))
2090
Victor Stinner937ee9e2018-06-26 02:11:06 +02002091 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002092 def test_w_source_close_fails(self):
2093
2094 srcfile = self.Faux(True)
2095 destfile = self.Faux()
2096
2097 def _open(filename, mode='r'):
2098 if filename == 'srcfile':
2099 return srcfile
2100 if filename == 'destfile':
2101 return destfile
2102 assert 0 # shouldn't reach here.
2103
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002104 with support.swap_attr(shutil, 'open', _open):
2105 with self.assertRaises(OSError):
2106 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002107 self.assertTrue(srcfile._entered)
2108 self.assertTrue(destfile._entered)
2109 self.assertFalse(destfile._raised)
2110 self.assertTrue(srcfile._exited_with[0] is None)
2111 self.assertTrue(srcfile._raised)
2112
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002113
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002114class TestCopyFileObj(unittest.TestCase):
2115 FILESIZE = 2 * 1024 * 1024
2116
2117 @classmethod
2118 def setUpClass(cls):
2119 write_test_file(TESTFN, cls.FILESIZE)
2120
2121 @classmethod
2122 def tearDownClass(cls):
2123 support.unlink(TESTFN)
2124 support.unlink(TESTFN2)
2125
2126 def tearDown(self):
2127 support.unlink(TESTFN2)
2128
2129 @contextlib.contextmanager
2130 def get_files(self):
2131 with open(TESTFN, "rb") as src:
2132 with open(TESTFN2, "wb") as dst:
2133 yield (src, dst)
2134
2135 def assert_files_eq(self, src, dst):
2136 with open(src, 'rb') as fsrc:
2137 with open(dst, 'rb') as fdst:
2138 self.assertEqual(fsrc.read(), fdst.read())
2139
2140 def test_content(self):
2141 with self.get_files() as (src, dst):
2142 shutil.copyfileobj(src, dst)
2143 self.assert_files_eq(TESTFN, TESTFN2)
2144
2145 def test_file_not_closed(self):
2146 with self.get_files() as (src, dst):
2147 shutil.copyfileobj(src, dst)
2148 assert not src.closed
2149 assert not dst.closed
2150
2151 def test_file_offset(self):
2152 with self.get_files() as (src, dst):
2153 shutil.copyfileobj(src, dst)
2154 self.assertEqual(src.tell(), self.FILESIZE)
2155 self.assertEqual(dst.tell(), self.FILESIZE)
2156
2157 @unittest.skipIf(os.name != 'nt', "Windows only")
2158 def test_win_impl(self):
2159 # Make sure alternate Windows implementation is called.
2160 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2161 shutil.copyfile(TESTFN, TESTFN2)
2162 assert m.called
2163
2164 # File size is 2 MiB but max buf size should be 1 MiB.
2165 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2166
2167 # If file size < 1 MiB memoryview() length must be equal to
2168 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002169 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002170 f.write(b'foo')
2171 fname = f.name
2172 self.addCleanup(support.unlink, fname)
2173 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2174 shutil.copyfile(fname, TESTFN2)
2175 self.assertEqual(m.call_args[0][2], 3)
2176
2177 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002178 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002179 pass
2180 fname = f.name
2181 self.addCleanup(support.unlink, fname)
2182 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2183 shutil.copyfile(fname, TESTFN2)
2184 assert not m.called
2185 self.assert_files_eq(fname, TESTFN2)
2186
2187
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002188class _ZeroCopyFileTest(object):
2189 """Tests common to all zero-copy APIs."""
2190 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2191 FILEDATA = b""
2192 PATCHPOINT = ""
2193
2194 @classmethod
2195 def setUpClass(cls):
2196 write_test_file(TESTFN, cls.FILESIZE)
2197 with open(TESTFN, 'rb') as f:
2198 cls.FILEDATA = f.read()
2199 assert len(cls.FILEDATA) == cls.FILESIZE
2200
2201 @classmethod
2202 def tearDownClass(cls):
2203 support.unlink(TESTFN)
2204
2205 def tearDown(self):
2206 support.unlink(TESTFN2)
2207
2208 @contextlib.contextmanager
2209 def get_files(self):
2210 with open(TESTFN, "rb") as src:
2211 with open(TESTFN2, "wb") as dst:
2212 yield (src, dst)
2213
2214 def zerocopy_fun(self, *args, **kwargs):
2215 raise NotImplementedError("must be implemented in subclass")
2216
2217 def reset(self):
2218 self.tearDown()
2219 self.tearDownClass()
2220 self.setUpClass()
2221 self.setUp()
2222
2223 # ---
2224
2225 def test_regular_copy(self):
2226 with self.get_files() as (src, dst):
2227 self.zerocopy_fun(src, dst)
2228 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2229 # Make sure the fallback function is not called.
2230 with self.get_files() as (src, dst):
2231 with unittest.mock.patch('shutil.copyfileobj') as m:
2232 shutil.copyfile(TESTFN, TESTFN2)
2233 assert not m.called
2234
2235 def test_same_file(self):
2236 self.addCleanup(self.reset)
2237 with self.get_files() as (src, dst):
2238 with self.assertRaises(Exception):
2239 self.zerocopy_fun(src, src)
2240 # Make sure src file is not corrupted.
2241 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2242
2243 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002244 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002245 with self.assertRaises(FileNotFoundError) as cm:
2246 shutil.copyfile(name, "new")
2247 self.assertEqual(cm.exception.filename, name)
2248
2249 def test_empty_file(self):
2250 srcname = TESTFN + 'src'
2251 dstname = TESTFN + 'dst'
2252 self.addCleanup(lambda: support.unlink(srcname))
2253 self.addCleanup(lambda: support.unlink(dstname))
2254 with open(srcname, "wb"):
2255 pass
2256
2257 with open(srcname, "rb") as src:
2258 with open(dstname, "wb") as dst:
2259 self.zerocopy_fun(src, dst)
2260
2261 self.assertEqual(read_file(dstname, binary=True), b"")
2262
2263 def test_unhandled_exception(self):
2264 with unittest.mock.patch(self.PATCHPOINT,
2265 side_effect=ZeroDivisionError):
2266 self.assertRaises(ZeroDivisionError,
2267 shutil.copyfile, TESTFN, TESTFN2)
2268
2269 def test_exception_on_first_call(self):
2270 # Emulate a case where the first call to the zero-copy
2271 # function raises an exception in which case the function is
2272 # supposed to give up immediately.
2273 with unittest.mock.patch(self.PATCHPOINT,
2274 side_effect=OSError(errno.EINVAL, "yo")):
2275 with self.get_files() as (src, dst):
2276 with self.assertRaises(_GiveupOnFastCopy):
2277 self.zerocopy_fun(src, dst)
2278
2279 def test_filesystem_full(self):
2280 # Emulate a case where filesystem is full and sendfile() fails
2281 # on first call.
2282 with unittest.mock.patch(self.PATCHPOINT,
2283 side_effect=OSError(errno.ENOSPC, "yo")):
2284 with self.get_files() as (src, dst):
2285 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2286
2287
2288@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2289class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2290 PATCHPOINT = "os.sendfile"
2291
2292 def zerocopy_fun(self, fsrc, fdst):
2293 return shutil._fastcopy_sendfile(fsrc, fdst)
2294
2295 def test_non_regular_file_src(self):
2296 with io.BytesIO(self.FILEDATA) as src:
2297 with open(TESTFN2, "wb") as dst:
2298 with self.assertRaises(_GiveupOnFastCopy):
2299 self.zerocopy_fun(src, dst)
2300 shutil.copyfileobj(src, dst)
2301
2302 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2303
2304 def test_non_regular_file_dst(self):
2305 with open(TESTFN, "rb") as src:
2306 with io.BytesIO() as dst:
2307 with self.assertRaises(_GiveupOnFastCopy):
2308 self.zerocopy_fun(src, dst)
2309 shutil.copyfileobj(src, dst)
2310 dst.seek(0)
2311 self.assertEqual(dst.read(), self.FILEDATA)
2312
2313 def test_exception_on_second_call(self):
2314 def sendfile(*args, **kwargs):
2315 if not flag:
2316 flag.append(None)
2317 return orig_sendfile(*args, **kwargs)
2318 else:
2319 raise OSError(errno.EBADF, "yo")
2320
2321 flag = []
2322 orig_sendfile = os.sendfile
2323 with unittest.mock.patch('os.sendfile', create=True,
2324 side_effect=sendfile):
2325 with self.get_files() as (src, dst):
2326 with self.assertRaises(OSError) as cm:
2327 shutil._fastcopy_sendfile(src, dst)
2328 assert flag
2329 self.assertEqual(cm.exception.errno, errno.EBADF)
2330
2331 def test_cant_get_size(self):
2332 # Emulate a case where src file size cannot be determined.
2333 # Internally bufsize will be set to a small value and
2334 # sendfile() will be called repeatedly.
2335 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2336 with self.get_files() as (src, dst):
2337 shutil._fastcopy_sendfile(src, dst)
2338 assert m.called
2339 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2340
2341 def test_small_chunks(self):
2342 # Force internal file size detection to be smaller than the
2343 # actual file size. We want to force sendfile() to be called
2344 # multiple times, also in order to emulate a src fd which gets
2345 # bigger while it is being copied.
2346 mock = unittest.mock.Mock()
2347 mock.st_size = 65536 + 1
2348 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2349 with self.get_files() as (src, dst):
2350 shutil._fastcopy_sendfile(src, dst)
2351 assert m.called
2352 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2353
2354 def test_big_chunk(self):
2355 # Force internal file size detection to be +100MB bigger than
2356 # the actual file size. Make sure sendfile() does not rely on
2357 # file size value except for (maybe) a better throughput /
2358 # performance.
2359 mock = unittest.mock.Mock()
2360 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2361 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2362 with self.get_files() as (src, dst):
2363 shutil._fastcopy_sendfile(src, dst)
2364 assert m.called
2365 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2366
2367 def test_blocksize_arg(self):
2368 with unittest.mock.patch('os.sendfile',
2369 side_effect=ZeroDivisionError) as m:
2370 self.assertRaises(ZeroDivisionError,
2371 shutil.copyfile, TESTFN, TESTFN2)
2372 blocksize = m.call_args[0][3]
2373 # Make sure file size and the block size arg passed to
2374 # sendfile() are the same.
2375 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2376 # ...unless we're dealing with a small file.
2377 support.unlink(TESTFN2)
2378 write_file(TESTFN2, b"hello", binary=True)
2379 self.addCleanup(support.unlink, TESTFN2 + '3')
2380 self.assertRaises(ZeroDivisionError,
2381 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2382 blocksize = m.call_args[0][3]
2383 self.assertEqual(blocksize, 2 ** 23)
2384
2385 def test_file2file_not_supported(self):
2386 # Emulate a case where sendfile() only support file->socket
2387 # fds. In such a case copyfile() is supposed to skip the
2388 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002389 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002390 try:
2391 with unittest.mock.patch(
2392 self.PATCHPOINT,
2393 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2394 with self.get_files() as (src, dst):
2395 with self.assertRaises(_GiveupOnFastCopy):
2396 shutil._fastcopy_sendfile(src, dst)
2397 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002398 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002399
2400 with unittest.mock.patch(self.PATCHPOINT) as m:
2401 shutil.copyfile(TESTFN, TESTFN2)
2402 assert not m.called
2403 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002404 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002405
2406
Victor Stinner937ee9e2018-06-26 02:11:06 +02002407@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002408class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002409 PATCHPOINT = "posix._fcopyfile"
2410
2411 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002412 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002413
2414
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002415class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002416 def test_does_not_crash(self):
2417 """Check if get_terminal_size() returns a meaningful value.
2418
2419 There's no easy portable way to actually check the size of the
2420 terminal, so let's check if it returns something sensible instead.
2421 """
2422 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002423 self.assertGreaterEqual(size.columns, 0)
2424 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002425
2426 def test_os_environ_first(self):
2427 "Check if environment variables have precedence"
2428
2429 with support.EnvironmentVarGuard() as env:
2430 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002431 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002432 size = shutil.get_terminal_size()
2433 self.assertEqual(size.columns, 777)
2434
2435 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002436 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002437 env['LINES'] = '888'
2438 size = shutil.get_terminal_size()
2439 self.assertEqual(size.lines, 888)
2440
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002441 def test_bad_environ(self):
2442 with support.EnvironmentVarGuard() as env:
2443 env['COLUMNS'] = 'xxx'
2444 env['LINES'] = 'yyy'
2445 size = shutil.get_terminal_size()
2446 self.assertGreaterEqual(size.columns, 0)
2447 self.assertGreaterEqual(size.lines, 0)
2448
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002449 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002450 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2451 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002452 def test_stty_match(self):
2453 """Check if stty returns the same results ignoring env
2454
2455 This test will fail if stdin and stdout are connected to
2456 different terminals with different sizes. Nevertheless, such
2457 situations should be pretty rare.
2458 """
2459 try:
2460 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002461 except (FileNotFoundError, PermissionError,
2462 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002463 self.skipTest("stty invocation failed")
2464 expected = (int(size[1]), int(size[0])) # reversed order
2465
2466 with support.EnvironmentVarGuard() as env:
2467 del env['LINES']
2468 del env['COLUMNS']
2469 actual = shutil.get_terminal_size()
2470
2471 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002472
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002473 def test_fallback(self):
2474 with support.EnvironmentVarGuard() as env:
2475 del env['LINES']
2476 del env['COLUMNS']
2477
2478 # sys.__stdout__ has no fileno()
2479 with support.swap_attr(sys, '__stdout__', None):
2480 size = shutil.get_terminal_size(fallback=(10, 20))
2481 self.assertEqual(size.columns, 10)
2482 self.assertEqual(size.lines, 20)
2483
2484 # sys.__stdout__ is not a terminal on Unix
2485 # or fileno() not in (0, 1, 2) on Windows
2486 with open(os.devnull, 'w') as f, \
2487 support.swap_attr(sys, '__stdout__', f):
2488 size = shutil.get_terminal_size(fallback=(30, 40))
2489 self.assertEqual(size.columns, 30)
2490 self.assertEqual(size.lines, 40)
2491
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002492
Berker Peksag8083cd62014-11-01 11:04:06 +02002493class PublicAPITests(unittest.TestCase):
2494 """Ensures that the correct values are exposed in the public API."""
2495
2496 def test_module_all_attribute(self):
2497 self.assertTrue(hasattr(shutil, '__all__'))
2498 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2499 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2500 'SpecialFileError', 'ExecError', 'make_archive',
2501 'get_archive_formats', 'register_archive_format',
2502 'unregister_archive_format', 'get_unpack_formats',
2503 'register_unpack_format', 'unregister_unpack_format',
2504 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2505 'get_terminal_size', 'SameFileError']
2506 if hasattr(os, 'statvfs') or os.name == 'nt':
2507 target_api.append('disk_usage')
2508 self.assertEqual(set(shutil.__all__), set(target_api))
2509
2510
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002511if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002512 unittest.main()