blob: e2574253d961f94ab82326dea6edc3ca45c108e6 [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Hai Shi0c4f0f32020-06-30 21:46:31 +080033from test.support import os_helper
34from test.support.os_helper import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030035
Antoine Pitrou7fff0962009-05-01 21:09:44 +000036TESTFN2 = TESTFN + "2"
Winson Luk132131b2021-03-02 15:53:15 -050037TESTFN_SRC = TESTFN + "_SRC"
38TESTFN_DST = TESTFN + "_DST"
Victor Stinner937ee9e2018-06-26 02:11:06 +020039MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010040AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000041try:
42 import grp
43 import pwd
44 UID_GID_SUPPORT = True
45except ImportError:
46 UID_GID_SUPPORT = False
47
Steve Dowerdf2d4a62019-08-21 15:27:33 -070048try:
49 import _winapi
50except ImportError:
51 _winapi = None
52
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040053def _fake_rename(*args, **kwargs):
54 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010055 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040056
57def mock_rename(func):
58 @functools.wraps(func)
59 def wrap(*args, **kwargs):
60 try:
61 builtin_rename = os.rename
62 os.rename = _fake_rename
63 return func(*args, **kwargs)
64 finally:
65 os.rename = builtin_rename
66 return wrap
67
Éric Araujoa7e33a12011-08-12 19:51:35 +020068def write_file(path, content, binary=False):
69 """Write *content* to a file located at *path*.
70
71 If *path* is a tuple instead of a string, os.path.join will be used to
72 make a path. If *binary* is true, the file will be opened in binary
73 mode.
74 """
75 if isinstance(path, tuple):
76 path = os.path.join(*path)
Inada Naokic8e5eb92021-04-05 13:11:23 +090077 mode = 'wb' if binary else 'w'
78 encoding = None if binary else "utf-8"
79 with open(path, mode, encoding=encoding) as fp:
Éric Araujoa7e33a12011-08-12 19:51:35 +020080 fp.write(content)
81
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020082def write_test_file(path, size):
83 """Create a test file with an arbitrary size and random text content."""
84 def chunks(total, step):
85 assert total >= step
86 while total > step:
87 yield step
88 total -= step
89 if total:
90 yield total
91
92 bufsize = min(size, 8192)
93 chunk = b"".join([random.choice(string.ascii_letters).encode()
94 for i in range(bufsize)])
95 with open(path, 'wb') as f:
96 for csize in chunks(size, bufsize):
97 f.write(chunk)
98 assert os.path.getsize(path) == size
99
Éric Araujoa7e33a12011-08-12 19:51:35 +0200100def read_file(path, binary=False):
101 """Return contents from a file located at *path*.
102
103 If *path* is a tuple instead of a string, os.path.join will be used to
104 make a path. If *binary* is true, the file will be opened in binary
105 mode.
106 """
107 if isinstance(path, tuple):
108 path = os.path.join(*path)
Inada Naokic8e5eb92021-04-05 13:11:23 +0900109 mode = 'rb' if binary else 'r'
110 encoding = None if binary else "utf-8"
111 with open(path, mode, encoding=encoding) as fp:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200112 return fp.read()
113
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300114def rlistdir(path):
115 res = []
116 for name in sorted(os.listdir(path)):
117 p = os.path.join(path, name)
118 if os.path.isdir(p) and not os.path.islink(p):
119 res.append(name + '/')
120 for n in rlistdir(p):
121 res.append(name + '/' + n)
122 else:
123 res.append(name)
124 return res
125
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200126def supports_file2file_sendfile():
127 # ...apparently Linux and Solaris are the only ones
128 if not hasattr(os, "sendfile"):
129 return False
130 srcname = None
131 dstname = None
132 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800133 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200134 srcname = f.name
135 f.write(b"0123456789")
136
137 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800138 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200139 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200140 infd = src.fileno()
141 outfd = dst.fileno()
142 try:
143 os.sendfile(outfd, infd, 0, 2)
144 except OSError:
145 return False
146 else:
147 return True
148 finally:
149 if srcname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800150 os_helper.unlink(srcname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200151 if dstname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800152 os_helper.unlink(dstname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200153
154
155SUPPORTS_SENDFILE = supports_file2file_sendfile()
156
Michael Feltef110b12019-02-18 12:02:44 +0100157# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
158# The AIX command 'dump -o program' gives XCOFF header information
159# The second word of the last line in the maxdata value
160# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
161def _maxdataOK():
162 if AIX and sys.maxsize == 2147483647:
163 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
164 maxdata=hdrs.split("\n")[-1].split()[1]
165 return int(maxdata,16) >= 0x20000000
166 else:
167 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200168
Tarek Ziadé396fad72010-02-23 05:30:31 +0000169
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300170class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000171
Steve Dowerabde52c2019-11-15 09:49:21 -0800172 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000173 """Create a temporary directory that will be cleaned up.
174
175 Returns the path of the directory.
176 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800177 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Hai Shi0c4f0f32020-06-30 21:46:31 +0800178 self.addCleanup(os_helper.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000179 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000180
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300181
182class TestRmTree(BaseTest, unittest.TestCase):
183
Hynek Schlawack3b527782012-06-25 13:27:31 +0200184 def test_rmtree_works_on_bytes(self):
185 tmp = self.mkdtemp()
186 victim = os.path.join(tmp, 'killme')
187 os.mkdir(victim)
188 write_file(os.path.join(victim, 'somefile'), 'foo')
189 victim = os.fsencode(victim)
190 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700191 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200192
Hai Shi0c4f0f32020-06-30 21:46:31 +0800193 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200194 def test_rmtree_fails_on_symlink(self):
195 tmp = self.mkdtemp()
196 dir_ = os.path.join(tmp, 'dir')
197 os.mkdir(dir_)
198 link = os.path.join(tmp, 'link')
199 os.symlink(dir_, link)
200 self.assertRaises(OSError, shutil.rmtree, link)
201 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100202 self.assertTrue(os.path.lexists(link))
203 errors = []
204 def onerror(*args):
205 errors.append(args)
206 shutil.rmtree(link, onerror=onerror)
207 self.assertEqual(len(errors), 1)
208 self.assertIs(errors[0][0], os.path.islink)
209 self.assertEqual(errors[0][1], link)
210 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200211
Hai Shi0c4f0f32020-06-30 21:46:31 +0800212 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200213 def test_rmtree_works_on_symlinks(self):
214 tmp = self.mkdtemp()
215 dir1 = os.path.join(tmp, 'dir1')
216 dir2 = os.path.join(dir1, 'dir2')
217 dir3 = os.path.join(tmp, 'dir3')
218 for d in dir1, dir2, dir3:
219 os.mkdir(d)
220 file1 = os.path.join(tmp, 'file1')
221 write_file(file1, 'foo')
222 link1 = os.path.join(dir1, 'link1')
223 os.symlink(dir2, link1)
224 link2 = os.path.join(dir1, 'link2')
225 os.symlink(dir3, link2)
226 link3 = os.path.join(dir1, 'link3')
227 os.symlink(file1, link3)
228 # make sure symlinks are removed but not followed
229 shutil.rmtree(dir1)
230 self.assertFalse(os.path.exists(dir1))
231 self.assertTrue(os.path.exists(dir3))
232 self.assertTrue(os.path.exists(file1))
233
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700234 @unittest.skipUnless(_winapi, 'only relevant on Windows')
235 def test_rmtree_fails_on_junctions(self):
236 tmp = self.mkdtemp()
237 dir_ = os.path.join(tmp, 'dir')
238 os.mkdir(dir_)
239 link = os.path.join(tmp, 'link')
240 _winapi.CreateJunction(dir_, link)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800241 self.addCleanup(os_helper.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700242 self.assertRaises(OSError, shutil.rmtree, link)
243 self.assertTrue(os.path.exists(dir_))
244 self.assertTrue(os.path.lexists(link))
245 errors = []
246 def onerror(*args):
247 errors.append(args)
248 shutil.rmtree(link, onerror=onerror)
249 self.assertEqual(len(errors), 1)
250 self.assertIs(errors[0][0], os.path.islink)
251 self.assertEqual(errors[0][1], link)
252 self.assertIsInstance(errors[0][2][1], OSError)
253
254 @unittest.skipUnless(_winapi, 'only relevant on Windows')
255 def test_rmtree_works_on_junctions(self):
256 tmp = self.mkdtemp()
257 dir1 = os.path.join(tmp, 'dir1')
258 dir2 = os.path.join(dir1, 'dir2')
259 dir3 = os.path.join(tmp, 'dir3')
260 for d in dir1, dir2, dir3:
261 os.mkdir(d)
262 file1 = os.path.join(tmp, 'file1')
263 write_file(file1, 'foo')
264 link1 = os.path.join(dir1, 'link1')
265 _winapi.CreateJunction(dir2, link1)
266 link2 = os.path.join(dir1, 'link2')
267 _winapi.CreateJunction(dir3, link2)
268 link3 = os.path.join(dir1, 'link3')
269 _winapi.CreateJunction(file1, link3)
270 # make sure junctions are removed but not followed
271 shutil.rmtree(dir1)
272 self.assertFalse(os.path.exists(dir1))
273 self.assertTrue(os.path.exists(dir3))
274 self.assertTrue(os.path.exists(file1))
275
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000276 def test_rmtree_errors(self):
277 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800278 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100279 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
280 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100281 shutil.rmtree(filename, ignore_errors=True)
282
283 # existing file
284 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100285 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100286 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100287 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100288 shutil.rmtree(filename)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100289 self.assertEqual(cm.exception.filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100290 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100291 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100292 shutil.rmtree(filename, ignore_errors=True)
293 self.assertTrue(os.path.exists(filename))
294 errors = []
295 def onerror(*args):
296 errors.append(args)
297 shutil.rmtree(filename, onerror=onerror)
298 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200299 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100300 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100301 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100302 self.assertEqual(errors[0][2][1].filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100303 self.assertIs(errors[1][0], os.rmdir)
304 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100305 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100306 self.assertEqual(errors[1][2][1].filename, filename)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000307
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000308
Serhiy Storchaka43767632013-11-03 21:31:38 +0200309 @unittest.skipIf(sys.platform[:6] == 'cygwin',
310 "This test can't be run on Cygwin (issue #1071513).")
311 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
312 "This test can't be run reliably as root (issue #1076467).")
313 def test_on_error(self):
314 self.errorState = 0
315 os.mkdir(TESTFN)
316 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200317
Serhiy Storchaka43767632013-11-03 21:31:38 +0200318 self.child_file_path = os.path.join(TESTFN, 'a')
319 self.child_dir_path = os.path.join(TESTFN, 'b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800320 os_helper.create_empty_file(self.child_file_path)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200321 os.mkdir(self.child_dir_path)
322 old_dir_mode = os.stat(TESTFN).st_mode
323 old_child_file_mode = os.stat(self.child_file_path).st_mode
324 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
325 # Make unwritable.
326 new_mode = stat.S_IREAD|stat.S_IEXEC
327 os.chmod(self.child_file_path, new_mode)
328 os.chmod(self.child_dir_path, new_mode)
329 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000330
Serhiy Storchaka43767632013-11-03 21:31:38 +0200331 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
332 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
333 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200334
Serhiy Storchaka43767632013-11-03 21:31:38 +0200335 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
336 # Test whether onerror has actually been called.
337 self.assertEqual(self.errorState, 3,
338 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000339
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000340 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000341 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200342 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000343 # This function is run when shutil.rmtree fails.
344 # 99.9% of the time it initially fails to remove
345 # a file in the directory, so the first time through
346 # func is os.remove.
347 # However, some Linux machines running ZFS on
348 # FUSE experienced a failure earlier in the process
349 # at os.listdir. The first failure may legally
350 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200351 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200352 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200353 self.assertEqual(arg, self.child_file_path)
354 elif func is os.rmdir:
355 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000356 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200357 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200358 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000359 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200360 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000361 else:
362 self.assertEqual(func, os.rmdir)
363 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000364 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200365 self.errorState = 3
366
367 def test_rmtree_does_not_choke_on_failing_lstat(self):
368 try:
369 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200370 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200371 if fn != TESTFN:
372 raise OSError()
373 else:
374 return orig_lstat(fn)
375 os.lstat = raiser
376
377 os.mkdir(TESTFN)
378 write_file((TESTFN, 'foo'), 'foo')
379 shutil.rmtree(TESTFN)
380 finally:
381 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000382
Hynek Schlawack2100b422012-06-23 20:28:32 +0200383 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200384 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
385 os.supports_dir_fd and
386 os.listdir in os.supports_fd and
387 os.stat in os.supports_follow_symlinks)
388 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200389 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000390 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200391 tmp_dir = self.mkdtemp()
392 d = os.path.join(tmp_dir, 'a')
393 os.mkdir(d)
394 try:
395 real_rmtree = shutil._rmtree_safe_fd
396 class Called(Exception): pass
397 def _raiser(*args, **kwargs):
398 raise Called
399 shutil._rmtree_safe_fd = _raiser
400 self.assertRaises(Called, shutil.rmtree, d)
401 finally:
402 shutil._rmtree_safe_fd = real_rmtree
403 else:
404 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000405 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200406
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000407 def test_rmtree_dont_delete_file(self):
408 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800409 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200410 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200411 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000412 os.remove(path)
413
Hai Shi0c4f0f32020-06-30 21:46:31 +0800414 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300415 def test_rmtree_on_symlink(self):
416 # bug 1669.
417 os.mkdir(TESTFN)
418 try:
419 src = os.path.join(TESTFN, 'cheese')
420 dst = os.path.join(TESTFN, 'shop')
421 os.mkdir(src)
422 os.symlink(src, dst)
423 self.assertRaises(OSError, shutil.rmtree, dst)
424 shutil.rmtree(dst, ignore_errors=True)
425 finally:
426 shutil.rmtree(TESTFN, ignore_errors=True)
427
428 @unittest.skipUnless(_winapi, 'only relevant on Windows')
429 def test_rmtree_on_junction(self):
430 os.mkdir(TESTFN)
431 try:
432 src = os.path.join(TESTFN, 'cheese')
433 dst = os.path.join(TESTFN, 'shop')
434 os.mkdir(src)
435 open(os.path.join(src, 'spam'), 'wb').close()
436 _winapi.CreateJunction(src, dst)
437 self.assertRaises(OSError, shutil.rmtree, dst)
438 shutil.rmtree(dst, ignore_errors=True)
439 finally:
440 shutil.rmtree(TESTFN, ignore_errors=True)
441
442
443class TestCopyTree(BaseTest, unittest.TestCase):
444
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000445 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800446 src_dir = self.mkdtemp()
447 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200448 self.addCleanup(shutil.rmtree, src_dir)
449 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
450 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000451 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200452 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000453
Éric Araujoa7e33a12011-08-12 19:51:35 +0200454 shutil.copytree(src_dir, dst_dir)
455 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
456 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
457 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
458 'test.txt')))
459 actual = read_file((dst_dir, 'test.txt'))
460 self.assertEqual(actual, '123')
461 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
462 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000463
jab9e00d9e2018-12-28 13:03:40 -0500464 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800465 src_dir = self.mkdtemp()
466 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500467 self.addCleanup(shutil.rmtree, src_dir)
468 self.addCleanup(shutil.rmtree, dst_dir)
469
470 write_file((src_dir, 'nonexisting.txt'), '123')
471 os.mkdir(os.path.join(src_dir, 'existing_dir'))
472 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
473 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
474 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
475
476 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
477 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
478 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
479 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
480 'existing.txt')))
481 actual = read_file((dst_dir, 'nonexisting.txt'))
482 self.assertEqual(actual, '123')
483 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
484 self.assertEqual(actual, 'has been replaced')
485
486 with self.assertRaises(FileExistsError):
487 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
488
Hai Shi0c4f0f32020-06-30 21:46:31 +0800489 @os_helper.skip_unless_symlink
Antoine Pitrou78091e62011-12-29 18:54:15 +0100490 def test_copytree_symlinks(self):
491 tmp_dir = self.mkdtemp()
492 src_dir = os.path.join(tmp_dir, 'src')
493 dst_dir = os.path.join(tmp_dir, 'dst')
494 sub_dir = os.path.join(src_dir, 'sub')
495 os.mkdir(src_dir)
496 os.mkdir(sub_dir)
497 write_file((src_dir, 'file.txt'), 'foo')
498 src_link = os.path.join(sub_dir, 'link')
499 dst_link = os.path.join(dst_dir, 'sub/link')
500 os.symlink(os.path.join(src_dir, 'file.txt'),
501 src_link)
502 if hasattr(os, 'lchmod'):
503 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
504 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
505 os.lchflags(src_link, stat.UF_NODUMP)
506 src_stat = os.lstat(src_link)
507 shutil.copytree(src_dir, dst_dir, symlinks=True)
508 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700509 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
510 # Bad practice to blindly strip the prefix as it may be required to
511 # correctly refer to the file, but we're only comparing paths here.
512 if os.name == 'nt' and actual.startswith('\\\\?\\'):
513 actual = actual[4:]
514 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100515 dst_stat = os.lstat(dst_link)
516 if hasattr(os, 'lchmod'):
517 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
518 if hasattr(os, 'lchflags'):
519 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
520
Georg Brandl2ee470f2008-07-16 12:55:28 +0000521 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000522 # creating data
523 join = os.path.join
524 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800525 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000526 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800527 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200528 write_file((src_dir, 'test.txt'), '123')
529 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000530 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200531 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000532 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200533 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000534 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
535 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200536 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
537 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000538
539 # testing glob-like patterns
540 try:
541 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
542 shutil.copytree(src_dir, dst_dir, ignore=patterns)
543 # checking the result: some elements should not be copied
544 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200545 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
546 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000547 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200548 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000549 try:
550 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
551 shutil.copytree(src_dir, dst_dir, ignore=patterns)
552 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200553 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
554 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
555 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000556 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200557 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000558
559 # testing callable-style
560 try:
561 def _filter(src, names):
562 res = []
563 for name in names:
564 path = os.path.join(src, name)
565
566 if (os.path.isdir(path) and
567 path.split()[-1] == 'subdir'):
568 res.append(name)
569 elif os.path.splitext(path)[-1] in ('.py'):
570 res.append(name)
571 return res
572
573 shutil.copytree(src_dir, dst_dir, ignore=_filter)
574
575 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200576 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
577 'test.py')))
578 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000579
580 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200581 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000582 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000583 shutil.rmtree(src_dir)
584 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000585
mbarkhau88704332020-01-24 14:51:16 +0000586 def test_copytree_arg_types_of_ignore(self):
587 join = os.path.join
588 exists = os.path.exists
589
590 tmp_dir = self.mkdtemp()
591 src_dir = join(tmp_dir, "source")
592
593 os.mkdir(join(src_dir))
594 os.mkdir(join(src_dir, 'test_dir'))
595 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
596 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
597
598 invokations = []
599
600 def _ignore(src, names):
601 invokations.append(src)
602 self.assertIsInstance(src, str)
603 self.assertIsInstance(names, list)
604 self.assertEqual(len(names), len(set(names)))
605 for name in names:
606 self.assertIsInstance(name, str)
607 return []
608
609 dst_dir = join(self.mkdtemp(), 'destination')
610 shutil.copytree(src_dir, dst_dir, ignore=_ignore)
611 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
612 'test.txt')))
613
614 dst_dir = join(self.mkdtemp(), 'destination')
615 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
616 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
617 'test.txt')))
618
619 dst_dir = join(self.mkdtemp(), 'destination')
620 src_dir_entry = list(os.scandir(tmp_dir))[0]
621 self.assertIsInstance(src_dir_entry, os.DirEntry)
622 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
623 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
624 'test.txt')))
625
626 self.assertEqual(len(invokations), 9)
627
Antoine Pitrouac601602013-08-16 19:35:02 +0200628 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800629 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200630 src_dir = os.path.join(tmp_dir, 'source')
631 os.mkdir(src_dir)
632 dst_dir = os.path.join(tmp_dir, 'destination')
633 self.addCleanup(shutil.rmtree, tmp_dir)
634
635 os.chmod(src_dir, 0o777)
636 write_file((src_dir, 'permissive.txt'), '123')
637 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
638 write_file((src_dir, 'restrictive.txt'), '456')
639 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
640 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800641 self.addCleanup(os_helper.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200642 os.chmod(restrictive_subdir, 0o600)
643
644 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400645 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
646 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200647 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400648 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200649 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
650 restrictive_subdir_dst = os.path.join(dst_dir,
651 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400652 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200653 os.stat(restrictive_subdir_dst).st_mode)
654
Berker Peksag884afd92014-12-10 02:50:32 +0200655 @unittest.mock.patch('os.chmod')
656 def test_copytree_winerror(self, mock_patch):
657 # When copying to VFAT, copystat() raises OSError. On Windows, the
658 # exception object has a meaningful 'winerror' attribute, but not
659 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800660 src_dir = self.mkdtemp()
661 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200662 self.addCleanup(shutil.rmtree, src_dir)
663 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
664
665 mock_patch.side_effect = PermissionError('ka-boom')
666 with self.assertRaises(shutil.Error):
667 shutil.copytree(src_dir, dst_dir)
668
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100669 def test_copytree_custom_copy_function(self):
670 # See: https://bugs.python.org/issue35648
671 def custom_cpfun(a, b):
672 flag.append(None)
673 self.assertIsInstance(a, str)
674 self.assertIsInstance(b, str)
675 self.assertEqual(a, os.path.join(src, 'foo'))
676 self.assertEqual(b, os.path.join(dst, 'foo'))
677
678 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800679 src = self.mkdtemp()
680 dst = tempfile.mktemp(dir=self.mkdtemp())
Inada Naokic8e5eb92021-04-05 13:11:23 +0900681 with open(os.path.join(src, 'foo'), 'w', encoding='utf-8') as f:
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100682 f.close()
683 shutil.copytree(src, dst, copy_function=custom_cpfun)
684 self.assertEqual(len(flag), 1)
685
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300686 # Issue #3002: copyfile and copytree block indefinitely on named pipes
687 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800688 @os_helper.skip_unless_symlink
pxinwr6a273fd2020-11-29 06:06:36 +0800689 @unittest.skipIf(sys.platform == "vxworks",
690 "fifo requires special path on VxWorks")
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300691 def test_copytree_named_pipe(self):
692 os.mkdir(TESTFN)
693 try:
694 subdir = os.path.join(TESTFN, "subdir")
695 os.mkdir(subdir)
696 pipe = os.path.join(subdir, "mypipe")
697 try:
698 os.mkfifo(pipe)
699 except PermissionError as e:
700 self.skipTest('os.mkfifo(): %s' % e)
701 try:
702 shutil.copytree(TESTFN, TESTFN2)
703 except shutil.Error as e:
704 errors = e.args[0]
705 self.assertEqual(len(errors), 1)
706 src, dst, error_msg = errors[0]
707 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
708 else:
709 self.fail("shutil.Error should have been raised")
710 finally:
711 shutil.rmtree(TESTFN, ignore_errors=True)
712 shutil.rmtree(TESTFN2, ignore_errors=True)
713
714 def test_copytree_special_func(self):
715 src_dir = self.mkdtemp()
716 dst_dir = os.path.join(self.mkdtemp(), 'destination')
717 write_file((src_dir, 'test.txt'), '123')
718 os.mkdir(os.path.join(src_dir, 'test_dir'))
719 write_file((src_dir, 'test_dir', 'test.txt'), '456')
720
721 copied = []
722 def _copy(src, dst):
723 copied.append((src, dst))
724
725 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
726 self.assertEqual(len(copied), 2)
727
Hai Shi0c4f0f32020-06-30 21:46:31 +0800728 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300729 def test_copytree_dangling_symlinks(self):
730 # a dangling symlink raises an error at the end
731 src_dir = self.mkdtemp()
732 dst_dir = os.path.join(self.mkdtemp(), 'destination')
733 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
734 os.mkdir(os.path.join(src_dir, 'test_dir'))
735 write_file((src_dir, 'test_dir', 'test.txt'), '456')
736 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
737
738 # a dangling symlink is ignored with the proper flag
739 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
740 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
741 self.assertNotIn('test.txt', os.listdir(dst_dir))
742
743 # a dangling symlink is copied if symlinks=True
744 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
745 shutil.copytree(src_dir, dst_dir, symlinks=True)
746 self.assertIn('test.txt', os.listdir(dst_dir))
747
Hai Shi0c4f0f32020-06-30 21:46:31 +0800748 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300749 def test_copytree_symlink_dir(self):
750 src_dir = self.mkdtemp()
751 dst_dir = os.path.join(self.mkdtemp(), 'destination')
752 os.mkdir(os.path.join(src_dir, 'real_dir'))
Inada Naokic8e5eb92021-04-05 13:11:23 +0900753 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'wb'):
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300754 pass
755 os.symlink(os.path.join(src_dir, 'real_dir'),
756 os.path.join(src_dir, 'link_to_dir'),
757 target_is_directory=True)
758
759 shutil.copytree(src_dir, dst_dir, symlinks=False)
760 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
761 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
762
763 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
764 shutil.copytree(src_dir, dst_dir, symlinks=True)
765 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
766 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
767
768 def test_copytree_return_value(self):
769 # copytree returns its destination path.
770 src_dir = self.mkdtemp()
771 dst_dir = src_dir + "dest"
772 self.addCleanup(shutil.rmtree, dst_dir, True)
773 src = os.path.join(src_dir, 'foo')
774 write_file(src, 'foo')
775 rv = shutil.copytree(src_dir, dst_dir)
776 self.assertEqual(['foo'], os.listdir(rv))
777
Bruno P. Kinoshita9bbcbc92019-11-27 14:10:37 +1300778 def test_copytree_subdirectory(self):
779 # copytree where dst is a subdirectory of src, see Issue 38688
780 base_dir = self.mkdtemp()
781 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
782 src_dir = os.path.join(base_dir, "t", "pg")
783 dst_dir = os.path.join(src_dir, "somevendor", "1.0")
784 os.makedirs(src_dir)
785 src = os.path.join(src_dir, 'pol')
786 write_file(src, 'pol')
787 rv = shutil.copytree(src_dir, dst_dir)
788 self.assertEqual(['pol'], os.listdir(rv))
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300789
790class TestCopy(BaseTest, unittest.TestCase):
791
792 ### shutil.copymode
793
Hai Shi0c4f0f32020-06-30 21:46:31 +0800794 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300795 def test_copymode_follow_symlinks(self):
796 tmp_dir = self.mkdtemp()
797 src = os.path.join(tmp_dir, 'foo')
798 dst = os.path.join(tmp_dir, 'bar')
799 src_link = os.path.join(tmp_dir, 'baz')
800 dst_link = os.path.join(tmp_dir, 'quux')
801 write_file(src, 'foo')
802 write_file(dst, 'foo')
803 os.symlink(src, src_link)
804 os.symlink(dst, dst_link)
805 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
806 # file to file
807 os.chmod(dst, stat.S_IRWXO)
808 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
809 shutil.copymode(src, dst)
810 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
811 # On Windows, os.chmod does not follow symlinks (issue #15411)
812 if os.name != 'nt':
813 # follow src link
814 os.chmod(dst, stat.S_IRWXO)
815 shutil.copymode(src_link, dst)
816 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
817 # follow dst link
818 os.chmod(dst, stat.S_IRWXO)
819 shutil.copymode(src, dst_link)
820 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
821 # follow both links
822 os.chmod(dst, stat.S_IRWXO)
823 shutil.copymode(src_link, dst_link)
824 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
825
826 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800827 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300828 def test_copymode_symlink_to_symlink(self):
829 tmp_dir = self.mkdtemp()
830 src = os.path.join(tmp_dir, 'foo')
831 dst = os.path.join(tmp_dir, 'bar')
832 src_link = os.path.join(tmp_dir, 'baz')
833 dst_link = os.path.join(tmp_dir, 'quux')
834 write_file(src, 'foo')
835 write_file(dst, 'foo')
836 os.symlink(src, src_link)
837 os.symlink(dst, dst_link)
838 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
839 os.chmod(dst, stat.S_IRWXU)
840 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
841 # link to link
842 os.lchmod(dst_link, stat.S_IRWXO)
843 shutil.copymode(src_link, dst_link, follow_symlinks=False)
844 self.assertEqual(os.lstat(src_link).st_mode,
845 os.lstat(dst_link).st_mode)
846 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
847 # src link - use chmod
848 os.lchmod(dst_link, stat.S_IRWXO)
849 shutil.copymode(src_link, dst, follow_symlinks=False)
850 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
851 # dst link - use chmod
852 os.lchmod(dst_link, stat.S_IRWXO)
853 shutil.copymode(src, dst_link, follow_symlinks=False)
854 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
855
856 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800857 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300858 def test_copymode_symlink_to_symlink_wo_lchmod(self):
859 tmp_dir = self.mkdtemp()
860 src = os.path.join(tmp_dir, 'foo')
861 dst = os.path.join(tmp_dir, 'bar')
862 src_link = os.path.join(tmp_dir, 'baz')
863 dst_link = os.path.join(tmp_dir, 'quux')
864 write_file(src, 'foo')
865 write_file(dst, 'foo')
866 os.symlink(src, src_link)
867 os.symlink(dst, dst_link)
868 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
869
870 ### shutil.copystat
871
Hai Shi0c4f0f32020-06-30 21:46:31 +0800872 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300873 def test_copystat_symlinks(self):
874 tmp_dir = self.mkdtemp()
875 src = os.path.join(tmp_dir, 'foo')
876 dst = os.path.join(tmp_dir, 'bar')
877 src_link = os.path.join(tmp_dir, 'baz')
878 dst_link = os.path.join(tmp_dir, 'qux')
879 write_file(src, 'foo')
880 src_stat = os.stat(src)
881 os.utime(src, (src_stat.st_atime,
882 src_stat.st_mtime - 42.0)) # ensure different mtimes
883 write_file(dst, 'bar')
884 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
885 os.symlink(src, src_link)
886 os.symlink(dst, dst_link)
887 if hasattr(os, 'lchmod'):
888 os.lchmod(src_link, stat.S_IRWXO)
889 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
890 os.lchflags(src_link, stat.UF_NODUMP)
891 src_link_stat = os.lstat(src_link)
892 # follow
893 if hasattr(os, 'lchmod'):
894 shutil.copystat(src_link, dst_link, follow_symlinks=True)
895 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
896 # don't follow
897 shutil.copystat(src_link, dst_link, follow_symlinks=False)
898 dst_link_stat = os.lstat(dst_link)
899 if os.utime in os.supports_follow_symlinks:
900 for attr in 'st_atime', 'st_mtime':
901 # The modification times may be truncated in the new file.
902 self.assertLessEqual(getattr(src_link_stat, attr),
903 getattr(dst_link_stat, attr) + 1)
904 if hasattr(os, 'lchmod'):
905 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
906 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
907 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
908 # tell to follow but dst is not a link
909 shutil.copystat(src_link, dst, follow_symlinks=False)
910 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
911 00000.1)
912
913 @unittest.skipUnless(hasattr(os, 'chflags') and
914 hasattr(errno, 'EOPNOTSUPP') and
915 hasattr(errno, 'ENOTSUP'),
916 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
917 def test_copystat_handles_harmless_chflags_errors(self):
918 tmpdir = self.mkdtemp()
919 file1 = os.path.join(tmpdir, 'file1')
920 file2 = os.path.join(tmpdir, 'file2')
921 write_file(file1, 'xxx')
922 write_file(file2, 'xxx')
923
924 def make_chflags_raiser(err):
925 ex = OSError()
926
927 def _chflags_raiser(path, flags, *, follow_symlinks=True):
928 ex.errno = err
929 raise ex
930 return _chflags_raiser
931 old_chflags = os.chflags
932 try:
933 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
934 os.chflags = make_chflags_raiser(err)
935 shutil.copystat(file1, file2)
936 # assert others errors break it
937 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
938 self.assertRaises(OSError, shutil.copystat, file1, file2)
939 finally:
940 os.chflags = old_chflags
941
942 ### shutil.copyxattr
943
Hai Shi0c4f0f32020-06-30 21:46:31 +0800944 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300945 def test_copyxattr(self):
946 tmp_dir = self.mkdtemp()
947 src = os.path.join(tmp_dir, 'foo')
948 write_file(src, 'foo')
949 dst = os.path.join(tmp_dir, 'bar')
950 write_file(dst, 'bar')
951
952 # no xattr == no problem
953 shutil._copyxattr(src, dst)
954 # common case
955 os.setxattr(src, 'user.foo', b'42')
956 os.setxattr(src, 'user.bar', b'43')
957 shutil._copyxattr(src, dst)
958 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
959 self.assertEqual(
960 os.getxattr(src, 'user.foo'),
961 os.getxattr(dst, 'user.foo'))
962 # check errors don't affect other attrs
963 os.remove(dst)
964 write_file(dst, 'bar')
965 os_error = OSError(errno.EPERM, 'EPERM')
966
967 def _raise_on_user_foo(fname, attr, val, **kwargs):
968 if attr == 'user.foo':
969 raise os_error
970 else:
971 orig_setxattr(fname, attr, val, **kwargs)
972 try:
973 orig_setxattr = os.setxattr
974 os.setxattr = _raise_on_user_foo
975 shutil._copyxattr(src, dst)
976 self.assertIn('user.bar', os.listxattr(dst))
977 finally:
978 os.setxattr = orig_setxattr
979 # the source filesystem not supporting xattrs should be ok, too.
980 def _raise_on_src(fname, *, follow_symlinks=True):
981 if fname == src:
982 raise OSError(errno.ENOTSUP, 'Operation not supported')
983 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
984 try:
985 orig_listxattr = os.listxattr
986 os.listxattr = _raise_on_src
987 shutil._copyxattr(src, dst)
988 finally:
989 os.listxattr = orig_listxattr
990
991 # test that shutil.copystat copies xattrs
992 src = os.path.join(tmp_dir, 'the_original')
993 srcro = os.path.join(tmp_dir, 'the_original_ro')
994 write_file(src, src)
995 write_file(srcro, srcro)
996 os.setxattr(src, 'user.the_value', b'fiddly')
997 os.setxattr(srcro, 'user.the_value', b'fiddly')
998 os.chmod(srcro, 0o444)
999 dst = os.path.join(tmp_dir, 'the_copy')
1000 dstro = os.path.join(tmp_dir, 'the_copy_ro')
1001 write_file(dst, dst)
1002 write_file(dstro, dstro)
1003 shutil.copystat(src, dst)
1004 shutil.copystat(srcro, dstro)
1005 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1006 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1007
Hai Shi0c4f0f32020-06-30 21:46:31 +08001008 @os_helper.skip_unless_symlink
1009 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001010 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
1011 'root privileges required')
1012 def test_copyxattr_symlinks(self):
1013 # On Linux, it's only possible to access non-user xattr for symlinks;
1014 # which in turn require root privileges. This test should be expanded
1015 # as soon as other platforms gain support for extended attributes.
1016 tmp_dir = self.mkdtemp()
1017 src = os.path.join(tmp_dir, 'foo')
1018 src_link = os.path.join(tmp_dir, 'baz')
1019 write_file(src, 'foo')
1020 os.symlink(src, src_link)
1021 os.setxattr(src, 'trusted.foo', b'42')
1022 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1023 dst = os.path.join(tmp_dir, 'bar')
1024 dst_link = os.path.join(tmp_dir, 'qux')
1025 write_file(dst, 'bar')
1026 os.symlink(dst, dst_link)
1027 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1028 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1029 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1030 shutil._copyxattr(src_link, dst, follow_symlinks=False)
1031 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1032
1033 ### shutil.copy
1034
1035 def _copy_file(self, method):
1036 fname = 'test.txt'
1037 tmpdir = self.mkdtemp()
1038 write_file((tmpdir, fname), 'xxx')
1039 file1 = os.path.join(tmpdir, fname)
1040 tmpdir2 = self.mkdtemp()
1041 method(file1, tmpdir2)
1042 file2 = os.path.join(tmpdir2, fname)
1043 return (file1, file2)
1044
1045 def test_copy(self):
1046 # Ensure that the copied file exists and has the same mode bits.
1047 file1, file2 = self._copy_file(shutil.copy)
1048 self.assertTrue(os.path.exists(file2))
1049 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1050
Hai Shi0c4f0f32020-06-30 21:46:31 +08001051 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001052 def test_copy_symlinks(self):
1053 tmp_dir = self.mkdtemp()
1054 src = os.path.join(tmp_dir, 'foo')
1055 dst = os.path.join(tmp_dir, 'bar')
1056 src_link = os.path.join(tmp_dir, 'baz')
1057 write_file(src, 'foo')
1058 os.symlink(src, src_link)
1059 if hasattr(os, 'lchmod'):
1060 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1061 # don't follow
1062 shutil.copy(src_link, dst, follow_symlinks=True)
1063 self.assertFalse(os.path.islink(dst))
1064 self.assertEqual(read_file(src), read_file(dst))
1065 os.remove(dst)
1066 # follow
1067 shutil.copy(src_link, dst, follow_symlinks=False)
1068 self.assertTrue(os.path.islink(dst))
1069 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1070 if hasattr(os, 'lchmod'):
1071 self.assertEqual(os.lstat(src_link).st_mode,
1072 os.lstat(dst).st_mode)
1073
1074 ### shutil.copy2
1075
1076 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1077 def test_copy2(self):
1078 # Ensure that the copied file exists and has the same mode and
1079 # modification time bits.
1080 file1, file2 = self._copy_file(shutil.copy2)
1081 self.assertTrue(os.path.exists(file2))
1082 file1_stat = os.stat(file1)
1083 file2_stat = os.stat(file2)
1084 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1085 for attr in 'st_atime', 'st_mtime':
1086 # The modification times may be truncated in the new file.
1087 self.assertLessEqual(getattr(file1_stat, attr),
1088 getattr(file2_stat, attr) + 1)
1089 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1090 self.assertEqual(getattr(file1_stat, 'st_flags'),
1091 getattr(file2_stat, 'st_flags'))
1092
Hai Shi0c4f0f32020-06-30 21:46:31 +08001093 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001094 def test_copy2_symlinks(self):
1095 tmp_dir = self.mkdtemp()
1096 src = os.path.join(tmp_dir, 'foo')
1097 dst = os.path.join(tmp_dir, 'bar')
1098 src_link = os.path.join(tmp_dir, 'baz')
1099 write_file(src, 'foo')
1100 os.symlink(src, src_link)
1101 if hasattr(os, 'lchmod'):
1102 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1103 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1104 os.lchflags(src_link, stat.UF_NODUMP)
1105 src_stat = os.stat(src)
1106 src_link_stat = os.lstat(src_link)
1107 # follow
1108 shutil.copy2(src_link, dst, follow_symlinks=True)
1109 self.assertFalse(os.path.islink(dst))
1110 self.assertEqual(read_file(src), read_file(dst))
1111 os.remove(dst)
1112 # don't follow
1113 shutil.copy2(src_link, dst, follow_symlinks=False)
1114 self.assertTrue(os.path.islink(dst))
1115 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1116 dst_stat = os.lstat(dst)
1117 if os.utime in os.supports_follow_symlinks:
1118 for attr in 'st_atime', 'st_mtime':
1119 # The modification times may be truncated in the new file.
1120 self.assertLessEqual(getattr(src_link_stat, attr),
1121 getattr(dst_stat, attr) + 1)
1122 if hasattr(os, 'lchmod'):
1123 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1124 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1125 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1126 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1127
Hai Shi0c4f0f32020-06-30 21:46:31 +08001128 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001129 def test_copy2_xattr(self):
1130 tmp_dir = self.mkdtemp()
1131 src = os.path.join(tmp_dir, 'foo')
1132 dst = os.path.join(tmp_dir, 'bar')
1133 write_file(src, 'foo')
1134 os.setxattr(src, 'user.foo', b'42')
1135 shutil.copy2(src, dst)
1136 self.assertEqual(
1137 os.getxattr(src, 'user.foo'),
1138 os.getxattr(dst, 'user.foo'))
1139 os.remove(dst)
1140
1141 def test_copy_return_value(self):
1142 # copy and copy2 both return their destination path.
1143 for fn in (shutil.copy, shutil.copy2):
1144 src_dir = self.mkdtemp()
1145 dst_dir = self.mkdtemp()
1146 src = os.path.join(src_dir, 'foo')
1147 write_file(src, 'foo')
1148 rv = fn(src, dst_dir)
1149 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1150 rv = fn(src, os.path.join(dst_dir, 'bar'))
1151 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1152
1153 ### shutil.copyfile
1154
Hai Shi0c4f0f32020-06-30 21:46:31 +08001155 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001156 def test_copyfile_symlinks(self):
1157 tmp_dir = self.mkdtemp()
1158 src = os.path.join(tmp_dir, 'src')
1159 dst = os.path.join(tmp_dir, 'dst')
1160 dst_link = os.path.join(tmp_dir, 'dst_link')
1161 link = os.path.join(tmp_dir, 'link')
1162 write_file(src, 'foo')
1163 os.symlink(src, link)
1164 # don't follow
1165 shutil.copyfile(link, dst_link, follow_symlinks=False)
1166 self.assertTrue(os.path.islink(dst_link))
1167 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1168 # follow
1169 shutil.copyfile(link, dst)
1170 self.assertFalse(os.path.islink(dst))
1171
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001172 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001173 def test_dont_copy_file_onto_link_to_itself(self):
1174 # bug 851123.
1175 os.mkdir(TESTFN)
1176 src = os.path.join(TESTFN, 'cheese')
1177 dst = os.path.join(TESTFN, 'shop')
1178 try:
Inada Naokic8e5eb92021-04-05 13:11:23 +09001179 with open(src, 'w', encoding='utf-8') as f:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001180 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001181 try:
1182 os.link(src, dst)
1183 except PermissionError as e:
1184 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001185 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Inada Naokic8e5eb92021-04-05 13:11:23 +09001186 with open(src, 'r', encoding='utf-8') as f:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001187 self.assertEqual(f.read(), 'cheddar')
1188 os.remove(dst)
1189 finally:
1190 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001191
Hai Shi0c4f0f32020-06-30 21:46:31 +08001192 @os_helper.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001193 def test_dont_copy_file_onto_symlink_to_itself(self):
1194 # bug 851123.
1195 os.mkdir(TESTFN)
1196 src = os.path.join(TESTFN, 'cheese')
1197 dst = os.path.join(TESTFN, 'shop')
1198 try:
Inada Naokic8e5eb92021-04-05 13:11:23 +09001199 with open(src, 'w', encoding='utf-8') as f:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001200 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001201 # Using `src` here would mean we end up with a symlink pointing
1202 # to TESTFN/TESTFN/cheese, while it should point at
1203 # TESTFN/cheese.
1204 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001205 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Inada Naokic8e5eb92021-04-05 13:11:23 +09001206 with open(src, 'r', encoding='utf-8') as f:
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001207 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001208 os.remove(dst)
1209 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001210 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001211
Serhiy Storchaka43767632013-11-03 21:31:38 +02001212 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1213 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
pxinwr6a273fd2020-11-29 06:06:36 +08001214 @unittest.skipIf(sys.platform == "vxworks",
1215 "fifo requires special path on VxWorks")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001216 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001217 try:
1218 os.mkfifo(TESTFN)
1219 except PermissionError as e:
1220 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001221 try:
1222 self.assertRaises(shutil.SpecialFileError,
1223 shutil.copyfile, TESTFN, TESTFN2)
1224 self.assertRaises(shutil.SpecialFileError,
1225 shutil.copyfile, __file__, TESTFN)
1226 finally:
1227 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001228
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001229 def test_copyfile_return_value(self):
1230 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001231 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001232 dst_dir = self.mkdtemp()
1233 dst_file = os.path.join(dst_dir, 'bar')
1234 src_file = os.path.join(src_dir, 'foo')
1235 write_file(src_file, 'foo')
1236 rv = shutil.copyfile(src_file, dst_file)
1237 self.assertTrue(os.path.exists(rv))
1238 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001239
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001240 def test_copyfile_same_file(self):
1241 # copyfile() should raise SameFileError if the source and destination
1242 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001243 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001244 src_file = os.path.join(src_dir, 'foo')
1245 write_file(src_file, 'foo')
1246 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1247 # But Error should work too, to stay backward compatible.
1248 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1249 # Make sure file is not corrupted.
1250 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001251
Miss Islington (bot)15772592021-07-09 21:07:35 -07001252 @unittest.skipIf(MACOS or _winapi, 'On MACOS and Windows the errors are not confusing (though different)')
1253 def test_copyfile_nonexistent_dir(self):
1254 # Issue 43219
1255 src_dir = self.mkdtemp()
1256 src_file = os.path.join(src_dir, 'foo')
1257 dst = os.path.join(src_dir, 'does_not_exist/')
1258 write_file(src_file, 'foo')
1259 self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst)
1260
Tarek Ziadéfb437512010-04-20 08:57:33 +00001261
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001262class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001263
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001264 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001265
Hai Shia3ec3ad2020-05-19 06:02:57 +08001266 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001267 def test_make_tarball(self):
1268 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001269 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001270
1271 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001272 # force shutil to create the directory
1273 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001274 # working with relative paths
1275 work_dir = os.path.dirname(tmpdir2)
1276 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001277
Hai Shi0c4f0f32020-06-30 21:46:31 +08001278 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001279 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001280 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001281
1282 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001283 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001284 self.assertTrue(os.path.isfile(tarball))
1285 self.assertTrue(tarfile.is_tarfile(tarball))
1286 with tarfile.open(tarball, 'r:gz') as tf:
1287 self.assertCountEqual(tf.getnames(),
1288 ['.', './sub', './sub2',
1289 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001290
1291 # trying an uncompressed one
Hai Shi0c4f0f32020-06-30 21:46:31 +08001292 with os_helper.change_cwd(work_dir):
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001293 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001294 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001295 self.assertTrue(os.path.isfile(tarball))
1296 self.assertTrue(tarfile.is_tarfile(tarball))
1297 with tarfile.open(tarball, 'r') as tf:
1298 self.assertCountEqual(tf.getnames(),
1299 ['.', './sub', './sub2',
1300 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001301
1302 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001303 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001304 names = tar.getnames()
1305 names.sort()
1306 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001307
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001308 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001309 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001310 root_dir = self.mkdtemp()
1311 dist = os.path.join(root_dir, base_dir)
1312 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001313 write_file((dist, 'file1'), 'xxx')
1314 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001315 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001316 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001317 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001318 if base_dir:
1319 write_file((root_dir, 'outer'), 'xxx')
1320 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001321
Hai Shia3ec3ad2020-05-19 06:02:57 +08001322 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001323 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001324 'Need the tar command to run')
1325 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001326 root_dir, base_dir = self._create_files()
1327 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001328 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001329
1330 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001331 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001332 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001333
1334 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001335 tarball2 = os.path.join(root_dir, 'archive2.tar')
1336 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001337 subprocess.check_call(tar_cmd, cwd=root_dir,
1338 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001339
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001340 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001341 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001342 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001343
1344 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001345 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1346 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001347 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001348
1349 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001350 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1351 dry_run=True)
1352 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001353 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001354
Hai Shia3ec3ad2020-05-19 06:02:57 +08001355 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001356 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001357 # creating something to zip
1358 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001359
1360 tmpdir2 = self.mkdtemp()
1361 # force shutil to create the directory
1362 os.rmdir(tmpdir2)
1363 # working with relative paths
1364 work_dir = os.path.dirname(tmpdir2)
1365 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001366
Hai Shi0c4f0f32020-06-30 21:46:31 +08001367 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001368 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001369 res = make_archive(rel_base_name, 'zip', root_dir)
1370
1371 self.assertEqual(res, base_name + '.zip')
1372 self.assertTrue(os.path.isfile(res))
1373 self.assertTrue(zipfile.is_zipfile(res))
1374 with zipfile.ZipFile(res) as zf:
1375 self.assertCountEqual(zf.namelist(),
1376 ['dist/', 'dist/sub/', 'dist/sub2/',
1377 'dist/file1', 'dist/file2', 'dist/sub/file3',
1378 'outer'])
1379
Hai Shi0c4f0f32020-06-30 21:46:31 +08001380 with os_helper.change_cwd(work_dir):
Serhiy Storchaka666de772016-10-23 15:55:09 +03001381 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001382 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001383
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001384 self.assertEqual(res, base_name + '.zip')
1385 self.assertTrue(os.path.isfile(res))
1386 self.assertTrue(zipfile.is_zipfile(res))
1387 with zipfile.ZipFile(res) as zf:
1388 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001389 ['dist/', 'dist/sub/', 'dist/sub2/',
1390 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001391
Hai Shia3ec3ad2020-05-19 06:02:57 +08001392 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001393 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001394 'Need the zip command to run')
1395 def test_zipfile_vs_zip(self):
1396 root_dir, base_dir = self._create_files()
1397 base_name = os.path.join(self.mkdtemp(), 'archive')
1398 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1399
1400 # check if ZIP file was created
1401 self.assertEqual(archive, base_name + '.zip')
1402 self.assertTrue(os.path.isfile(archive))
1403
1404 # now create another ZIP file using `zip`
1405 archive2 = os.path.join(root_dir, 'archive2.zip')
1406 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001407 subprocess.check_call(zip_cmd, cwd=root_dir,
1408 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001409
1410 self.assertTrue(os.path.isfile(archive2))
1411 # let's compare both ZIP files
1412 with zipfile.ZipFile(archive) as zf:
1413 names = zf.namelist()
1414 with zipfile.ZipFile(archive2) as zf:
1415 names2 = zf.namelist()
1416 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001417
Hai Shia3ec3ad2020-05-19 06:02:57 +08001418 @support.requires_zlib()
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001419 @unittest.skipUnless(shutil.which('unzip'),
1420 'Need the unzip command to run')
1421 def test_unzip_zipfile(self):
1422 root_dir, base_dir = self._create_files()
1423 base_name = os.path.join(self.mkdtemp(), 'archive')
1424 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1425
1426 # check if ZIP file was created
1427 self.assertEqual(archive, base_name + '.zip')
1428 self.assertTrue(os.path.isfile(archive))
1429
1430 # now check the ZIP file using `unzip -t`
1431 zip_cmd = ['unzip', '-t', archive]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001432 with os_helper.change_cwd(root_dir):
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001433 try:
1434 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1435 except subprocess.CalledProcessError as exc:
1436 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001437 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001438 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001439 msg = "{}\n\n**Unzip Output**\n{}"
1440 self.fail(msg.format(exc, details))
1441
Tarek Ziadé396fad72010-02-23 05:30:31 +00001442 def test_make_archive(self):
1443 tmpdir = self.mkdtemp()
1444 base_name = os.path.join(tmpdir, 'archive')
1445 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1446
Hai Shia3ec3ad2020-05-19 06:02:57 +08001447 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001448 def test_make_archive_owner_group(self):
1449 # testing make_archive with owner and group, with various combinations
1450 # this works even if there's not gid/uid support
1451 if UID_GID_SUPPORT:
1452 group = grp.getgrgid(0)[0]
1453 owner = pwd.getpwuid(0)[0]
1454 else:
1455 group = owner = 'root'
1456
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001457 root_dir, base_dir = self._create_files()
1458 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001459 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1460 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001461 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001462
1463 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001464 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001465
1466 res = make_archive(base_name, 'tar', root_dir, base_dir,
1467 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001468 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001469
1470 res = make_archive(base_name, 'tar', root_dir, base_dir,
1471 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001472 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001473
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001474
Hai Shia3ec3ad2020-05-19 06:02:57 +08001475 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001476 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1477 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001478 root_dir, base_dir = self._create_files()
1479 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001480 group = grp.getgrgid(0)[0]
1481 owner = pwd.getpwuid(0)[0]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001482 with os_helper.change_cwd(root_dir):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001483 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1484 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001485
1486 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001487 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001488
1489 # now checks the rights
1490 archive = tarfile.open(archive_name)
1491 try:
1492 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001493 self.assertEqual(member.uid, 0)
1494 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001495 finally:
1496 archive.close()
1497
1498 def test_make_archive_cwd(self):
1499 current_dir = os.getcwd()
1500 def _breaks(*args, **kw):
1501 raise RuntimeError()
1502
1503 register_archive_format('xxx', _breaks, [], 'xxx file')
1504 try:
1505 try:
1506 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1507 except Exception:
1508 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001509 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001510 finally:
1511 unregister_archive_format('xxx')
1512
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001513 def test_make_tarfile_in_curdir(self):
1514 # Issue #21280
1515 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001516 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001517 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1518 self.assertTrue(os.path.isfile('test.tar'))
1519
Hai Shia3ec3ad2020-05-19 06:02:57 +08001520 @support.requires_zlib()
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001521 def test_make_zipfile_in_curdir(self):
1522 # Issue #21280
1523 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001524 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001525 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1526 self.assertTrue(os.path.isfile('test.zip'))
1527
Tarek Ziadé396fad72010-02-23 05:30:31 +00001528 def test_register_archive_format(self):
1529
1530 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1531 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1532 1)
1533 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1534 [(1, 2), (1, 2, 3)])
1535
1536 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1537 formats = [name for name, params in get_archive_formats()]
1538 self.assertIn('xxx', formats)
1539
1540 unregister_archive_format('xxx')
1541 formats = [name for name, params in get_archive_formats()]
1542 self.assertNotIn('xxx', formats)
1543
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001544 ### shutil.unpack_archive
1545
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001546 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001547 self.check_unpack_archive_with_converter(format, lambda path: path)
1548 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001549 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001550
1551 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001552 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001553 expected = rlistdir(root_dir)
1554 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001555
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001556 base_name = os.path.join(self.mkdtemp(), 'archive')
1557 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001558
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001559 # let's try to unpack it now
1560 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001561 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001562 self.assertEqual(rlistdir(tmpdir2), expected)
1563
1564 # and again, this time with the format specified
1565 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001566 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001567 self.assertEqual(rlistdir(tmpdir3), expected)
1568
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001569 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1570 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001571
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001572 def test_unpack_archive_tar(self):
1573 self.check_unpack_archive('tar')
1574
Hai Shia3ec3ad2020-05-19 06:02:57 +08001575 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001576 def test_unpack_archive_gztar(self):
1577 self.check_unpack_archive('gztar')
1578
Hai Shia3ec3ad2020-05-19 06:02:57 +08001579 @support.requires_bz2()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001580 def test_unpack_archive_bztar(self):
1581 self.check_unpack_archive('bztar')
1582
Hai Shia3ec3ad2020-05-19 06:02:57 +08001583 @support.requires_lzma()
Michael Feltef110b12019-02-18 12:02:44 +01001584 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001585 def test_unpack_archive_xztar(self):
1586 self.check_unpack_archive('xztar')
1587
Hai Shia3ec3ad2020-05-19 06:02:57 +08001588 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001589 def test_unpack_archive_zip(self):
1590 self.check_unpack_archive('zip')
1591
Martin Pantereb995702016-07-28 01:11:04 +00001592 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001593
1594 formats = get_unpack_formats()
1595
1596 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(extra, 1)
1598 self.assertEqual(filename, 'stuff.boo')
1599 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001600
1601 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1602 unpack_archive('stuff.boo', 'xx')
1603
1604 # trying to register a .boo unpacker again
1605 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1606 ['.boo'], _boo)
1607
1608 # should work now
1609 unregister_unpack_format('Boo')
1610 register_unpack_format('Boo2', ['.boo'], _boo)
1611 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1612 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1613
1614 # let's leave a clean state
1615 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001616 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001617
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001618
1619class TestMisc(BaseTest, unittest.TestCase):
1620
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001621 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1622 "disk_usage not available on this platform")
1623 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001624 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001625 for attr in ('total', 'used', 'free'):
1626 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001627 self.assertGreater(usage.total, 0)
1628 self.assertGreater(usage.used, 0)
1629 self.assertGreaterEqual(usage.free, 0)
1630 self.assertGreaterEqual(usage.total, usage.used)
1631 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001632
Victor Stinnerdc525f42018-12-11 12:05:21 +01001633 # bpo-32557: Check that disk_usage() also accepts a filename
1634 shutil.disk_usage(__file__)
1635
Sandro Tosid902a142011-08-22 23:28:27 +02001636 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1637 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1638 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001639 dirname = self.mkdtemp()
1640 filename = tempfile.mktemp(dir=dirname)
1641 write_file(filename, 'testing chown function')
1642
1643 with self.assertRaises(ValueError):
1644 shutil.chown(filename)
1645
1646 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001647 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001648
1649 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001650 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001651
1652 with self.assertRaises(TypeError):
1653 shutil.chown(filename, b'spam')
1654
1655 with self.assertRaises(TypeError):
1656 shutil.chown(filename, 3.14)
1657
1658 uid = os.getuid()
1659 gid = os.getgid()
1660
1661 def check_chown(path, uid=None, gid=None):
1662 s = os.stat(filename)
1663 if uid is not None:
1664 self.assertEqual(uid, s.st_uid)
1665 if gid is not None:
1666 self.assertEqual(gid, s.st_gid)
1667
1668 shutil.chown(filename, uid, gid)
1669 check_chown(filename, uid, gid)
1670 shutil.chown(filename, uid)
1671 check_chown(filename, uid)
1672 shutil.chown(filename, user=uid)
1673 check_chown(filename, uid)
1674 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001675 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001676
1677 shutil.chown(dirname, uid, gid)
1678 check_chown(dirname, uid, gid)
1679 shutil.chown(dirname, uid)
1680 check_chown(dirname, uid)
1681 shutil.chown(dirname, user=uid)
1682 check_chown(dirname, uid)
1683 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001684 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001685
Matthias Braun52268942020-03-17 09:51:44 -07001686 try:
1687 user = pwd.getpwuid(uid)[0]
1688 group = grp.getgrgid(gid)[0]
1689 except KeyError:
1690 # On some systems uid/gid cannot be resolved.
1691 pass
1692 else:
1693 shutil.chown(filename, user, group)
1694 check_chown(filename, uid, gid)
1695 shutil.chown(dirname, user, group)
1696 check_chown(dirname, uid, gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001697
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001698
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001699class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001700
1701 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001702 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001703 # Give the temp_file an ".exe" suffix for all.
1704 # It's needed on Windows and not harmful on other platforms.
1705 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001706 prefix="Tmp",
1707 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001708 os.chmod(self.temp_file.name, stat.S_IXUSR)
1709 self.addCleanup(self.temp_file.close)
1710 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001711 self.env_path = self.dir
1712 self.curdir = os.curdir
1713 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001714
1715 def test_basic(self):
1716 # Given an EXE in a directory, it should be returned.
1717 rv = shutil.which(self.file, path=self.dir)
1718 self.assertEqual(rv, self.temp_file.name)
1719
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001720 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001721 # When given the fully qualified path to an executable that exists,
1722 # it should be returned.
1723 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001724 self.assertEqual(rv, self.temp_file.name)
1725
1726 def test_relative_cmd(self):
1727 # When given the relative path with a directory part to an executable
1728 # that exists, it should be returned.
1729 base_dir, tail_dir = os.path.split(self.dir)
1730 relpath = os.path.join(tail_dir, self.file)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001731 with os_helper.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001732 rv = shutil.which(relpath, path=self.temp_dir)
1733 self.assertEqual(rv, relpath)
1734 # But it shouldn't be searched in PATH directories (issue #16957).
Hai Shi0c4f0f32020-06-30 21:46:31 +08001735 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001736 rv = shutil.which(relpath, path=base_dir)
1737 self.assertIsNone(rv)
1738
1739 def test_cwd(self):
1740 # Issue #16957
1741 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001742 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001743 rv = shutil.which(self.file, path=base_dir)
1744 if sys.platform == "win32":
1745 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001746 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001747 else:
1748 # Other platforms: shouldn't match in the current directory.
1749 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001750
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001751 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1752 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001753 def test_non_matching_mode(self):
1754 # Set the file read-only and ask for writeable files.
1755 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001756 if os.access(self.temp_file.name, os.W_OK):
1757 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001758 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1759 self.assertIsNone(rv)
1760
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001761 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001762 base_dir, tail_dir = os.path.split(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001763 with os_helper.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001764 rv = shutil.which(self.file, path=tail_dir)
1765 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001766
Brian Curtinc57a3452012-06-22 16:00:30 -05001767 def test_nonexistent_file(self):
1768 # Return None when no matching executable file is found on the path.
1769 rv = shutil.which("foo.exe", path=self.dir)
1770 self.assertIsNone(rv)
1771
1772 @unittest.skipUnless(sys.platform == "win32",
1773 "pathext check is Windows-only")
1774 def test_pathext_checking(self):
1775 # Ask for the file without the ".exe" extension, then ensure that
1776 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001777 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001778 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001779
Barry Warsaw618738b2013-04-16 11:05:03 -04001780 def test_environ_path(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001781 with os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001782 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001783 rv = shutil.which(self.file)
1784 self.assertEqual(rv, self.temp_file.name)
1785
Victor Stinner228a3c92019-04-17 16:26:36 +02001786 def test_environ_path_empty(self):
1787 # PATH='': no match
Hai Shi0c4f0f32020-06-30 21:46:31 +08001788 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001789 env['PATH'] = ''
1790 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1791 create=True), \
1792 support.swap_attr(os, 'defpath', self.dir), \
Hai Shi0c4f0f32020-06-30 21:46:31 +08001793 os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001794 rv = shutil.which(self.file)
1795 self.assertIsNone(rv)
1796
1797 def test_environ_path_cwd(self):
1798 expected_cwd = os.path.basename(self.temp_file.name)
1799 if sys.platform == "win32":
1800 curdir = os.curdir
1801 if isinstance(expected_cwd, bytes):
1802 curdir = os.fsencode(curdir)
1803 expected_cwd = os.path.join(curdir, expected_cwd)
1804
1805 # PATH=':': explicitly looks in the current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001806 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001807 env['PATH'] = os.pathsep
1808 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1809 create=True), \
1810 support.swap_attr(os, 'defpath', self.dir):
1811 rv = shutil.which(self.file)
1812 self.assertIsNone(rv)
1813
1814 # look in current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001815 with os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001816 rv = shutil.which(self.file)
1817 self.assertEqual(rv, expected_cwd)
1818
1819 def test_environ_path_missing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001820 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001821 env.pop('PATH', None)
1822
1823 # without confstr
1824 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1825 create=True), \
1826 support.swap_attr(os, 'defpath', self.dir):
1827 rv = shutil.which(self.file)
1828 self.assertEqual(rv, self.temp_file.name)
1829
1830 # with confstr
1831 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1832 create=True), \
1833 support.swap_attr(os, 'defpath', ''):
1834 rv = shutil.which(self.file)
1835 self.assertEqual(rv, self.temp_file.name)
1836
Barry Warsaw618738b2013-04-16 11:05:03 -04001837 def test_empty_path(self):
1838 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001839 with os_helper.change_cwd(path=self.dir), \
1840 os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001841 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001842 rv = shutil.which(self.file, path='')
1843 self.assertIsNone(rv)
1844
1845 def test_empty_path_no_PATH(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001846 with os_helper.EnvironmentVarGuard() as env:
Barry Warsaw618738b2013-04-16 11:05:03 -04001847 env.pop('PATH', None)
1848 rv = shutil.which(self.file)
1849 self.assertIsNone(rv)
1850
Victor Stinner228a3c92019-04-17 16:26:36 +02001851 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1852 def test_pathext(self):
1853 ext = ".xyz"
1854 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1855 prefix="Tmp2", suffix=ext)
1856 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1857 self.addCleanup(temp_filexyz.close)
1858
1859 # strip path and extension
1860 program = os.path.basename(temp_filexyz.name)
1861 program = os.path.splitext(program)[0]
1862
Hai Shi0c4f0f32020-06-30 21:46:31 +08001863 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001864 env['PATHEXT'] = ext
1865 rv = shutil.which(program, path=self.temp_dir)
1866 self.assertEqual(rv, temp_filexyz.name)
1867
Christopher Marchfelderda6f0982020-10-23 12:08:24 +02001868 # Issue 40592: See https://bugs.python.org/issue40592
1869 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1870 def test_pathext_with_empty_str(self):
1871 ext = ".xyz"
1872 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1873 prefix="Tmp2", suffix=ext)
1874 self.addCleanup(temp_filexyz.close)
1875
1876 # strip path and extension
1877 program = os.path.basename(temp_filexyz.name)
1878 program = os.path.splitext(program)[0]
1879
1880 with os_helper.EnvironmentVarGuard() as env:
1881 env['PATHEXT'] = f"{ext};" # note the ;
1882 rv = shutil.which(program, path=self.temp_dir)
1883 self.assertEqual(rv, temp_filexyz.name)
1884
Brian Curtinc57a3452012-06-22 16:00:30 -05001885
Cheryl Sabella5680f652019-02-13 06:25:10 -05001886class TestWhichBytes(TestWhich):
1887 def setUp(self):
1888 TestWhich.setUp(self)
1889 self.dir = os.fsencode(self.dir)
1890 self.file = os.fsencode(self.file)
1891 self.temp_file.name = os.fsencode(self.temp_file.name)
1892 self.curdir = os.fsencode(self.curdir)
1893 self.ext = os.fsencode(self.ext)
1894
1895
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001896class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001897
1898 def setUp(self):
1899 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001900 self.src_dir = self.mkdtemp()
1901 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001902 self.src_file = os.path.join(self.src_dir, filename)
1903 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001904 with open(self.src_file, "wb") as f:
1905 f.write(b"spam")
1906
Christian Heimesada8c3b2008-03-18 18:26:33 +00001907 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001908 with open(src, "rb") as f:
1909 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001910 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001911 with open(real_dst, "rb") as f:
1912 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001913 self.assertFalse(os.path.exists(src))
1914
1915 def _check_move_dir(self, src, dst, real_dst):
1916 contents = sorted(os.listdir(src))
1917 shutil.move(src, dst)
1918 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1919 self.assertFalse(os.path.exists(src))
1920
1921 def test_move_file(self):
1922 # Move a file to another location on the same filesystem.
1923 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1924
1925 def test_move_file_to_dir(self):
1926 # Move a file inside an existing dir on the same filesystem.
1927 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1928
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001929 def test_move_file_to_dir_pathlike_src(self):
1930 # Move a pathlike file to another location on the same filesystem.
1931 src = pathlib.Path(self.src_file)
1932 self._check_move_file(src, self.dst_dir, self.dst_file)
1933
1934 def test_move_file_to_dir_pathlike_dst(self):
1935 # Move a file to another pathlike location on the same filesystem.
1936 dst = pathlib.Path(self.dst_dir)
1937 self._check_move_file(self.src_file, dst, self.dst_file)
1938
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001939 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001940 def test_move_file_other_fs(self):
1941 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001942 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001943
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001944 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001945 def test_move_file_to_dir_other_fs(self):
1946 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001947 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001948
1949 def test_move_dir(self):
1950 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001951 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001952 try:
1953 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1954 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001955 os_helper.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001956
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001957 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001958 def test_move_dir_other_fs(self):
1959 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001960 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001961
1962 def test_move_dir_to_dir(self):
1963 # Move a dir inside an existing dir on the same filesystem.
1964 self._check_move_dir(self.src_dir, self.dst_dir,
1965 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1966
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001967 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001968 def test_move_dir_to_dir_other_fs(self):
1969 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001970 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001971
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001972 def test_move_dir_sep_to_dir(self):
1973 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1974 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1975
1976 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1977 def test_move_dir_altsep_to_dir(self):
1978 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1979 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1980
Christian Heimesada8c3b2008-03-18 18:26:33 +00001981 def test_existing_file_inside_dest_dir(self):
1982 # A file with the same name inside the destination dir already exists.
1983 with open(self.dst_file, "wb"):
1984 pass
1985 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1986
1987 def test_dont_move_dir_in_itself(self):
1988 # Moving a dir inside itself raises an Error.
1989 dst = os.path.join(self.src_dir, "bar")
1990 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1991
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001992 def test_destinsrc_false_negative(self):
1993 os.mkdir(TESTFN)
1994 try:
1995 for src, dst in [('srcdir', 'srcdir/dest')]:
1996 src = os.path.join(TESTFN, src)
1997 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001998 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001999 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002000 'dst (%s) is not in src (%s)' % (dst, src))
2001 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002002 os_helper.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00002003
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002004 def test_destinsrc_false_positive(self):
2005 os.mkdir(TESTFN)
2006 try:
2007 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
2008 src = os.path.join(TESTFN, src)
2009 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002010 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00002011 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002012 'dst (%s) is in src (%s)' % (dst, src))
2013 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002014 os_helper.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00002015
Hai Shi0c4f0f32020-06-30 21:46:31 +08002016 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002017 @mock_rename
2018 def test_move_file_symlink(self):
2019 dst = os.path.join(self.src_dir, 'bar')
2020 os.symlink(self.src_file, dst)
2021 shutil.move(dst, self.dst_file)
2022 self.assertTrue(os.path.islink(self.dst_file))
2023 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
2024
Hai Shi0c4f0f32020-06-30 21:46:31 +08002025 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002026 @mock_rename
2027 def test_move_file_symlink_to_dir(self):
2028 filename = "bar"
2029 dst = os.path.join(self.src_dir, filename)
2030 os.symlink(self.src_file, dst)
2031 shutil.move(dst, self.dst_dir)
2032 final_link = os.path.join(self.dst_dir, filename)
2033 self.assertTrue(os.path.islink(final_link))
2034 self.assertTrue(os.path.samefile(self.src_file, final_link))
2035
Hai Shi0c4f0f32020-06-30 21:46:31 +08002036 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002037 @mock_rename
2038 def test_move_dangling_symlink(self):
2039 src = os.path.join(self.src_dir, 'baz')
2040 dst = os.path.join(self.src_dir, 'bar')
2041 os.symlink(src, dst)
2042 dst_link = os.path.join(self.dst_dir, 'quux')
2043 shutil.move(dst, dst_link)
2044 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07002045 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002046
Hai Shi0c4f0f32020-06-30 21:46:31 +08002047 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002048 @mock_rename
2049 def test_move_dir_symlink(self):
2050 src = os.path.join(self.src_dir, 'baz')
2051 dst = os.path.join(self.src_dir, 'bar')
2052 os.mkdir(src)
2053 os.symlink(src, dst)
2054 dst_link = os.path.join(self.dst_dir, 'quux')
2055 shutil.move(dst, dst_link)
2056 self.assertTrue(os.path.islink(dst_link))
2057 self.assertTrue(os.path.samefile(src, dst_link))
2058
Brian Curtin0d0a1de2012-06-18 18:41:07 -05002059 def test_move_return_value(self):
2060 rv = shutil.move(self.src_file, self.dst_dir)
2061 self.assertEqual(rv,
2062 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2063
2064 def test_move_as_rename_return_value(self):
2065 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2066 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2067
R David Murray6ffface2014-06-11 14:40:13 -04002068 @mock_rename
2069 def test_move_file_special_function(self):
2070 moved = []
2071 def _copy(src, dst):
2072 moved.append((src, dst))
2073 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2074 self.assertEqual(len(moved), 1)
2075
2076 @mock_rename
2077 def test_move_dir_special_function(self):
2078 moved = []
2079 def _copy(src, dst):
2080 moved.append((src, dst))
Hai Shi0c4f0f32020-06-30 21:46:31 +08002081 os_helper.create_empty_file(os.path.join(self.src_dir, 'child'))
2082 os_helper.create_empty_file(os.path.join(self.src_dir, 'child1'))
R David Murray6ffface2014-06-11 14:40:13 -04002083 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2084 self.assertEqual(len(moved), 3)
2085
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002086 def test_move_dir_caseinsensitive(self):
2087 # Renames a folder to the same name
2088 # but a different case.
2089
2090 self.src_dir = self.mkdtemp()
2091 dst_dir = os.path.join(
2092 os.path.dirname(self.src_dir),
2093 os.path.basename(self.src_dir).upper())
2094 self.assertNotEqual(self.src_dir, dst_dir)
2095
2096 try:
2097 shutil.move(self.src_dir, dst_dir)
2098 self.assertTrue(os.path.isdir(dst_dir))
2099 finally:
2100 os.rmdir(dst_dir)
2101
Tarek Ziadé5340db32010-04-19 22:30:51 +00002102
Winson Luk132131b2021-03-02 15:53:15 -05002103 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0
2104 and hasattr(os, 'lchflags')
2105 and hasattr(stat, 'SF_IMMUTABLE')
2106 and hasattr(stat, 'UF_OPAQUE'),
2107 'root privileges required')
2108 def test_move_dir_permission_denied(self):
2109 # bpo-42782: shutil.move should not create destination directories
2110 # if the source directory cannot be removed.
2111 try:
2112 os.mkdir(TESTFN_SRC)
2113 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2114
2115 # Testing on an empty immutable directory
2116 # TESTFN_DST should not exist if shutil.move failed
2117 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2118 self.assertFalse(TESTFN_DST in os.listdir())
2119
2120 # Create a file and keep the directory immutable
2121 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2122 os_helper.create_empty_file(os.path.join(TESTFN_SRC, 'child'))
2123 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2124
2125 # Testing on a non-empty immutable directory
2126 # TESTFN_DST should not exist if shutil.move failed
2127 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2128 self.assertFalse(TESTFN_DST in os.listdir())
2129 finally:
2130 if os.path.exists(TESTFN_SRC):
2131 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2132 os_helper.rmtree(TESTFN_SRC)
2133 if os.path.exists(TESTFN_DST):
2134 os.lchflags(TESTFN_DST, stat.UF_OPAQUE)
2135 os_helper.rmtree(TESTFN_DST)
2136
2137
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002138class TestCopyFile(unittest.TestCase):
2139
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002140 class Faux(object):
2141 _entered = False
2142 _exited_with = None
2143 _raised = False
2144 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2145 self._raise_in_exit = raise_in_exit
2146 self._suppress_at_exit = suppress_at_exit
2147 def read(self, *args):
2148 return ''
2149 def __enter__(self):
2150 self._entered = True
2151 def __exit__(self, exc_type, exc_val, exc_tb):
2152 self._exited_with = exc_type, exc_val, exc_tb
2153 if self._raise_in_exit:
2154 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002155 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002156 return self._suppress_at_exit
2157
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002158 def test_w_source_open_fails(self):
2159 def _open(filename, mode='r'):
2160 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002161 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002162 assert 0 # shouldn't reach here.
2163
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002164 with support.swap_attr(shutil, 'open', _open):
2165 with self.assertRaises(OSError):
2166 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002167
Victor Stinner937ee9e2018-06-26 02:11:06 +02002168 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002169 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002170 srcfile = self.Faux()
2171
2172 def _open(filename, mode='r'):
2173 if filename == 'srcfile':
2174 return srcfile
2175 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002176 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002177 assert 0 # shouldn't reach here.
2178
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002179 with support.swap_attr(shutil, 'open', _open):
2180 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002181 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002182 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002183 self.assertEqual(srcfile._exited_with[1].args,
2184 ('Cannot open "destfile"',))
2185
Victor Stinner937ee9e2018-06-26 02:11:06 +02002186 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002187 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002188 srcfile = self.Faux()
2189 destfile = self.Faux(True)
2190
2191 def _open(filename, mode='r'):
2192 if filename == 'srcfile':
2193 return srcfile
2194 if filename == 'destfile':
2195 return destfile
2196 assert 0 # shouldn't reach here.
2197
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002198 with support.swap_attr(shutil, 'open', _open):
2199 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002200 self.assertTrue(srcfile._entered)
2201 self.assertTrue(destfile._entered)
2202 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002203 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002204 self.assertEqual(srcfile._exited_with[1].args,
2205 ('Cannot close',))
2206
Victor Stinner937ee9e2018-06-26 02:11:06 +02002207 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002208 def test_w_source_close_fails(self):
2209
2210 srcfile = self.Faux(True)
2211 destfile = self.Faux()
2212
2213 def _open(filename, mode='r'):
2214 if filename == 'srcfile':
2215 return srcfile
2216 if filename == 'destfile':
2217 return destfile
2218 assert 0 # shouldn't reach here.
2219
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002220 with support.swap_attr(shutil, 'open', _open):
2221 with self.assertRaises(OSError):
2222 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002223 self.assertTrue(srcfile._entered)
2224 self.assertTrue(destfile._entered)
2225 self.assertFalse(destfile._raised)
2226 self.assertTrue(srcfile._exited_with[0] is None)
2227 self.assertTrue(srcfile._raised)
2228
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002229
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002230class TestCopyFileObj(unittest.TestCase):
2231 FILESIZE = 2 * 1024 * 1024
2232
2233 @classmethod
2234 def setUpClass(cls):
2235 write_test_file(TESTFN, cls.FILESIZE)
2236
2237 @classmethod
2238 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002239 os_helper.unlink(TESTFN)
2240 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002241
2242 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002243 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002244
2245 @contextlib.contextmanager
2246 def get_files(self):
2247 with open(TESTFN, "rb") as src:
2248 with open(TESTFN2, "wb") as dst:
2249 yield (src, dst)
2250
2251 def assert_files_eq(self, src, dst):
2252 with open(src, 'rb') as fsrc:
2253 with open(dst, 'rb') as fdst:
2254 self.assertEqual(fsrc.read(), fdst.read())
2255
2256 def test_content(self):
2257 with self.get_files() as (src, dst):
2258 shutil.copyfileobj(src, dst)
2259 self.assert_files_eq(TESTFN, TESTFN2)
2260
2261 def test_file_not_closed(self):
2262 with self.get_files() as (src, dst):
2263 shutil.copyfileobj(src, dst)
2264 assert not src.closed
2265 assert not dst.closed
2266
2267 def test_file_offset(self):
2268 with self.get_files() as (src, dst):
2269 shutil.copyfileobj(src, dst)
2270 self.assertEqual(src.tell(), self.FILESIZE)
2271 self.assertEqual(dst.tell(), self.FILESIZE)
2272
2273 @unittest.skipIf(os.name != 'nt', "Windows only")
2274 def test_win_impl(self):
2275 # Make sure alternate Windows implementation is called.
2276 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2277 shutil.copyfile(TESTFN, TESTFN2)
2278 assert m.called
2279
2280 # File size is 2 MiB but max buf size should be 1 MiB.
2281 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2282
2283 # If file size < 1 MiB memoryview() length must be equal to
2284 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002285 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002286 f.write(b'foo')
2287 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002288 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002289 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2290 shutil.copyfile(fname, TESTFN2)
2291 self.assertEqual(m.call_args[0][2], 3)
2292
2293 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002294 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002295 pass
2296 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002297 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002298 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2299 shutil.copyfile(fname, TESTFN2)
2300 assert not m.called
2301 self.assert_files_eq(fname, TESTFN2)
2302
2303
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002304class _ZeroCopyFileTest(object):
2305 """Tests common to all zero-copy APIs."""
2306 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2307 FILEDATA = b""
2308 PATCHPOINT = ""
2309
2310 @classmethod
2311 def setUpClass(cls):
2312 write_test_file(TESTFN, cls.FILESIZE)
2313 with open(TESTFN, 'rb') as f:
2314 cls.FILEDATA = f.read()
2315 assert len(cls.FILEDATA) == cls.FILESIZE
2316
2317 @classmethod
2318 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002319 os_helper.unlink(TESTFN)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002320
2321 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002322 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002323
2324 @contextlib.contextmanager
2325 def get_files(self):
2326 with open(TESTFN, "rb") as src:
2327 with open(TESTFN2, "wb") as dst:
2328 yield (src, dst)
2329
2330 def zerocopy_fun(self, *args, **kwargs):
2331 raise NotImplementedError("must be implemented in subclass")
2332
2333 def reset(self):
2334 self.tearDown()
2335 self.tearDownClass()
2336 self.setUpClass()
2337 self.setUp()
2338
2339 # ---
2340
2341 def test_regular_copy(self):
2342 with self.get_files() as (src, dst):
2343 self.zerocopy_fun(src, dst)
2344 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2345 # Make sure the fallback function is not called.
2346 with self.get_files() as (src, dst):
2347 with unittest.mock.patch('shutil.copyfileobj') as m:
2348 shutil.copyfile(TESTFN, TESTFN2)
2349 assert not m.called
2350
2351 def test_same_file(self):
2352 self.addCleanup(self.reset)
2353 with self.get_files() as (src, dst):
2354 with self.assertRaises(Exception):
2355 self.zerocopy_fun(src, src)
2356 # Make sure src file is not corrupted.
2357 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2358
2359 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002360 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002361 with self.assertRaises(FileNotFoundError) as cm:
2362 shutil.copyfile(name, "new")
2363 self.assertEqual(cm.exception.filename, name)
2364
2365 def test_empty_file(self):
2366 srcname = TESTFN + 'src'
2367 dstname = TESTFN + 'dst'
Hai Shi0c4f0f32020-06-30 21:46:31 +08002368 self.addCleanup(lambda: os_helper.unlink(srcname))
2369 self.addCleanup(lambda: os_helper.unlink(dstname))
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002370 with open(srcname, "wb"):
2371 pass
2372
2373 with open(srcname, "rb") as src:
2374 with open(dstname, "wb") as dst:
2375 self.zerocopy_fun(src, dst)
2376
2377 self.assertEqual(read_file(dstname, binary=True), b"")
2378
2379 def test_unhandled_exception(self):
2380 with unittest.mock.patch(self.PATCHPOINT,
2381 side_effect=ZeroDivisionError):
2382 self.assertRaises(ZeroDivisionError,
2383 shutil.copyfile, TESTFN, TESTFN2)
2384
2385 def test_exception_on_first_call(self):
2386 # Emulate a case where the first call to the zero-copy
2387 # function raises an exception in which case the function is
2388 # supposed to give up immediately.
2389 with unittest.mock.patch(self.PATCHPOINT,
2390 side_effect=OSError(errno.EINVAL, "yo")):
2391 with self.get_files() as (src, dst):
2392 with self.assertRaises(_GiveupOnFastCopy):
2393 self.zerocopy_fun(src, dst)
2394
2395 def test_filesystem_full(self):
2396 # Emulate a case where filesystem is full and sendfile() fails
2397 # on first call.
2398 with unittest.mock.patch(self.PATCHPOINT,
2399 side_effect=OSError(errno.ENOSPC, "yo")):
2400 with self.get_files() as (src, dst):
2401 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2402
2403
2404@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2405class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2406 PATCHPOINT = "os.sendfile"
2407
2408 def zerocopy_fun(self, fsrc, fdst):
2409 return shutil._fastcopy_sendfile(fsrc, fdst)
2410
2411 def test_non_regular_file_src(self):
2412 with io.BytesIO(self.FILEDATA) as src:
2413 with open(TESTFN2, "wb") as dst:
2414 with self.assertRaises(_GiveupOnFastCopy):
2415 self.zerocopy_fun(src, dst)
2416 shutil.copyfileobj(src, dst)
2417
2418 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2419
2420 def test_non_regular_file_dst(self):
2421 with open(TESTFN, "rb") as src:
2422 with io.BytesIO() as dst:
2423 with self.assertRaises(_GiveupOnFastCopy):
2424 self.zerocopy_fun(src, dst)
2425 shutil.copyfileobj(src, dst)
2426 dst.seek(0)
2427 self.assertEqual(dst.read(), self.FILEDATA)
2428
2429 def test_exception_on_second_call(self):
2430 def sendfile(*args, **kwargs):
2431 if not flag:
2432 flag.append(None)
2433 return orig_sendfile(*args, **kwargs)
2434 else:
2435 raise OSError(errno.EBADF, "yo")
2436
2437 flag = []
2438 orig_sendfile = os.sendfile
2439 with unittest.mock.patch('os.sendfile', create=True,
2440 side_effect=sendfile):
2441 with self.get_files() as (src, dst):
2442 with self.assertRaises(OSError) as cm:
2443 shutil._fastcopy_sendfile(src, dst)
2444 assert flag
2445 self.assertEqual(cm.exception.errno, errno.EBADF)
2446
2447 def test_cant_get_size(self):
2448 # Emulate a case where src file size cannot be determined.
2449 # Internally bufsize will be set to a small value and
2450 # sendfile() will be called repeatedly.
2451 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2452 with self.get_files() as (src, dst):
2453 shutil._fastcopy_sendfile(src, dst)
2454 assert m.called
2455 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2456
2457 def test_small_chunks(self):
2458 # Force internal file size detection to be smaller than the
2459 # actual file size. We want to force sendfile() to be called
2460 # multiple times, also in order to emulate a src fd which gets
2461 # bigger while it is being copied.
2462 mock = unittest.mock.Mock()
2463 mock.st_size = 65536 + 1
2464 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2465 with self.get_files() as (src, dst):
2466 shutil._fastcopy_sendfile(src, dst)
2467 assert m.called
2468 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2469
2470 def test_big_chunk(self):
2471 # Force internal file size detection to be +100MB bigger than
2472 # the actual file size. Make sure sendfile() does not rely on
2473 # file size value except for (maybe) a better throughput /
2474 # performance.
2475 mock = unittest.mock.Mock()
2476 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2477 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2478 with self.get_files() as (src, dst):
2479 shutil._fastcopy_sendfile(src, dst)
2480 assert m.called
2481 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2482
2483 def test_blocksize_arg(self):
2484 with unittest.mock.patch('os.sendfile',
2485 side_effect=ZeroDivisionError) as m:
2486 self.assertRaises(ZeroDivisionError,
2487 shutil.copyfile, TESTFN, TESTFN2)
2488 blocksize = m.call_args[0][3]
2489 # Make sure file size and the block size arg passed to
2490 # sendfile() are the same.
2491 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2492 # ...unless we're dealing with a small file.
Hai Shi0c4f0f32020-06-30 21:46:31 +08002493 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002494 write_file(TESTFN2, b"hello", binary=True)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002495 self.addCleanup(os_helper.unlink, TESTFN2 + '3')
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002496 self.assertRaises(ZeroDivisionError,
2497 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2498 blocksize = m.call_args[0][3]
2499 self.assertEqual(blocksize, 2 ** 23)
2500
2501 def test_file2file_not_supported(self):
2502 # Emulate a case where sendfile() only support file->socket
2503 # fds. In such a case copyfile() is supposed to skip the
2504 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002505 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002506 try:
2507 with unittest.mock.patch(
2508 self.PATCHPOINT,
2509 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2510 with self.get_files() as (src, dst):
2511 with self.assertRaises(_GiveupOnFastCopy):
2512 shutil._fastcopy_sendfile(src, dst)
2513 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002514 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002515
2516 with unittest.mock.patch(self.PATCHPOINT) as m:
2517 shutil.copyfile(TESTFN, TESTFN2)
2518 assert not m.called
2519 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002520 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002521
2522
Victor Stinner937ee9e2018-06-26 02:11:06 +02002523@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002524class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002525 PATCHPOINT = "posix._fcopyfile"
2526
2527 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002528 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002529
2530
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002531class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002532 def test_does_not_crash(self):
2533 """Check if get_terminal_size() returns a meaningful value.
2534
2535 There's no easy portable way to actually check the size of the
2536 terminal, so let's check if it returns something sensible instead.
2537 """
2538 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002539 self.assertGreaterEqual(size.columns, 0)
2540 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002541
2542 def test_os_environ_first(self):
2543 "Check if environment variables have precedence"
2544
Hai Shi0c4f0f32020-06-30 21:46:31 +08002545 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002546 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002547 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002548 size = shutil.get_terminal_size()
2549 self.assertEqual(size.columns, 777)
2550
Hai Shi0c4f0f32020-06-30 21:46:31 +08002551 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002552 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002553 env['LINES'] = '888'
2554 size = shutil.get_terminal_size()
2555 self.assertEqual(size.lines, 888)
2556
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002557 def test_bad_environ(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002558 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002559 env['COLUMNS'] = 'xxx'
2560 env['LINES'] = 'yyy'
2561 size = shutil.get_terminal_size()
2562 self.assertGreaterEqual(size.columns, 0)
2563 self.assertGreaterEqual(size.lines, 0)
2564
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002565 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002566 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2567 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002568 def test_stty_match(self):
2569 """Check if stty returns the same results ignoring env
2570
2571 This test will fail if stdin and stdout are connected to
2572 different terminals with different sizes. Nevertheless, such
2573 situations should be pretty rare.
2574 """
2575 try:
2576 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002577 except (FileNotFoundError, PermissionError,
2578 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002579 self.skipTest("stty invocation failed")
2580 expected = (int(size[1]), int(size[0])) # reversed order
2581
Hai Shi0c4f0f32020-06-30 21:46:31 +08002582 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002583 del env['LINES']
2584 del env['COLUMNS']
2585 actual = shutil.get_terminal_size()
2586
2587 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002588
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002589 def test_fallback(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002590 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002591 del env['LINES']
2592 del env['COLUMNS']
2593
2594 # sys.__stdout__ has no fileno()
2595 with support.swap_attr(sys, '__stdout__', None):
2596 size = shutil.get_terminal_size(fallback=(10, 20))
2597 self.assertEqual(size.columns, 10)
2598 self.assertEqual(size.lines, 20)
2599
2600 # sys.__stdout__ is not a terminal on Unix
2601 # or fileno() not in (0, 1, 2) on Windows
Inada Naokic8e5eb92021-04-05 13:11:23 +09002602 with open(os.devnull, 'w', encoding='utf-8') as f, \
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002603 support.swap_attr(sys, '__stdout__', f):
2604 size = shutil.get_terminal_size(fallback=(30, 40))
2605 self.assertEqual(size.columns, 30)
2606 self.assertEqual(size.lines, 40)
2607
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002608
Berker Peksag8083cd62014-11-01 11:04:06 +02002609class PublicAPITests(unittest.TestCase):
2610 """Ensures that the correct values are exposed in the public API."""
2611
2612 def test_module_all_attribute(self):
2613 self.assertTrue(hasattr(shutil, '__all__'))
2614 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2615 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2616 'SpecialFileError', 'ExecError', 'make_archive',
2617 'get_archive_formats', 'register_archive_format',
2618 'unregister_archive_format', 'get_unpack_formats',
2619 'register_unpack_format', 'unregister_unpack_format',
2620 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2621 'get_terminal_size', 'SameFileError']
2622 if hasattr(os, 'statvfs') or os.name == 'nt':
2623 target_api.append('disk_usage')
2624 self.assertEqual(set(shutil.__all__), set(target_api))
2625
2626
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002627if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002628 unittest.main()