blob: 7bf60fd566e13f3c954e583186ec3bed2aefa614 [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Hai Shi0c4f0f32020-06-30 21:46:31 +080033from test.support import os_helper
34from test.support.os_helper import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030035
Antoine Pitrou7fff0962009-05-01 21:09:44 +000036TESTFN2 = TESTFN + "2"
Winson Luk132131b2021-03-02 15:53:15 -050037TESTFN_SRC = TESTFN + "_SRC"
38TESTFN_DST = TESTFN + "_DST"
Victor Stinner937ee9e2018-06-26 02:11:06 +020039MACOS = sys.platform.startswith("darwin")
Miss Islington (bot)574da462021-07-20 11:53:31 -070040SOLARIS = sys.platform.startswith("sunos")
Michael Feltef110b12019-02-18 12:02:44 +010041AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000042try:
43 import grp
44 import pwd
45 UID_GID_SUPPORT = True
46except ImportError:
47 UID_GID_SUPPORT = False
48
Steve Dowerdf2d4a62019-08-21 15:27:33 -070049try:
50 import _winapi
51except ImportError:
52 _winapi = None
53
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040054def _fake_rename(*args, **kwargs):
55 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010056 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040057
58def mock_rename(func):
59 @functools.wraps(func)
60 def wrap(*args, **kwargs):
61 try:
62 builtin_rename = os.rename
63 os.rename = _fake_rename
64 return func(*args, **kwargs)
65 finally:
66 os.rename = builtin_rename
67 return wrap
68
Éric Araujoa7e33a12011-08-12 19:51:35 +020069def write_file(path, content, binary=False):
70 """Write *content* to a file located at *path*.
71
72 If *path* is a tuple instead of a string, os.path.join will be used to
73 make a path. If *binary* is true, the file will be opened in binary
74 mode.
75 """
76 if isinstance(path, tuple):
77 path = os.path.join(*path)
Inada Naokic8e5eb92021-04-05 13:11:23 +090078 mode = 'wb' if binary else 'w'
79 encoding = None if binary else "utf-8"
80 with open(path, mode, encoding=encoding) as fp:
Éric Araujoa7e33a12011-08-12 19:51:35 +020081 fp.write(content)
82
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020083def write_test_file(path, size):
84 """Create a test file with an arbitrary size and random text content."""
85 def chunks(total, step):
86 assert total >= step
87 while total > step:
88 yield step
89 total -= step
90 if total:
91 yield total
92
93 bufsize = min(size, 8192)
94 chunk = b"".join([random.choice(string.ascii_letters).encode()
95 for i in range(bufsize)])
96 with open(path, 'wb') as f:
97 for csize in chunks(size, bufsize):
98 f.write(chunk)
99 assert os.path.getsize(path) == size
100
Éric Araujoa7e33a12011-08-12 19:51:35 +0200101def read_file(path, binary=False):
102 """Return contents from a file located at *path*.
103
104 If *path* is a tuple instead of a string, os.path.join will be used to
105 make a path. If *binary* is true, the file will be opened in binary
106 mode.
107 """
108 if isinstance(path, tuple):
109 path = os.path.join(*path)
Inada Naokic8e5eb92021-04-05 13:11:23 +0900110 mode = 'rb' if binary else 'r'
111 encoding = None if binary else "utf-8"
112 with open(path, mode, encoding=encoding) as fp:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200113 return fp.read()
114
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300115def rlistdir(path):
116 res = []
117 for name in sorted(os.listdir(path)):
118 p = os.path.join(path, name)
119 if os.path.isdir(p) and not os.path.islink(p):
120 res.append(name + '/')
121 for n in rlistdir(p):
122 res.append(name + '/' + n)
123 else:
124 res.append(name)
125 return res
126
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200127def supports_file2file_sendfile():
128 # ...apparently Linux and Solaris are the only ones
129 if not hasattr(os, "sendfile"):
130 return False
131 srcname = None
132 dstname = None
133 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800134 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200135 srcname = f.name
136 f.write(b"0123456789")
137
138 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800139 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200140 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200141 infd = src.fileno()
142 outfd = dst.fileno()
143 try:
144 os.sendfile(outfd, infd, 0, 2)
145 except OSError:
146 return False
147 else:
148 return True
149 finally:
150 if srcname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800151 os_helper.unlink(srcname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200152 if dstname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800153 os_helper.unlink(dstname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200154
155
156SUPPORTS_SENDFILE = supports_file2file_sendfile()
157
Michael Feltef110b12019-02-18 12:02:44 +0100158# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
159# The AIX command 'dump -o program' gives XCOFF header information
160# The second word of the last line in the maxdata value
161# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
162def _maxdataOK():
163 if AIX and sys.maxsize == 2147483647:
164 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
165 maxdata=hdrs.split("\n")[-1].split()[1]
166 return int(maxdata,16) >= 0x20000000
167 else:
168 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200169
Tarek Ziadé396fad72010-02-23 05:30:31 +0000170
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300171class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000172
Steve Dowerabde52c2019-11-15 09:49:21 -0800173 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000174 """Create a temporary directory that will be cleaned up.
175
176 Returns the path of the directory.
177 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800178 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Hai Shi0c4f0f32020-06-30 21:46:31 +0800179 self.addCleanup(os_helper.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000180 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000181
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300182
183class TestRmTree(BaseTest, unittest.TestCase):
184
Hynek Schlawack3b527782012-06-25 13:27:31 +0200185 def test_rmtree_works_on_bytes(self):
186 tmp = self.mkdtemp()
187 victim = os.path.join(tmp, 'killme')
188 os.mkdir(victim)
189 write_file(os.path.join(victim, 'somefile'), 'foo')
190 victim = os.fsencode(victim)
191 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700192 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200193
Hai Shi0c4f0f32020-06-30 21:46:31 +0800194 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200195 def test_rmtree_fails_on_symlink(self):
196 tmp = self.mkdtemp()
197 dir_ = os.path.join(tmp, 'dir')
198 os.mkdir(dir_)
199 link = os.path.join(tmp, 'link')
200 os.symlink(dir_, link)
201 self.assertRaises(OSError, shutil.rmtree, link)
202 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100203 self.assertTrue(os.path.lexists(link))
204 errors = []
205 def onerror(*args):
206 errors.append(args)
207 shutil.rmtree(link, onerror=onerror)
208 self.assertEqual(len(errors), 1)
209 self.assertIs(errors[0][0], os.path.islink)
210 self.assertEqual(errors[0][1], link)
211 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200212
Hai Shi0c4f0f32020-06-30 21:46:31 +0800213 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200214 def test_rmtree_works_on_symlinks(self):
215 tmp = self.mkdtemp()
216 dir1 = os.path.join(tmp, 'dir1')
217 dir2 = os.path.join(dir1, 'dir2')
218 dir3 = os.path.join(tmp, 'dir3')
219 for d in dir1, dir2, dir3:
220 os.mkdir(d)
221 file1 = os.path.join(tmp, 'file1')
222 write_file(file1, 'foo')
223 link1 = os.path.join(dir1, 'link1')
224 os.symlink(dir2, link1)
225 link2 = os.path.join(dir1, 'link2')
226 os.symlink(dir3, link2)
227 link3 = os.path.join(dir1, 'link3')
228 os.symlink(file1, link3)
229 # make sure symlinks are removed but not followed
230 shutil.rmtree(dir1)
231 self.assertFalse(os.path.exists(dir1))
232 self.assertTrue(os.path.exists(dir3))
233 self.assertTrue(os.path.exists(file1))
234
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700235 @unittest.skipUnless(_winapi, 'only relevant on Windows')
236 def test_rmtree_fails_on_junctions(self):
237 tmp = self.mkdtemp()
238 dir_ = os.path.join(tmp, 'dir')
239 os.mkdir(dir_)
240 link = os.path.join(tmp, 'link')
241 _winapi.CreateJunction(dir_, link)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800242 self.addCleanup(os_helper.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700243 self.assertRaises(OSError, shutil.rmtree, link)
244 self.assertTrue(os.path.exists(dir_))
245 self.assertTrue(os.path.lexists(link))
246 errors = []
247 def onerror(*args):
248 errors.append(args)
249 shutil.rmtree(link, onerror=onerror)
250 self.assertEqual(len(errors), 1)
251 self.assertIs(errors[0][0], os.path.islink)
252 self.assertEqual(errors[0][1], link)
253 self.assertIsInstance(errors[0][2][1], OSError)
254
255 @unittest.skipUnless(_winapi, 'only relevant on Windows')
256 def test_rmtree_works_on_junctions(self):
257 tmp = self.mkdtemp()
258 dir1 = os.path.join(tmp, 'dir1')
259 dir2 = os.path.join(dir1, 'dir2')
260 dir3 = os.path.join(tmp, 'dir3')
261 for d in dir1, dir2, dir3:
262 os.mkdir(d)
263 file1 = os.path.join(tmp, 'file1')
264 write_file(file1, 'foo')
265 link1 = os.path.join(dir1, 'link1')
266 _winapi.CreateJunction(dir2, link1)
267 link2 = os.path.join(dir1, 'link2')
268 _winapi.CreateJunction(dir3, link2)
269 link3 = os.path.join(dir1, 'link3')
270 _winapi.CreateJunction(file1, link3)
271 # make sure junctions are removed but not followed
272 shutil.rmtree(dir1)
273 self.assertFalse(os.path.exists(dir1))
274 self.assertTrue(os.path.exists(dir3))
275 self.assertTrue(os.path.exists(file1))
276
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000277 def test_rmtree_errors(self):
278 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800279 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100280 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
281 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100282 shutil.rmtree(filename, ignore_errors=True)
283
284 # existing file
285 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100286 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100287 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100288 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100289 shutil.rmtree(filename)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100290 self.assertEqual(cm.exception.filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100291 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100292 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100293 shutil.rmtree(filename, ignore_errors=True)
294 self.assertTrue(os.path.exists(filename))
295 errors = []
296 def onerror(*args):
297 errors.append(args)
298 shutil.rmtree(filename, onerror=onerror)
299 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200300 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100301 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100302 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100303 self.assertEqual(errors[0][2][1].filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100304 self.assertIs(errors[1][0], os.rmdir)
305 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100306 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100307 self.assertEqual(errors[1][2][1].filename, filename)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000308
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000309
Serhiy Storchaka43767632013-11-03 21:31:38 +0200310 @unittest.skipIf(sys.platform[:6] == 'cygwin',
311 "This test can't be run on Cygwin (issue #1071513).")
312 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
313 "This test can't be run reliably as root (issue #1076467).")
314 def test_on_error(self):
315 self.errorState = 0
316 os.mkdir(TESTFN)
317 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200318
Serhiy Storchaka43767632013-11-03 21:31:38 +0200319 self.child_file_path = os.path.join(TESTFN, 'a')
320 self.child_dir_path = os.path.join(TESTFN, 'b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800321 os_helper.create_empty_file(self.child_file_path)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200322 os.mkdir(self.child_dir_path)
323 old_dir_mode = os.stat(TESTFN).st_mode
324 old_child_file_mode = os.stat(self.child_file_path).st_mode
325 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
326 # Make unwritable.
327 new_mode = stat.S_IREAD|stat.S_IEXEC
328 os.chmod(self.child_file_path, new_mode)
329 os.chmod(self.child_dir_path, new_mode)
330 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000331
Serhiy Storchaka43767632013-11-03 21:31:38 +0200332 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
333 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
334 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200335
Serhiy Storchaka43767632013-11-03 21:31:38 +0200336 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
337 # Test whether onerror has actually been called.
338 self.assertEqual(self.errorState, 3,
339 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000340
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000341 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000342 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200343 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000344 # This function is run when shutil.rmtree fails.
345 # 99.9% of the time it initially fails to remove
346 # a file in the directory, so the first time through
347 # func is os.remove.
348 # However, some Linux machines running ZFS on
349 # FUSE experienced a failure earlier in the process
350 # at os.listdir. The first failure may legally
351 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200352 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200353 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200354 self.assertEqual(arg, self.child_file_path)
355 elif func is os.rmdir:
356 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000357 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200358 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200359 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000360 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200361 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000362 else:
363 self.assertEqual(func, os.rmdir)
364 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000365 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200366 self.errorState = 3
367
368 def test_rmtree_does_not_choke_on_failing_lstat(self):
369 try:
370 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200371 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200372 if fn != TESTFN:
373 raise OSError()
374 else:
375 return orig_lstat(fn)
376 os.lstat = raiser
377
378 os.mkdir(TESTFN)
379 write_file((TESTFN, 'foo'), 'foo')
380 shutil.rmtree(TESTFN)
381 finally:
382 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000383
Hynek Schlawack2100b422012-06-23 20:28:32 +0200384 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200385 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
386 os.supports_dir_fd and
387 os.listdir in os.supports_fd and
388 os.stat in os.supports_follow_symlinks)
389 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200390 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000391 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200392 tmp_dir = self.mkdtemp()
393 d = os.path.join(tmp_dir, 'a')
394 os.mkdir(d)
395 try:
396 real_rmtree = shutil._rmtree_safe_fd
397 class Called(Exception): pass
398 def _raiser(*args, **kwargs):
399 raise Called
400 shutil._rmtree_safe_fd = _raiser
401 self.assertRaises(Called, shutil.rmtree, d)
402 finally:
403 shutil._rmtree_safe_fd = real_rmtree
404 else:
405 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000406 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200407
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000408 def test_rmtree_dont_delete_file(self):
409 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800410 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200411 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200412 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000413 os.remove(path)
414
Hai Shi0c4f0f32020-06-30 21:46:31 +0800415 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300416 def test_rmtree_on_symlink(self):
417 # bug 1669.
418 os.mkdir(TESTFN)
419 try:
420 src = os.path.join(TESTFN, 'cheese')
421 dst = os.path.join(TESTFN, 'shop')
422 os.mkdir(src)
423 os.symlink(src, dst)
424 self.assertRaises(OSError, shutil.rmtree, dst)
425 shutil.rmtree(dst, ignore_errors=True)
426 finally:
427 shutil.rmtree(TESTFN, ignore_errors=True)
428
429 @unittest.skipUnless(_winapi, 'only relevant on Windows')
430 def test_rmtree_on_junction(self):
431 os.mkdir(TESTFN)
432 try:
433 src = os.path.join(TESTFN, 'cheese')
434 dst = os.path.join(TESTFN, 'shop')
435 os.mkdir(src)
436 open(os.path.join(src, 'spam'), 'wb').close()
437 _winapi.CreateJunction(src, dst)
438 self.assertRaises(OSError, shutil.rmtree, dst)
439 shutil.rmtree(dst, ignore_errors=True)
440 finally:
441 shutil.rmtree(TESTFN, ignore_errors=True)
442
443
444class TestCopyTree(BaseTest, unittest.TestCase):
445
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000446 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800447 src_dir = self.mkdtemp()
448 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200449 self.addCleanup(shutil.rmtree, src_dir)
450 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
451 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000452 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200453 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000454
Éric Araujoa7e33a12011-08-12 19:51:35 +0200455 shutil.copytree(src_dir, dst_dir)
456 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
457 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
458 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
459 'test.txt')))
460 actual = read_file((dst_dir, 'test.txt'))
461 self.assertEqual(actual, '123')
462 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
463 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000464
jab9e00d9e2018-12-28 13:03:40 -0500465 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800466 src_dir = self.mkdtemp()
467 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500468 self.addCleanup(shutil.rmtree, src_dir)
469 self.addCleanup(shutil.rmtree, dst_dir)
470
471 write_file((src_dir, 'nonexisting.txt'), '123')
472 os.mkdir(os.path.join(src_dir, 'existing_dir'))
473 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
474 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
475 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
476
477 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
478 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
479 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
480 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
481 'existing.txt')))
482 actual = read_file((dst_dir, 'nonexisting.txt'))
483 self.assertEqual(actual, '123')
484 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
485 self.assertEqual(actual, 'has been replaced')
486
487 with self.assertRaises(FileExistsError):
488 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
489
Hai Shi0c4f0f32020-06-30 21:46:31 +0800490 @os_helper.skip_unless_symlink
Antoine Pitrou78091e62011-12-29 18:54:15 +0100491 def test_copytree_symlinks(self):
492 tmp_dir = self.mkdtemp()
493 src_dir = os.path.join(tmp_dir, 'src')
494 dst_dir = os.path.join(tmp_dir, 'dst')
495 sub_dir = os.path.join(src_dir, 'sub')
496 os.mkdir(src_dir)
497 os.mkdir(sub_dir)
498 write_file((src_dir, 'file.txt'), 'foo')
499 src_link = os.path.join(sub_dir, 'link')
500 dst_link = os.path.join(dst_dir, 'sub/link')
501 os.symlink(os.path.join(src_dir, 'file.txt'),
502 src_link)
503 if hasattr(os, 'lchmod'):
504 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
505 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
506 os.lchflags(src_link, stat.UF_NODUMP)
507 src_stat = os.lstat(src_link)
508 shutil.copytree(src_dir, dst_dir, symlinks=True)
509 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700510 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
511 # Bad practice to blindly strip the prefix as it may be required to
512 # correctly refer to the file, but we're only comparing paths here.
513 if os.name == 'nt' and actual.startswith('\\\\?\\'):
514 actual = actual[4:]
515 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100516 dst_stat = os.lstat(dst_link)
517 if hasattr(os, 'lchmod'):
518 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
519 if hasattr(os, 'lchflags'):
520 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
521
Georg Brandl2ee470f2008-07-16 12:55:28 +0000522 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000523 # creating data
524 join = os.path.join
525 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800526 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000527 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800528 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200529 write_file((src_dir, 'test.txt'), '123')
530 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000531 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200532 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000533 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200534 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000535 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
536 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200537 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
538 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000539
540 # testing glob-like patterns
541 try:
542 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
543 shutil.copytree(src_dir, dst_dir, ignore=patterns)
544 # checking the result: some elements should not be copied
545 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200546 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
547 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000548 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200549 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000550 try:
551 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
552 shutil.copytree(src_dir, dst_dir, ignore=patterns)
553 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200554 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
555 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
556 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000557 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200558 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000559
560 # testing callable-style
561 try:
562 def _filter(src, names):
563 res = []
564 for name in names:
565 path = os.path.join(src, name)
566
567 if (os.path.isdir(path) and
568 path.split()[-1] == 'subdir'):
569 res.append(name)
570 elif os.path.splitext(path)[-1] in ('.py'):
571 res.append(name)
572 return res
573
574 shutil.copytree(src_dir, dst_dir, ignore=_filter)
575
576 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200577 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
578 'test.py')))
579 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000580
581 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200582 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000583 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000584 shutil.rmtree(src_dir)
585 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000586
mbarkhau88704332020-01-24 14:51:16 +0000587 def test_copytree_arg_types_of_ignore(self):
588 join = os.path.join
589 exists = os.path.exists
590
591 tmp_dir = self.mkdtemp()
592 src_dir = join(tmp_dir, "source")
593
594 os.mkdir(join(src_dir))
595 os.mkdir(join(src_dir, 'test_dir'))
596 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
597 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
598
599 invokations = []
600
601 def _ignore(src, names):
602 invokations.append(src)
603 self.assertIsInstance(src, str)
604 self.assertIsInstance(names, list)
605 self.assertEqual(len(names), len(set(names)))
606 for name in names:
607 self.assertIsInstance(name, str)
608 return []
609
610 dst_dir = join(self.mkdtemp(), 'destination')
611 shutil.copytree(src_dir, dst_dir, ignore=_ignore)
612 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
613 'test.txt')))
614
615 dst_dir = join(self.mkdtemp(), 'destination')
616 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
617 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
618 'test.txt')))
619
620 dst_dir = join(self.mkdtemp(), 'destination')
621 src_dir_entry = list(os.scandir(tmp_dir))[0]
622 self.assertIsInstance(src_dir_entry, os.DirEntry)
623 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
624 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
625 'test.txt')))
626
627 self.assertEqual(len(invokations), 9)
628
Antoine Pitrouac601602013-08-16 19:35:02 +0200629 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800630 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200631 src_dir = os.path.join(tmp_dir, 'source')
632 os.mkdir(src_dir)
633 dst_dir = os.path.join(tmp_dir, 'destination')
634 self.addCleanup(shutil.rmtree, tmp_dir)
635
636 os.chmod(src_dir, 0o777)
637 write_file((src_dir, 'permissive.txt'), '123')
638 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
639 write_file((src_dir, 'restrictive.txt'), '456')
640 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
641 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800642 self.addCleanup(os_helper.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200643 os.chmod(restrictive_subdir, 0o600)
644
645 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400646 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
647 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200648 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400649 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200650 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
651 restrictive_subdir_dst = os.path.join(dst_dir,
652 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400653 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200654 os.stat(restrictive_subdir_dst).st_mode)
655
Berker Peksag884afd92014-12-10 02:50:32 +0200656 @unittest.mock.patch('os.chmod')
657 def test_copytree_winerror(self, mock_patch):
658 # When copying to VFAT, copystat() raises OSError. On Windows, the
659 # exception object has a meaningful 'winerror' attribute, but not
660 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800661 src_dir = self.mkdtemp()
662 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200663 self.addCleanup(shutil.rmtree, src_dir)
664 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
665
666 mock_patch.side_effect = PermissionError('ka-boom')
667 with self.assertRaises(shutil.Error):
668 shutil.copytree(src_dir, dst_dir)
669
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100670 def test_copytree_custom_copy_function(self):
671 # See: https://bugs.python.org/issue35648
672 def custom_cpfun(a, b):
673 flag.append(None)
674 self.assertIsInstance(a, str)
675 self.assertIsInstance(b, str)
676 self.assertEqual(a, os.path.join(src, 'foo'))
677 self.assertEqual(b, os.path.join(dst, 'foo'))
678
679 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800680 src = self.mkdtemp()
681 dst = tempfile.mktemp(dir=self.mkdtemp())
Inada Naokic8e5eb92021-04-05 13:11:23 +0900682 with open(os.path.join(src, 'foo'), 'w', encoding='utf-8') as f:
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100683 f.close()
684 shutil.copytree(src, dst, copy_function=custom_cpfun)
685 self.assertEqual(len(flag), 1)
686
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300687 # Issue #3002: copyfile and copytree block indefinitely on named pipes
688 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800689 @os_helper.skip_unless_symlink
pxinwr6a273fd2020-11-29 06:06:36 +0800690 @unittest.skipIf(sys.platform == "vxworks",
691 "fifo requires special path on VxWorks")
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300692 def test_copytree_named_pipe(self):
693 os.mkdir(TESTFN)
694 try:
695 subdir = os.path.join(TESTFN, "subdir")
696 os.mkdir(subdir)
697 pipe = os.path.join(subdir, "mypipe")
698 try:
699 os.mkfifo(pipe)
700 except PermissionError as e:
701 self.skipTest('os.mkfifo(): %s' % e)
702 try:
703 shutil.copytree(TESTFN, TESTFN2)
704 except shutil.Error as e:
705 errors = e.args[0]
706 self.assertEqual(len(errors), 1)
707 src, dst, error_msg = errors[0]
708 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
709 else:
710 self.fail("shutil.Error should have been raised")
711 finally:
712 shutil.rmtree(TESTFN, ignore_errors=True)
713 shutil.rmtree(TESTFN2, ignore_errors=True)
714
715 def test_copytree_special_func(self):
716 src_dir = self.mkdtemp()
717 dst_dir = os.path.join(self.mkdtemp(), 'destination')
718 write_file((src_dir, 'test.txt'), '123')
719 os.mkdir(os.path.join(src_dir, 'test_dir'))
720 write_file((src_dir, 'test_dir', 'test.txt'), '456')
721
722 copied = []
723 def _copy(src, dst):
724 copied.append((src, dst))
725
726 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
727 self.assertEqual(len(copied), 2)
728
Hai Shi0c4f0f32020-06-30 21:46:31 +0800729 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300730 def test_copytree_dangling_symlinks(self):
731 # a dangling symlink raises an error at the end
732 src_dir = self.mkdtemp()
733 dst_dir = os.path.join(self.mkdtemp(), 'destination')
734 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
735 os.mkdir(os.path.join(src_dir, 'test_dir'))
736 write_file((src_dir, 'test_dir', 'test.txt'), '456')
737 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
738
739 # a dangling symlink is ignored with the proper flag
740 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
741 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
742 self.assertNotIn('test.txt', os.listdir(dst_dir))
743
744 # a dangling symlink is copied if symlinks=True
745 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
746 shutil.copytree(src_dir, dst_dir, symlinks=True)
747 self.assertIn('test.txt', os.listdir(dst_dir))
748
Hai Shi0c4f0f32020-06-30 21:46:31 +0800749 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300750 def test_copytree_symlink_dir(self):
751 src_dir = self.mkdtemp()
752 dst_dir = os.path.join(self.mkdtemp(), 'destination')
753 os.mkdir(os.path.join(src_dir, 'real_dir'))
Inada Naokic8e5eb92021-04-05 13:11:23 +0900754 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'wb'):
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300755 pass
756 os.symlink(os.path.join(src_dir, 'real_dir'),
757 os.path.join(src_dir, 'link_to_dir'),
758 target_is_directory=True)
759
760 shutil.copytree(src_dir, dst_dir, symlinks=False)
761 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
762 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
763
764 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
765 shutil.copytree(src_dir, dst_dir, symlinks=True)
766 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
767 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
768
769 def test_copytree_return_value(self):
770 # copytree returns its destination path.
771 src_dir = self.mkdtemp()
772 dst_dir = src_dir + "dest"
773 self.addCleanup(shutil.rmtree, dst_dir, True)
774 src = os.path.join(src_dir, 'foo')
775 write_file(src, 'foo')
776 rv = shutil.copytree(src_dir, dst_dir)
777 self.assertEqual(['foo'], os.listdir(rv))
778
Bruno P. Kinoshita9bbcbc92019-11-27 14:10:37 +1300779 def test_copytree_subdirectory(self):
780 # copytree where dst is a subdirectory of src, see Issue 38688
781 base_dir = self.mkdtemp()
782 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
783 src_dir = os.path.join(base_dir, "t", "pg")
784 dst_dir = os.path.join(src_dir, "somevendor", "1.0")
785 os.makedirs(src_dir)
786 src = os.path.join(src_dir, 'pol')
787 write_file(src, 'pol')
788 rv = shutil.copytree(src_dir, dst_dir)
789 self.assertEqual(['pol'], os.listdir(rv))
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300790
791class TestCopy(BaseTest, unittest.TestCase):
792
793 ### shutil.copymode
794
Hai Shi0c4f0f32020-06-30 21:46:31 +0800795 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300796 def test_copymode_follow_symlinks(self):
797 tmp_dir = self.mkdtemp()
798 src = os.path.join(tmp_dir, 'foo')
799 dst = os.path.join(tmp_dir, 'bar')
800 src_link = os.path.join(tmp_dir, 'baz')
801 dst_link = os.path.join(tmp_dir, 'quux')
802 write_file(src, 'foo')
803 write_file(dst, 'foo')
804 os.symlink(src, src_link)
805 os.symlink(dst, dst_link)
806 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
807 # file to file
808 os.chmod(dst, stat.S_IRWXO)
809 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
810 shutil.copymode(src, dst)
811 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
812 # On Windows, os.chmod does not follow symlinks (issue #15411)
813 if os.name != 'nt':
814 # follow src link
815 os.chmod(dst, stat.S_IRWXO)
816 shutil.copymode(src_link, dst)
817 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
818 # follow dst link
819 os.chmod(dst, stat.S_IRWXO)
820 shutil.copymode(src, dst_link)
821 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
822 # follow both links
823 os.chmod(dst, stat.S_IRWXO)
824 shutil.copymode(src_link, dst_link)
825 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
826
827 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800828 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300829 def test_copymode_symlink_to_symlink(self):
830 tmp_dir = self.mkdtemp()
831 src = os.path.join(tmp_dir, 'foo')
832 dst = os.path.join(tmp_dir, 'bar')
833 src_link = os.path.join(tmp_dir, 'baz')
834 dst_link = os.path.join(tmp_dir, 'quux')
835 write_file(src, 'foo')
836 write_file(dst, 'foo')
837 os.symlink(src, src_link)
838 os.symlink(dst, dst_link)
839 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
840 os.chmod(dst, stat.S_IRWXU)
841 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
842 # link to link
843 os.lchmod(dst_link, stat.S_IRWXO)
844 shutil.copymode(src_link, dst_link, follow_symlinks=False)
845 self.assertEqual(os.lstat(src_link).st_mode,
846 os.lstat(dst_link).st_mode)
847 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
848 # src link - use chmod
849 os.lchmod(dst_link, stat.S_IRWXO)
850 shutil.copymode(src_link, dst, follow_symlinks=False)
851 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
852 # dst link - use chmod
853 os.lchmod(dst_link, stat.S_IRWXO)
854 shutil.copymode(src, dst_link, follow_symlinks=False)
855 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
856
857 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800858 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300859 def test_copymode_symlink_to_symlink_wo_lchmod(self):
860 tmp_dir = self.mkdtemp()
861 src = os.path.join(tmp_dir, 'foo')
862 dst = os.path.join(tmp_dir, 'bar')
863 src_link = os.path.join(tmp_dir, 'baz')
864 dst_link = os.path.join(tmp_dir, 'quux')
865 write_file(src, 'foo')
866 write_file(dst, 'foo')
867 os.symlink(src, src_link)
868 os.symlink(dst, dst_link)
869 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
870
871 ### shutil.copystat
872
Hai Shi0c4f0f32020-06-30 21:46:31 +0800873 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300874 def test_copystat_symlinks(self):
875 tmp_dir = self.mkdtemp()
876 src = os.path.join(tmp_dir, 'foo')
877 dst = os.path.join(tmp_dir, 'bar')
878 src_link = os.path.join(tmp_dir, 'baz')
879 dst_link = os.path.join(tmp_dir, 'qux')
880 write_file(src, 'foo')
881 src_stat = os.stat(src)
882 os.utime(src, (src_stat.st_atime,
883 src_stat.st_mtime - 42.0)) # ensure different mtimes
884 write_file(dst, 'bar')
885 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
886 os.symlink(src, src_link)
887 os.symlink(dst, dst_link)
888 if hasattr(os, 'lchmod'):
889 os.lchmod(src_link, stat.S_IRWXO)
890 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
891 os.lchflags(src_link, stat.UF_NODUMP)
892 src_link_stat = os.lstat(src_link)
893 # follow
894 if hasattr(os, 'lchmod'):
895 shutil.copystat(src_link, dst_link, follow_symlinks=True)
896 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
897 # don't follow
898 shutil.copystat(src_link, dst_link, follow_symlinks=False)
899 dst_link_stat = os.lstat(dst_link)
900 if os.utime in os.supports_follow_symlinks:
901 for attr in 'st_atime', 'st_mtime':
902 # The modification times may be truncated in the new file.
903 self.assertLessEqual(getattr(src_link_stat, attr),
904 getattr(dst_link_stat, attr) + 1)
905 if hasattr(os, 'lchmod'):
906 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
907 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
908 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
909 # tell to follow but dst is not a link
910 shutil.copystat(src_link, dst, follow_symlinks=False)
911 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
912 00000.1)
913
914 @unittest.skipUnless(hasattr(os, 'chflags') and
915 hasattr(errno, 'EOPNOTSUPP') and
916 hasattr(errno, 'ENOTSUP'),
917 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
918 def test_copystat_handles_harmless_chflags_errors(self):
919 tmpdir = self.mkdtemp()
920 file1 = os.path.join(tmpdir, 'file1')
921 file2 = os.path.join(tmpdir, 'file2')
922 write_file(file1, 'xxx')
923 write_file(file2, 'xxx')
924
925 def make_chflags_raiser(err):
926 ex = OSError()
927
928 def _chflags_raiser(path, flags, *, follow_symlinks=True):
929 ex.errno = err
930 raise ex
931 return _chflags_raiser
932 old_chflags = os.chflags
933 try:
934 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
935 os.chflags = make_chflags_raiser(err)
936 shutil.copystat(file1, file2)
937 # assert others errors break it
938 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
939 self.assertRaises(OSError, shutil.copystat, file1, file2)
940 finally:
941 os.chflags = old_chflags
942
943 ### shutil.copyxattr
944
Hai Shi0c4f0f32020-06-30 21:46:31 +0800945 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300946 def test_copyxattr(self):
947 tmp_dir = self.mkdtemp()
948 src = os.path.join(tmp_dir, 'foo')
949 write_file(src, 'foo')
950 dst = os.path.join(tmp_dir, 'bar')
951 write_file(dst, 'bar')
952
953 # no xattr == no problem
954 shutil._copyxattr(src, dst)
955 # common case
956 os.setxattr(src, 'user.foo', b'42')
957 os.setxattr(src, 'user.bar', b'43')
958 shutil._copyxattr(src, dst)
959 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
960 self.assertEqual(
961 os.getxattr(src, 'user.foo'),
962 os.getxattr(dst, 'user.foo'))
963 # check errors don't affect other attrs
964 os.remove(dst)
965 write_file(dst, 'bar')
966 os_error = OSError(errno.EPERM, 'EPERM')
967
968 def _raise_on_user_foo(fname, attr, val, **kwargs):
969 if attr == 'user.foo':
970 raise os_error
971 else:
972 orig_setxattr(fname, attr, val, **kwargs)
973 try:
974 orig_setxattr = os.setxattr
975 os.setxattr = _raise_on_user_foo
976 shutil._copyxattr(src, dst)
977 self.assertIn('user.bar', os.listxattr(dst))
978 finally:
979 os.setxattr = orig_setxattr
980 # the source filesystem not supporting xattrs should be ok, too.
981 def _raise_on_src(fname, *, follow_symlinks=True):
982 if fname == src:
983 raise OSError(errno.ENOTSUP, 'Operation not supported')
984 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
985 try:
986 orig_listxattr = os.listxattr
987 os.listxattr = _raise_on_src
988 shutil._copyxattr(src, dst)
989 finally:
990 os.listxattr = orig_listxattr
991
992 # test that shutil.copystat copies xattrs
993 src = os.path.join(tmp_dir, 'the_original')
994 srcro = os.path.join(tmp_dir, 'the_original_ro')
995 write_file(src, src)
996 write_file(srcro, srcro)
997 os.setxattr(src, 'user.the_value', b'fiddly')
998 os.setxattr(srcro, 'user.the_value', b'fiddly')
999 os.chmod(srcro, 0o444)
1000 dst = os.path.join(tmp_dir, 'the_copy')
1001 dstro = os.path.join(tmp_dir, 'the_copy_ro')
1002 write_file(dst, dst)
1003 write_file(dstro, dstro)
1004 shutil.copystat(src, dst)
1005 shutil.copystat(srcro, dstro)
1006 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1007 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1008
Hai Shi0c4f0f32020-06-30 21:46:31 +08001009 @os_helper.skip_unless_symlink
1010 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001011 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
1012 'root privileges required')
1013 def test_copyxattr_symlinks(self):
1014 # On Linux, it's only possible to access non-user xattr for symlinks;
1015 # which in turn require root privileges. This test should be expanded
1016 # as soon as other platforms gain support for extended attributes.
1017 tmp_dir = self.mkdtemp()
1018 src = os.path.join(tmp_dir, 'foo')
1019 src_link = os.path.join(tmp_dir, 'baz')
1020 write_file(src, 'foo')
1021 os.symlink(src, src_link)
1022 os.setxattr(src, 'trusted.foo', b'42')
1023 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1024 dst = os.path.join(tmp_dir, 'bar')
1025 dst_link = os.path.join(tmp_dir, 'qux')
1026 write_file(dst, 'bar')
1027 os.symlink(dst, dst_link)
1028 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1029 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1030 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1031 shutil._copyxattr(src_link, dst, follow_symlinks=False)
1032 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1033
1034 ### shutil.copy
1035
1036 def _copy_file(self, method):
1037 fname = 'test.txt'
1038 tmpdir = self.mkdtemp()
1039 write_file((tmpdir, fname), 'xxx')
1040 file1 = os.path.join(tmpdir, fname)
1041 tmpdir2 = self.mkdtemp()
1042 method(file1, tmpdir2)
1043 file2 = os.path.join(tmpdir2, fname)
1044 return (file1, file2)
1045
1046 def test_copy(self):
1047 # Ensure that the copied file exists and has the same mode bits.
1048 file1, file2 = self._copy_file(shutil.copy)
1049 self.assertTrue(os.path.exists(file2))
1050 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1051
Hai Shi0c4f0f32020-06-30 21:46:31 +08001052 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001053 def test_copy_symlinks(self):
1054 tmp_dir = self.mkdtemp()
1055 src = os.path.join(tmp_dir, 'foo')
1056 dst = os.path.join(tmp_dir, 'bar')
1057 src_link = os.path.join(tmp_dir, 'baz')
1058 write_file(src, 'foo')
1059 os.symlink(src, src_link)
1060 if hasattr(os, 'lchmod'):
1061 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1062 # don't follow
1063 shutil.copy(src_link, dst, follow_symlinks=True)
1064 self.assertFalse(os.path.islink(dst))
1065 self.assertEqual(read_file(src), read_file(dst))
1066 os.remove(dst)
1067 # follow
1068 shutil.copy(src_link, dst, follow_symlinks=False)
1069 self.assertTrue(os.path.islink(dst))
1070 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1071 if hasattr(os, 'lchmod'):
1072 self.assertEqual(os.lstat(src_link).st_mode,
1073 os.lstat(dst).st_mode)
1074
1075 ### shutil.copy2
1076
1077 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1078 def test_copy2(self):
1079 # Ensure that the copied file exists and has the same mode and
1080 # modification time bits.
1081 file1, file2 = self._copy_file(shutil.copy2)
1082 self.assertTrue(os.path.exists(file2))
1083 file1_stat = os.stat(file1)
1084 file2_stat = os.stat(file2)
1085 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1086 for attr in 'st_atime', 'st_mtime':
1087 # The modification times may be truncated in the new file.
1088 self.assertLessEqual(getattr(file1_stat, attr),
1089 getattr(file2_stat, attr) + 1)
1090 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1091 self.assertEqual(getattr(file1_stat, 'st_flags'),
1092 getattr(file2_stat, 'st_flags'))
1093
Hai Shi0c4f0f32020-06-30 21:46:31 +08001094 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001095 def test_copy2_symlinks(self):
1096 tmp_dir = self.mkdtemp()
1097 src = os.path.join(tmp_dir, 'foo')
1098 dst = os.path.join(tmp_dir, 'bar')
1099 src_link = os.path.join(tmp_dir, 'baz')
1100 write_file(src, 'foo')
1101 os.symlink(src, src_link)
1102 if hasattr(os, 'lchmod'):
1103 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1104 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1105 os.lchflags(src_link, stat.UF_NODUMP)
1106 src_stat = os.stat(src)
1107 src_link_stat = os.lstat(src_link)
1108 # follow
1109 shutil.copy2(src_link, dst, follow_symlinks=True)
1110 self.assertFalse(os.path.islink(dst))
1111 self.assertEqual(read_file(src), read_file(dst))
1112 os.remove(dst)
1113 # don't follow
1114 shutil.copy2(src_link, dst, follow_symlinks=False)
1115 self.assertTrue(os.path.islink(dst))
1116 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1117 dst_stat = os.lstat(dst)
1118 if os.utime in os.supports_follow_symlinks:
1119 for attr in 'st_atime', 'st_mtime':
1120 # The modification times may be truncated in the new file.
1121 self.assertLessEqual(getattr(src_link_stat, attr),
1122 getattr(dst_stat, attr) + 1)
1123 if hasattr(os, 'lchmod'):
1124 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1125 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1126 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1127 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1128
Hai Shi0c4f0f32020-06-30 21:46:31 +08001129 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001130 def test_copy2_xattr(self):
1131 tmp_dir = self.mkdtemp()
1132 src = os.path.join(tmp_dir, 'foo')
1133 dst = os.path.join(tmp_dir, 'bar')
1134 write_file(src, 'foo')
1135 os.setxattr(src, 'user.foo', b'42')
1136 shutil.copy2(src, dst)
1137 self.assertEqual(
1138 os.getxattr(src, 'user.foo'),
1139 os.getxattr(dst, 'user.foo'))
1140 os.remove(dst)
1141
1142 def test_copy_return_value(self):
1143 # copy and copy2 both return their destination path.
1144 for fn in (shutil.copy, shutil.copy2):
1145 src_dir = self.mkdtemp()
1146 dst_dir = self.mkdtemp()
1147 src = os.path.join(src_dir, 'foo')
1148 write_file(src, 'foo')
1149 rv = fn(src, dst_dir)
1150 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1151 rv = fn(src, os.path.join(dst_dir, 'bar'))
1152 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1153
1154 ### shutil.copyfile
1155
Hai Shi0c4f0f32020-06-30 21:46:31 +08001156 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001157 def test_copyfile_symlinks(self):
1158 tmp_dir = self.mkdtemp()
1159 src = os.path.join(tmp_dir, 'src')
1160 dst = os.path.join(tmp_dir, 'dst')
1161 dst_link = os.path.join(tmp_dir, 'dst_link')
1162 link = os.path.join(tmp_dir, 'link')
1163 write_file(src, 'foo')
1164 os.symlink(src, link)
1165 # don't follow
1166 shutil.copyfile(link, dst_link, follow_symlinks=False)
1167 self.assertTrue(os.path.islink(dst_link))
1168 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1169 # follow
1170 shutil.copyfile(link, dst)
1171 self.assertFalse(os.path.islink(dst))
1172
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001173 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001174 def test_dont_copy_file_onto_link_to_itself(self):
1175 # bug 851123.
1176 os.mkdir(TESTFN)
1177 src = os.path.join(TESTFN, 'cheese')
1178 dst = os.path.join(TESTFN, 'shop')
1179 try:
Inada Naokic8e5eb92021-04-05 13:11:23 +09001180 with open(src, 'w', encoding='utf-8') as f:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001181 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001182 try:
1183 os.link(src, dst)
1184 except PermissionError as e:
1185 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001186 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Inada Naokic8e5eb92021-04-05 13:11:23 +09001187 with open(src, 'r', encoding='utf-8') as f:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001188 self.assertEqual(f.read(), 'cheddar')
1189 os.remove(dst)
1190 finally:
1191 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001192
Hai Shi0c4f0f32020-06-30 21:46:31 +08001193 @os_helper.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001194 def test_dont_copy_file_onto_symlink_to_itself(self):
1195 # bug 851123.
1196 os.mkdir(TESTFN)
1197 src = os.path.join(TESTFN, 'cheese')
1198 dst = os.path.join(TESTFN, 'shop')
1199 try:
Inada Naokic8e5eb92021-04-05 13:11:23 +09001200 with open(src, 'w', encoding='utf-8') as f:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001201 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001202 # Using `src` here would mean we end up with a symlink pointing
1203 # to TESTFN/TESTFN/cheese, while it should point at
1204 # TESTFN/cheese.
1205 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001206 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Inada Naokic8e5eb92021-04-05 13:11:23 +09001207 with open(src, 'r', encoding='utf-8') as f:
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001208 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001209 os.remove(dst)
1210 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001211 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001212
Serhiy Storchaka43767632013-11-03 21:31:38 +02001213 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1214 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
pxinwr6a273fd2020-11-29 06:06:36 +08001215 @unittest.skipIf(sys.platform == "vxworks",
1216 "fifo requires special path on VxWorks")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001217 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001218 try:
1219 os.mkfifo(TESTFN)
1220 except PermissionError as e:
1221 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001222 try:
1223 self.assertRaises(shutil.SpecialFileError,
1224 shutil.copyfile, TESTFN, TESTFN2)
1225 self.assertRaises(shutil.SpecialFileError,
1226 shutil.copyfile, __file__, TESTFN)
1227 finally:
1228 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001229
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001230 def test_copyfile_return_value(self):
1231 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001232 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001233 dst_dir = self.mkdtemp()
1234 dst_file = os.path.join(dst_dir, 'bar')
1235 src_file = os.path.join(src_dir, 'foo')
1236 write_file(src_file, 'foo')
1237 rv = shutil.copyfile(src_file, dst_file)
1238 self.assertTrue(os.path.exists(rv))
1239 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001240
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001241 def test_copyfile_same_file(self):
1242 # copyfile() should raise SameFileError if the source and destination
1243 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001244 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001245 src_file = os.path.join(src_dir, 'foo')
1246 write_file(src_file, 'foo')
1247 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1248 # But Error should work too, to stay backward compatible.
1249 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1250 # Make sure file is not corrupted.
1251 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001252
Miss Islington (bot)574da462021-07-20 11:53:31 -07001253 @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)')
Miss Islington (bot)15772592021-07-09 21:07:35 -07001254 def test_copyfile_nonexistent_dir(self):
1255 # Issue 43219
1256 src_dir = self.mkdtemp()
1257 src_file = os.path.join(src_dir, 'foo')
1258 dst = os.path.join(src_dir, 'does_not_exist/')
1259 write_file(src_file, 'foo')
1260 self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst)
1261
Tarek Ziadéfb437512010-04-20 08:57:33 +00001262
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001263class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001264
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001265 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001266
Hai Shia3ec3ad2020-05-19 06:02:57 +08001267 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001268 def test_make_tarball(self):
1269 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001270 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001271
1272 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001273 # force shutil to create the directory
1274 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001275 # working with relative paths
1276 work_dir = os.path.dirname(tmpdir2)
1277 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001278
Hai Shi0c4f0f32020-06-30 21:46:31 +08001279 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001280 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001281 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001282
1283 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001284 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001285 self.assertTrue(os.path.isfile(tarball))
1286 self.assertTrue(tarfile.is_tarfile(tarball))
1287 with tarfile.open(tarball, 'r:gz') as tf:
1288 self.assertCountEqual(tf.getnames(),
1289 ['.', './sub', './sub2',
1290 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001291
1292 # trying an uncompressed one
Hai Shi0c4f0f32020-06-30 21:46:31 +08001293 with os_helper.change_cwd(work_dir):
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001294 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001295 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001296 self.assertTrue(os.path.isfile(tarball))
1297 self.assertTrue(tarfile.is_tarfile(tarball))
1298 with tarfile.open(tarball, 'r') as tf:
1299 self.assertCountEqual(tf.getnames(),
1300 ['.', './sub', './sub2',
1301 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001302
1303 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001304 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001305 names = tar.getnames()
1306 names.sort()
1307 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001308
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001309 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001310 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001311 root_dir = self.mkdtemp()
1312 dist = os.path.join(root_dir, base_dir)
1313 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001314 write_file((dist, 'file1'), 'xxx')
1315 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001316 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001317 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001318 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001319 if base_dir:
1320 write_file((root_dir, 'outer'), 'xxx')
1321 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001322
Hai Shia3ec3ad2020-05-19 06:02:57 +08001323 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001324 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001325 'Need the tar command to run')
1326 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001327 root_dir, base_dir = self._create_files()
1328 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001329 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001330
1331 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001332 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001333 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001334
1335 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001336 tarball2 = os.path.join(root_dir, 'archive2.tar')
1337 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001338 subprocess.check_call(tar_cmd, cwd=root_dir,
1339 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001340
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001341 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001342 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001343 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001344
1345 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001346 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1347 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001348 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001349
1350 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001351 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1352 dry_run=True)
1353 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001354 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001355
Hai Shia3ec3ad2020-05-19 06:02:57 +08001356 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001357 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001358 # creating something to zip
1359 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001360
1361 tmpdir2 = self.mkdtemp()
1362 # force shutil to create the directory
1363 os.rmdir(tmpdir2)
1364 # working with relative paths
1365 work_dir = os.path.dirname(tmpdir2)
1366 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001367
Hai Shi0c4f0f32020-06-30 21:46:31 +08001368 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001369 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001370 res = make_archive(rel_base_name, 'zip', root_dir)
1371
1372 self.assertEqual(res, base_name + '.zip')
1373 self.assertTrue(os.path.isfile(res))
1374 self.assertTrue(zipfile.is_zipfile(res))
1375 with zipfile.ZipFile(res) as zf:
1376 self.assertCountEqual(zf.namelist(),
1377 ['dist/', 'dist/sub/', 'dist/sub2/',
1378 'dist/file1', 'dist/file2', 'dist/sub/file3',
1379 'outer'])
1380
Hai Shi0c4f0f32020-06-30 21:46:31 +08001381 with os_helper.change_cwd(work_dir):
Serhiy Storchaka666de772016-10-23 15:55:09 +03001382 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001383 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001384
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001385 self.assertEqual(res, base_name + '.zip')
1386 self.assertTrue(os.path.isfile(res))
1387 self.assertTrue(zipfile.is_zipfile(res))
1388 with zipfile.ZipFile(res) as zf:
1389 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001390 ['dist/', 'dist/sub/', 'dist/sub2/',
1391 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001392
Hai Shia3ec3ad2020-05-19 06:02:57 +08001393 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001394 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001395 'Need the zip command to run')
1396 def test_zipfile_vs_zip(self):
1397 root_dir, base_dir = self._create_files()
1398 base_name = os.path.join(self.mkdtemp(), 'archive')
1399 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1400
1401 # check if ZIP file was created
1402 self.assertEqual(archive, base_name + '.zip')
1403 self.assertTrue(os.path.isfile(archive))
1404
1405 # now create another ZIP file using `zip`
1406 archive2 = os.path.join(root_dir, 'archive2.zip')
1407 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001408 subprocess.check_call(zip_cmd, cwd=root_dir,
1409 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001410
1411 self.assertTrue(os.path.isfile(archive2))
1412 # let's compare both ZIP files
1413 with zipfile.ZipFile(archive) as zf:
1414 names = zf.namelist()
1415 with zipfile.ZipFile(archive2) as zf:
1416 names2 = zf.namelist()
1417 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001418
Hai Shia3ec3ad2020-05-19 06:02:57 +08001419 @support.requires_zlib()
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001420 @unittest.skipUnless(shutil.which('unzip'),
1421 'Need the unzip command to run')
1422 def test_unzip_zipfile(self):
1423 root_dir, base_dir = self._create_files()
1424 base_name = os.path.join(self.mkdtemp(), 'archive')
1425 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1426
1427 # check if ZIP file was created
1428 self.assertEqual(archive, base_name + '.zip')
1429 self.assertTrue(os.path.isfile(archive))
1430
1431 # now check the ZIP file using `unzip -t`
1432 zip_cmd = ['unzip', '-t', archive]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001433 with os_helper.change_cwd(root_dir):
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001434 try:
1435 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1436 except subprocess.CalledProcessError as exc:
1437 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001438 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001439 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001440 msg = "{}\n\n**Unzip Output**\n{}"
1441 self.fail(msg.format(exc, details))
1442
Tarek Ziadé396fad72010-02-23 05:30:31 +00001443 def test_make_archive(self):
1444 tmpdir = self.mkdtemp()
1445 base_name = os.path.join(tmpdir, 'archive')
1446 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1447
Hai Shia3ec3ad2020-05-19 06:02:57 +08001448 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001449 def test_make_archive_owner_group(self):
1450 # testing make_archive with owner and group, with various combinations
1451 # this works even if there's not gid/uid support
1452 if UID_GID_SUPPORT:
1453 group = grp.getgrgid(0)[0]
1454 owner = pwd.getpwuid(0)[0]
1455 else:
1456 group = owner = 'root'
1457
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001458 root_dir, base_dir = self._create_files()
1459 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001460 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1461 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001462 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001463
1464 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001465 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001466
1467 res = make_archive(base_name, 'tar', root_dir, base_dir,
1468 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001469 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001470
1471 res = make_archive(base_name, 'tar', root_dir, base_dir,
1472 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001473 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001474
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001475
Hai Shia3ec3ad2020-05-19 06:02:57 +08001476 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001477 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1478 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001479 root_dir, base_dir = self._create_files()
1480 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001481 group = grp.getgrgid(0)[0]
1482 owner = pwd.getpwuid(0)[0]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001483 with os_helper.change_cwd(root_dir):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001484 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1485 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001486
1487 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001488 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001489
1490 # now checks the rights
1491 archive = tarfile.open(archive_name)
1492 try:
1493 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001494 self.assertEqual(member.uid, 0)
1495 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001496 finally:
1497 archive.close()
1498
1499 def test_make_archive_cwd(self):
1500 current_dir = os.getcwd()
1501 def _breaks(*args, **kw):
1502 raise RuntimeError()
1503
1504 register_archive_format('xxx', _breaks, [], 'xxx file')
1505 try:
1506 try:
1507 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1508 except Exception:
1509 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001510 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001511 finally:
1512 unregister_archive_format('xxx')
1513
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001514 def test_make_tarfile_in_curdir(self):
1515 # Issue #21280
1516 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001517 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001518 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1519 self.assertTrue(os.path.isfile('test.tar'))
1520
Hai Shia3ec3ad2020-05-19 06:02:57 +08001521 @support.requires_zlib()
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001522 def test_make_zipfile_in_curdir(self):
1523 # Issue #21280
1524 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001525 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001526 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1527 self.assertTrue(os.path.isfile('test.zip'))
1528
Tarek Ziadé396fad72010-02-23 05:30:31 +00001529 def test_register_archive_format(self):
1530
1531 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1532 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1533 1)
1534 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1535 [(1, 2), (1, 2, 3)])
1536
1537 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1538 formats = [name for name, params in get_archive_formats()]
1539 self.assertIn('xxx', formats)
1540
1541 unregister_archive_format('xxx')
1542 formats = [name for name, params in get_archive_formats()]
1543 self.assertNotIn('xxx', formats)
1544
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001545 ### shutil.unpack_archive
1546
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001547 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001548 self.check_unpack_archive_with_converter(format, lambda path: path)
1549 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001550 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001551
1552 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001553 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001554 expected = rlistdir(root_dir)
1555 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001556
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001557 base_name = os.path.join(self.mkdtemp(), 'archive')
1558 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001559
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001560 # let's try to unpack it now
1561 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001562 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001563 self.assertEqual(rlistdir(tmpdir2), expected)
1564
1565 # and again, this time with the format specified
1566 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001567 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001568 self.assertEqual(rlistdir(tmpdir3), expected)
1569
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001570 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1571 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001572
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001573 def test_unpack_archive_tar(self):
1574 self.check_unpack_archive('tar')
1575
Hai Shia3ec3ad2020-05-19 06:02:57 +08001576 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001577 def test_unpack_archive_gztar(self):
1578 self.check_unpack_archive('gztar')
1579
Hai Shia3ec3ad2020-05-19 06:02:57 +08001580 @support.requires_bz2()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001581 def test_unpack_archive_bztar(self):
1582 self.check_unpack_archive('bztar')
1583
Hai Shia3ec3ad2020-05-19 06:02:57 +08001584 @support.requires_lzma()
Michael Feltef110b12019-02-18 12:02:44 +01001585 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001586 def test_unpack_archive_xztar(self):
1587 self.check_unpack_archive('xztar')
1588
Hai Shia3ec3ad2020-05-19 06:02:57 +08001589 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001590 def test_unpack_archive_zip(self):
1591 self.check_unpack_archive('zip')
1592
Martin Pantereb995702016-07-28 01:11:04 +00001593 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001594
1595 formats = get_unpack_formats()
1596
1597 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001598 self.assertEqual(extra, 1)
1599 self.assertEqual(filename, 'stuff.boo')
1600 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001601
1602 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1603 unpack_archive('stuff.boo', 'xx')
1604
1605 # trying to register a .boo unpacker again
1606 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1607 ['.boo'], _boo)
1608
1609 # should work now
1610 unregister_unpack_format('Boo')
1611 register_unpack_format('Boo2', ['.boo'], _boo)
1612 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1613 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1614
1615 # let's leave a clean state
1616 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001617 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001618
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001619
1620class TestMisc(BaseTest, unittest.TestCase):
1621
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001622 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1623 "disk_usage not available on this platform")
1624 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001625 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001626 for attr in ('total', 'used', 'free'):
1627 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001628 self.assertGreater(usage.total, 0)
1629 self.assertGreater(usage.used, 0)
1630 self.assertGreaterEqual(usage.free, 0)
1631 self.assertGreaterEqual(usage.total, usage.used)
1632 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001633
Victor Stinnerdc525f42018-12-11 12:05:21 +01001634 # bpo-32557: Check that disk_usage() also accepts a filename
1635 shutil.disk_usage(__file__)
1636
Sandro Tosid902a142011-08-22 23:28:27 +02001637 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1638 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1639 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001640 dirname = self.mkdtemp()
1641 filename = tempfile.mktemp(dir=dirname)
1642 write_file(filename, 'testing chown function')
1643
1644 with self.assertRaises(ValueError):
1645 shutil.chown(filename)
1646
1647 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001648 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001649
1650 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001651 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001652
1653 with self.assertRaises(TypeError):
1654 shutil.chown(filename, b'spam')
1655
1656 with self.assertRaises(TypeError):
1657 shutil.chown(filename, 3.14)
1658
1659 uid = os.getuid()
1660 gid = os.getgid()
1661
1662 def check_chown(path, uid=None, gid=None):
1663 s = os.stat(filename)
1664 if uid is not None:
1665 self.assertEqual(uid, s.st_uid)
1666 if gid is not None:
1667 self.assertEqual(gid, s.st_gid)
1668
1669 shutil.chown(filename, uid, gid)
1670 check_chown(filename, uid, gid)
1671 shutil.chown(filename, uid)
1672 check_chown(filename, uid)
1673 shutil.chown(filename, user=uid)
1674 check_chown(filename, uid)
1675 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001676 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001677
1678 shutil.chown(dirname, uid, gid)
1679 check_chown(dirname, uid, gid)
1680 shutil.chown(dirname, uid)
1681 check_chown(dirname, uid)
1682 shutil.chown(dirname, user=uid)
1683 check_chown(dirname, uid)
1684 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001685 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001686
Matthias Braun52268942020-03-17 09:51:44 -07001687 try:
1688 user = pwd.getpwuid(uid)[0]
1689 group = grp.getgrgid(gid)[0]
1690 except KeyError:
1691 # On some systems uid/gid cannot be resolved.
1692 pass
1693 else:
1694 shutil.chown(filename, user, group)
1695 check_chown(filename, uid, gid)
1696 shutil.chown(dirname, user, group)
1697 check_chown(dirname, uid, gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001698
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001699
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001700class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001701
1702 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001703 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001704 # Give the temp_file an ".exe" suffix for all.
1705 # It's needed on Windows and not harmful on other platforms.
1706 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001707 prefix="Tmp",
1708 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001709 os.chmod(self.temp_file.name, stat.S_IXUSR)
1710 self.addCleanup(self.temp_file.close)
1711 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001712 self.env_path = self.dir
1713 self.curdir = os.curdir
1714 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001715
1716 def test_basic(self):
1717 # Given an EXE in a directory, it should be returned.
1718 rv = shutil.which(self.file, path=self.dir)
1719 self.assertEqual(rv, self.temp_file.name)
1720
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001721 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001722 # When given the fully qualified path to an executable that exists,
1723 # it should be returned.
1724 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001725 self.assertEqual(rv, self.temp_file.name)
1726
1727 def test_relative_cmd(self):
1728 # When given the relative path with a directory part to an executable
1729 # that exists, it should be returned.
1730 base_dir, tail_dir = os.path.split(self.dir)
1731 relpath = os.path.join(tail_dir, self.file)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001732 with os_helper.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001733 rv = shutil.which(relpath, path=self.temp_dir)
1734 self.assertEqual(rv, relpath)
1735 # But it shouldn't be searched in PATH directories (issue #16957).
Hai Shi0c4f0f32020-06-30 21:46:31 +08001736 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001737 rv = shutil.which(relpath, path=base_dir)
1738 self.assertIsNone(rv)
1739
1740 def test_cwd(self):
1741 # Issue #16957
1742 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001743 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001744 rv = shutil.which(self.file, path=base_dir)
1745 if sys.platform == "win32":
1746 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001747 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001748 else:
1749 # Other platforms: shouldn't match in the current directory.
1750 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001751
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001752 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1753 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001754 def test_non_matching_mode(self):
1755 # Set the file read-only and ask for writeable files.
1756 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001757 if os.access(self.temp_file.name, os.W_OK):
1758 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001759 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1760 self.assertIsNone(rv)
1761
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001762 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001763 base_dir, tail_dir = os.path.split(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001764 with os_helper.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001765 rv = shutil.which(self.file, path=tail_dir)
1766 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001767
Brian Curtinc57a3452012-06-22 16:00:30 -05001768 def test_nonexistent_file(self):
1769 # Return None when no matching executable file is found on the path.
1770 rv = shutil.which("foo.exe", path=self.dir)
1771 self.assertIsNone(rv)
1772
1773 @unittest.skipUnless(sys.platform == "win32",
1774 "pathext check is Windows-only")
1775 def test_pathext_checking(self):
1776 # Ask for the file without the ".exe" extension, then ensure that
1777 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001778 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001779 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001780
Barry Warsaw618738b2013-04-16 11:05:03 -04001781 def test_environ_path(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001782 with os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001783 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001784 rv = shutil.which(self.file)
1785 self.assertEqual(rv, self.temp_file.name)
1786
Victor Stinner228a3c92019-04-17 16:26:36 +02001787 def test_environ_path_empty(self):
1788 # PATH='': no match
Hai Shi0c4f0f32020-06-30 21:46:31 +08001789 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001790 env['PATH'] = ''
1791 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1792 create=True), \
1793 support.swap_attr(os, 'defpath', self.dir), \
Hai Shi0c4f0f32020-06-30 21:46:31 +08001794 os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001795 rv = shutil.which(self.file)
1796 self.assertIsNone(rv)
1797
1798 def test_environ_path_cwd(self):
1799 expected_cwd = os.path.basename(self.temp_file.name)
1800 if sys.platform == "win32":
1801 curdir = os.curdir
1802 if isinstance(expected_cwd, bytes):
1803 curdir = os.fsencode(curdir)
1804 expected_cwd = os.path.join(curdir, expected_cwd)
1805
1806 # PATH=':': explicitly looks in the current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001807 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001808 env['PATH'] = os.pathsep
1809 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1810 create=True), \
1811 support.swap_attr(os, 'defpath', self.dir):
1812 rv = shutil.which(self.file)
1813 self.assertIsNone(rv)
1814
1815 # look in current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001816 with os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001817 rv = shutil.which(self.file)
1818 self.assertEqual(rv, expected_cwd)
1819
1820 def test_environ_path_missing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001821 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001822 env.pop('PATH', None)
1823
1824 # without confstr
1825 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1826 create=True), \
1827 support.swap_attr(os, 'defpath', self.dir):
1828 rv = shutil.which(self.file)
1829 self.assertEqual(rv, self.temp_file.name)
1830
1831 # with confstr
1832 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1833 create=True), \
1834 support.swap_attr(os, 'defpath', ''):
1835 rv = shutil.which(self.file)
1836 self.assertEqual(rv, self.temp_file.name)
1837
Barry Warsaw618738b2013-04-16 11:05:03 -04001838 def test_empty_path(self):
1839 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001840 with os_helper.change_cwd(path=self.dir), \
1841 os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001842 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001843 rv = shutil.which(self.file, path='')
1844 self.assertIsNone(rv)
1845
1846 def test_empty_path_no_PATH(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001847 with os_helper.EnvironmentVarGuard() as env:
Barry Warsaw618738b2013-04-16 11:05:03 -04001848 env.pop('PATH', None)
1849 rv = shutil.which(self.file)
1850 self.assertIsNone(rv)
1851
Victor Stinner228a3c92019-04-17 16:26:36 +02001852 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1853 def test_pathext(self):
1854 ext = ".xyz"
1855 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1856 prefix="Tmp2", suffix=ext)
1857 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1858 self.addCleanup(temp_filexyz.close)
1859
1860 # strip path and extension
1861 program = os.path.basename(temp_filexyz.name)
1862 program = os.path.splitext(program)[0]
1863
Hai Shi0c4f0f32020-06-30 21:46:31 +08001864 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001865 env['PATHEXT'] = ext
1866 rv = shutil.which(program, path=self.temp_dir)
1867 self.assertEqual(rv, temp_filexyz.name)
1868
Christopher Marchfelderda6f0982020-10-23 12:08:24 +02001869 # Issue 40592: See https://bugs.python.org/issue40592
1870 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1871 def test_pathext_with_empty_str(self):
1872 ext = ".xyz"
1873 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1874 prefix="Tmp2", suffix=ext)
1875 self.addCleanup(temp_filexyz.close)
1876
1877 # strip path and extension
1878 program = os.path.basename(temp_filexyz.name)
1879 program = os.path.splitext(program)[0]
1880
1881 with os_helper.EnvironmentVarGuard() as env:
1882 env['PATHEXT'] = f"{ext};" # note the ;
1883 rv = shutil.which(program, path=self.temp_dir)
1884 self.assertEqual(rv, temp_filexyz.name)
1885
Brian Curtinc57a3452012-06-22 16:00:30 -05001886
Cheryl Sabella5680f652019-02-13 06:25:10 -05001887class TestWhichBytes(TestWhich):
1888 def setUp(self):
1889 TestWhich.setUp(self)
1890 self.dir = os.fsencode(self.dir)
1891 self.file = os.fsencode(self.file)
1892 self.temp_file.name = os.fsencode(self.temp_file.name)
1893 self.curdir = os.fsencode(self.curdir)
1894 self.ext = os.fsencode(self.ext)
1895
1896
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001897class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001898
1899 def setUp(self):
1900 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001901 self.src_dir = self.mkdtemp()
1902 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001903 self.src_file = os.path.join(self.src_dir, filename)
1904 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001905 with open(self.src_file, "wb") as f:
1906 f.write(b"spam")
1907
Christian Heimesada8c3b2008-03-18 18:26:33 +00001908 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001909 with open(src, "rb") as f:
1910 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001911 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001912 with open(real_dst, "rb") as f:
1913 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001914 self.assertFalse(os.path.exists(src))
1915
1916 def _check_move_dir(self, src, dst, real_dst):
1917 contents = sorted(os.listdir(src))
1918 shutil.move(src, dst)
1919 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1920 self.assertFalse(os.path.exists(src))
1921
1922 def test_move_file(self):
1923 # Move a file to another location on the same filesystem.
1924 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1925
1926 def test_move_file_to_dir(self):
1927 # Move a file inside an existing dir on the same filesystem.
1928 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1929
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001930 def test_move_file_to_dir_pathlike_src(self):
1931 # Move a pathlike file to another location on the same filesystem.
1932 src = pathlib.Path(self.src_file)
1933 self._check_move_file(src, self.dst_dir, self.dst_file)
1934
1935 def test_move_file_to_dir_pathlike_dst(self):
1936 # Move a file to another pathlike location on the same filesystem.
1937 dst = pathlib.Path(self.dst_dir)
1938 self._check_move_file(self.src_file, dst, self.dst_file)
1939
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001940 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001941 def test_move_file_other_fs(self):
1942 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001943 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001944
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001945 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001946 def test_move_file_to_dir_other_fs(self):
1947 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001948 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001949
1950 def test_move_dir(self):
1951 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001952 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001953 try:
1954 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1955 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001956 os_helper.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001957
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001958 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001959 def test_move_dir_other_fs(self):
1960 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001961 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001962
1963 def test_move_dir_to_dir(self):
1964 # Move a dir inside an existing dir on the same filesystem.
1965 self._check_move_dir(self.src_dir, self.dst_dir,
1966 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1967
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001968 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001969 def test_move_dir_to_dir_other_fs(self):
1970 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001971 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001972
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001973 def test_move_dir_sep_to_dir(self):
1974 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1975 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1976
1977 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1978 def test_move_dir_altsep_to_dir(self):
1979 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1980 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1981
Christian Heimesada8c3b2008-03-18 18:26:33 +00001982 def test_existing_file_inside_dest_dir(self):
1983 # A file with the same name inside the destination dir already exists.
1984 with open(self.dst_file, "wb"):
1985 pass
1986 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1987
1988 def test_dont_move_dir_in_itself(self):
1989 # Moving a dir inside itself raises an Error.
1990 dst = os.path.join(self.src_dir, "bar")
1991 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1992
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001993 def test_destinsrc_false_negative(self):
1994 os.mkdir(TESTFN)
1995 try:
1996 for src, dst in [('srcdir', 'srcdir/dest')]:
1997 src = os.path.join(TESTFN, src)
1998 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001999 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00002000 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002001 'dst (%s) is not in src (%s)' % (dst, src))
2002 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002003 os_helper.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00002004
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002005 def test_destinsrc_false_positive(self):
2006 os.mkdir(TESTFN)
2007 try:
2008 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
2009 src = os.path.join(TESTFN, src)
2010 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002011 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00002012 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002013 'dst (%s) is in src (%s)' % (dst, src))
2014 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002015 os_helper.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00002016
Hai Shi0c4f0f32020-06-30 21:46:31 +08002017 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002018 @mock_rename
2019 def test_move_file_symlink(self):
2020 dst = os.path.join(self.src_dir, 'bar')
2021 os.symlink(self.src_file, dst)
2022 shutil.move(dst, self.dst_file)
2023 self.assertTrue(os.path.islink(self.dst_file))
2024 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
2025
Hai Shi0c4f0f32020-06-30 21:46:31 +08002026 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002027 @mock_rename
2028 def test_move_file_symlink_to_dir(self):
2029 filename = "bar"
2030 dst = os.path.join(self.src_dir, filename)
2031 os.symlink(self.src_file, dst)
2032 shutil.move(dst, self.dst_dir)
2033 final_link = os.path.join(self.dst_dir, filename)
2034 self.assertTrue(os.path.islink(final_link))
2035 self.assertTrue(os.path.samefile(self.src_file, final_link))
2036
Hai Shi0c4f0f32020-06-30 21:46:31 +08002037 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002038 @mock_rename
2039 def test_move_dangling_symlink(self):
2040 src = os.path.join(self.src_dir, 'baz')
2041 dst = os.path.join(self.src_dir, 'bar')
2042 os.symlink(src, dst)
2043 dst_link = os.path.join(self.dst_dir, 'quux')
2044 shutil.move(dst, dst_link)
2045 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07002046 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002047
Hai Shi0c4f0f32020-06-30 21:46:31 +08002048 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002049 @mock_rename
2050 def test_move_dir_symlink(self):
2051 src = os.path.join(self.src_dir, 'baz')
2052 dst = os.path.join(self.src_dir, 'bar')
2053 os.mkdir(src)
2054 os.symlink(src, dst)
2055 dst_link = os.path.join(self.dst_dir, 'quux')
2056 shutil.move(dst, dst_link)
2057 self.assertTrue(os.path.islink(dst_link))
2058 self.assertTrue(os.path.samefile(src, dst_link))
2059
Brian Curtin0d0a1de2012-06-18 18:41:07 -05002060 def test_move_return_value(self):
2061 rv = shutil.move(self.src_file, self.dst_dir)
2062 self.assertEqual(rv,
2063 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2064
2065 def test_move_as_rename_return_value(self):
2066 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2067 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2068
R David Murray6ffface2014-06-11 14:40:13 -04002069 @mock_rename
2070 def test_move_file_special_function(self):
2071 moved = []
2072 def _copy(src, dst):
2073 moved.append((src, dst))
2074 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2075 self.assertEqual(len(moved), 1)
2076
2077 @mock_rename
2078 def test_move_dir_special_function(self):
2079 moved = []
2080 def _copy(src, dst):
2081 moved.append((src, dst))
Hai Shi0c4f0f32020-06-30 21:46:31 +08002082 os_helper.create_empty_file(os.path.join(self.src_dir, 'child'))
2083 os_helper.create_empty_file(os.path.join(self.src_dir, 'child1'))
R David Murray6ffface2014-06-11 14:40:13 -04002084 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2085 self.assertEqual(len(moved), 3)
2086
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002087 def test_move_dir_caseinsensitive(self):
2088 # Renames a folder to the same name
2089 # but a different case.
2090
2091 self.src_dir = self.mkdtemp()
2092 dst_dir = os.path.join(
2093 os.path.dirname(self.src_dir),
2094 os.path.basename(self.src_dir).upper())
2095 self.assertNotEqual(self.src_dir, dst_dir)
2096
2097 try:
2098 shutil.move(self.src_dir, dst_dir)
2099 self.assertTrue(os.path.isdir(dst_dir))
2100 finally:
2101 os.rmdir(dst_dir)
2102
Tarek Ziadé5340db32010-04-19 22:30:51 +00002103
Winson Luk132131b2021-03-02 15:53:15 -05002104 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0
2105 and hasattr(os, 'lchflags')
2106 and hasattr(stat, 'SF_IMMUTABLE')
2107 and hasattr(stat, 'UF_OPAQUE'),
2108 'root privileges required')
2109 def test_move_dir_permission_denied(self):
2110 # bpo-42782: shutil.move should not create destination directories
2111 # if the source directory cannot be removed.
2112 try:
2113 os.mkdir(TESTFN_SRC)
2114 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2115
2116 # Testing on an empty immutable directory
2117 # TESTFN_DST should not exist if shutil.move failed
2118 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2119 self.assertFalse(TESTFN_DST in os.listdir())
2120
2121 # Create a file and keep the directory immutable
2122 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2123 os_helper.create_empty_file(os.path.join(TESTFN_SRC, 'child'))
2124 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2125
2126 # Testing on a non-empty immutable directory
2127 # TESTFN_DST should not exist if shutil.move failed
2128 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2129 self.assertFalse(TESTFN_DST in os.listdir())
2130 finally:
2131 if os.path.exists(TESTFN_SRC):
2132 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2133 os_helper.rmtree(TESTFN_SRC)
2134 if os.path.exists(TESTFN_DST):
2135 os.lchflags(TESTFN_DST, stat.UF_OPAQUE)
2136 os_helper.rmtree(TESTFN_DST)
2137
2138
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002139class TestCopyFile(unittest.TestCase):
2140
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002141 class Faux(object):
2142 _entered = False
2143 _exited_with = None
2144 _raised = False
2145 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2146 self._raise_in_exit = raise_in_exit
2147 self._suppress_at_exit = suppress_at_exit
2148 def read(self, *args):
2149 return ''
2150 def __enter__(self):
2151 self._entered = True
2152 def __exit__(self, exc_type, exc_val, exc_tb):
2153 self._exited_with = exc_type, exc_val, exc_tb
2154 if self._raise_in_exit:
2155 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002156 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002157 return self._suppress_at_exit
2158
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002159 def test_w_source_open_fails(self):
2160 def _open(filename, mode='r'):
2161 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002162 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002163 assert 0 # shouldn't reach here.
2164
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002165 with support.swap_attr(shutil, 'open', _open):
2166 with self.assertRaises(OSError):
2167 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002168
Victor Stinner937ee9e2018-06-26 02:11:06 +02002169 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002170 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002171 srcfile = self.Faux()
2172
2173 def _open(filename, mode='r'):
2174 if filename == 'srcfile':
2175 return srcfile
2176 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002177 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002178 assert 0 # shouldn't reach here.
2179
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002180 with support.swap_attr(shutil, 'open', _open):
2181 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002182 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002183 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002184 self.assertEqual(srcfile._exited_with[1].args,
2185 ('Cannot open "destfile"',))
2186
Victor Stinner937ee9e2018-06-26 02:11:06 +02002187 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002188 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002189 srcfile = self.Faux()
2190 destfile = self.Faux(True)
2191
2192 def _open(filename, mode='r'):
2193 if filename == 'srcfile':
2194 return srcfile
2195 if filename == 'destfile':
2196 return destfile
2197 assert 0 # shouldn't reach here.
2198
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002199 with support.swap_attr(shutil, 'open', _open):
2200 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002201 self.assertTrue(srcfile._entered)
2202 self.assertTrue(destfile._entered)
2203 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002204 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002205 self.assertEqual(srcfile._exited_with[1].args,
2206 ('Cannot close',))
2207
Victor Stinner937ee9e2018-06-26 02:11:06 +02002208 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002209 def test_w_source_close_fails(self):
2210
2211 srcfile = self.Faux(True)
2212 destfile = self.Faux()
2213
2214 def _open(filename, mode='r'):
2215 if filename == 'srcfile':
2216 return srcfile
2217 if filename == 'destfile':
2218 return destfile
2219 assert 0 # shouldn't reach here.
2220
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002221 with support.swap_attr(shutil, 'open', _open):
2222 with self.assertRaises(OSError):
2223 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002224 self.assertTrue(srcfile._entered)
2225 self.assertTrue(destfile._entered)
2226 self.assertFalse(destfile._raised)
2227 self.assertTrue(srcfile._exited_with[0] is None)
2228 self.assertTrue(srcfile._raised)
2229
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002230
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002231class TestCopyFileObj(unittest.TestCase):
2232 FILESIZE = 2 * 1024 * 1024
2233
2234 @classmethod
2235 def setUpClass(cls):
2236 write_test_file(TESTFN, cls.FILESIZE)
2237
2238 @classmethod
2239 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002240 os_helper.unlink(TESTFN)
2241 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002242
2243 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002244 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002245
2246 @contextlib.contextmanager
2247 def get_files(self):
2248 with open(TESTFN, "rb") as src:
2249 with open(TESTFN2, "wb") as dst:
2250 yield (src, dst)
2251
2252 def assert_files_eq(self, src, dst):
2253 with open(src, 'rb') as fsrc:
2254 with open(dst, 'rb') as fdst:
2255 self.assertEqual(fsrc.read(), fdst.read())
2256
2257 def test_content(self):
2258 with self.get_files() as (src, dst):
2259 shutil.copyfileobj(src, dst)
2260 self.assert_files_eq(TESTFN, TESTFN2)
2261
2262 def test_file_not_closed(self):
2263 with self.get_files() as (src, dst):
2264 shutil.copyfileobj(src, dst)
2265 assert not src.closed
2266 assert not dst.closed
2267
2268 def test_file_offset(self):
2269 with self.get_files() as (src, dst):
2270 shutil.copyfileobj(src, dst)
2271 self.assertEqual(src.tell(), self.FILESIZE)
2272 self.assertEqual(dst.tell(), self.FILESIZE)
2273
2274 @unittest.skipIf(os.name != 'nt', "Windows only")
2275 def test_win_impl(self):
2276 # Make sure alternate Windows implementation is called.
2277 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2278 shutil.copyfile(TESTFN, TESTFN2)
2279 assert m.called
2280
2281 # File size is 2 MiB but max buf size should be 1 MiB.
2282 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2283
2284 # If file size < 1 MiB memoryview() length must be equal to
2285 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002286 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002287 f.write(b'foo')
2288 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002289 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002290 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2291 shutil.copyfile(fname, TESTFN2)
2292 self.assertEqual(m.call_args[0][2], 3)
2293
2294 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002295 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002296 pass
2297 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002298 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002299 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2300 shutil.copyfile(fname, TESTFN2)
2301 assert not m.called
2302 self.assert_files_eq(fname, TESTFN2)
2303
2304
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002305class _ZeroCopyFileTest(object):
2306 """Tests common to all zero-copy APIs."""
2307 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2308 FILEDATA = b""
2309 PATCHPOINT = ""
2310
2311 @classmethod
2312 def setUpClass(cls):
2313 write_test_file(TESTFN, cls.FILESIZE)
2314 with open(TESTFN, 'rb') as f:
2315 cls.FILEDATA = f.read()
2316 assert len(cls.FILEDATA) == cls.FILESIZE
2317
2318 @classmethod
2319 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002320 os_helper.unlink(TESTFN)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002321
2322 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002323 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002324
2325 @contextlib.contextmanager
2326 def get_files(self):
2327 with open(TESTFN, "rb") as src:
2328 with open(TESTFN2, "wb") as dst:
2329 yield (src, dst)
2330
2331 def zerocopy_fun(self, *args, **kwargs):
2332 raise NotImplementedError("must be implemented in subclass")
2333
2334 def reset(self):
2335 self.tearDown()
2336 self.tearDownClass()
2337 self.setUpClass()
2338 self.setUp()
2339
2340 # ---
2341
2342 def test_regular_copy(self):
2343 with self.get_files() as (src, dst):
2344 self.zerocopy_fun(src, dst)
2345 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2346 # Make sure the fallback function is not called.
2347 with self.get_files() as (src, dst):
2348 with unittest.mock.patch('shutil.copyfileobj') as m:
2349 shutil.copyfile(TESTFN, TESTFN2)
2350 assert not m.called
2351
2352 def test_same_file(self):
2353 self.addCleanup(self.reset)
2354 with self.get_files() as (src, dst):
2355 with self.assertRaises(Exception):
2356 self.zerocopy_fun(src, src)
2357 # Make sure src file is not corrupted.
2358 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2359
2360 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002361 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002362 with self.assertRaises(FileNotFoundError) as cm:
2363 shutil.copyfile(name, "new")
2364 self.assertEqual(cm.exception.filename, name)
2365
2366 def test_empty_file(self):
2367 srcname = TESTFN + 'src'
2368 dstname = TESTFN + 'dst'
Hai Shi0c4f0f32020-06-30 21:46:31 +08002369 self.addCleanup(lambda: os_helper.unlink(srcname))
2370 self.addCleanup(lambda: os_helper.unlink(dstname))
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002371 with open(srcname, "wb"):
2372 pass
2373
2374 with open(srcname, "rb") as src:
2375 with open(dstname, "wb") as dst:
2376 self.zerocopy_fun(src, dst)
2377
2378 self.assertEqual(read_file(dstname, binary=True), b"")
2379
2380 def test_unhandled_exception(self):
2381 with unittest.mock.patch(self.PATCHPOINT,
2382 side_effect=ZeroDivisionError):
2383 self.assertRaises(ZeroDivisionError,
2384 shutil.copyfile, TESTFN, TESTFN2)
2385
2386 def test_exception_on_first_call(self):
2387 # Emulate a case where the first call to the zero-copy
2388 # function raises an exception in which case the function is
2389 # supposed to give up immediately.
2390 with unittest.mock.patch(self.PATCHPOINT,
2391 side_effect=OSError(errno.EINVAL, "yo")):
2392 with self.get_files() as (src, dst):
2393 with self.assertRaises(_GiveupOnFastCopy):
2394 self.zerocopy_fun(src, dst)
2395
2396 def test_filesystem_full(self):
2397 # Emulate a case where filesystem is full and sendfile() fails
2398 # on first call.
2399 with unittest.mock.patch(self.PATCHPOINT,
2400 side_effect=OSError(errno.ENOSPC, "yo")):
2401 with self.get_files() as (src, dst):
2402 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2403
2404
2405@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2406class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2407 PATCHPOINT = "os.sendfile"
2408
2409 def zerocopy_fun(self, fsrc, fdst):
2410 return shutil._fastcopy_sendfile(fsrc, fdst)
2411
2412 def test_non_regular_file_src(self):
2413 with io.BytesIO(self.FILEDATA) as src:
2414 with open(TESTFN2, "wb") as dst:
2415 with self.assertRaises(_GiveupOnFastCopy):
2416 self.zerocopy_fun(src, dst)
2417 shutil.copyfileobj(src, dst)
2418
2419 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2420
2421 def test_non_regular_file_dst(self):
2422 with open(TESTFN, "rb") as src:
2423 with io.BytesIO() as dst:
2424 with self.assertRaises(_GiveupOnFastCopy):
2425 self.zerocopy_fun(src, dst)
2426 shutil.copyfileobj(src, dst)
2427 dst.seek(0)
2428 self.assertEqual(dst.read(), self.FILEDATA)
2429
2430 def test_exception_on_second_call(self):
2431 def sendfile(*args, **kwargs):
2432 if not flag:
2433 flag.append(None)
2434 return orig_sendfile(*args, **kwargs)
2435 else:
2436 raise OSError(errno.EBADF, "yo")
2437
2438 flag = []
2439 orig_sendfile = os.sendfile
2440 with unittest.mock.patch('os.sendfile', create=True,
2441 side_effect=sendfile):
2442 with self.get_files() as (src, dst):
2443 with self.assertRaises(OSError) as cm:
2444 shutil._fastcopy_sendfile(src, dst)
2445 assert flag
2446 self.assertEqual(cm.exception.errno, errno.EBADF)
2447
2448 def test_cant_get_size(self):
2449 # Emulate a case where src file size cannot be determined.
2450 # Internally bufsize will be set to a small value and
2451 # sendfile() will be called repeatedly.
2452 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2453 with self.get_files() as (src, dst):
2454 shutil._fastcopy_sendfile(src, dst)
2455 assert m.called
2456 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2457
2458 def test_small_chunks(self):
2459 # Force internal file size detection to be smaller than the
2460 # actual file size. We want to force sendfile() to be called
2461 # multiple times, also in order to emulate a src fd which gets
2462 # bigger while it is being copied.
2463 mock = unittest.mock.Mock()
2464 mock.st_size = 65536 + 1
2465 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2466 with self.get_files() as (src, dst):
2467 shutil._fastcopy_sendfile(src, dst)
2468 assert m.called
2469 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2470
2471 def test_big_chunk(self):
2472 # Force internal file size detection to be +100MB bigger than
2473 # the actual file size. Make sure sendfile() does not rely on
2474 # file size value except for (maybe) a better throughput /
2475 # performance.
2476 mock = unittest.mock.Mock()
2477 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2478 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2479 with self.get_files() as (src, dst):
2480 shutil._fastcopy_sendfile(src, dst)
2481 assert m.called
2482 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2483
2484 def test_blocksize_arg(self):
2485 with unittest.mock.patch('os.sendfile',
2486 side_effect=ZeroDivisionError) as m:
2487 self.assertRaises(ZeroDivisionError,
2488 shutil.copyfile, TESTFN, TESTFN2)
2489 blocksize = m.call_args[0][3]
2490 # Make sure file size and the block size arg passed to
2491 # sendfile() are the same.
2492 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2493 # ...unless we're dealing with a small file.
Hai Shi0c4f0f32020-06-30 21:46:31 +08002494 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002495 write_file(TESTFN2, b"hello", binary=True)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002496 self.addCleanup(os_helper.unlink, TESTFN2 + '3')
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002497 self.assertRaises(ZeroDivisionError,
2498 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2499 blocksize = m.call_args[0][3]
2500 self.assertEqual(blocksize, 2 ** 23)
2501
2502 def test_file2file_not_supported(self):
2503 # Emulate a case where sendfile() only support file->socket
2504 # fds. In such a case copyfile() is supposed to skip the
2505 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002506 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002507 try:
2508 with unittest.mock.patch(
2509 self.PATCHPOINT,
2510 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2511 with self.get_files() as (src, dst):
2512 with self.assertRaises(_GiveupOnFastCopy):
2513 shutil._fastcopy_sendfile(src, dst)
2514 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002515 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002516
2517 with unittest.mock.patch(self.PATCHPOINT) as m:
2518 shutil.copyfile(TESTFN, TESTFN2)
2519 assert not m.called
2520 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002521 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002522
2523
Victor Stinner937ee9e2018-06-26 02:11:06 +02002524@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002525class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002526 PATCHPOINT = "posix._fcopyfile"
2527
2528 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002529 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002530
2531
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002532class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002533 def test_does_not_crash(self):
2534 """Check if get_terminal_size() returns a meaningful value.
2535
2536 There's no easy portable way to actually check the size of the
2537 terminal, so let's check if it returns something sensible instead.
2538 """
2539 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002540 self.assertGreaterEqual(size.columns, 0)
2541 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002542
2543 def test_os_environ_first(self):
2544 "Check if environment variables have precedence"
2545
Hai Shi0c4f0f32020-06-30 21:46:31 +08002546 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002547 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002548 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002549 size = shutil.get_terminal_size()
2550 self.assertEqual(size.columns, 777)
2551
Hai Shi0c4f0f32020-06-30 21:46:31 +08002552 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002553 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002554 env['LINES'] = '888'
2555 size = shutil.get_terminal_size()
2556 self.assertEqual(size.lines, 888)
2557
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002558 def test_bad_environ(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002559 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002560 env['COLUMNS'] = 'xxx'
2561 env['LINES'] = 'yyy'
2562 size = shutil.get_terminal_size()
2563 self.assertGreaterEqual(size.columns, 0)
2564 self.assertGreaterEqual(size.lines, 0)
2565
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002566 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002567 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2568 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002569 def test_stty_match(self):
2570 """Check if stty returns the same results ignoring env
2571
2572 This test will fail if stdin and stdout are connected to
2573 different terminals with different sizes. Nevertheless, such
2574 situations should be pretty rare.
2575 """
2576 try:
2577 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002578 except (FileNotFoundError, PermissionError,
2579 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002580 self.skipTest("stty invocation failed")
2581 expected = (int(size[1]), int(size[0])) # reversed order
2582
Hai Shi0c4f0f32020-06-30 21:46:31 +08002583 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002584 del env['LINES']
2585 del env['COLUMNS']
2586 actual = shutil.get_terminal_size()
2587
2588 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002589
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002590 def test_fallback(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002591 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002592 del env['LINES']
2593 del env['COLUMNS']
2594
2595 # sys.__stdout__ has no fileno()
2596 with support.swap_attr(sys, '__stdout__', None):
2597 size = shutil.get_terminal_size(fallback=(10, 20))
2598 self.assertEqual(size.columns, 10)
2599 self.assertEqual(size.lines, 20)
2600
2601 # sys.__stdout__ is not a terminal on Unix
2602 # or fileno() not in (0, 1, 2) on Windows
Inada Naokic8e5eb92021-04-05 13:11:23 +09002603 with open(os.devnull, 'w', encoding='utf-8') as f, \
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002604 support.swap_attr(sys, '__stdout__', f):
2605 size = shutil.get_terminal_size(fallback=(30, 40))
2606 self.assertEqual(size.columns, 30)
2607 self.assertEqual(size.lines, 40)
2608
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002609
Berker Peksag8083cd62014-11-01 11:04:06 +02002610class PublicAPITests(unittest.TestCase):
2611 """Ensures that the correct values are exposed in the public API."""
2612
2613 def test_module_all_attribute(self):
2614 self.assertTrue(hasattr(shutil, '__all__'))
2615 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2616 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2617 'SpecialFileError', 'ExecError', 'make_archive',
2618 'get_archive_formats', 'register_archive_format',
2619 'unregister_archive_format', 'get_unpack_formats',
2620 'register_unpack_format', 'unregister_unpack_format',
2621 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2622 'get_terminal_size', 'SameFileError']
2623 if hasattr(os, 'statvfs') or os.name == 'nt':
2624 target_api.append('disk_usage')
2625 self.assertEqual(set(shutil.__all__), set(target_api))
2626
2627
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002628if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002629 unittest.main()