blob: 7669b94ac35a688e3244641ea8449253865ea9f9 [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Hai Shi0c4f0f32020-06-30 21:46:31 +080033from test.support import os_helper
34from test.support.os_helper import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030035
Antoine Pitrou7fff0962009-05-01 21:09:44 +000036TESTFN2 = TESTFN + "2"
Winson Luk132131b2021-03-02 15:53:15 -050037TESTFN_SRC = TESTFN + "_SRC"
38TESTFN_DST = TESTFN + "_DST"
Victor Stinner937ee9e2018-06-26 02:11:06 +020039MACOS = sys.platform.startswith("darwin")
Miss Islington (bot)574da462021-07-20 11:53:31 -070040SOLARIS = sys.platform.startswith("sunos")
Michael Feltef110b12019-02-18 12:02:44 +010041AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000042try:
43 import grp
44 import pwd
45 UID_GID_SUPPORT = True
46except ImportError:
47 UID_GID_SUPPORT = False
48
Steve Dowerdf2d4a62019-08-21 15:27:33 -070049try:
50 import _winapi
51except ImportError:
52 _winapi = None
53
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040054def _fake_rename(*args, **kwargs):
55 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010056 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040057
58def mock_rename(func):
59 @functools.wraps(func)
60 def wrap(*args, **kwargs):
61 try:
62 builtin_rename = os.rename
63 os.rename = _fake_rename
64 return func(*args, **kwargs)
65 finally:
66 os.rename = builtin_rename
67 return wrap
68
Éric Araujoa7e33a12011-08-12 19:51:35 +020069def write_file(path, content, binary=False):
70 """Write *content* to a file located at *path*.
71
72 If *path* is a tuple instead of a string, os.path.join will be used to
73 make a path. If *binary* is true, the file will be opened in binary
74 mode.
75 """
76 if isinstance(path, tuple):
77 path = os.path.join(*path)
Inada Naokic8e5eb92021-04-05 13:11:23 +090078 mode = 'wb' if binary else 'w'
79 encoding = None if binary else "utf-8"
80 with open(path, mode, encoding=encoding) as fp:
Éric Araujoa7e33a12011-08-12 19:51:35 +020081 fp.write(content)
82
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020083def write_test_file(path, size):
84 """Create a test file with an arbitrary size and random text content."""
85 def chunks(total, step):
86 assert total >= step
87 while total > step:
88 yield step
89 total -= step
90 if total:
91 yield total
92
93 bufsize = min(size, 8192)
94 chunk = b"".join([random.choice(string.ascii_letters).encode()
95 for i in range(bufsize)])
96 with open(path, 'wb') as f:
97 for csize in chunks(size, bufsize):
98 f.write(chunk)
99 assert os.path.getsize(path) == size
100
Éric Araujoa7e33a12011-08-12 19:51:35 +0200101def read_file(path, binary=False):
102 """Return contents from a file located at *path*.
103
104 If *path* is a tuple instead of a string, os.path.join will be used to
105 make a path. If *binary* is true, the file will be opened in binary
106 mode.
107 """
108 if isinstance(path, tuple):
109 path = os.path.join(*path)
Inada Naokic8e5eb92021-04-05 13:11:23 +0900110 mode = 'rb' if binary else 'r'
111 encoding = None if binary else "utf-8"
112 with open(path, mode, encoding=encoding) as fp:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200113 return fp.read()
114
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300115def rlistdir(path):
116 res = []
117 for name in sorted(os.listdir(path)):
118 p = os.path.join(path, name)
119 if os.path.isdir(p) and not os.path.islink(p):
120 res.append(name + '/')
121 for n in rlistdir(p):
122 res.append(name + '/' + n)
123 else:
124 res.append(name)
125 return res
126
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200127def supports_file2file_sendfile():
128 # ...apparently Linux and Solaris are the only ones
129 if not hasattr(os, "sendfile"):
130 return False
131 srcname = None
132 dstname = None
133 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800134 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200135 srcname = f.name
136 f.write(b"0123456789")
137
138 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800139 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200140 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200141 infd = src.fileno()
142 outfd = dst.fileno()
143 try:
144 os.sendfile(outfd, infd, 0, 2)
145 except OSError:
146 return False
147 else:
148 return True
149 finally:
150 if srcname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800151 os_helper.unlink(srcname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200152 if dstname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800153 os_helper.unlink(dstname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200154
155
156SUPPORTS_SENDFILE = supports_file2file_sendfile()
157
Michael Feltef110b12019-02-18 12:02:44 +0100158# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
159# The AIX command 'dump -o program' gives XCOFF header information
160# The second word of the last line in the maxdata value
161# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
162def _maxdataOK():
163 if AIX and sys.maxsize == 2147483647:
164 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
165 maxdata=hdrs.split("\n")[-1].split()[1]
166 return int(maxdata,16) >= 0x20000000
167 else:
168 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200169
Tarek Ziadé396fad72010-02-23 05:30:31 +0000170
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300171class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000172
Steve Dowerabde52c2019-11-15 09:49:21 -0800173 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000174 """Create a temporary directory that will be cleaned up.
175
176 Returns the path of the directory.
177 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800178 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Hai Shi0c4f0f32020-06-30 21:46:31 +0800179 self.addCleanup(os_helper.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000180 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000181
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300182
183class TestRmTree(BaseTest, unittest.TestCase):
184
Hynek Schlawack3b527782012-06-25 13:27:31 +0200185 def test_rmtree_works_on_bytes(self):
186 tmp = self.mkdtemp()
187 victim = os.path.join(tmp, 'killme')
188 os.mkdir(victim)
189 write_file(os.path.join(victim, 'somefile'), 'foo')
190 victim = os.fsencode(victim)
191 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700192 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200193
Hai Shi0c4f0f32020-06-30 21:46:31 +0800194 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200195 def test_rmtree_fails_on_symlink(self):
196 tmp = self.mkdtemp()
197 dir_ = os.path.join(tmp, 'dir')
198 os.mkdir(dir_)
199 link = os.path.join(tmp, 'link')
200 os.symlink(dir_, link)
201 self.assertRaises(OSError, shutil.rmtree, link)
202 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100203 self.assertTrue(os.path.lexists(link))
204 errors = []
205 def onerror(*args):
206 errors.append(args)
207 shutil.rmtree(link, onerror=onerror)
208 self.assertEqual(len(errors), 1)
209 self.assertIs(errors[0][0], os.path.islink)
210 self.assertEqual(errors[0][1], link)
211 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200212
Hai Shi0c4f0f32020-06-30 21:46:31 +0800213 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200214 def test_rmtree_works_on_symlinks(self):
215 tmp = self.mkdtemp()
216 dir1 = os.path.join(tmp, 'dir1')
217 dir2 = os.path.join(dir1, 'dir2')
218 dir3 = os.path.join(tmp, 'dir3')
219 for d in dir1, dir2, dir3:
220 os.mkdir(d)
221 file1 = os.path.join(tmp, 'file1')
222 write_file(file1, 'foo')
223 link1 = os.path.join(dir1, 'link1')
224 os.symlink(dir2, link1)
225 link2 = os.path.join(dir1, 'link2')
226 os.symlink(dir3, link2)
227 link3 = os.path.join(dir1, 'link3')
228 os.symlink(file1, link3)
229 # make sure symlinks are removed but not followed
230 shutil.rmtree(dir1)
231 self.assertFalse(os.path.exists(dir1))
232 self.assertTrue(os.path.exists(dir3))
233 self.assertTrue(os.path.exists(file1))
234
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700235 @unittest.skipUnless(_winapi, 'only relevant on Windows')
236 def test_rmtree_fails_on_junctions(self):
237 tmp = self.mkdtemp()
238 dir_ = os.path.join(tmp, 'dir')
239 os.mkdir(dir_)
240 link = os.path.join(tmp, 'link')
241 _winapi.CreateJunction(dir_, link)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800242 self.addCleanup(os_helper.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700243 self.assertRaises(OSError, shutil.rmtree, link)
244 self.assertTrue(os.path.exists(dir_))
245 self.assertTrue(os.path.lexists(link))
246 errors = []
247 def onerror(*args):
248 errors.append(args)
249 shutil.rmtree(link, onerror=onerror)
250 self.assertEqual(len(errors), 1)
251 self.assertIs(errors[0][0], os.path.islink)
252 self.assertEqual(errors[0][1], link)
253 self.assertIsInstance(errors[0][2][1], OSError)
254
255 @unittest.skipUnless(_winapi, 'only relevant on Windows')
256 def test_rmtree_works_on_junctions(self):
257 tmp = self.mkdtemp()
258 dir1 = os.path.join(tmp, 'dir1')
259 dir2 = os.path.join(dir1, 'dir2')
260 dir3 = os.path.join(tmp, 'dir3')
261 for d in dir1, dir2, dir3:
262 os.mkdir(d)
263 file1 = os.path.join(tmp, 'file1')
264 write_file(file1, 'foo')
265 link1 = os.path.join(dir1, 'link1')
266 _winapi.CreateJunction(dir2, link1)
267 link2 = os.path.join(dir1, 'link2')
268 _winapi.CreateJunction(dir3, link2)
269 link3 = os.path.join(dir1, 'link3')
270 _winapi.CreateJunction(file1, link3)
271 # make sure junctions are removed but not followed
272 shutil.rmtree(dir1)
273 self.assertFalse(os.path.exists(dir1))
274 self.assertTrue(os.path.exists(dir3))
275 self.assertTrue(os.path.exists(file1))
276
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000277 def test_rmtree_errors(self):
278 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800279 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100280 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
281 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100282 shutil.rmtree(filename, ignore_errors=True)
283
284 # existing file
285 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100286 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100287 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100288 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100289 shutil.rmtree(filename)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100290 self.assertEqual(cm.exception.filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100291 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100292 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100293 shutil.rmtree(filename, ignore_errors=True)
294 self.assertTrue(os.path.exists(filename))
295 errors = []
296 def onerror(*args):
297 errors.append(args)
298 shutil.rmtree(filename, onerror=onerror)
299 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200300 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100301 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100302 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100303 self.assertEqual(errors[0][2][1].filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100304 self.assertIs(errors[1][0], os.rmdir)
305 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100306 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100307 self.assertEqual(errors[1][2][1].filename, filename)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000308
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000309
Serhiy Storchaka43767632013-11-03 21:31:38 +0200310 @unittest.skipIf(sys.platform[:6] == 'cygwin',
311 "This test can't be run on Cygwin (issue #1071513).")
312 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
313 "This test can't be run reliably as root (issue #1076467).")
314 def test_on_error(self):
315 self.errorState = 0
316 os.mkdir(TESTFN)
317 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200318
Serhiy Storchaka43767632013-11-03 21:31:38 +0200319 self.child_file_path = os.path.join(TESTFN, 'a')
320 self.child_dir_path = os.path.join(TESTFN, 'b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800321 os_helper.create_empty_file(self.child_file_path)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200322 os.mkdir(self.child_dir_path)
323 old_dir_mode = os.stat(TESTFN).st_mode
324 old_child_file_mode = os.stat(self.child_file_path).st_mode
325 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
326 # Make unwritable.
327 new_mode = stat.S_IREAD|stat.S_IEXEC
328 os.chmod(self.child_file_path, new_mode)
329 os.chmod(self.child_dir_path, new_mode)
330 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000331
Serhiy Storchaka43767632013-11-03 21:31:38 +0200332 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
333 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
334 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200335
Serhiy Storchaka43767632013-11-03 21:31:38 +0200336 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
337 # Test whether onerror has actually been called.
338 self.assertEqual(self.errorState, 3,
339 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000340
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000341 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000342 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200343 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000344 # This function is run when shutil.rmtree fails.
345 # 99.9% of the time it initially fails to remove
346 # a file in the directory, so the first time through
347 # func is os.remove.
348 # However, some Linux machines running ZFS on
349 # FUSE experienced a failure earlier in the process
350 # at os.listdir. The first failure may legally
351 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200352 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200353 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200354 self.assertEqual(arg, self.child_file_path)
355 elif func is os.rmdir:
356 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000357 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200358 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200359 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000360 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200361 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000362 else:
363 self.assertEqual(func, os.rmdir)
364 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000365 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200366 self.errorState = 3
367
368 def test_rmtree_does_not_choke_on_failing_lstat(self):
369 try:
370 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200371 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200372 if fn != TESTFN:
373 raise OSError()
374 else:
375 return orig_lstat(fn)
376 os.lstat = raiser
377
378 os.mkdir(TESTFN)
379 write_file((TESTFN, 'foo'), 'foo')
380 shutil.rmtree(TESTFN)
381 finally:
382 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000383
Hynek Schlawack2100b422012-06-23 20:28:32 +0200384 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200385 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
386 os.supports_dir_fd and
387 os.listdir in os.supports_fd and
388 os.stat in os.supports_follow_symlinks)
389 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200390 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000391 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200392 tmp_dir = self.mkdtemp()
393 d = os.path.join(tmp_dir, 'a')
394 os.mkdir(d)
395 try:
396 real_rmtree = shutil._rmtree_safe_fd
397 class Called(Exception): pass
398 def _raiser(*args, **kwargs):
399 raise Called
400 shutil._rmtree_safe_fd = _raiser
401 self.assertRaises(Called, shutil.rmtree, d)
402 finally:
403 shutil._rmtree_safe_fd = real_rmtree
404 else:
405 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000406 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200407
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000408 def test_rmtree_dont_delete_file(self):
409 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800410 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200411 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200412 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000413 os.remove(path)
414
Hai Shi0c4f0f32020-06-30 21:46:31 +0800415 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300416 def test_rmtree_on_symlink(self):
417 # bug 1669.
418 os.mkdir(TESTFN)
419 try:
420 src = os.path.join(TESTFN, 'cheese')
421 dst = os.path.join(TESTFN, 'shop')
422 os.mkdir(src)
423 os.symlink(src, dst)
424 self.assertRaises(OSError, shutil.rmtree, dst)
425 shutil.rmtree(dst, ignore_errors=True)
426 finally:
427 shutil.rmtree(TESTFN, ignore_errors=True)
428
429 @unittest.skipUnless(_winapi, 'only relevant on Windows')
430 def test_rmtree_on_junction(self):
431 os.mkdir(TESTFN)
432 try:
433 src = os.path.join(TESTFN, 'cheese')
434 dst = os.path.join(TESTFN, 'shop')
435 os.mkdir(src)
436 open(os.path.join(src, 'spam'), 'wb').close()
437 _winapi.CreateJunction(src, dst)
438 self.assertRaises(OSError, shutil.rmtree, dst)
439 shutil.rmtree(dst, ignore_errors=True)
440 finally:
441 shutil.rmtree(TESTFN, ignore_errors=True)
442
443
444class TestCopyTree(BaseTest, unittest.TestCase):
445
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000446 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800447 src_dir = self.mkdtemp()
448 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200449 self.addCleanup(shutil.rmtree, src_dir)
450 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
451 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000452 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200453 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000454
Éric Araujoa7e33a12011-08-12 19:51:35 +0200455 shutil.copytree(src_dir, dst_dir)
456 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
457 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
458 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
459 'test.txt')))
460 actual = read_file((dst_dir, 'test.txt'))
461 self.assertEqual(actual, '123')
462 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
463 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000464
jab9e00d9e2018-12-28 13:03:40 -0500465 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800466 src_dir = self.mkdtemp()
467 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500468 self.addCleanup(shutil.rmtree, src_dir)
469 self.addCleanup(shutil.rmtree, dst_dir)
470
471 write_file((src_dir, 'nonexisting.txt'), '123')
472 os.mkdir(os.path.join(src_dir, 'existing_dir'))
473 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
474 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
475 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
476
477 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
478 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
479 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
480 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
481 'existing.txt')))
482 actual = read_file((dst_dir, 'nonexisting.txt'))
483 self.assertEqual(actual, '123')
484 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
485 self.assertEqual(actual, 'has been replaced')
486
487 with self.assertRaises(FileExistsError):
488 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
489
Hai Shi0c4f0f32020-06-30 21:46:31 +0800490 @os_helper.skip_unless_symlink
Antoine Pitrou78091e62011-12-29 18:54:15 +0100491 def test_copytree_symlinks(self):
492 tmp_dir = self.mkdtemp()
493 src_dir = os.path.join(tmp_dir, 'src')
494 dst_dir = os.path.join(tmp_dir, 'dst')
495 sub_dir = os.path.join(src_dir, 'sub')
496 os.mkdir(src_dir)
497 os.mkdir(sub_dir)
498 write_file((src_dir, 'file.txt'), 'foo')
499 src_link = os.path.join(sub_dir, 'link')
500 dst_link = os.path.join(dst_dir, 'sub/link')
501 os.symlink(os.path.join(src_dir, 'file.txt'),
502 src_link)
503 if hasattr(os, 'lchmod'):
504 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
505 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
506 os.lchflags(src_link, stat.UF_NODUMP)
507 src_stat = os.lstat(src_link)
508 shutil.copytree(src_dir, dst_dir, symlinks=True)
509 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700510 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
511 # Bad practice to blindly strip the prefix as it may be required to
512 # correctly refer to the file, but we're only comparing paths here.
513 if os.name == 'nt' and actual.startswith('\\\\?\\'):
514 actual = actual[4:]
515 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100516 dst_stat = os.lstat(dst_link)
517 if hasattr(os, 'lchmod'):
518 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
519 if hasattr(os, 'lchflags'):
520 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
521
Georg Brandl2ee470f2008-07-16 12:55:28 +0000522 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000523 # creating data
524 join = os.path.join
525 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800526 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000527 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800528 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200529 write_file((src_dir, 'test.txt'), '123')
530 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000531 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200532 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000533 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200534 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000535 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
536 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200537 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
538 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000539
540 # testing glob-like patterns
541 try:
542 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
543 shutil.copytree(src_dir, dst_dir, ignore=patterns)
544 # checking the result: some elements should not be copied
545 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200546 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
547 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000548 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200549 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000550 try:
551 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
552 shutil.copytree(src_dir, dst_dir, ignore=patterns)
553 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200554 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
555 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
556 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000557 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200558 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000559
560 # testing callable-style
561 try:
562 def _filter(src, names):
563 res = []
564 for name in names:
565 path = os.path.join(src, name)
566
567 if (os.path.isdir(path) and
568 path.split()[-1] == 'subdir'):
569 res.append(name)
570 elif os.path.splitext(path)[-1] in ('.py'):
571 res.append(name)
572 return res
573
574 shutil.copytree(src_dir, dst_dir, ignore=_filter)
575
576 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200577 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
578 'test.py')))
579 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000580
581 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200582 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000583 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000584 shutil.rmtree(src_dir)
585 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000586
mbarkhau88704332020-01-24 14:51:16 +0000587 def test_copytree_arg_types_of_ignore(self):
588 join = os.path.join
589 exists = os.path.exists
590
591 tmp_dir = self.mkdtemp()
592 src_dir = join(tmp_dir, "source")
593
594 os.mkdir(join(src_dir))
595 os.mkdir(join(src_dir, 'test_dir'))
596 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
597 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
598
599 invokations = []
600
601 def _ignore(src, names):
602 invokations.append(src)
603 self.assertIsInstance(src, str)
604 self.assertIsInstance(names, list)
605 self.assertEqual(len(names), len(set(names)))
606 for name in names:
607 self.assertIsInstance(name, str)
608 return []
609
610 dst_dir = join(self.mkdtemp(), 'destination')
611 shutil.copytree(src_dir, dst_dir, ignore=_ignore)
612 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
613 'test.txt')))
614
615 dst_dir = join(self.mkdtemp(), 'destination')
616 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
617 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
618 'test.txt')))
619
620 dst_dir = join(self.mkdtemp(), 'destination')
621 src_dir_entry = list(os.scandir(tmp_dir))[0]
622 self.assertIsInstance(src_dir_entry, os.DirEntry)
623 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
624 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
625 'test.txt')))
626
627 self.assertEqual(len(invokations), 9)
628
Antoine Pitrouac601602013-08-16 19:35:02 +0200629 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800630 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200631 src_dir = os.path.join(tmp_dir, 'source')
632 os.mkdir(src_dir)
633 dst_dir = os.path.join(tmp_dir, 'destination')
634 self.addCleanup(shutil.rmtree, tmp_dir)
635
636 os.chmod(src_dir, 0o777)
637 write_file((src_dir, 'permissive.txt'), '123')
638 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
639 write_file((src_dir, 'restrictive.txt'), '456')
640 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
641 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800642 self.addCleanup(os_helper.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200643 os.chmod(restrictive_subdir, 0o600)
644
645 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400646 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
647 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200648 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400649 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200650 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
651 restrictive_subdir_dst = os.path.join(dst_dir,
652 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400653 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200654 os.stat(restrictive_subdir_dst).st_mode)
655
Berker Peksag884afd92014-12-10 02:50:32 +0200656 @unittest.mock.patch('os.chmod')
657 def test_copytree_winerror(self, mock_patch):
658 # When copying to VFAT, copystat() raises OSError. On Windows, the
659 # exception object has a meaningful 'winerror' attribute, but not
660 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800661 src_dir = self.mkdtemp()
662 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200663 self.addCleanup(shutil.rmtree, src_dir)
664 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
665
666 mock_patch.side_effect = PermissionError('ka-boom')
667 with self.assertRaises(shutil.Error):
668 shutil.copytree(src_dir, dst_dir)
669
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100670 def test_copytree_custom_copy_function(self):
671 # See: https://bugs.python.org/issue35648
672 def custom_cpfun(a, b):
673 flag.append(None)
674 self.assertIsInstance(a, str)
675 self.assertIsInstance(b, str)
676 self.assertEqual(a, os.path.join(src, 'foo'))
677 self.assertEqual(b, os.path.join(dst, 'foo'))
678
679 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800680 src = self.mkdtemp()
681 dst = tempfile.mktemp(dir=self.mkdtemp())
Inada Naokic8e5eb92021-04-05 13:11:23 +0900682 with open(os.path.join(src, 'foo'), 'w', encoding='utf-8') as f:
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100683 f.close()
684 shutil.copytree(src, dst, copy_function=custom_cpfun)
685 self.assertEqual(len(flag), 1)
686
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300687 # Issue #3002: copyfile and copytree block indefinitely on named pipes
688 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800689 @os_helper.skip_unless_symlink
pxinwr6a273fd2020-11-29 06:06:36 +0800690 @unittest.skipIf(sys.platform == "vxworks",
691 "fifo requires special path on VxWorks")
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300692 def test_copytree_named_pipe(self):
693 os.mkdir(TESTFN)
694 try:
695 subdir = os.path.join(TESTFN, "subdir")
696 os.mkdir(subdir)
697 pipe = os.path.join(subdir, "mypipe")
698 try:
699 os.mkfifo(pipe)
700 except PermissionError as e:
701 self.skipTest('os.mkfifo(): %s' % e)
702 try:
703 shutil.copytree(TESTFN, TESTFN2)
704 except shutil.Error as e:
705 errors = e.args[0]
706 self.assertEqual(len(errors), 1)
707 src, dst, error_msg = errors[0]
708 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
709 else:
710 self.fail("shutil.Error should have been raised")
711 finally:
712 shutil.rmtree(TESTFN, ignore_errors=True)
713 shutil.rmtree(TESTFN2, ignore_errors=True)
714
715 def test_copytree_special_func(self):
716 src_dir = self.mkdtemp()
717 dst_dir = os.path.join(self.mkdtemp(), 'destination')
718 write_file((src_dir, 'test.txt'), '123')
719 os.mkdir(os.path.join(src_dir, 'test_dir'))
720 write_file((src_dir, 'test_dir', 'test.txt'), '456')
721
722 copied = []
723 def _copy(src, dst):
724 copied.append((src, dst))
725
726 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
727 self.assertEqual(len(copied), 2)
728
Hai Shi0c4f0f32020-06-30 21:46:31 +0800729 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300730 def test_copytree_dangling_symlinks(self):
731 # a dangling symlink raises an error at the end
732 src_dir = self.mkdtemp()
733 dst_dir = os.path.join(self.mkdtemp(), 'destination')
734 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
735 os.mkdir(os.path.join(src_dir, 'test_dir'))
736 write_file((src_dir, 'test_dir', 'test.txt'), '456')
737 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
738
739 # a dangling symlink is ignored with the proper flag
740 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
741 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
742 self.assertNotIn('test.txt', os.listdir(dst_dir))
743
744 # a dangling symlink is copied if symlinks=True
745 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
746 shutil.copytree(src_dir, dst_dir, symlinks=True)
747 self.assertIn('test.txt', os.listdir(dst_dir))
748
Hai Shi0c4f0f32020-06-30 21:46:31 +0800749 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300750 def test_copytree_symlink_dir(self):
751 src_dir = self.mkdtemp()
752 dst_dir = os.path.join(self.mkdtemp(), 'destination')
753 os.mkdir(os.path.join(src_dir, 'real_dir'))
Inada Naokic8e5eb92021-04-05 13:11:23 +0900754 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'wb'):
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300755 pass
756 os.symlink(os.path.join(src_dir, 'real_dir'),
757 os.path.join(src_dir, 'link_to_dir'),
758 target_is_directory=True)
759
760 shutil.copytree(src_dir, dst_dir, symlinks=False)
761 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
762 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
763
764 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
765 shutil.copytree(src_dir, dst_dir, symlinks=True)
766 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
767 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
768
769 def test_copytree_return_value(self):
770 # copytree returns its destination path.
771 src_dir = self.mkdtemp()
772 dst_dir = src_dir + "dest"
773 self.addCleanup(shutil.rmtree, dst_dir, True)
774 src = os.path.join(src_dir, 'foo')
775 write_file(src, 'foo')
776 rv = shutil.copytree(src_dir, dst_dir)
777 self.assertEqual(['foo'], os.listdir(rv))
778
Bruno P. Kinoshita9bbcbc92019-11-27 14:10:37 +1300779 def test_copytree_subdirectory(self):
780 # copytree where dst is a subdirectory of src, see Issue 38688
781 base_dir = self.mkdtemp()
782 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
783 src_dir = os.path.join(base_dir, "t", "pg")
784 dst_dir = os.path.join(src_dir, "somevendor", "1.0")
785 os.makedirs(src_dir)
786 src = os.path.join(src_dir, 'pol')
787 write_file(src, 'pol')
788 rv = shutil.copytree(src_dir, dst_dir)
789 self.assertEqual(['pol'], os.listdir(rv))
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300790
791class TestCopy(BaseTest, unittest.TestCase):
792
793 ### shutil.copymode
794
Hai Shi0c4f0f32020-06-30 21:46:31 +0800795 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300796 def test_copymode_follow_symlinks(self):
797 tmp_dir = self.mkdtemp()
798 src = os.path.join(tmp_dir, 'foo')
799 dst = os.path.join(tmp_dir, 'bar')
800 src_link = os.path.join(tmp_dir, 'baz')
801 dst_link = os.path.join(tmp_dir, 'quux')
802 write_file(src, 'foo')
803 write_file(dst, 'foo')
804 os.symlink(src, src_link)
805 os.symlink(dst, dst_link)
806 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
807 # file to file
808 os.chmod(dst, stat.S_IRWXO)
809 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
810 shutil.copymode(src, dst)
811 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
812 # On Windows, os.chmod does not follow symlinks (issue #15411)
813 if os.name != 'nt':
814 # follow src link
815 os.chmod(dst, stat.S_IRWXO)
816 shutil.copymode(src_link, dst)
817 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
818 # follow dst link
819 os.chmod(dst, stat.S_IRWXO)
820 shutil.copymode(src, dst_link)
821 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
822 # follow both links
823 os.chmod(dst, stat.S_IRWXO)
824 shutil.copymode(src_link, dst_link)
825 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
826
827 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800828 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300829 def test_copymode_symlink_to_symlink(self):
830 tmp_dir = self.mkdtemp()
831 src = os.path.join(tmp_dir, 'foo')
832 dst = os.path.join(tmp_dir, 'bar')
833 src_link = os.path.join(tmp_dir, 'baz')
834 dst_link = os.path.join(tmp_dir, 'quux')
835 write_file(src, 'foo')
836 write_file(dst, 'foo')
837 os.symlink(src, src_link)
838 os.symlink(dst, dst_link)
839 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
840 os.chmod(dst, stat.S_IRWXU)
841 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
842 # link to link
843 os.lchmod(dst_link, stat.S_IRWXO)
844 shutil.copymode(src_link, dst_link, follow_symlinks=False)
845 self.assertEqual(os.lstat(src_link).st_mode,
846 os.lstat(dst_link).st_mode)
847 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
848 # src link - use chmod
849 os.lchmod(dst_link, stat.S_IRWXO)
850 shutil.copymode(src_link, dst, follow_symlinks=False)
851 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
852 # dst link - use chmod
853 os.lchmod(dst_link, stat.S_IRWXO)
854 shutil.copymode(src, dst_link, follow_symlinks=False)
855 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
856
857 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800858 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300859 def test_copymode_symlink_to_symlink_wo_lchmod(self):
860 tmp_dir = self.mkdtemp()
861 src = os.path.join(tmp_dir, 'foo')
862 dst = os.path.join(tmp_dir, 'bar')
863 src_link = os.path.join(tmp_dir, 'baz')
864 dst_link = os.path.join(tmp_dir, 'quux')
865 write_file(src, 'foo')
866 write_file(dst, 'foo')
867 os.symlink(src, src_link)
868 os.symlink(dst, dst_link)
869 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
870
871 ### shutil.copystat
872
Hai Shi0c4f0f32020-06-30 21:46:31 +0800873 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300874 def test_copystat_symlinks(self):
875 tmp_dir = self.mkdtemp()
876 src = os.path.join(tmp_dir, 'foo')
877 dst = os.path.join(tmp_dir, 'bar')
878 src_link = os.path.join(tmp_dir, 'baz')
879 dst_link = os.path.join(tmp_dir, 'qux')
880 write_file(src, 'foo')
881 src_stat = os.stat(src)
882 os.utime(src, (src_stat.st_atime,
883 src_stat.st_mtime - 42.0)) # ensure different mtimes
884 write_file(dst, 'bar')
885 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
886 os.symlink(src, src_link)
887 os.symlink(dst, dst_link)
888 if hasattr(os, 'lchmod'):
889 os.lchmod(src_link, stat.S_IRWXO)
890 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
891 os.lchflags(src_link, stat.UF_NODUMP)
892 src_link_stat = os.lstat(src_link)
893 # follow
894 if hasattr(os, 'lchmod'):
895 shutil.copystat(src_link, dst_link, follow_symlinks=True)
896 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
897 # don't follow
898 shutil.copystat(src_link, dst_link, follow_symlinks=False)
899 dst_link_stat = os.lstat(dst_link)
900 if os.utime in os.supports_follow_symlinks:
901 for attr in 'st_atime', 'st_mtime':
902 # The modification times may be truncated in the new file.
903 self.assertLessEqual(getattr(src_link_stat, attr),
904 getattr(dst_link_stat, attr) + 1)
905 if hasattr(os, 'lchmod'):
906 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
907 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
908 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
909 # tell to follow but dst is not a link
910 shutil.copystat(src_link, dst, follow_symlinks=False)
911 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
912 00000.1)
913
914 @unittest.skipUnless(hasattr(os, 'chflags') and
915 hasattr(errno, 'EOPNOTSUPP') and
916 hasattr(errno, 'ENOTSUP'),
917 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
918 def test_copystat_handles_harmless_chflags_errors(self):
919 tmpdir = self.mkdtemp()
920 file1 = os.path.join(tmpdir, 'file1')
921 file2 = os.path.join(tmpdir, 'file2')
922 write_file(file1, 'xxx')
923 write_file(file2, 'xxx')
924
925 def make_chflags_raiser(err):
926 ex = OSError()
927
928 def _chflags_raiser(path, flags, *, follow_symlinks=True):
929 ex.errno = err
930 raise ex
931 return _chflags_raiser
932 old_chflags = os.chflags
933 try:
934 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
935 os.chflags = make_chflags_raiser(err)
936 shutil.copystat(file1, file2)
937 # assert others errors break it
938 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
939 self.assertRaises(OSError, shutil.copystat, file1, file2)
940 finally:
941 os.chflags = old_chflags
942
943 ### shutil.copyxattr
944
Hai Shi0c4f0f32020-06-30 21:46:31 +0800945 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300946 def test_copyxattr(self):
947 tmp_dir = self.mkdtemp()
948 src = os.path.join(tmp_dir, 'foo')
949 write_file(src, 'foo')
950 dst = os.path.join(tmp_dir, 'bar')
951 write_file(dst, 'bar')
952
953 # no xattr == no problem
954 shutil._copyxattr(src, dst)
955 # common case
956 os.setxattr(src, 'user.foo', b'42')
957 os.setxattr(src, 'user.bar', b'43')
958 shutil._copyxattr(src, dst)
959 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
960 self.assertEqual(
961 os.getxattr(src, 'user.foo'),
962 os.getxattr(dst, 'user.foo'))
963 # check errors don't affect other attrs
964 os.remove(dst)
965 write_file(dst, 'bar')
966 os_error = OSError(errno.EPERM, 'EPERM')
967
968 def _raise_on_user_foo(fname, attr, val, **kwargs):
969 if attr == 'user.foo':
970 raise os_error
971 else:
972 orig_setxattr(fname, attr, val, **kwargs)
973 try:
974 orig_setxattr = os.setxattr
975 os.setxattr = _raise_on_user_foo
976 shutil._copyxattr(src, dst)
977 self.assertIn('user.bar', os.listxattr(dst))
978 finally:
979 os.setxattr = orig_setxattr
980 # the source filesystem not supporting xattrs should be ok, too.
981 def _raise_on_src(fname, *, follow_symlinks=True):
982 if fname == src:
983 raise OSError(errno.ENOTSUP, 'Operation not supported')
984 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
985 try:
986 orig_listxattr = os.listxattr
987 os.listxattr = _raise_on_src
988 shutil._copyxattr(src, dst)
989 finally:
990 os.listxattr = orig_listxattr
991
992 # test that shutil.copystat copies xattrs
993 src = os.path.join(tmp_dir, 'the_original')
994 srcro = os.path.join(tmp_dir, 'the_original_ro')
995 write_file(src, src)
996 write_file(srcro, srcro)
997 os.setxattr(src, 'user.the_value', b'fiddly')
998 os.setxattr(srcro, 'user.the_value', b'fiddly')
999 os.chmod(srcro, 0o444)
1000 dst = os.path.join(tmp_dir, 'the_copy')
1001 dstro = os.path.join(tmp_dir, 'the_copy_ro')
1002 write_file(dst, dst)
1003 write_file(dstro, dstro)
1004 shutil.copystat(src, dst)
1005 shutil.copystat(srcro, dstro)
1006 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1007 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1008
Hai Shi0c4f0f32020-06-30 21:46:31 +08001009 @os_helper.skip_unless_symlink
1010 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001011 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
1012 'root privileges required')
1013 def test_copyxattr_symlinks(self):
1014 # On Linux, it's only possible to access non-user xattr for symlinks;
1015 # which in turn require root privileges. This test should be expanded
1016 # as soon as other platforms gain support for extended attributes.
1017 tmp_dir = self.mkdtemp()
1018 src = os.path.join(tmp_dir, 'foo')
1019 src_link = os.path.join(tmp_dir, 'baz')
1020 write_file(src, 'foo')
1021 os.symlink(src, src_link)
1022 os.setxattr(src, 'trusted.foo', b'42')
1023 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1024 dst = os.path.join(tmp_dir, 'bar')
1025 dst_link = os.path.join(tmp_dir, 'qux')
1026 write_file(dst, 'bar')
1027 os.symlink(dst, dst_link)
1028 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1029 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1030 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1031 shutil._copyxattr(src_link, dst, follow_symlinks=False)
1032 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1033
1034 ### shutil.copy
1035
1036 def _copy_file(self, method):
1037 fname = 'test.txt'
1038 tmpdir = self.mkdtemp()
1039 write_file((tmpdir, fname), 'xxx')
1040 file1 = os.path.join(tmpdir, fname)
1041 tmpdir2 = self.mkdtemp()
1042 method(file1, tmpdir2)
1043 file2 = os.path.join(tmpdir2, fname)
1044 return (file1, file2)
1045
1046 def test_copy(self):
1047 # Ensure that the copied file exists and has the same mode bits.
1048 file1, file2 = self._copy_file(shutil.copy)
1049 self.assertTrue(os.path.exists(file2))
1050 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1051
Hai Shi0c4f0f32020-06-30 21:46:31 +08001052 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001053 def test_copy_symlinks(self):
1054 tmp_dir = self.mkdtemp()
1055 src = os.path.join(tmp_dir, 'foo')
1056 dst = os.path.join(tmp_dir, 'bar')
1057 src_link = os.path.join(tmp_dir, 'baz')
1058 write_file(src, 'foo')
1059 os.symlink(src, src_link)
1060 if hasattr(os, 'lchmod'):
1061 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1062 # don't follow
1063 shutil.copy(src_link, dst, follow_symlinks=True)
1064 self.assertFalse(os.path.islink(dst))
1065 self.assertEqual(read_file(src), read_file(dst))
1066 os.remove(dst)
1067 # follow
1068 shutil.copy(src_link, dst, follow_symlinks=False)
1069 self.assertTrue(os.path.islink(dst))
1070 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1071 if hasattr(os, 'lchmod'):
1072 self.assertEqual(os.lstat(src_link).st_mode,
1073 os.lstat(dst).st_mode)
1074
1075 ### shutil.copy2
1076
1077 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1078 def test_copy2(self):
1079 # Ensure that the copied file exists and has the same mode and
1080 # modification time bits.
1081 file1, file2 = self._copy_file(shutil.copy2)
1082 self.assertTrue(os.path.exists(file2))
1083 file1_stat = os.stat(file1)
1084 file2_stat = os.stat(file2)
1085 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1086 for attr in 'st_atime', 'st_mtime':
1087 # The modification times may be truncated in the new file.
1088 self.assertLessEqual(getattr(file1_stat, attr),
1089 getattr(file2_stat, attr) + 1)
1090 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1091 self.assertEqual(getattr(file1_stat, 'st_flags'),
1092 getattr(file2_stat, 'st_flags'))
1093
Hai Shi0c4f0f32020-06-30 21:46:31 +08001094 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001095 def test_copy2_symlinks(self):
1096 tmp_dir = self.mkdtemp()
1097 src = os.path.join(tmp_dir, 'foo')
1098 dst = os.path.join(tmp_dir, 'bar')
1099 src_link = os.path.join(tmp_dir, 'baz')
1100 write_file(src, 'foo')
1101 os.symlink(src, src_link)
1102 if hasattr(os, 'lchmod'):
1103 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1104 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1105 os.lchflags(src_link, stat.UF_NODUMP)
1106 src_stat = os.stat(src)
1107 src_link_stat = os.lstat(src_link)
1108 # follow
1109 shutil.copy2(src_link, dst, follow_symlinks=True)
1110 self.assertFalse(os.path.islink(dst))
1111 self.assertEqual(read_file(src), read_file(dst))
1112 os.remove(dst)
1113 # don't follow
1114 shutil.copy2(src_link, dst, follow_symlinks=False)
1115 self.assertTrue(os.path.islink(dst))
1116 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1117 dst_stat = os.lstat(dst)
1118 if os.utime in os.supports_follow_symlinks:
1119 for attr in 'st_atime', 'st_mtime':
1120 # The modification times may be truncated in the new file.
1121 self.assertLessEqual(getattr(src_link_stat, attr),
1122 getattr(dst_stat, attr) + 1)
1123 if hasattr(os, 'lchmod'):
1124 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1125 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1126 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1127 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1128
Hai Shi0c4f0f32020-06-30 21:46:31 +08001129 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001130 def test_copy2_xattr(self):
1131 tmp_dir = self.mkdtemp()
1132 src = os.path.join(tmp_dir, 'foo')
1133 dst = os.path.join(tmp_dir, 'bar')
1134 write_file(src, 'foo')
1135 os.setxattr(src, 'user.foo', b'42')
1136 shutil.copy2(src, dst)
1137 self.assertEqual(
1138 os.getxattr(src, 'user.foo'),
1139 os.getxattr(dst, 'user.foo'))
1140 os.remove(dst)
1141
1142 def test_copy_return_value(self):
1143 # copy and copy2 both return their destination path.
1144 for fn in (shutil.copy, shutil.copy2):
1145 src_dir = self.mkdtemp()
1146 dst_dir = self.mkdtemp()
1147 src = os.path.join(src_dir, 'foo')
1148 write_file(src, 'foo')
1149 rv = fn(src, dst_dir)
1150 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1151 rv = fn(src, os.path.join(dst_dir, 'bar'))
1152 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1153
Miss Islington (bot)41d48bc2021-09-21 15:14:40 -07001154 def test_copy_dir(self):
1155 self._test_copy_dir(shutil.copy)
1156
1157 def test_copy2_dir(self):
1158 self._test_copy_dir(shutil.copy2)
1159
1160 def _test_copy_dir(self, copy_func):
1161 src_dir = self.mkdtemp()
1162 src_file = os.path.join(src_dir, 'foo')
1163 dir2 = self.mkdtemp()
1164 dst = os.path.join(src_dir, 'does_not_exist/')
1165 write_file(src_file, 'foo')
1166 if sys.platform == "win32":
1167 err = PermissionError
1168 else:
1169 err = IsADirectoryError
1170 self.assertRaises(err, copy_func, dir2, src_dir)
1171
1172 # raise *err* because of src rather than FileNotFoundError because of dst
1173 self.assertRaises(err, copy_func, dir2, dst)
1174 copy_func(src_file, dir2) # should not raise exceptions
1175
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001176 ### shutil.copyfile
1177
Hai Shi0c4f0f32020-06-30 21:46:31 +08001178 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001179 def test_copyfile_symlinks(self):
1180 tmp_dir = self.mkdtemp()
1181 src = os.path.join(tmp_dir, 'src')
1182 dst = os.path.join(tmp_dir, 'dst')
1183 dst_link = os.path.join(tmp_dir, 'dst_link')
1184 link = os.path.join(tmp_dir, 'link')
1185 write_file(src, 'foo')
1186 os.symlink(src, link)
1187 # don't follow
1188 shutil.copyfile(link, dst_link, follow_symlinks=False)
1189 self.assertTrue(os.path.islink(dst_link))
1190 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1191 # follow
1192 shutil.copyfile(link, dst)
1193 self.assertFalse(os.path.islink(dst))
1194
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001195 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001196 def test_dont_copy_file_onto_link_to_itself(self):
1197 # bug 851123.
1198 os.mkdir(TESTFN)
1199 src = os.path.join(TESTFN, 'cheese')
1200 dst = os.path.join(TESTFN, 'shop')
1201 try:
Inada Naokic8e5eb92021-04-05 13:11:23 +09001202 with open(src, 'w', encoding='utf-8') as f:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001203 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001204 try:
1205 os.link(src, dst)
1206 except PermissionError as e:
1207 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001208 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Inada Naokic8e5eb92021-04-05 13:11:23 +09001209 with open(src, 'r', encoding='utf-8') as f:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001210 self.assertEqual(f.read(), 'cheddar')
1211 os.remove(dst)
1212 finally:
1213 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001214
Hai Shi0c4f0f32020-06-30 21:46:31 +08001215 @os_helper.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001216 def test_dont_copy_file_onto_symlink_to_itself(self):
1217 # bug 851123.
1218 os.mkdir(TESTFN)
1219 src = os.path.join(TESTFN, 'cheese')
1220 dst = os.path.join(TESTFN, 'shop')
1221 try:
Inada Naokic8e5eb92021-04-05 13:11:23 +09001222 with open(src, 'w', encoding='utf-8') as f:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001223 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001224 # Using `src` here would mean we end up with a symlink pointing
1225 # to TESTFN/TESTFN/cheese, while it should point at
1226 # TESTFN/cheese.
1227 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001228 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Inada Naokic8e5eb92021-04-05 13:11:23 +09001229 with open(src, 'r', encoding='utf-8') as f:
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001230 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001231 os.remove(dst)
1232 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001233 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001234
Serhiy Storchaka43767632013-11-03 21:31:38 +02001235 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1236 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
pxinwr6a273fd2020-11-29 06:06:36 +08001237 @unittest.skipIf(sys.platform == "vxworks",
1238 "fifo requires special path on VxWorks")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001239 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001240 try:
1241 os.mkfifo(TESTFN)
1242 except PermissionError as e:
1243 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001244 try:
1245 self.assertRaises(shutil.SpecialFileError,
1246 shutil.copyfile, TESTFN, TESTFN2)
1247 self.assertRaises(shutil.SpecialFileError,
1248 shutil.copyfile, __file__, TESTFN)
1249 finally:
1250 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001251
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001252 def test_copyfile_return_value(self):
1253 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001254 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001255 dst_dir = self.mkdtemp()
1256 dst_file = os.path.join(dst_dir, 'bar')
1257 src_file = os.path.join(src_dir, 'foo')
1258 write_file(src_file, 'foo')
1259 rv = shutil.copyfile(src_file, dst_file)
1260 self.assertTrue(os.path.exists(rv))
1261 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001262
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001263 def test_copyfile_same_file(self):
1264 # copyfile() should raise SameFileError if the source and destination
1265 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001266 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001267 src_file = os.path.join(src_dir, 'foo')
1268 write_file(src_file, 'foo')
1269 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1270 # But Error should work too, to stay backward compatible.
1271 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1272 # Make sure file is not corrupted.
1273 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001274
Miss Islington (bot)574da462021-07-20 11:53:31 -07001275 @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)')
Miss Islington (bot)15772592021-07-09 21:07:35 -07001276 def test_copyfile_nonexistent_dir(self):
1277 # Issue 43219
1278 src_dir = self.mkdtemp()
1279 src_file = os.path.join(src_dir, 'foo')
1280 dst = os.path.join(src_dir, 'does_not_exist/')
1281 write_file(src_file, 'foo')
1282 self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst)
1283
Miss Islington (bot)41d48bc2021-09-21 15:14:40 -07001284 def test_copyfile_copy_dir(self):
1285 # Issue 45234
1286 # test copy() and copyfile() raising proper exceptions when src and/or
1287 # dst are directories
1288 src_dir = self.mkdtemp()
1289 src_file = os.path.join(src_dir, 'foo')
1290 dir2 = self.mkdtemp()
1291 dst = os.path.join(src_dir, 'does_not_exist/')
1292 write_file(src_file, 'foo')
1293 if sys.platform == "win32":
1294 err = PermissionError
1295 else:
1296 err = IsADirectoryError
1297
1298 self.assertRaises(err, shutil.copyfile, src_dir, dst)
1299 self.assertRaises(err, shutil.copyfile, src_file, src_dir)
1300 self.assertRaises(err, shutil.copyfile, dir2, src_dir)
1301
Tarek Ziadéfb437512010-04-20 08:57:33 +00001302
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001303class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001304
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001305 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001306
Hai Shia3ec3ad2020-05-19 06:02:57 +08001307 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001308 def test_make_tarball(self):
1309 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001310 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001311
1312 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001313 # force shutil to create the directory
1314 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001315 # working with relative paths
1316 work_dir = os.path.dirname(tmpdir2)
1317 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001318
Hai Shi0c4f0f32020-06-30 21:46:31 +08001319 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001320 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001321 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001322
1323 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001324 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001325 self.assertTrue(os.path.isfile(tarball))
1326 self.assertTrue(tarfile.is_tarfile(tarball))
1327 with tarfile.open(tarball, 'r:gz') as tf:
1328 self.assertCountEqual(tf.getnames(),
1329 ['.', './sub', './sub2',
1330 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001331
1332 # trying an uncompressed one
Hai Shi0c4f0f32020-06-30 21:46:31 +08001333 with os_helper.change_cwd(work_dir):
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001334 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001335 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001336 self.assertTrue(os.path.isfile(tarball))
1337 self.assertTrue(tarfile.is_tarfile(tarball))
1338 with tarfile.open(tarball, 'r') as tf:
1339 self.assertCountEqual(tf.getnames(),
1340 ['.', './sub', './sub2',
1341 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001342
1343 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001344 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001345 names = tar.getnames()
1346 names.sort()
1347 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001348
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001349 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001350 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001351 root_dir = self.mkdtemp()
1352 dist = os.path.join(root_dir, base_dir)
1353 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001354 write_file((dist, 'file1'), 'xxx')
1355 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001356 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001357 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001358 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001359 if base_dir:
1360 write_file((root_dir, 'outer'), 'xxx')
1361 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001362
Hai Shia3ec3ad2020-05-19 06:02:57 +08001363 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001364 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001365 'Need the tar command to run')
1366 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001367 root_dir, base_dir = self._create_files()
1368 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001369 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001370
1371 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001372 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001373 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001374
1375 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001376 tarball2 = os.path.join(root_dir, 'archive2.tar')
1377 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001378 subprocess.check_call(tar_cmd, cwd=root_dir,
1379 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001380
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001381 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001382 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001383 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001384
1385 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001386 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1387 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001388 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001389
1390 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001391 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1392 dry_run=True)
1393 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001394 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001395
Hai Shia3ec3ad2020-05-19 06:02:57 +08001396 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001397 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001398 # creating something to zip
1399 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001400
1401 tmpdir2 = self.mkdtemp()
1402 # force shutil to create the directory
1403 os.rmdir(tmpdir2)
1404 # working with relative paths
1405 work_dir = os.path.dirname(tmpdir2)
1406 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001407
Hai Shi0c4f0f32020-06-30 21:46:31 +08001408 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001409 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001410 res = make_archive(rel_base_name, 'zip', root_dir)
1411
1412 self.assertEqual(res, base_name + '.zip')
1413 self.assertTrue(os.path.isfile(res))
1414 self.assertTrue(zipfile.is_zipfile(res))
1415 with zipfile.ZipFile(res) as zf:
1416 self.assertCountEqual(zf.namelist(),
1417 ['dist/', 'dist/sub/', 'dist/sub2/',
1418 'dist/file1', 'dist/file2', 'dist/sub/file3',
1419 'outer'])
1420
Hai Shi0c4f0f32020-06-30 21:46:31 +08001421 with os_helper.change_cwd(work_dir):
Serhiy Storchaka666de772016-10-23 15:55:09 +03001422 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001423 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001424
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001425 self.assertEqual(res, base_name + '.zip')
1426 self.assertTrue(os.path.isfile(res))
1427 self.assertTrue(zipfile.is_zipfile(res))
1428 with zipfile.ZipFile(res) as zf:
1429 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001430 ['dist/', 'dist/sub/', 'dist/sub2/',
1431 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001432
Hai Shia3ec3ad2020-05-19 06:02:57 +08001433 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001434 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001435 'Need the zip command to run')
1436 def test_zipfile_vs_zip(self):
1437 root_dir, base_dir = self._create_files()
1438 base_name = os.path.join(self.mkdtemp(), 'archive')
1439 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1440
1441 # check if ZIP file was created
1442 self.assertEqual(archive, base_name + '.zip')
1443 self.assertTrue(os.path.isfile(archive))
1444
1445 # now create another ZIP file using `zip`
1446 archive2 = os.path.join(root_dir, 'archive2.zip')
1447 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001448 subprocess.check_call(zip_cmd, cwd=root_dir,
1449 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001450
1451 self.assertTrue(os.path.isfile(archive2))
1452 # let's compare both ZIP files
1453 with zipfile.ZipFile(archive) as zf:
1454 names = zf.namelist()
1455 with zipfile.ZipFile(archive2) as zf:
1456 names2 = zf.namelist()
1457 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001458
Hai Shia3ec3ad2020-05-19 06:02:57 +08001459 @support.requires_zlib()
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001460 @unittest.skipUnless(shutil.which('unzip'),
1461 'Need the unzip command to run')
1462 def test_unzip_zipfile(self):
1463 root_dir, base_dir = self._create_files()
1464 base_name = os.path.join(self.mkdtemp(), 'archive')
1465 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1466
1467 # check if ZIP file was created
1468 self.assertEqual(archive, base_name + '.zip')
1469 self.assertTrue(os.path.isfile(archive))
1470
1471 # now check the ZIP file using `unzip -t`
1472 zip_cmd = ['unzip', '-t', archive]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001473 with os_helper.change_cwd(root_dir):
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001474 try:
1475 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1476 except subprocess.CalledProcessError as exc:
1477 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001478 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001479 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001480 msg = "{}\n\n**Unzip Output**\n{}"
1481 self.fail(msg.format(exc, details))
1482
Tarek Ziadé396fad72010-02-23 05:30:31 +00001483 def test_make_archive(self):
1484 tmpdir = self.mkdtemp()
1485 base_name = os.path.join(tmpdir, 'archive')
1486 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1487
Hai Shia3ec3ad2020-05-19 06:02:57 +08001488 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001489 def test_make_archive_owner_group(self):
1490 # testing make_archive with owner and group, with various combinations
1491 # this works even if there's not gid/uid support
1492 if UID_GID_SUPPORT:
1493 group = grp.getgrgid(0)[0]
1494 owner = pwd.getpwuid(0)[0]
1495 else:
1496 group = owner = 'root'
1497
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001498 root_dir, base_dir = self._create_files()
1499 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001500 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1501 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001502 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001503
1504 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001505 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001506
1507 res = make_archive(base_name, 'tar', root_dir, base_dir,
1508 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001509 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001510
1511 res = make_archive(base_name, 'tar', root_dir, base_dir,
1512 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001513 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001514
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001515
Hai Shia3ec3ad2020-05-19 06:02:57 +08001516 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001517 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1518 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001519 root_dir, base_dir = self._create_files()
1520 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001521 group = grp.getgrgid(0)[0]
1522 owner = pwd.getpwuid(0)[0]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001523 with os_helper.change_cwd(root_dir):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001524 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1525 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001526
1527 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001528 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001529
1530 # now checks the rights
1531 archive = tarfile.open(archive_name)
1532 try:
1533 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001534 self.assertEqual(member.uid, 0)
1535 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001536 finally:
1537 archive.close()
1538
1539 def test_make_archive_cwd(self):
1540 current_dir = os.getcwd()
1541 def _breaks(*args, **kw):
1542 raise RuntimeError()
1543
1544 register_archive_format('xxx', _breaks, [], 'xxx file')
1545 try:
1546 try:
1547 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1548 except Exception:
1549 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001550 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001551 finally:
1552 unregister_archive_format('xxx')
1553
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001554 def test_make_tarfile_in_curdir(self):
1555 # Issue #21280
1556 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001557 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001558 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1559 self.assertTrue(os.path.isfile('test.tar'))
1560
Hai Shia3ec3ad2020-05-19 06:02:57 +08001561 @support.requires_zlib()
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001562 def test_make_zipfile_in_curdir(self):
1563 # Issue #21280
1564 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001565 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001566 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1567 self.assertTrue(os.path.isfile('test.zip'))
1568
Tarek Ziadé396fad72010-02-23 05:30:31 +00001569 def test_register_archive_format(self):
1570
1571 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1572 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1573 1)
1574 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1575 [(1, 2), (1, 2, 3)])
1576
1577 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1578 formats = [name for name, params in get_archive_formats()]
1579 self.assertIn('xxx', formats)
1580
1581 unregister_archive_format('xxx')
1582 formats = [name for name, params in get_archive_formats()]
1583 self.assertNotIn('xxx', formats)
1584
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001585 ### shutil.unpack_archive
1586
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001587 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001588 self.check_unpack_archive_with_converter(format, lambda path: path)
1589 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001590 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001591
1592 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001593 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001594 expected = rlistdir(root_dir)
1595 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001596
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001597 base_name = os.path.join(self.mkdtemp(), 'archive')
1598 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001599
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001600 # let's try to unpack it now
1601 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001602 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001603 self.assertEqual(rlistdir(tmpdir2), expected)
1604
1605 # and again, this time with the format specified
1606 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001607 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001608 self.assertEqual(rlistdir(tmpdir3), expected)
1609
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001610 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1611 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001612
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001613 def test_unpack_archive_tar(self):
1614 self.check_unpack_archive('tar')
1615
Hai Shia3ec3ad2020-05-19 06:02:57 +08001616 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001617 def test_unpack_archive_gztar(self):
1618 self.check_unpack_archive('gztar')
1619
Hai Shia3ec3ad2020-05-19 06:02:57 +08001620 @support.requires_bz2()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001621 def test_unpack_archive_bztar(self):
1622 self.check_unpack_archive('bztar')
1623
Hai Shia3ec3ad2020-05-19 06:02:57 +08001624 @support.requires_lzma()
Michael Feltef110b12019-02-18 12:02:44 +01001625 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001626 def test_unpack_archive_xztar(self):
1627 self.check_unpack_archive('xztar')
1628
Hai Shia3ec3ad2020-05-19 06:02:57 +08001629 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001630 def test_unpack_archive_zip(self):
1631 self.check_unpack_archive('zip')
1632
Martin Pantereb995702016-07-28 01:11:04 +00001633 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001634
1635 formats = get_unpack_formats()
1636
1637 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001638 self.assertEqual(extra, 1)
1639 self.assertEqual(filename, 'stuff.boo')
1640 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001641
1642 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1643 unpack_archive('stuff.boo', 'xx')
1644
1645 # trying to register a .boo unpacker again
1646 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1647 ['.boo'], _boo)
1648
1649 # should work now
1650 unregister_unpack_format('Boo')
1651 register_unpack_format('Boo2', ['.boo'], _boo)
1652 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1653 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1654
1655 # let's leave a clean state
1656 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001657 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001658
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001659
1660class TestMisc(BaseTest, unittest.TestCase):
1661
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001662 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1663 "disk_usage not available on this platform")
1664 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001665 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001666 for attr in ('total', 'used', 'free'):
1667 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001668 self.assertGreater(usage.total, 0)
1669 self.assertGreater(usage.used, 0)
1670 self.assertGreaterEqual(usage.free, 0)
1671 self.assertGreaterEqual(usage.total, usage.used)
1672 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001673
Victor Stinnerdc525f42018-12-11 12:05:21 +01001674 # bpo-32557: Check that disk_usage() also accepts a filename
1675 shutil.disk_usage(__file__)
1676
Sandro Tosid902a142011-08-22 23:28:27 +02001677 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1678 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1679 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001680 dirname = self.mkdtemp()
1681 filename = tempfile.mktemp(dir=dirname)
1682 write_file(filename, 'testing chown function')
1683
1684 with self.assertRaises(ValueError):
1685 shutil.chown(filename)
1686
1687 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001688 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001689
1690 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001691 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001692
1693 with self.assertRaises(TypeError):
1694 shutil.chown(filename, b'spam')
1695
1696 with self.assertRaises(TypeError):
1697 shutil.chown(filename, 3.14)
1698
1699 uid = os.getuid()
1700 gid = os.getgid()
1701
1702 def check_chown(path, uid=None, gid=None):
1703 s = os.stat(filename)
1704 if uid is not None:
1705 self.assertEqual(uid, s.st_uid)
1706 if gid is not None:
1707 self.assertEqual(gid, s.st_gid)
1708
1709 shutil.chown(filename, uid, gid)
1710 check_chown(filename, uid, gid)
1711 shutil.chown(filename, uid)
1712 check_chown(filename, uid)
1713 shutil.chown(filename, user=uid)
1714 check_chown(filename, uid)
1715 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001716 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001717
1718 shutil.chown(dirname, uid, gid)
1719 check_chown(dirname, uid, gid)
1720 shutil.chown(dirname, uid)
1721 check_chown(dirname, uid)
1722 shutil.chown(dirname, user=uid)
1723 check_chown(dirname, uid)
1724 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001725 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001726
Matthias Braun52268942020-03-17 09:51:44 -07001727 try:
1728 user = pwd.getpwuid(uid)[0]
1729 group = grp.getgrgid(gid)[0]
1730 except KeyError:
1731 # On some systems uid/gid cannot be resolved.
1732 pass
1733 else:
1734 shutil.chown(filename, user, group)
1735 check_chown(filename, uid, gid)
1736 shutil.chown(dirname, user, group)
1737 check_chown(dirname, uid, gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001738
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001739
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001740class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001741
1742 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001743 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001744 # Give the temp_file an ".exe" suffix for all.
1745 # It's needed on Windows and not harmful on other platforms.
1746 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001747 prefix="Tmp",
1748 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001749 os.chmod(self.temp_file.name, stat.S_IXUSR)
1750 self.addCleanup(self.temp_file.close)
1751 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001752 self.env_path = self.dir
1753 self.curdir = os.curdir
1754 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001755
1756 def test_basic(self):
1757 # Given an EXE in a directory, it should be returned.
1758 rv = shutil.which(self.file, path=self.dir)
1759 self.assertEqual(rv, self.temp_file.name)
1760
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001761 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001762 # When given the fully qualified path to an executable that exists,
1763 # it should be returned.
1764 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001765 self.assertEqual(rv, self.temp_file.name)
1766
1767 def test_relative_cmd(self):
1768 # When given the relative path with a directory part to an executable
1769 # that exists, it should be returned.
1770 base_dir, tail_dir = os.path.split(self.dir)
1771 relpath = os.path.join(tail_dir, self.file)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001772 with os_helper.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001773 rv = shutil.which(relpath, path=self.temp_dir)
1774 self.assertEqual(rv, relpath)
1775 # But it shouldn't be searched in PATH directories (issue #16957).
Hai Shi0c4f0f32020-06-30 21:46:31 +08001776 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001777 rv = shutil.which(relpath, path=base_dir)
1778 self.assertIsNone(rv)
1779
1780 def test_cwd(self):
1781 # Issue #16957
1782 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001783 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001784 rv = shutil.which(self.file, path=base_dir)
1785 if sys.platform == "win32":
1786 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001787 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001788 else:
1789 # Other platforms: shouldn't match in the current directory.
1790 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001791
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001792 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1793 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001794 def test_non_matching_mode(self):
1795 # Set the file read-only and ask for writeable files.
1796 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001797 if os.access(self.temp_file.name, os.W_OK):
1798 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001799 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1800 self.assertIsNone(rv)
1801
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001802 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001803 base_dir, tail_dir = os.path.split(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001804 with os_helper.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001805 rv = shutil.which(self.file, path=tail_dir)
1806 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001807
Brian Curtinc57a3452012-06-22 16:00:30 -05001808 def test_nonexistent_file(self):
1809 # Return None when no matching executable file is found on the path.
1810 rv = shutil.which("foo.exe", path=self.dir)
1811 self.assertIsNone(rv)
1812
1813 @unittest.skipUnless(sys.platform == "win32",
1814 "pathext check is Windows-only")
1815 def test_pathext_checking(self):
1816 # Ask for the file without the ".exe" extension, then ensure that
1817 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001818 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001819 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001820
Barry Warsaw618738b2013-04-16 11:05:03 -04001821 def test_environ_path(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001822 with os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001823 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001824 rv = shutil.which(self.file)
1825 self.assertEqual(rv, self.temp_file.name)
1826
Victor Stinner228a3c92019-04-17 16:26:36 +02001827 def test_environ_path_empty(self):
1828 # PATH='': no match
Hai Shi0c4f0f32020-06-30 21:46:31 +08001829 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001830 env['PATH'] = ''
1831 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1832 create=True), \
1833 support.swap_attr(os, 'defpath', self.dir), \
Hai Shi0c4f0f32020-06-30 21:46:31 +08001834 os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001835 rv = shutil.which(self.file)
1836 self.assertIsNone(rv)
1837
1838 def test_environ_path_cwd(self):
1839 expected_cwd = os.path.basename(self.temp_file.name)
1840 if sys.platform == "win32":
1841 curdir = os.curdir
1842 if isinstance(expected_cwd, bytes):
1843 curdir = os.fsencode(curdir)
1844 expected_cwd = os.path.join(curdir, expected_cwd)
1845
1846 # PATH=':': explicitly looks in the current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001847 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001848 env['PATH'] = os.pathsep
1849 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1850 create=True), \
1851 support.swap_attr(os, 'defpath', self.dir):
1852 rv = shutil.which(self.file)
1853 self.assertIsNone(rv)
1854
1855 # look in current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001856 with os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001857 rv = shutil.which(self.file)
1858 self.assertEqual(rv, expected_cwd)
1859
1860 def test_environ_path_missing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001861 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001862 env.pop('PATH', None)
1863
1864 # without confstr
1865 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1866 create=True), \
1867 support.swap_attr(os, 'defpath', self.dir):
1868 rv = shutil.which(self.file)
1869 self.assertEqual(rv, self.temp_file.name)
1870
1871 # with confstr
1872 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1873 create=True), \
1874 support.swap_attr(os, 'defpath', ''):
1875 rv = shutil.which(self.file)
1876 self.assertEqual(rv, self.temp_file.name)
1877
Barry Warsaw618738b2013-04-16 11:05:03 -04001878 def test_empty_path(self):
1879 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001880 with os_helper.change_cwd(path=self.dir), \
1881 os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001882 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001883 rv = shutil.which(self.file, path='')
1884 self.assertIsNone(rv)
1885
1886 def test_empty_path_no_PATH(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001887 with os_helper.EnvironmentVarGuard() as env:
Barry Warsaw618738b2013-04-16 11:05:03 -04001888 env.pop('PATH', None)
1889 rv = shutil.which(self.file)
1890 self.assertIsNone(rv)
1891
Victor Stinner228a3c92019-04-17 16:26:36 +02001892 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1893 def test_pathext(self):
1894 ext = ".xyz"
1895 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1896 prefix="Tmp2", suffix=ext)
1897 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1898 self.addCleanup(temp_filexyz.close)
1899
1900 # strip path and extension
1901 program = os.path.basename(temp_filexyz.name)
1902 program = os.path.splitext(program)[0]
1903
Hai Shi0c4f0f32020-06-30 21:46:31 +08001904 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001905 env['PATHEXT'] = ext
1906 rv = shutil.which(program, path=self.temp_dir)
1907 self.assertEqual(rv, temp_filexyz.name)
1908
Christopher Marchfelderda6f0982020-10-23 12:08:24 +02001909 # Issue 40592: See https://bugs.python.org/issue40592
1910 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1911 def test_pathext_with_empty_str(self):
1912 ext = ".xyz"
1913 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1914 prefix="Tmp2", suffix=ext)
1915 self.addCleanup(temp_filexyz.close)
1916
1917 # strip path and extension
1918 program = os.path.basename(temp_filexyz.name)
1919 program = os.path.splitext(program)[0]
1920
1921 with os_helper.EnvironmentVarGuard() as env:
1922 env['PATHEXT'] = f"{ext};" # note the ;
1923 rv = shutil.which(program, path=self.temp_dir)
1924 self.assertEqual(rv, temp_filexyz.name)
1925
Brian Curtinc57a3452012-06-22 16:00:30 -05001926
Cheryl Sabella5680f652019-02-13 06:25:10 -05001927class TestWhichBytes(TestWhich):
1928 def setUp(self):
1929 TestWhich.setUp(self)
1930 self.dir = os.fsencode(self.dir)
1931 self.file = os.fsencode(self.file)
1932 self.temp_file.name = os.fsencode(self.temp_file.name)
1933 self.curdir = os.fsencode(self.curdir)
1934 self.ext = os.fsencode(self.ext)
1935
1936
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001937class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001938
1939 def setUp(self):
1940 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001941 self.src_dir = self.mkdtemp()
1942 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001943 self.src_file = os.path.join(self.src_dir, filename)
1944 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001945 with open(self.src_file, "wb") as f:
1946 f.write(b"spam")
1947
Christian Heimesada8c3b2008-03-18 18:26:33 +00001948 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001949 with open(src, "rb") as f:
1950 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001951 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001952 with open(real_dst, "rb") as f:
1953 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001954 self.assertFalse(os.path.exists(src))
1955
1956 def _check_move_dir(self, src, dst, real_dst):
1957 contents = sorted(os.listdir(src))
1958 shutil.move(src, dst)
1959 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1960 self.assertFalse(os.path.exists(src))
1961
1962 def test_move_file(self):
1963 # Move a file to another location on the same filesystem.
1964 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1965
1966 def test_move_file_to_dir(self):
1967 # Move a file inside an existing dir on the same filesystem.
1968 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1969
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001970 def test_move_file_to_dir_pathlike_src(self):
1971 # Move a pathlike file to another location on the same filesystem.
1972 src = pathlib.Path(self.src_file)
1973 self._check_move_file(src, self.dst_dir, self.dst_file)
1974
1975 def test_move_file_to_dir_pathlike_dst(self):
1976 # Move a file to another pathlike location on the same filesystem.
1977 dst = pathlib.Path(self.dst_dir)
1978 self._check_move_file(self.src_file, dst, self.dst_file)
1979
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001980 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001981 def test_move_file_other_fs(self):
1982 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001983 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001984
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001985 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001986 def test_move_file_to_dir_other_fs(self):
1987 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001988 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001989
1990 def test_move_dir(self):
1991 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001992 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001993 try:
1994 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1995 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001996 os_helper.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001997
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001998 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001999 def test_move_dir_other_fs(self):
2000 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04002001 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00002002
2003 def test_move_dir_to_dir(self):
2004 # Move a dir inside an existing dir on the same filesystem.
2005 self._check_move_dir(self.src_dir, self.dst_dir,
2006 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
2007
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04002008 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00002009 def test_move_dir_to_dir_other_fs(self):
2010 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04002011 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00002012
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02002013 def test_move_dir_sep_to_dir(self):
2014 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
2015 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
2016
2017 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
2018 def test_move_dir_altsep_to_dir(self):
2019 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
2020 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
2021
Christian Heimesada8c3b2008-03-18 18:26:33 +00002022 def test_existing_file_inside_dest_dir(self):
2023 # A file with the same name inside the destination dir already exists.
2024 with open(self.dst_file, "wb"):
2025 pass
2026 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
2027
2028 def test_dont_move_dir_in_itself(self):
2029 # Moving a dir inside itself raises an Error.
2030 dst = os.path.join(self.src_dir, "bar")
2031 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
2032
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002033 def test_destinsrc_false_negative(self):
2034 os.mkdir(TESTFN)
2035 try:
2036 for src, dst in [('srcdir', 'srcdir/dest')]:
2037 src = os.path.join(TESTFN, src)
2038 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002039 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00002040 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002041 'dst (%s) is not in src (%s)' % (dst, src))
2042 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002043 os_helper.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00002044
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002045 def test_destinsrc_false_positive(self):
2046 os.mkdir(TESTFN)
2047 try:
2048 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
2049 src = os.path.join(TESTFN, src)
2050 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002051 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00002052 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002053 'dst (%s) is in src (%s)' % (dst, src))
2054 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002055 os_helper.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00002056
Hai Shi0c4f0f32020-06-30 21:46:31 +08002057 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002058 @mock_rename
2059 def test_move_file_symlink(self):
2060 dst = os.path.join(self.src_dir, 'bar')
2061 os.symlink(self.src_file, dst)
2062 shutil.move(dst, self.dst_file)
2063 self.assertTrue(os.path.islink(self.dst_file))
2064 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
2065
Hai Shi0c4f0f32020-06-30 21:46:31 +08002066 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002067 @mock_rename
2068 def test_move_file_symlink_to_dir(self):
2069 filename = "bar"
2070 dst = os.path.join(self.src_dir, filename)
2071 os.symlink(self.src_file, dst)
2072 shutil.move(dst, self.dst_dir)
2073 final_link = os.path.join(self.dst_dir, filename)
2074 self.assertTrue(os.path.islink(final_link))
2075 self.assertTrue(os.path.samefile(self.src_file, final_link))
2076
Hai Shi0c4f0f32020-06-30 21:46:31 +08002077 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002078 @mock_rename
2079 def test_move_dangling_symlink(self):
2080 src = os.path.join(self.src_dir, 'baz')
2081 dst = os.path.join(self.src_dir, 'bar')
2082 os.symlink(src, dst)
2083 dst_link = os.path.join(self.dst_dir, 'quux')
2084 shutil.move(dst, dst_link)
2085 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07002086 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002087
Hai Shi0c4f0f32020-06-30 21:46:31 +08002088 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002089 @mock_rename
2090 def test_move_dir_symlink(self):
2091 src = os.path.join(self.src_dir, 'baz')
2092 dst = os.path.join(self.src_dir, 'bar')
2093 os.mkdir(src)
2094 os.symlink(src, dst)
2095 dst_link = os.path.join(self.dst_dir, 'quux')
2096 shutil.move(dst, dst_link)
2097 self.assertTrue(os.path.islink(dst_link))
2098 self.assertTrue(os.path.samefile(src, dst_link))
2099
Brian Curtin0d0a1de2012-06-18 18:41:07 -05002100 def test_move_return_value(self):
2101 rv = shutil.move(self.src_file, self.dst_dir)
2102 self.assertEqual(rv,
2103 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2104
2105 def test_move_as_rename_return_value(self):
2106 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2107 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2108
R David Murray6ffface2014-06-11 14:40:13 -04002109 @mock_rename
2110 def test_move_file_special_function(self):
2111 moved = []
2112 def _copy(src, dst):
2113 moved.append((src, dst))
2114 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2115 self.assertEqual(len(moved), 1)
2116
2117 @mock_rename
2118 def test_move_dir_special_function(self):
2119 moved = []
2120 def _copy(src, dst):
2121 moved.append((src, dst))
Hai Shi0c4f0f32020-06-30 21:46:31 +08002122 os_helper.create_empty_file(os.path.join(self.src_dir, 'child'))
2123 os_helper.create_empty_file(os.path.join(self.src_dir, 'child1'))
R David Murray6ffface2014-06-11 14:40:13 -04002124 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2125 self.assertEqual(len(moved), 3)
2126
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002127 def test_move_dir_caseinsensitive(self):
2128 # Renames a folder to the same name
2129 # but a different case.
2130
2131 self.src_dir = self.mkdtemp()
2132 dst_dir = os.path.join(
2133 os.path.dirname(self.src_dir),
2134 os.path.basename(self.src_dir).upper())
2135 self.assertNotEqual(self.src_dir, dst_dir)
2136
2137 try:
2138 shutil.move(self.src_dir, dst_dir)
2139 self.assertTrue(os.path.isdir(dst_dir))
2140 finally:
2141 os.rmdir(dst_dir)
2142
Tarek Ziadé5340db32010-04-19 22:30:51 +00002143
Winson Luk132131b2021-03-02 15:53:15 -05002144 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0
2145 and hasattr(os, 'lchflags')
2146 and hasattr(stat, 'SF_IMMUTABLE')
2147 and hasattr(stat, 'UF_OPAQUE'),
2148 'root privileges required')
2149 def test_move_dir_permission_denied(self):
2150 # bpo-42782: shutil.move should not create destination directories
2151 # if the source directory cannot be removed.
2152 try:
2153 os.mkdir(TESTFN_SRC)
2154 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2155
2156 # Testing on an empty immutable directory
2157 # TESTFN_DST should not exist if shutil.move failed
2158 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2159 self.assertFalse(TESTFN_DST in os.listdir())
2160
2161 # Create a file and keep the directory immutable
2162 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2163 os_helper.create_empty_file(os.path.join(TESTFN_SRC, 'child'))
2164 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2165
2166 # Testing on a non-empty immutable directory
2167 # TESTFN_DST should not exist if shutil.move failed
2168 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2169 self.assertFalse(TESTFN_DST in os.listdir())
2170 finally:
2171 if os.path.exists(TESTFN_SRC):
2172 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2173 os_helper.rmtree(TESTFN_SRC)
2174 if os.path.exists(TESTFN_DST):
2175 os.lchflags(TESTFN_DST, stat.UF_OPAQUE)
2176 os_helper.rmtree(TESTFN_DST)
2177
2178
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002179class TestCopyFile(unittest.TestCase):
2180
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002181 class Faux(object):
2182 _entered = False
2183 _exited_with = None
2184 _raised = False
2185 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2186 self._raise_in_exit = raise_in_exit
2187 self._suppress_at_exit = suppress_at_exit
2188 def read(self, *args):
2189 return ''
2190 def __enter__(self):
2191 self._entered = True
2192 def __exit__(self, exc_type, exc_val, exc_tb):
2193 self._exited_with = exc_type, exc_val, exc_tb
2194 if self._raise_in_exit:
2195 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002196 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002197 return self._suppress_at_exit
2198
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002199 def test_w_source_open_fails(self):
2200 def _open(filename, mode='r'):
2201 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002202 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002203 assert 0 # shouldn't reach here.
2204
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002205 with support.swap_attr(shutil, 'open', _open):
2206 with self.assertRaises(OSError):
2207 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002208
Victor Stinner937ee9e2018-06-26 02:11:06 +02002209 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002210 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002211 srcfile = self.Faux()
2212
2213 def _open(filename, mode='r'):
2214 if filename == 'srcfile':
2215 return srcfile
2216 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002217 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002218 assert 0 # shouldn't reach here.
2219
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002220 with support.swap_attr(shutil, 'open', _open):
2221 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002222 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002223 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002224 self.assertEqual(srcfile._exited_with[1].args,
2225 ('Cannot open "destfile"',))
2226
Victor Stinner937ee9e2018-06-26 02:11:06 +02002227 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002228 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002229 srcfile = self.Faux()
2230 destfile = self.Faux(True)
2231
2232 def _open(filename, mode='r'):
2233 if filename == 'srcfile':
2234 return srcfile
2235 if filename == 'destfile':
2236 return destfile
2237 assert 0 # shouldn't reach here.
2238
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002239 with support.swap_attr(shutil, 'open', _open):
2240 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002241 self.assertTrue(srcfile._entered)
2242 self.assertTrue(destfile._entered)
2243 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002244 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002245 self.assertEqual(srcfile._exited_with[1].args,
2246 ('Cannot close',))
2247
Victor Stinner937ee9e2018-06-26 02:11:06 +02002248 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002249 def test_w_source_close_fails(self):
2250
2251 srcfile = self.Faux(True)
2252 destfile = self.Faux()
2253
2254 def _open(filename, mode='r'):
2255 if filename == 'srcfile':
2256 return srcfile
2257 if filename == 'destfile':
2258 return destfile
2259 assert 0 # shouldn't reach here.
2260
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002261 with support.swap_attr(shutil, 'open', _open):
2262 with self.assertRaises(OSError):
2263 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002264 self.assertTrue(srcfile._entered)
2265 self.assertTrue(destfile._entered)
2266 self.assertFalse(destfile._raised)
2267 self.assertTrue(srcfile._exited_with[0] is None)
2268 self.assertTrue(srcfile._raised)
2269
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002270
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002271class TestCopyFileObj(unittest.TestCase):
2272 FILESIZE = 2 * 1024 * 1024
2273
2274 @classmethod
2275 def setUpClass(cls):
2276 write_test_file(TESTFN, cls.FILESIZE)
2277
2278 @classmethod
2279 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002280 os_helper.unlink(TESTFN)
2281 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002282
2283 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002284 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002285
2286 @contextlib.contextmanager
2287 def get_files(self):
2288 with open(TESTFN, "rb") as src:
2289 with open(TESTFN2, "wb") as dst:
2290 yield (src, dst)
2291
2292 def assert_files_eq(self, src, dst):
2293 with open(src, 'rb') as fsrc:
2294 with open(dst, 'rb') as fdst:
2295 self.assertEqual(fsrc.read(), fdst.read())
2296
2297 def test_content(self):
2298 with self.get_files() as (src, dst):
2299 shutil.copyfileobj(src, dst)
2300 self.assert_files_eq(TESTFN, TESTFN2)
2301
2302 def test_file_not_closed(self):
2303 with self.get_files() as (src, dst):
2304 shutil.copyfileobj(src, dst)
2305 assert not src.closed
2306 assert not dst.closed
2307
2308 def test_file_offset(self):
2309 with self.get_files() as (src, dst):
2310 shutil.copyfileobj(src, dst)
2311 self.assertEqual(src.tell(), self.FILESIZE)
2312 self.assertEqual(dst.tell(), self.FILESIZE)
2313
2314 @unittest.skipIf(os.name != 'nt', "Windows only")
2315 def test_win_impl(self):
2316 # Make sure alternate Windows implementation is called.
2317 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2318 shutil.copyfile(TESTFN, TESTFN2)
2319 assert m.called
2320
2321 # File size is 2 MiB but max buf size should be 1 MiB.
2322 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2323
2324 # If file size < 1 MiB memoryview() length must be equal to
2325 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002326 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002327 f.write(b'foo')
2328 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002329 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002330 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2331 shutil.copyfile(fname, TESTFN2)
2332 self.assertEqual(m.call_args[0][2], 3)
2333
2334 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002335 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002336 pass
2337 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002338 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002339 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2340 shutil.copyfile(fname, TESTFN2)
2341 assert not m.called
2342 self.assert_files_eq(fname, TESTFN2)
2343
2344
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002345class _ZeroCopyFileTest(object):
2346 """Tests common to all zero-copy APIs."""
2347 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2348 FILEDATA = b""
2349 PATCHPOINT = ""
2350
2351 @classmethod
2352 def setUpClass(cls):
2353 write_test_file(TESTFN, cls.FILESIZE)
2354 with open(TESTFN, 'rb') as f:
2355 cls.FILEDATA = f.read()
2356 assert len(cls.FILEDATA) == cls.FILESIZE
2357
2358 @classmethod
2359 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002360 os_helper.unlink(TESTFN)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002361
2362 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002363 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002364
2365 @contextlib.contextmanager
2366 def get_files(self):
2367 with open(TESTFN, "rb") as src:
2368 with open(TESTFN2, "wb") as dst:
2369 yield (src, dst)
2370
2371 def zerocopy_fun(self, *args, **kwargs):
2372 raise NotImplementedError("must be implemented in subclass")
2373
2374 def reset(self):
2375 self.tearDown()
2376 self.tearDownClass()
2377 self.setUpClass()
2378 self.setUp()
2379
2380 # ---
2381
2382 def test_regular_copy(self):
2383 with self.get_files() as (src, dst):
2384 self.zerocopy_fun(src, dst)
2385 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2386 # Make sure the fallback function is not called.
2387 with self.get_files() as (src, dst):
2388 with unittest.mock.patch('shutil.copyfileobj') as m:
2389 shutil.copyfile(TESTFN, TESTFN2)
2390 assert not m.called
2391
2392 def test_same_file(self):
2393 self.addCleanup(self.reset)
2394 with self.get_files() as (src, dst):
2395 with self.assertRaises(Exception):
2396 self.zerocopy_fun(src, src)
2397 # Make sure src file is not corrupted.
2398 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2399
2400 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002401 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002402 with self.assertRaises(FileNotFoundError) as cm:
2403 shutil.copyfile(name, "new")
2404 self.assertEqual(cm.exception.filename, name)
2405
2406 def test_empty_file(self):
2407 srcname = TESTFN + 'src'
2408 dstname = TESTFN + 'dst'
Hai Shi0c4f0f32020-06-30 21:46:31 +08002409 self.addCleanup(lambda: os_helper.unlink(srcname))
2410 self.addCleanup(lambda: os_helper.unlink(dstname))
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002411 with open(srcname, "wb"):
2412 pass
2413
2414 with open(srcname, "rb") as src:
2415 with open(dstname, "wb") as dst:
2416 self.zerocopy_fun(src, dst)
2417
2418 self.assertEqual(read_file(dstname, binary=True), b"")
2419
2420 def test_unhandled_exception(self):
2421 with unittest.mock.patch(self.PATCHPOINT,
2422 side_effect=ZeroDivisionError):
2423 self.assertRaises(ZeroDivisionError,
2424 shutil.copyfile, TESTFN, TESTFN2)
2425
2426 def test_exception_on_first_call(self):
2427 # Emulate a case where the first call to the zero-copy
2428 # function raises an exception in which case the function is
2429 # supposed to give up immediately.
2430 with unittest.mock.patch(self.PATCHPOINT,
2431 side_effect=OSError(errno.EINVAL, "yo")):
2432 with self.get_files() as (src, dst):
2433 with self.assertRaises(_GiveupOnFastCopy):
2434 self.zerocopy_fun(src, dst)
2435
2436 def test_filesystem_full(self):
2437 # Emulate a case where filesystem is full and sendfile() fails
2438 # on first call.
2439 with unittest.mock.patch(self.PATCHPOINT,
2440 side_effect=OSError(errno.ENOSPC, "yo")):
2441 with self.get_files() as (src, dst):
2442 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2443
2444
2445@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2446class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2447 PATCHPOINT = "os.sendfile"
2448
2449 def zerocopy_fun(self, fsrc, fdst):
2450 return shutil._fastcopy_sendfile(fsrc, fdst)
2451
2452 def test_non_regular_file_src(self):
2453 with io.BytesIO(self.FILEDATA) as src:
2454 with open(TESTFN2, "wb") as dst:
2455 with self.assertRaises(_GiveupOnFastCopy):
2456 self.zerocopy_fun(src, dst)
2457 shutil.copyfileobj(src, dst)
2458
2459 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2460
2461 def test_non_regular_file_dst(self):
2462 with open(TESTFN, "rb") as src:
2463 with io.BytesIO() as dst:
2464 with self.assertRaises(_GiveupOnFastCopy):
2465 self.zerocopy_fun(src, dst)
2466 shutil.copyfileobj(src, dst)
2467 dst.seek(0)
2468 self.assertEqual(dst.read(), self.FILEDATA)
2469
2470 def test_exception_on_second_call(self):
2471 def sendfile(*args, **kwargs):
2472 if not flag:
2473 flag.append(None)
2474 return orig_sendfile(*args, **kwargs)
2475 else:
2476 raise OSError(errno.EBADF, "yo")
2477
2478 flag = []
2479 orig_sendfile = os.sendfile
2480 with unittest.mock.patch('os.sendfile', create=True,
2481 side_effect=sendfile):
2482 with self.get_files() as (src, dst):
2483 with self.assertRaises(OSError) as cm:
2484 shutil._fastcopy_sendfile(src, dst)
2485 assert flag
2486 self.assertEqual(cm.exception.errno, errno.EBADF)
2487
2488 def test_cant_get_size(self):
2489 # Emulate a case where src file size cannot be determined.
2490 # Internally bufsize will be set to a small value and
2491 # sendfile() will be called repeatedly.
2492 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2493 with self.get_files() as (src, dst):
2494 shutil._fastcopy_sendfile(src, dst)
2495 assert m.called
2496 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2497
2498 def test_small_chunks(self):
2499 # Force internal file size detection to be smaller than the
2500 # actual file size. We want to force sendfile() to be called
2501 # multiple times, also in order to emulate a src fd which gets
2502 # bigger while it is being copied.
2503 mock = unittest.mock.Mock()
2504 mock.st_size = 65536 + 1
2505 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2506 with self.get_files() as (src, dst):
2507 shutil._fastcopy_sendfile(src, dst)
2508 assert m.called
2509 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2510
2511 def test_big_chunk(self):
2512 # Force internal file size detection to be +100MB bigger than
2513 # the actual file size. Make sure sendfile() does not rely on
2514 # file size value except for (maybe) a better throughput /
2515 # performance.
2516 mock = unittest.mock.Mock()
2517 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2518 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2519 with self.get_files() as (src, dst):
2520 shutil._fastcopy_sendfile(src, dst)
2521 assert m.called
2522 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2523
2524 def test_blocksize_arg(self):
2525 with unittest.mock.patch('os.sendfile',
2526 side_effect=ZeroDivisionError) as m:
2527 self.assertRaises(ZeroDivisionError,
2528 shutil.copyfile, TESTFN, TESTFN2)
2529 blocksize = m.call_args[0][3]
2530 # Make sure file size and the block size arg passed to
2531 # sendfile() are the same.
2532 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2533 # ...unless we're dealing with a small file.
Hai Shi0c4f0f32020-06-30 21:46:31 +08002534 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002535 write_file(TESTFN2, b"hello", binary=True)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002536 self.addCleanup(os_helper.unlink, TESTFN2 + '3')
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002537 self.assertRaises(ZeroDivisionError,
2538 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2539 blocksize = m.call_args[0][3]
2540 self.assertEqual(blocksize, 2 ** 23)
2541
2542 def test_file2file_not_supported(self):
2543 # Emulate a case where sendfile() only support file->socket
2544 # fds. In such a case copyfile() is supposed to skip the
2545 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002546 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002547 try:
2548 with unittest.mock.patch(
2549 self.PATCHPOINT,
2550 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2551 with self.get_files() as (src, dst):
2552 with self.assertRaises(_GiveupOnFastCopy):
2553 shutil._fastcopy_sendfile(src, dst)
2554 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002555 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002556
2557 with unittest.mock.patch(self.PATCHPOINT) as m:
2558 shutil.copyfile(TESTFN, TESTFN2)
2559 assert not m.called
2560 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002561 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002562
2563
Victor Stinner937ee9e2018-06-26 02:11:06 +02002564@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002565class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002566 PATCHPOINT = "posix._fcopyfile"
2567
2568 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002569 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002570
2571
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002572class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002573 def test_does_not_crash(self):
2574 """Check if get_terminal_size() returns a meaningful value.
2575
2576 There's no easy portable way to actually check the size of the
2577 terminal, so let's check if it returns something sensible instead.
2578 """
2579 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002580 self.assertGreaterEqual(size.columns, 0)
2581 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002582
2583 def test_os_environ_first(self):
2584 "Check if environment variables have precedence"
2585
Hai Shi0c4f0f32020-06-30 21:46:31 +08002586 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002587 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002588 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002589 size = shutil.get_terminal_size()
2590 self.assertEqual(size.columns, 777)
2591
Hai Shi0c4f0f32020-06-30 21:46:31 +08002592 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002593 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002594 env['LINES'] = '888'
2595 size = shutil.get_terminal_size()
2596 self.assertEqual(size.lines, 888)
2597
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002598 def test_bad_environ(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002599 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002600 env['COLUMNS'] = 'xxx'
2601 env['LINES'] = 'yyy'
2602 size = shutil.get_terminal_size()
2603 self.assertGreaterEqual(size.columns, 0)
2604 self.assertGreaterEqual(size.lines, 0)
2605
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002606 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002607 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2608 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002609 def test_stty_match(self):
2610 """Check if stty returns the same results ignoring env
2611
2612 This test will fail if stdin and stdout are connected to
2613 different terminals with different sizes. Nevertheless, such
2614 situations should be pretty rare.
2615 """
2616 try:
2617 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002618 except (FileNotFoundError, PermissionError,
2619 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002620 self.skipTest("stty invocation failed")
2621 expected = (int(size[1]), int(size[0])) # reversed order
2622
Hai Shi0c4f0f32020-06-30 21:46:31 +08002623 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002624 del env['LINES']
2625 del env['COLUMNS']
2626 actual = shutil.get_terminal_size()
2627
2628 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002629
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002630 def test_fallback(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002631 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002632 del env['LINES']
2633 del env['COLUMNS']
2634
2635 # sys.__stdout__ has no fileno()
2636 with support.swap_attr(sys, '__stdout__', None):
2637 size = shutil.get_terminal_size(fallback=(10, 20))
2638 self.assertEqual(size.columns, 10)
2639 self.assertEqual(size.lines, 20)
2640
2641 # sys.__stdout__ is not a terminal on Unix
2642 # or fileno() not in (0, 1, 2) on Windows
Inada Naokic8e5eb92021-04-05 13:11:23 +09002643 with open(os.devnull, 'w', encoding='utf-8') as f, \
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002644 support.swap_attr(sys, '__stdout__', f):
2645 size = shutil.get_terminal_size(fallback=(30, 40))
2646 self.assertEqual(size.columns, 30)
2647 self.assertEqual(size.lines, 40)
2648
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002649
Berker Peksag8083cd62014-11-01 11:04:06 +02002650class PublicAPITests(unittest.TestCase):
2651 """Ensures that the correct values are exposed in the public API."""
2652
2653 def test_module_all_attribute(self):
2654 self.assertTrue(hasattr(shutil, '__all__'))
2655 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2656 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2657 'SpecialFileError', 'ExecError', 'make_archive',
2658 'get_archive_formats', 'register_archive_format',
2659 'unregister_archive_format', 'get_unpack_formats',
2660 'register_unpack_format', 'unregister_unpack_format',
2661 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2662 'get_terminal_size', 'SameFileError']
2663 if hasattr(os, 'statvfs') or os.name == 'nt':
2664 target_api.append('disk_usage')
2665 self.assertEqual(set(shutil.__all__), set(target_api))
2666
2667
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002668if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002669 unittest.main()