blob: 4bcad51509d9c4e3750745499856bc6117ac87aa [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Hai Shi0c4f0f32020-06-30 21:46:31 +080033from test.support import os_helper
34from test.support.os_helper import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030035
Antoine Pitrou7fff0962009-05-01 21:09:44 +000036TESTFN2 = TESTFN + "2"
Winson Luk132131b2021-03-02 15:53:15 -050037TESTFN_SRC = TESTFN + "_SRC"
38TESTFN_DST = TESTFN + "_DST"
Victor Stinner937ee9e2018-06-26 02:11:06 +020039MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010040AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000041try:
42 import grp
43 import pwd
44 UID_GID_SUPPORT = True
45except ImportError:
46 UID_GID_SUPPORT = False
47
Steve Dowerdf2d4a62019-08-21 15:27:33 -070048try:
49 import _winapi
50except ImportError:
51 _winapi = None
52
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040053def _fake_rename(*args, **kwargs):
54 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010055 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040056
57def mock_rename(func):
58 @functools.wraps(func)
59 def wrap(*args, **kwargs):
60 try:
61 builtin_rename = os.rename
62 os.rename = _fake_rename
63 return func(*args, **kwargs)
64 finally:
65 os.rename = builtin_rename
66 return wrap
67
Éric Araujoa7e33a12011-08-12 19:51:35 +020068def write_file(path, content, binary=False):
69 """Write *content* to a file located at *path*.
70
71 If *path* is a tuple instead of a string, os.path.join will be used to
72 make a path. If *binary* is true, the file will be opened in binary
73 mode.
74 """
75 if isinstance(path, tuple):
76 path = os.path.join(*path)
77 with open(path, 'wb' if binary else 'w') as fp:
78 fp.write(content)
79
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020080def write_test_file(path, size):
81 """Create a test file with an arbitrary size and random text content."""
82 def chunks(total, step):
83 assert total >= step
84 while total > step:
85 yield step
86 total -= step
87 if total:
88 yield total
89
90 bufsize = min(size, 8192)
91 chunk = b"".join([random.choice(string.ascii_letters).encode()
92 for i in range(bufsize)])
93 with open(path, 'wb') as f:
94 for csize in chunks(size, bufsize):
95 f.write(chunk)
96 assert os.path.getsize(path) == size
97
Éric Araujoa7e33a12011-08-12 19:51:35 +020098def read_file(path, binary=False):
99 """Return contents from a file located at *path*.
100
101 If *path* is a tuple instead of a string, os.path.join will be used to
102 make a path. If *binary* is true, the file will be opened in binary
103 mode.
104 """
105 if isinstance(path, tuple):
106 path = os.path.join(*path)
107 with open(path, 'rb' if binary else 'r') as fp:
108 return fp.read()
109
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300110def rlistdir(path):
111 res = []
112 for name in sorted(os.listdir(path)):
113 p = os.path.join(path, name)
114 if os.path.isdir(p) and not os.path.islink(p):
115 res.append(name + '/')
116 for n in rlistdir(p):
117 res.append(name + '/' + n)
118 else:
119 res.append(name)
120 return res
121
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200122def supports_file2file_sendfile():
123 # ...apparently Linux and Solaris are the only ones
124 if not hasattr(os, "sendfile"):
125 return False
126 srcname = None
127 dstname = None
128 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800129 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200130 srcname = f.name
131 f.write(b"0123456789")
132
133 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800134 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200135 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200136 infd = src.fileno()
137 outfd = dst.fileno()
138 try:
139 os.sendfile(outfd, infd, 0, 2)
140 except OSError:
141 return False
142 else:
143 return True
144 finally:
145 if srcname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800146 os_helper.unlink(srcname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200147 if dstname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800148 os_helper.unlink(dstname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200149
150
151SUPPORTS_SENDFILE = supports_file2file_sendfile()
152
Michael Feltef110b12019-02-18 12:02:44 +0100153# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
154# The AIX command 'dump -o program' gives XCOFF header information
155# The second word of the last line in the maxdata value
156# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
157def _maxdataOK():
158 if AIX and sys.maxsize == 2147483647:
159 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
160 maxdata=hdrs.split("\n")[-1].split()[1]
161 return int(maxdata,16) >= 0x20000000
162 else:
163 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200164
Tarek Ziadé396fad72010-02-23 05:30:31 +0000165
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300166class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000167
Steve Dowerabde52c2019-11-15 09:49:21 -0800168 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000169 """Create a temporary directory that will be cleaned up.
170
171 Returns the path of the directory.
172 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800173 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Hai Shi0c4f0f32020-06-30 21:46:31 +0800174 self.addCleanup(os_helper.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000175 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000176
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300177
178class TestRmTree(BaseTest, unittest.TestCase):
179
Hynek Schlawack3b527782012-06-25 13:27:31 +0200180 def test_rmtree_works_on_bytes(self):
181 tmp = self.mkdtemp()
182 victim = os.path.join(tmp, 'killme')
183 os.mkdir(victim)
184 write_file(os.path.join(victim, 'somefile'), 'foo')
185 victim = os.fsencode(victim)
186 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700187 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200188
Hai Shi0c4f0f32020-06-30 21:46:31 +0800189 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200190 def test_rmtree_fails_on_symlink(self):
191 tmp = self.mkdtemp()
192 dir_ = os.path.join(tmp, 'dir')
193 os.mkdir(dir_)
194 link = os.path.join(tmp, 'link')
195 os.symlink(dir_, link)
196 self.assertRaises(OSError, shutil.rmtree, link)
197 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100198 self.assertTrue(os.path.lexists(link))
199 errors = []
200 def onerror(*args):
201 errors.append(args)
202 shutil.rmtree(link, onerror=onerror)
203 self.assertEqual(len(errors), 1)
204 self.assertIs(errors[0][0], os.path.islink)
205 self.assertEqual(errors[0][1], link)
206 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200207
Hai Shi0c4f0f32020-06-30 21:46:31 +0800208 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200209 def test_rmtree_works_on_symlinks(self):
210 tmp = self.mkdtemp()
211 dir1 = os.path.join(tmp, 'dir1')
212 dir2 = os.path.join(dir1, 'dir2')
213 dir3 = os.path.join(tmp, 'dir3')
214 for d in dir1, dir2, dir3:
215 os.mkdir(d)
216 file1 = os.path.join(tmp, 'file1')
217 write_file(file1, 'foo')
218 link1 = os.path.join(dir1, 'link1')
219 os.symlink(dir2, link1)
220 link2 = os.path.join(dir1, 'link2')
221 os.symlink(dir3, link2)
222 link3 = os.path.join(dir1, 'link3')
223 os.symlink(file1, link3)
224 # make sure symlinks are removed but not followed
225 shutil.rmtree(dir1)
226 self.assertFalse(os.path.exists(dir1))
227 self.assertTrue(os.path.exists(dir3))
228 self.assertTrue(os.path.exists(file1))
229
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700230 @unittest.skipUnless(_winapi, 'only relevant on Windows')
231 def test_rmtree_fails_on_junctions(self):
232 tmp = self.mkdtemp()
233 dir_ = os.path.join(tmp, 'dir')
234 os.mkdir(dir_)
235 link = os.path.join(tmp, 'link')
236 _winapi.CreateJunction(dir_, link)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800237 self.addCleanup(os_helper.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700238 self.assertRaises(OSError, shutil.rmtree, link)
239 self.assertTrue(os.path.exists(dir_))
240 self.assertTrue(os.path.lexists(link))
241 errors = []
242 def onerror(*args):
243 errors.append(args)
244 shutil.rmtree(link, onerror=onerror)
245 self.assertEqual(len(errors), 1)
246 self.assertIs(errors[0][0], os.path.islink)
247 self.assertEqual(errors[0][1], link)
248 self.assertIsInstance(errors[0][2][1], OSError)
249
250 @unittest.skipUnless(_winapi, 'only relevant on Windows')
251 def test_rmtree_works_on_junctions(self):
252 tmp = self.mkdtemp()
253 dir1 = os.path.join(tmp, 'dir1')
254 dir2 = os.path.join(dir1, 'dir2')
255 dir3 = os.path.join(tmp, 'dir3')
256 for d in dir1, dir2, dir3:
257 os.mkdir(d)
258 file1 = os.path.join(tmp, 'file1')
259 write_file(file1, 'foo')
260 link1 = os.path.join(dir1, 'link1')
261 _winapi.CreateJunction(dir2, link1)
262 link2 = os.path.join(dir1, 'link2')
263 _winapi.CreateJunction(dir3, link2)
264 link3 = os.path.join(dir1, 'link3')
265 _winapi.CreateJunction(file1, link3)
266 # make sure junctions are removed but not followed
267 shutil.rmtree(dir1)
268 self.assertFalse(os.path.exists(dir1))
269 self.assertTrue(os.path.exists(dir3))
270 self.assertTrue(os.path.exists(file1))
271
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000272 def test_rmtree_errors(self):
273 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800274 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100275 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
276 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100277 shutil.rmtree(filename, ignore_errors=True)
278
279 # existing file
280 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100281 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100282 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100283 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100284 shutil.rmtree(filename)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100285 self.assertEqual(cm.exception.filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100286 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100287 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100288 shutil.rmtree(filename, ignore_errors=True)
289 self.assertTrue(os.path.exists(filename))
290 errors = []
291 def onerror(*args):
292 errors.append(args)
293 shutil.rmtree(filename, onerror=onerror)
294 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200295 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100296 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100297 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100298 self.assertEqual(errors[0][2][1].filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100299 self.assertIs(errors[1][0], os.rmdir)
300 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100301 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100302 self.assertEqual(errors[1][2][1].filename, filename)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000303
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000304
Serhiy Storchaka43767632013-11-03 21:31:38 +0200305 @unittest.skipIf(sys.platform[:6] == 'cygwin',
306 "This test can't be run on Cygwin (issue #1071513).")
307 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
308 "This test can't be run reliably as root (issue #1076467).")
309 def test_on_error(self):
310 self.errorState = 0
311 os.mkdir(TESTFN)
312 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200313
Serhiy Storchaka43767632013-11-03 21:31:38 +0200314 self.child_file_path = os.path.join(TESTFN, 'a')
315 self.child_dir_path = os.path.join(TESTFN, 'b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800316 os_helper.create_empty_file(self.child_file_path)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200317 os.mkdir(self.child_dir_path)
318 old_dir_mode = os.stat(TESTFN).st_mode
319 old_child_file_mode = os.stat(self.child_file_path).st_mode
320 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
321 # Make unwritable.
322 new_mode = stat.S_IREAD|stat.S_IEXEC
323 os.chmod(self.child_file_path, new_mode)
324 os.chmod(self.child_dir_path, new_mode)
325 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000326
Serhiy Storchaka43767632013-11-03 21:31:38 +0200327 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
328 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
329 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200330
Serhiy Storchaka43767632013-11-03 21:31:38 +0200331 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
332 # Test whether onerror has actually been called.
333 self.assertEqual(self.errorState, 3,
334 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000335
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000336 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000337 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200338 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000339 # This function is run when shutil.rmtree fails.
340 # 99.9% of the time it initially fails to remove
341 # a file in the directory, so the first time through
342 # func is os.remove.
343 # However, some Linux machines running ZFS on
344 # FUSE experienced a failure earlier in the process
345 # at os.listdir. The first failure may legally
346 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200347 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200348 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200349 self.assertEqual(arg, self.child_file_path)
350 elif func is os.rmdir:
351 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000352 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200353 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200354 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000355 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200356 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000357 else:
358 self.assertEqual(func, os.rmdir)
359 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000360 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200361 self.errorState = 3
362
363 def test_rmtree_does_not_choke_on_failing_lstat(self):
364 try:
365 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200366 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200367 if fn != TESTFN:
368 raise OSError()
369 else:
370 return orig_lstat(fn)
371 os.lstat = raiser
372
373 os.mkdir(TESTFN)
374 write_file((TESTFN, 'foo'), 'foo')
375 shutil.rmtree(TESTFN)
376 finally:
377 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000378
Hynek Schlawack2100b422012-06-23 20:28:32 +0200379 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200380 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
381 os.supports_dir_fd and
382 os.listdir in os.supports_fd and
383 os.stat in os.supports_follow_symlinks)
384 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200385 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000386 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200387 tmp_dir = self.mkdtemp()
388 d = os.path.join(tmp_dir, 'a')
389 os.mkdir(d)
390 try:
391 real_rmtree = shutil._rmtree_safe_fd
392 class Called(Exception): pass
393 def _raiser(*args, **kwargs):
394 raise Called
395 shutil._rmtree_safe_fd = _raiser
396 self.assertRaises(Called, shutil.rmtree, d)
397 finally:
398 shutil._rmtree_safe_fd = real_rmtree
399 else:
400 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000401 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200402
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000403 def test_rmtree_dont_delete_file(self):
404 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800405 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200406 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200407 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000408 os.remove(path)
409
Hai Shi0c4f0f32020-06-30 21:46:31 +0800410 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300411 def test_rmtree_on_symlink(self):
412 # bug 1669.
413 os.mkdir(TESTFN)
414 try:
415 src = os.path.join(TESTFN, 'cheese')
416 dst = os.path.join(TESTFN, 'shop')
417 os.mkdir(src)
418 os.symlink(src, dst)
419 self.assertRaises(OSError, shutil.rmtree, dst)
420 shutil.rmtree(dst, ignore_errors=True)
421 finally:
422 shutil.rmtree(TESTFN, ignore_errors=True)
423
424 @unittest.skipUnless(_winapi, 'only relevant on Windows')
425 def test_rmtree_on_junction(self):
426 os.mkdir(TESTFN)
427 try:
428 src = os.path.join(TESTFN, 'cheese')
429 dst = os.path.join(TESTFN, 'shop')
430 os.mkdir(src)
431 open(os.path.join(src, 'spam'), 'wb').close()
432 _winapi.CreateJunction(src, dst)
433 self.assertRaises(OSError, shutil.rmtree, dst)
434 shutil.rmtree(dst, ignore_errors=True)
435 finally:
436 shutil.rmtree(TESTFN, ignore_errors=True)
437
438
439class TestCopyTree(BaseTest, unittest.TestCase):
440
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000441 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800442 src_dir = self.mkdtemp()
443 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200444 self.addCleanup(shutil.rmtree, src_dir)
445 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
446 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000447 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200448 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000449
Éric Araujoa7e33a12011-08-12 19:51:35 +0200450 shutil.copytree(src_dir, dst_dir)
451 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
452 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
453 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
454 'test.txt')))
455 actual = read_file((dst_dir, 'test.txt'))
456 self.assertEqual(actual, '123')
457 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
458 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000459
jab9e00d9e2018-12-28 13:03:40 -0500460 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800461 src_dir = self.mkdtemp()
462 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500463 self.addCleanup(shutil.rmtree, src_dir)
464 self.addCleanup(shutil.rmtree, dst_dir)
465
466 write_file((src_dir, 'nonexisting.txt'), '123')
467 os.mkdir(os.path.join(src_dir, 'existing_dir'))
468 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
469 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
470 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
471
472 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
473 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
474 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
475 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
476 'existing.txt')))
477 actual = read_file((dst_dir, 'nonexisting.txt'))
478 self.assertEqual(actual, '123')
479 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
480 self.assertEqual(actual, 'has been replaced')
481
482 with self.assertRaises(FileExistsError):
483 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
484
Hai Shi0c4f0f32020-06-30 21:46:31 +0800485 @os_helper.skip_unless_symlink
Antoine Pitrou78091e62011-12-29 18:54:15 +0100486 def test_copytree_symlinks(self):
487 tmp_dir = self.mkdtemp()
488 src_dir = os.path.join(tmp_dir, 'src')
489 dst_dir = os.path.join(tmp_dir, 'dst')
490 sub_dir = os.path.join(src_dir, 'sub')
491 os.mkdir(src_dir)
492 os.mkdir(sub_dir)
493 write_file((src_dir, 'file.txt'), 'foo')
494 src_link = os.path.join(sub_dir, 'link')
495 dst_link = os.path.join(dst_dir, 'sub/link')
496 os.symlink(os.path.join(src_dir, 'file.txt'),
497 src_link)
498 if hasattr(os, 'lchmod'):
499 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
500 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
501 os.lchflags(src_link, stat.UF_NODUMP)
502 src_stat = os.lstat(src_link)
503 shutil.copytree(src_dir, dst_dir, symlinks=True)
504 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700505 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
506 # Bad practice to blindly strip the prefix as it may be required to
507 # correctly refer to the file, but we're only comparing paths here.
508 if os.name == 'nt' and actual.startswith('\\\\?\\'):
509 actual = actual[4:]
510 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100511 dst_stat = os.lstat(dst_link)
512 if hasattr(os, 'lchmod'):
513 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
514 if hasattr(os, 'lchflags'):
515 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
516
Georg Brandl2ee470f2008-07-16 12:55:28 +0000517 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000518 # creating data
519 join = os.path.join
520 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800521 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000522 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800523 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200524 write_file((src_dir, 'test.txt'), '123')
525 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000526 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200527 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000528 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200529 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000530 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
531 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200532 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
533 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000534
535 # testing glob-like patterns
536 try:
537 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
538 shutil.copytree(src_dir, dst_dir, ignore=patterns)
539 # checking the result: some elements should not be copied
540 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200541 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
542 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000543 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200544 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000545 try:
546 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
547 shutil.copytree(src_dir, dst_dir, ignore=patterns)
548 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200549 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
550 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
551 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000552 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200553 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000554
555 # testing callable-style
556 try:
557 def _filter(src, names):
558 res = []
559 for name in names:
560 path = os.path.join(src, name)
561
562 if (os.path.isdir(path) and
563 path.split()[-1] == 'subdir'):
564 res.append(name)
565 elif os.path.splitext(path)[-1] in ('.py'):
566 res.append(name)
567 return res
568
569 shutil.copytree(src_dir, dst_dir, ignore=_filter)
570
571 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200572 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
573 'test.py')))
574 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000575
576 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200577 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000578 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000579 shutil.rmtree(src_dir)
580 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000581
mbarkhau88704332020-01-24 14:51:16 +0000582 def test_copytree_arg_types_of_ignore(self):
583 join = os.path.join
584 exists = os.path.exists
585
586 tmp_dir = self.mkdtemp()
587 src_dir = join(tmp_dir, "source")
588
589 os.mkdir(join(src_dir))
590 os.mkdir(join(src_dir, 'test_dir'))
591 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
592 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
593
594 invokations = []
595
596 def _ignore(src, names):
597 invokations.append(src)
598 self.assertIsInstance(src, str)
599 self.assertIsInstance(names, list)
600 self.assertEqual(len(names), len(set(names)))
601 for name in names:
602 self.assertIsInstance(name, str)
603 return []
604
605 dst_dir = join(self.mkdtemp(), 'destination')
606 shutil.copytree(src_dir, dst_dir, ignore=_ignore)
607 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
608 'test.txt')))
609
610 dst_dir = join(self.mkdtemp(), 'destination')
611 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
612 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
613 'test.txt')))
614
615 dst_dir = join(self.mkdtemp(), 'destination')
616 src_dir_entry = list(os.scandir(tmp_dir))[0]
617 self.assertIsInstance(src_dir_entry, os.DirEntry)
618 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
619 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
620 'test.txt')))
621
622 self.assertEqual(len(invokations), 9)
623
Antoine Pitrouac601602013-08-16 19:35:02 +0200624 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800625 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200626 src_dir = os.path.join(tmp_dir, 'source')
627 os.mkdir(src_dir)
628 dst_dir = os.path.join(tmp_dir, 'destination')
629 self.addCleanup(shutil.rmtree, tmp_dir)
630
631 os.chmod(src_dir, 0o777)
632 write_file((src_dir, 'permissive.txt'), '123')
633 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
634 write_file((src_dir, 'restrictive.txt'), '456')
635 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
636 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800637 self.addCleanup(os_helper.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200638 os.chmod(restrictive_subdir, 0o600)
639
640 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400641 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
642 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200643 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400644 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200645 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
646 restrictive_subdir_dst = os.path.join(dst_dir,
647 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400648 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200649 os.stat(restrictive_subdir_dst).st_mode)
650
Berker Peksag884afd92014-12-10 02:50:32 +0200651 @unittest.mock.patch('os.chmod')
652 def test_copytree_winerror(self, mock_patch):
653 # When copying to VFAT, copystat() raises OSError. On Windows, the
654 # exception object has a meaningful 'winerror' attribute, but not
655 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800656 src_dir = self.mkdtemp()
657 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200658 self.addCleanup(shutil.rmtree, src_dir)
659 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
660
661 mock_patch.side_effect = PermissionError('ka-boom')
662 with self.assertRaises(shutil.Error):
663 shutil.copytree(src_dir, dst_dir)
664
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100665 def test_copytree_custom_copy_function(self):
666 # See: https://bugs.python.org/issue35648
667 def custom_cpfun(a, b):
668 flag.append(None)
669 self.assertIsInstance(a, str)
670 self.assertIsInstance(b, str)
671 self.assertEqual(a, os.path.join(src, 'foo'))
672 self.assertEqual(b, os.path.join(dst, 'foo'))
673
674 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800675 src = self.mkdtemp()
676 dst = tempfile.mktemp(dir=self.mkdtemp())
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100677 with open(os.path.join(src, 'foo'), 'w') as f:
678 f.close()
679 shutil.copytree(src, dst, copy_function=custom_cpfun)
680 self.assertEqual(len(flag), 1)
681
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300682 # Issue #3002: copyfile and copytree block indefinitely on named pipes
683 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800684 @os_helper.skip_unless_symlink
pxinwr6a273fd2020-11-29 06:06:36 +0800685 @unittest.skipIf(sys.platform == "vxworks",
686 "fifo requires special path on VxWorks")
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300687 def test_copytree_named_pipe(self):
688 os.mkdir(TESTFN)
689 try:
690 subdir = os.path.join(TESTFN, "subdir")
691 os.mkdir(subdir)
692 pipe = os.path.join(subdir, "mypipe")
693 try:
694 os.mkfifo(pipe)
695 except PermissionError as e:
696 self.skipTest('os.mkfifo(): %s' % e)
697 try:
698 shutil.copytree(TESTFN, TESTFN2)
699 except shutil.Error as e:
700 errors = e.args[0]
701 self.assertEqual(len(errors), 1)
702 src, dst, error_msg = errors[0]
703 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
704 else:
705 self.fail("shutil.Error should have been raised")
706 finally:
707 shutil.rmtree(TESTFN, ignore_errors=True)
708 shutil.rmtree(TESTFN2, ignore_errors=True)
709
710 def test_copytree_special_func(self):
711 src_dir = self.mkdtemp()
712 dst_dir = os.path.join(self.mkdtemp(), 'destination')
713 write_file((src_dir, 'test.txt'), '123')
714 os.mkdir(os.path.join(src_dir, 'test_dir'))
715 write_file((src_dir, 'test_dir', 'test.txt'), '456')
716
717 copied = []
718 def _copy(src, dst):
719 copied.append((src, dst))
720
721 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
722 self.assertEqual(len(copied), 2)
723
Hai Shi0c4f0f32020-06-30 21:46:31 +0800724 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300725 def test_copytree_dangling_symlinks(self):
726 # a dangling symlink raises an error at the end
727 src_dir = self.mkdtemp()
728 dst_dir = os.path.join(self.mkdtemp(), 'destination')
729 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
730 os.mkdir(os.path.join(src_dir, 'test_dir'))
731 write_file((src_dir, 'test_dir', 'test.txt'), '456')
732 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
733
734 # a dangling symlink is ignored with the proper flag
735 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
736 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
737 self.assertNotIn('test.txt', os.listdir(dst_dir))
738
739 # a dangling symlink is copied if symlinks=True
740 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
741 shutil.copytree(src_dir, dst_dir, symlinks=True)
742 self.assertIn('test.txt', os.listdir(dst_dir))
743
Hai Shi0c4f0f32020-06-30 21:46:31 +0800744 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300745 def test_copytree_symlink_dir(self):
746 src_dir = self.mkdtemp()
747 dst_dir = os.path.join(self.mkdtemp(), 'destination')
748 os.mkdir(os.path.join(src_dir, 'real_dir'))
749 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
750 pass
751 os.symlink(os.path.join(src_dir, 'real_dir'),
752 os.path.join(src_dir, 'link_to_dir'),
753 target_is_directory=True)
754
755 shutil.copytree(src_dir, dst_dir, symlinks=False)
756 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
757 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
758
759 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
760 shutil.copytree(src_dir, dst_dir, symlinks=True)
761 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
762 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
763
764 def test_copytree_return_value(self):
765 # copytree returns its destination path.
766 src_dir = self.mkdtemp()
767 dst_dir = src_dir + "dest"
768 self.addCleanup(shutil.rmtree, dst_dir, True)
769 src = os.path.join(src_dir, 'foo')
770 write_file(src, 'foo')
771 rv = shutil.copytree(src_dir, dst_dir)
772 self.assertEqual(['foo'], os.listdir(rv))
773
Bruno P. Kinoshita9bbcbc92019-11-27 14:10:37 +1300774 def test_copytree_subdirectory(self):
775 # copytree where dst is a subdirectory of src, see Issue 38688
776 base_dir = self.mkdtemp()
777 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
778 src_dir = os.path.join(base_dir, "t", "pg")
779 dst_dir = os.path.join(src_dir, "somevendor", "1.0")
780 os.makedirs(src_dir)
781 src = os.path.join(src_dir, 'pol')
782 write_file(src, 'pol')
783 rv = shutil.copytree(src_dir, dst_dir)
784 self.assertEqual(['pol'], os.listdir(rv))
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300785
786class TestCopy(BaseTest, unittest.TestCase):
787
788 ### shutil.copymode
789
Hai Shi0c4f0f32020-06-30 21:46:31 +0800790 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300791 def test_copymode_follow_symlinks(self):
792 tmp_dir = self.mkdtemp()
793 src = os.path.join(tmp_dir, 'foo')
794 dst = os.path.join(tmp_dir, 'bar')
795 src_link = os.path.join(tmp_dir, 'baz')
796 dst_link = os.path.join(tmp_dir, 'quux')
797 write_file(src, 'foo')
798 write_file(dst, 'foo')
799 os.symlink(src, src_link)
800 os.symlink(dst, dst_link)
801 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
802 # file to file
803 os.chmod(dst, stat.S_IRWXO)
804 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
805 shutil.copymode(src, dst)
806 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
807 # On Windows, os.chmod does not follow symlinks (issue #15411)
808 if os.name != 'nt':
809 # follow src link
810 os.chmod(dst, stat.S_IRWXO)
811 shutil.copymode(src_link, dst)
812 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
813 # follow dst link
814 os.chmod(dst, stat.S_IRWXO)
815 shutil.copymode(src, dst_link)
816 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
817 # follow both links
818 os.chmod(dst, stat.S_IRWXO)
819 shutil.copymode(src_link, dst_link)
820 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
821
822 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800823 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300824 def test_copymode_symlink_to_symlink(self):
825 tmp_dir = self.mkdtemp()
826 src = os.path.join(tmp_dir, 'foo')
827 dst = os.path.join(tmp_dir, 'bar')
828 src_link = os.path.join(tmp_dir, 'baz')
829 dst_link = os.path.join(tmp_dir, 'quux')
830 write_file(src, 'foo')
831 write_file(dst, 'foo')
832 os.symlink(src, src_link)
833 os.symlink(dst, dst_link)
834 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
835 os.chmod(dst, stat.S_IRWXU)
836 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
837 # link to link
838 os.lchmod(dst_link, stat.S_IRWXO)
839 shutil.copymode(src_link, dst_link, follow_symlinks=False)
840 self.assertEqual(os.lstat(src_link).st_mode,
841 os.lstat(dst_link).st_mode)
842 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
843 # src link - use chmod
844 os.lchmod(dst_link, stat.S_IRWXO)
845 shutil.copymode(src_link, dst, follow_symlinks=False)
846 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
847 # dst link - use chmod
848 os.lchmod(dst_link, stat.S_IRWXO)
849 shutil.copymode(src, dst_link, follow_symlinks=False)
850 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
851
852 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800853 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300854 def test_copymode_symlink_to_symlink_wo_lchmod(self):
855 tmp_dir = self.mkdtemp()
856 src = os.path.join(tmp_dir, 'foo')
857 dst = os.path.join(tmp_dir, 'bar')
858 src_link = os.path.join(tmp_dir, 'baz')
859 dst_link = os.path.join(tmp_dir, 'quux')
860 write_file(src, 'foo')
861 write_file(dst, 'foo')
862 os.symlink(src, src_link)
863 os.symlink(dst, dst_link)
864 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
865
866 ### shutil.copystat
867
Hai Shi0c4f0f32020-06-30 21:46:31 +0800868 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300869 def test_copystat_symlinks(self):
870 tmp_dir = self.mkdtemp()
871 src = os.path.join(tmp_dir, 'foo')
872 dst = os.path.join(tmp_dir, 'bar')
873 src_link = os.path.join(tmp_dir, 'baz')
874 dst_link = os.path.join(tmp_dir, 'qux')
875 write_file(src, 'foo')
876 src_stat = os.stat(src)
877 os.utime(src, (src_stat.st_atime,
878 src_stat.st_mtime - 42.0)) # ensure different mtimes
879 write_file(dst, 'bar')
880 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
881 os.symlink(src, src_link)
882 os.symlink(dst, dst_link)
883 if hasattr(os, 'lchmod'):
884 os.lchmod(src_link, stat.S_IRWXO)
885 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
886 os.lchflags(src_link, stat.UF_NODUMP)
887 src_link_stat = os.lstat(src_link)
888 # follow
889 if hasattr(os, 'lchmod'):
890 shutil.copystat(src_link, dst_link, follow_symlinks=True)
891 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
892 # don't follow
893 shutil.copystat(src_link, dst_link, follow_symlinks=False)
894 dst_link_stat = os.lstat(dst_link)
895 if os.utime in os.supports_follow_symlinks:
896 for attr in 'st_atime', 'st_mtime':
897 # The modification times may be truncated in the new file.
898 self.assertLessEqual(getattr(src_link_stat, attr),
899 getattr(dst_link_stat, attr) + 1)
900 if hasattr(os, 'lchmod'):
901 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
902 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
903 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
904 # tell to follow but dst is not a link
905 shutil.copystat(src_link, dst, follow_symlinks=False)
906 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
907 00000.1)
908
909 @unittest.skipUnless(hasattr(os, 'chflags') and
910 hasattr(errno, 'EOPNOTSUPP') and
911 hasattr(errno, 'ENOTSUP'),
912 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
913 def test_copystat_handles_harmless_chflags_errors(self):
914 tmpdir = self.mkdtemp()
915 file1 = os.path.join(tmpdir, 'file1')
916 file2 = os.path.join(tmpdir, 'file2')
917 write_file(file1, 'xxx')
918 write_file(file2, 'xxx')
919
920 def make_chflags_raiser(err):
921 ex = OSError()
922
923 def _chflags_raiser(path, flags, *, follow_symlinks=True):
924 ex.errno = err
925 raise ex
926 return _chflags_raiser
927 old_chflags = os.chflags
928 try:
929 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
930 os.chflags = make_chflags_raiser(err)
931 shutil.copystat(file1, file2)
932 # assert others errors break it
933 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
934 self.assertRaises(OSError, shutil.copystat, file1, file2)
935 finally:
936 os.chflags = old_chflags
937
938 ### shutil.copyxattr
939
Hai Shi0c4f0f32020-06-30 21:46:31 +0800940 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300941 def test_copyxattr(self):
942 tmp_dir = self.mkdtemp()
943 src = os.path.join(tmp_dir, 'foo')
944 write_file(src, 'foo')
945 dst = os.path.join(tmp_dir, 'bar')
946 write_file(dst, 'bar')
947
948 # no xattr == no problem
949 shutil._copyxattr(src, dst)
950 # common case
951 os.setxattr(src, 'user.foo', b'42')
952 os.setxattr(src, 'user.bar', b'43')
953 shutil._copyxattr(src, dst)
954 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
955 self.assertEqual(
956 os.getxattr(src, 'user.foo'),
957 os.getxattr(dst, 'user.foo'))
958 # check errors don't affect other attrs
959 os.remove(dst)
960 write_file(dst, 'bar')
961 os_error = OSError(errno.EPERM, 'EPERM')
962
963 def _raise_on_user_foo(fname, attr, val, **kwargs):
964 if attr == 'user.foo':
965 raise os_error
966 else:
967 orig_setxattr(fname, attr, val, **kwargs)
968 try:
969 orig_setxattr = os.setxattr
970 os.setxattr = _raise_on_user_foo
971 shutil._copyxattr(src, dst)
972 self.assertIn('user.bar', os.listxattr(dst))
973 finally:
974 os.setxattr = orig_setxattr
975 # the source filesystem not supporting xattrs should be ok, too.
976 def _raise_on_src(fname, *, follow_symlinks=True):
977 if fname == src:
978 raise OSError(errno.ENOTSUP, 'Operation not supported')
979 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
980 try:
981 orig_listxattr = os.listxattr
982 os.listxattr = _raise_on_src
983 shutil._copyxattr(src, dst)
984 finally:
985 os.listxattr = orig_listxattr
986
987 # test that shutil.copystat copies xattrs
988 src = os.path.join(tmp_dir, 'the_original')
989 srcro = os.path.join(tmp_dir, 'the_original_ro')
990 write_file(src, src)
991 write_file(srcro, srcro)
992 os.setxattr(src, 'user.the_value', b'fiddly')
993 os.setxattr(srcro, 'user.the_value', b'fiddly')
994 os.chmod(srcro, 0o444)
995 dst = os.path.join(tmp_dir, 'the_copy')
996 dstro = os.path.join(tmp_dir, 'the_copy_ro')
997 write_file(dst, dst)
998 write_file(dstro, dstro)
999 shutil.copystat(src, dst)
1000 shutil.copystat(srcro, dstro)
1001 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1002 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1003
Hai Shi0c4f0f32020-06-30 21:46:31 +08001004 @os_helper.skip_unless_symlink
1005 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001006 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
1007 'root privileges required')
1008 def test_copyxattr_symlinks(self):
1009 # On Linux, it's only possible to access non-user xattr for symlinks;
1010 # which in turn require root privileges. This test should be expanded
1011 # as soon as other platforms gain support for extended attributes.
1012 tmp_dir = self.mkdtemp()
1013 src = os.path.join(tmp_dir, 'foo')
1014 src_link = os.path.join(tmp_dir, 'baz')
1015 write_file(src, 'foo')
1016 os.symlink(src, src_link)
1017 os.setxattr(src, 'trusted.foo', b'42')
1018 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1019 dst = os.path.join(tmp_dir, 'bar')
1020 dst_link = os.path.join(tmp_dir, 'qux')
1021 write_file(dst, 'bar')
1022 os.symlink(dst, dst_link)
1023 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1024 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1025 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1026 shutil._copyxattr(src_link, dst, follow_symlinks=False)
1027 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1028
1029 ### shutil.copy
1030
1031 def _copy_file(self, method):
1032 fname = 'test.txt'
1033 tmpdir = self.mkdtemp()
1034 write_file((tmpdir, fname), 'xxx')
1035 file1 = os.path.join(tmpdir, fname)
1036 tmpdir2 = self.mkdtemp()
1037 method(file1, tmpdir2)
1038 file2 = os.path.join(tmpdir2, fname)
1039 return (file1, file2)
1040
1041 def test_copy(self):
1042 # Ensure that the copied file exists and has the same mode bits.
1043 file1, file2 = self._copy_file(shutil.copy)
1044 self.assertTrue(os.path.exists(file2))
1045 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1046
Hai Shi0c4f0f32020-06-30 21:46:31 +08001047 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001048 def test_copy_symlinks(self):
1049 tmp_dir = self.mkdtemp()
1050 src = os.path.join(tmp_dir, 'foo')
1051 dst = os.path.join(tmp_dir, 'bar')
1052 src_link = os.path.join(tmp_dir, 'baz')
1053 write_file(src, 'foo')
1054 os.symlink(src, src_link)
1055 if hasattr(os, 'lchmod'):
1056 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1057 # don't follow
1058 shutil.copy(src_link, dst, follow_symlinks=True)
1059 self.assertFalse(os.path.islink(dst))
1060 self.assertEqual(read_file(src), read_file(dst))
1061 os.remove(dst)
1062 # follow
1063 shutil.copy(src_link, dst, follow_symlinks=False)
1064 self.assertTrue(os.path.islink(dst))
1065 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1066 if hasattr(os, 'lchmod'):
1067 self.assertEqual(os.lstat(src_link).st_mode,
1068 os.lstat(dst).st_mode)
1069
1070 ### shutil.copy2
1071
1072 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1073 def test_copy2(self):
1074 # Ensure that the copied file exists and has the same mode and
1075 # modification time bits.
1076 file1, file2 = self._copy_file(shutil.copy2)
1077 self.assertTrue(os.path.exists(file2))
1078 file1_stat = os.stat(file1)
1079 file2_stat = os.stat(file2)
1080 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1081 for attr in 'st_atime', 'st_mtime':
1082 # The modification times may be truncated in the new file.
1083 self.assertLessEqual(getattr(file1_stat, attr),
1084 getattr(file2_stat, attr) + 1)
1085 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1086 self.assertEqual(getattr(file1_stat, 'st_flags'),
1087 getattr(file2_stat, 'st_flags'))
1088
Hai Shi0c4f0f32020-06-30 21:46:31 +08001089 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001090 def test_copy2_symlinks(self):
1091 tmp_dir = self.mkdtemp()
1092 src = os.path.join(tmp_dir, 'foo')
1093 dst = os.path.join(tmp_dir, 'bar')
1094 src_link = os.path.join(tmp_dir, 'baz')
1095 write_file(src, 'foo')
1096 os.symlink(src, src_link)
1097 if hasattr(os, 'lchmod'):
1098 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1099 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1100 os.lchflags(src_link, stat.UF_NODUMP)
1101 src_stat = os.stat(src)
1102 src_link_stat = os.lstat(src_link)
1103 # follow
1104 shutil.copy2(src_link, dst, follow_symlinks=True)
1105 self.assertFalse(os.path.islink(dst))
1106 self.assertEqual(read_file(src), read_file(dst))
1107 os.remove(dst)
1108 # don't follow
1109 shutil.copy2(src_link, dst, follow_symlinks=False)
1110 self.assertTrue(os.path.islink(dst))
1111 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1112 dst_stat = os.lstat(dst)
1113 if os.utime in os.supports_follow_symlinks:
1114 for attr in 'st_atime', 'st_mtime':
1115 # The modification times may be truncated in the new file.
1116 self.assertLessEqual(getattr(src_link_stat, attr),
1117 getattr(dst_stat, attr) + 1)
1118 if hasattr(os, 'lchmod'):
1119 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1120 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1121 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1122 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1123
Hai Shi0c4f0f32020-06-30 21:46:31 +08001124 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001125 def test_copy2_xattr(self):
1126 tmp_dir = self.mkdtemp()
1127 src = os.path.join(tmp_dir, 'foo')
1128 dst = os.path.join(tmp_dir, 'bar')
1129 write_file(src, 'foo')
1130 os.setxattr(src, 'user.foo', b'42')
1131 shutil.copy2(src, dst)
1132 self.assertEqual(
1133 os.getxattr(src, 'user.foo'),
1134 os.getxattr(dst, 'user.foo'))
1135 os.remove(dst)
1136
1137 def test_copy_return_value(self):
1138 # copy and copy2 both return their destination path.
1139 for fn in (shutil.copy, shutil.copy2):
1140 src_dir = self.mkdtemp()
1141 dst_dir = self.mkdtemp()
1142 src = os.path.join(src_dir, 'foo')
1143 write_file(src, 'foo')
1144 rv = fn(src, dst_dir)
1145 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1146 rv = fn(src, os.path.join(dst_dir, 'bar'))
1147 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1148
1149 ### shutil.copyfile
1150
Hai Shi0c4f0f32020-06-30 21:46:31 +08001151 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001152 def test_copyfile_symlinks(self):
1153 tmp_dir = self.mkdtemp()
1154 src = os.path.join(tmp_dir, 'src')
1155 dst = os.path.join(tmp_dir, 'dst')
1156 dst_link = os.path.join(tmp_dir, 'dst_link')
1157 link = os.path.join(tmp_dir, 'link')
1158 write_file(src, 'foo')
1159 os.symlink(src, link)
1160 # don't follow
1161 shutil.copyfile(link, dst_link, follow_symlinks=False)
1162 self.assertTrue(os.path.islink(dst_link))
1163 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1164 # follow
1165 shutil.copyfile(link, dst)
1166 self.assertFalse(os.path.islink(dst))
1167
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001168 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001169 def test_dont_copy_file_onto_link_to_itself(self):
1170 # bug 851123.
1171 os.mkdir(TESTFN)
1172 src = os.path.join(TESTFN, 'cheese')
1173 dst = os.path.join(TESTFN, 'shop')
1174 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001175 with open(src, 'w') as f:
1176 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001177 try:
1178 os.link(src, dst)
1179 except PermissionError as e:
1180 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001181 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001182 with open(src, 'r') as f:
1183 self.assertEqual(f.read(), 'cheddar')
1184 os.remove(dst)
1185 finally:
1186 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001187
Hai Shi0c4f0f32020-06-30 21:46:31 +08001188 @os_helper.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001189 def test_dont_copy_file_onto_symlink_to_itself(self):
1190 # bug 851123.
1191 os.mkdir(TESTFN)
1192 src = os.path.join(TESTFN, 'cheese')
1193 dst = os.path.join(TESTFN, 'shop')
1194 try:
1195 with open(src, 'w') as f:
1196 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001197 # Using `src` here would mean we end up with a symlink pointing
1198 # to TESTFN/TESTFN/cheese, while it should point at
1199 # TESTFN/cheese.
1200 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001201 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001202 with open(src, 'r') as f:
1203 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001204 os.remove(dst)
1205 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001206 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001207
Serhiy Storchaka43767632013-11-03 21:31:38 +02001208 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1209 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
pxinwr6a273fd2020-11-29 06:06:36 +08001210 @unittest.skipIf(sys.platform == "vxworks",
1211 "fifo requires special path on VxWorks")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001212 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001213 try:
1214 os.mkfifo(TESTFN)
1215 except PermissionError as e:
1216 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001217 try:
1218 self.assertRaises(shutil.SpecialFileError,
1219 shutil.copyfile, TESTFN, TESTFN2)
1220 self.assertRaises(shutil.SpecialFileError,
1221 shutil.copyfile, __file__, TESTFN)
1222 finally:
1223 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001224
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001225 def test_copyfile_return_value(self):
1226 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001227 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001228 dst_dir = self.mkdtemp()
1229 dst_file = os.path.join(dst_dir, 'bar')
1230 src_file = os.path.join(src_dir, 'foo')
1231 write_file(src_file, 'foo')
1232 rv = shutil.copyfile(src_file, dst_file)
1233 self.assertTrue(os.path.exists(rv))
1234 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001235
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001236 def test_copyfile_same_file(self):
1237 # copyfile() should raise SameFileError if the source and destination
1238 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001239 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001240 src_file = os.path.join(src_dir, 'foo')
1241 write_file(src_file, 'foo')
1242 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1243 # But Error should work too, to stay backward compatible.
1244 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1245 # Make sure file is not corrupted.
1246 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001247
Tarek Ziadéfb437512010-04-20 08:57:33 +00001248
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001249class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001250
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001251 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001252
Hai Shia3ec3ad2020-05-19 06:02:57 +08001253 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001254 def test_make_tarball(self):
1255 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001256 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001257
1258 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001259 # force shutil to create the directory
1260 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001261 # working with relative paths
1262 work_dir = os.path.dirname(tmpdir2)
1263 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001264
Hai Shi0c4f0f32020-06-30 21:46:31 +08001265 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001266 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001267 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001268
1269 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001270 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001271 self.assertTrue(os.path.isfile(tarball))
1272 self.assertTrue(tarfile.is_tarfile(tarball))
1273 with tarfile.open(tarball, 'r:gz') as tf:
1274 self.assertCountEqual(tf.getnames(),
1275 ['.', './sub', './sub2',
1276 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001277
1278 # trying an uncompressed one
Hai Shi0c4f0f32020-06-30 21:46:31 +08001279 with os_helper.change_cwd(work_dir):
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001280 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001281 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001282 self.assertTrue(os.path.isfile(tarball))
1283 self.assertTrue(tarfile.is_tarfile(tarball))
1284 with tarfile.open(tarball, 'r') as tf:
1285 self.assertCountEqual(tf.getnames(),
1286 ['.', './sub', './sub2',
1287 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001288
1289 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001290 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001291 names = tar.getnames()
1292 names.sort()
1293 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001294
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001295 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001296 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001297 root_dir = self.mkdtemp()
1298 dist = os.path.join(root_dir, base_dir)
1299 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001300 write_file((dist, 'file1'), 'xxx')
1301 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001302 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001303 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001304 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001305 if base_dir:
1306 write_file((root_dir, 'outer'), 'xxx')
1307 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001308
Hai Shia3ec3ad2020-05-19 06:02:57 +08001309 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001310 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001311 'Need the tar command to run')
1312 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001313 root_dir, base_dir = self._create_files()
1314 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001315 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001316
1317 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001318 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001319 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001320
1321 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001322 tarball2 = os.path.join(root_dir, 'archive2.tar')
1323 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001324 subprocess.check_call(tar_cmd, cwd=root_dir,
1325 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001326
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001327 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001328 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001329 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001330
1331 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001332 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1333 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001334 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001335
1336 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001337 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1338 dry_run=True)
1339 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001340 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001341
Hai Shia3ec3ad2020-05-19 06:02:57 +08001342 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001343 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001344 # creating something to zip
1345 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001346
1347 tmpdir2 = self.mkdtemp()
1348 # force shutil to create the directory
1349 os.rmdir(tmpdir2)
1350 # working with relative paths
1351 work_dir = os.path.dirname(tmpdir2)
1352 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001353
Hai Shi0c4f0f32020-06-30 21:46:31 +08001354 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001355 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001356 res = make_archive(rel_base_name, 'zip', root_dir)
1357
1358 self.assertEqual(res, base_name + '.zip')
1359 self.assertTrue(os.path.isfile(res))
1360 self.assertTrue(zipfile.is_zipfile(res))
1361 with zipfile.ZipFile(res) as zf:
1362 self.assertCountEqual(zf.namelist(),
1363 ['dist/', 'dist/sub/', 'dist/sub2/',
1364 'dist/file1', 'dist/file2', 'dist/sub/file3',
1365 'outer'])
1366
Hai Shi0c4f0f32020-06-30 21:46:31 +08001367 with os_helper.change_cwd(work_dir):
Serhiy Storchaka666de772016-10-23 15:55:09 +03001368 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001369 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001370
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001371 self.assertEqual(res, base_name + '.zip')
1372 self.assertTrue(os.path.isfile(res))
1373 self.assertTrue(zipfile.is_zipfile(res))
1374 with zipfile.ZipFile(res) as zf:
1375 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001376 ['dist/', 'dist/sub/', 'dist/sub2/',
1377 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001378
Hai Shia3ec3ad2020-05-19 06:02:57 +08001379 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001380 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001381 'Need the zip command to run')
1382 def test_zipfile_vs_zip(self):
1383 root_dir, base_dir = self._create_files()
1384 base_name = os.path.join(self.mkdtemp(), 'archive')
1385 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1386
1387 # check if ZIP file was created
1388 self.assertEqual(archive, base_name + '.zip')
1389 self.assertTrue(os.path.isfile(archive))
1390
1391 # now create another ZIP file using `zip`
1392 archive2 = os.path.join(root_dir, 'archive2.zip')
1393 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001394 subprocess.check_call(zip_cmd, cwd=root_dir,
1395 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001396
1397 self.assertTrue(os.path.isfile(archive2))
1398 # let's compare both ZIP files
1399 with zipfile.ZipFile(archive) as zf:
1400 names = zf.namelist()
1401 with zipfile.ZipFile(archive2) as zf:
1402 names2 = zf.namelist()
1403 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001404
Hai Shia3ec3ad2020-05-19 06:02:57 +08001405 @support.requires_zlib()
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001406 @unittest.skipUnless(shutil.which('unzip'),
1407 'Need the unzip command to run')
1408 def test_unzip_zipfile(self):
1409 root_dir, base_dir = self._create_files()
1410 base_name = os.path.join(self.mkdtemp(), 'archive')
1411 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1412
1413 # check if ZIP file was created
1414 self.assertEqual(archive, base_name + '.zip')
1415 self.assertTrue(os.path.isfile(archive))
1416
1417 # now check the ZIP file using `unzip -t`
1418 zip_cmd = ['unzip', '-t', archive]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001419 with os_helper.change_cwd(root_dir):
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001420 try:
1421 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1422 except subprocess.CalledProcessError as exc:
1423 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001424 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001425 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001426 msg = "{}\n\n**Unzip Output**\n{}"
1427 self.fail(msg.format(exc, details))
1428
Tarek Ziadé396fad72010-02-23 05:30:31 +00001429 def test_make_archive(self):
1430 tmpdir = self.mkdtemp()
1431 base_name = os.path.join(tmpdir, 'archive')
1432 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1433
Hai Shia3ec3ad2020-05-19 06:02:57 +08001434 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001435 def test_make_archive_owner_group(self):
1436 # testing make_archive with owner and group, with various combinations
1437 # this works even if there's not gid/uid support
1438 if UID_GID_SUPPORT:
1439 group = grp.getgrgid(0)[0]
1440 owner = pwd.getpwuid(0)[0]
1441 else:
1442 group = owner = 'root'
1443
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001444 root_dir, base_dir = self._create_files()
1445 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001446 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1447 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001448 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001449
1450 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001451 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001452
1453 res = make_archive(base_name, 'tar', root_dir, base_dir,
1454 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001455 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001456
1457 res = make_archive(base_name, 'tar', root_dir, base_dir,
1458 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001459 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001460
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001461
Hai Shia3ec3ad2020-05-19 06:02:57 +08001462 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001463 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1464 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001465 root_dir, base_dir = self._create_files()
1466 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001467 group = grp.getgrgid(0)[0]
1468 owner = pwd.getpwuid(0)[0]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001469 with os_helper.change_cwd(root_dir):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001470 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1471 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001472
1473 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001474 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001475
1476 # now checks the rights
1477 archive = tarfile.open(archive_name)
1478 try:
1479 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001480 self.assertEqual(member.uid, 0)
1481 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001482 finally:
1483 archive.close()
1484
1485 def test_make_archive_cwd(self):
1486 current_dir = os.getcwd()
1487 def _breaks(*args, **kw):
1488 raise RuntimeError()
1489
1490 register_archive_format('xxx', _breaks, [], 'xxx file')
1491 try:
1492 try:
1493 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1494 except Exception:
1495 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001496 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001497 finally:
1498 unregister_archive_format('xxx')
1499
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001500 def test_make_tarfile_in_curdir(self):
1501 # Issue #21280
1502 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001503 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001504 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1505 self.assertTrue(os.path.isfile('test.tar'))
1506
Hai Shia3ec3ad2020-05-19 06:02:57 +08001507 @support.requires_zlib()
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001508 def test_make_zipfile_in_curdir(self):
1509 # Issue #21280
1510 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001511 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001512 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1513 self.assertTrue(os.path.isfile('test.zip'))
1514
Tarek Ziadé396fad72010-02-23 05:30:31 +00001515 def test_register_archive_format(self):
1516
1517 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1518 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1519 1)
1520 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1521 [(1, 2), (1, 2, 3)])
1522
1523 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1524 formats = [name for name, params in get_archive_formats()]
1525 self.assertIn('xxx', formats)
1526
1527 unregister_archive_format('xxx')
1528 formats = [name for name, params in get_archive_formats()]
1529 self.assertNotIn('xxx', formats)
1530
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001531 ### shutil.unpack_archive
1532
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001533 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001534 self.check_unpack_archive_with_converter(format, lambda path: path)
1535 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001536 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001537
1538 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001539 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001540 expected = rlistdir(root_dir)
1541 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001542
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001543 base_name = os.path.join(self.mkdtemp(), 'archive')
1544 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001545
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001546 # let's try to unpack it now
1547 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001548 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001549 self.assertEqual(rlistdir(tmpdir2), expected)
1550
1551 # and again, this time with the format specified
1552 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001553 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001554 self.assertEqual(rlistdir(tmpdir3), expected)
1555
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001556 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1557 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001558
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001559 def test_unpack_archive_tar(self):
1560 self.check_unpack_archive('tar')
1561
Hai Shia3ec3ad2020-05-19 06:02:57 +08001562 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001563 def test_unpack_archive_gztar(self):
1564 self.check_unpack_archive('gztar')
1565
Hai Shia3ec3ad2020-05-19 06:02:57 +08001566 @support.requires_bz2()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001567 def test_unpack_archive_bztar(self):
1568 self.check_unpack_archive('bztar')
1569
Hai Shia3ec3ad2020-05-19 06:02:57 +08001570 @support.requires_lzma()
Michael Feltef110b12019-02-18 12:02:44 +01001571 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001572 def test_unpack_archive_xztar(self):
1573 self.check_unpack_archive('xztar')
1574
Hai Shia3ec3ad2020-05-19 06:02:57 +08001575 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001576 def test_unpack_archive_zip(self):
1577 self.check_unpack_archive('zip')
1578
Martin Pantereb995702016-07-28 01:11:04 +00001579 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001580
1581 formats = get_unpack_formats()
1582
1583 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001584 self.assertEqual(extra, 1)
1585 self.assertEqual(filename, 'stuff.boo')
1586 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001587
1588 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1589 unpack_archive('stuff.boo', 'xx')
1590
1591 # trying to register a .boo unpacker again
1592 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1593 ['.boo'], _boo)
1594
1595 # should work now
1596 unregister_unpack_format('Boo')
1597 register_unpack_format('Boo2', ['.boo'], _boo)
1598 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1599 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1600
1601 # let's leave a clean state
1602 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001603 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001604
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001605
1606class TestMisc(BaseTest, unittest.TestCase):
1607
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001608 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1609 "disk_usage not available on this platform")
1610 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001611 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001612 for attr in ('total', 'used', 'free'):
1613 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001614 self.assertGreater(usage.total, 0)
1615 self.assertGreater(usage.used, 0)
1616 self.assertGreaterEqual(usage.free, 0)
1617 self.assertGreaterEqual(usage.total, usage.used)
1618 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001619
Victor Stinnerdc525f42018-12-11 12:05:21 +01001620 # bpo-32557: Check that disk_usage() also accepts a filename
1621 shutil.disk_usage(__file__)
1622
Sandro Tosid902a142011-08-22 23:28:27 +02001623 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1624 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1625 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001626 dirname = self.mkdtemp()
1627 filename = tempfile.mktemp(dir=dirname)
1628 write_file(filename, 'testing chown function')
1629
1630 with self.assertRaises(ValueError):
1631 shutil.chown(filename)
1632
1633 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001634 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001635
1636 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001637 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001638
1639 with self.assertRaises(TypeError):
1640 shutil.chown(filename, b'spam')
1641
1642 with self.assertRaises(TypeError):
1643 shutil.chown(filename, 3.14)
1644
1645 uid = os.getuid()
1646 gid = os.getgid()
1647
1648 def check_chown(path, uid=None, gid=None):
1649 s = os.stat(filename)
1650 if uid is not None:
1651 self.assertEqual(uid, s.st_uid)
1652 if gid is not None:
1653 self.assertEqual(gid, s.st_gid)
1654
1655 shutil.chown(filename, uid, gid)
1656 check_chown(filename, uid, gid)
1657 shutil.chown(filename, uid)
1658 check_chown(filename, uid)
1659 shutil.chown(filename, user=uid)
1660 check_chown(filename, uid)
1661 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001662 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001663
1664 shutil.chown(dirname, uid, gid)
1665 check_chown(dirname, uid, gid)
1666 shutil.chown(dirname, uid)
1667 check_chown(dirname, uid)
1668 shutil.chown(dirname, user=uid)
1669 check_chown(dirname, uid)
1670 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001671 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001672
Matthias Braun52268942020-03-17 09:51:44 -07001673 try:
1674 user = pwd.getpwuid(uid)[0]
1675 group = grp.getgrgid(gid)[0]
1676 except KeyError:
1677 # On some systems uid/gid cannot be resolved.
1678 pass
1679 else:
1680 shutil.chown(filename, user, group)
1681 check_chown(filename, uid, gid)
1682 shutil.chown(dirname, user, group)
1683 check_chown(dirname, uid, gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001684
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001685
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001686class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001687
1688 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001689 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001690 # Give the temp_file an ".exe" suffix for all.
1691 # It's needed on Windows and not harmful on other platforms.
1692 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001693 prefix="Tmp",
1694 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001695 os.chmod(self.temp_file.name, stat.S_IXUSR)
1696 self.addCleanup(self.temp_file.close)
1697 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001698 self.env_path = self.dir
1699 self.curdir = os.curdir
1700 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001701
1702 def test_basic(self):
1703 # Given an EXE in a directory, it should be returned.
1704 rv = shutil.which(self.file, path=self.dir)
1705 self.assertEqual(rv, self.temp_file.name)
1706
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001707 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001708 # When given the fully qualified path to an executable that exists,
1709 # it should be returned.
1710 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001711 self.assertEqual(rv, self.temp_file.name)
1712
1713 def test_relative_cmd(self):
1714 # When given the relative path with a directory part to an executable
1715 # that exists, it should be returned.
1716 base_dir, tail_dir = os.path.split(self.dir)
1717 relpath = os.path.join(tail_dir, self.file)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001718 with os_helper.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001719 rv = shutil.which(relpath, path=self.temp_dir)
1720 self.assertEqual(rv, relpath)
1721 # But it shouldn't be searched in PATH directories (issue #16957).
Hai Shi0c4f0f32020-06-30 21:46:31 +08001722 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001723 rv = shutil.which(relpath, path=base_dir)
1724 self.assertIsNone(rv)
1725
1726 def test_cwd(self):
1727 # Issue #16957
1728 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001729 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001730 rv = shutil.which(self.file, path=base_dir)
1731 if sys.platform == "win32":
1732 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001733 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001734 else:
1735 # Other platforms: shouldn't match in the current directory.
1736 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001737
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001738 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1739 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001740 def test_non_matching_mode(self):
1741 # Set the file read-only and ask for writeable files.
1742 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001743 if os.access(self.temp_file.name, os.W_OK):
1744 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001745 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1746 self.assertIsNone(rv)
1747
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001748 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001749 base_dir, tail_dir = os.path.split(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001750 with os_helper.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001751 rv = shutil.which(self.file, path=tail_dir)
1752 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001753
Brian Curtinc57a3452012-06-22 16:00:30 -05001754 def test_nonexistent_file(self):
1755 # Return None when no matching executable file is found on the path.
1756 rv = shutil.which("foo.exe", path=self.dir)
1757 self.assertIsNone(rv)
1758
1759 @unittest.skipUnless(sys.platform == "win32",
1760 "pathext check is Windows-only")
1761 def test_pathext_checking(self):
1762 # Ask for the file without the ".exe" extension, then ensure that
1763 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001764 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001765 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001766
Barry Warsaw618738b2013-04-16 11:05:03 -04001767 def test_environ_path(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001768 with os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001769 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001770 rv = shutil.which(self.file)
1771 self.assertEqual(rv, self.temp_file.name)
1772
Victor Stinner228a3c92019-04-17 16:26:36 +02001773 def test_environ_path_empty(self):
1774 # PATH='': no match
Hai Shi0c4f0f32020-06-30 21:46:31 +08001775 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001776 env['PATH'] = ''
1777 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1778 create=True), \
1779 support.swap_attr(os, 'defpath', self.dir), \
Hai Shi0c4f0f32020-06-30 21:46:31 +08001780 os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001781 rv = shutil.which(self.file)
1782 self.assertIsNone(rv)
1783
1784 def test_environ_path_cwd(self):
1785 expected_cwd = os.path.basename(self.temp_file.name)
1786 if sys.platform == "win32":
1787 curdir = os.curdir
1788 if isinstance(expected_cwd, bytes):
1789 curdir = os.fsencode(curdir)
1790 expected_cwd = os.path.join(curdir, expected_cwd)
1791
1792 # PATH=':': explicitly looks in the current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001793 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001794 env['PATH'] = os.pathsep
1795 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1796 create=True), \
1797 support.swap_attr(os, 'defpath', self.dir):
1798 rv = shutil.which(self.file)
1799 self.assertIsNone(rv)
1800
1801 # look in current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001802 with os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001803 rv = shutil.which(self.file)
1804 self.assertEqual(rv, expected_cwd)
1805
1806 def test_environ_path_missing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001807 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001808 env.pop('PATH', None)
1809
1810 # without confstr
1811 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1812 create=True), \
1813 support.swap_attr(os, 'defpath', self.dir):
1814 rv = shutil.which(self.file)
1815 self.assertEqual(rv, self.temp_file.name)
1816
1817 # with confstr
1818 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1819 create=True), \
1820 support.swap_attr(os, 'defpath', ''):
1821 rv = shutil.which(self.file)
1822 self.assertEqual(rv, self.temp_file.name)
1823
Barry Warsaw618738b2013-04-16 11:05:03 -04001824 def test_empty_path(self):
1825 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001826 with os_helper.change_cwd(path=self.dir), \
1827 os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001828 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001829 rv = shutil.which(self.file, path='')
1830 self.assertIsNone(rv)
1831
1832 def test_empty_path_no_PATH(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001833 with os_helper.EnvironmentVarGuard() as env:
Barry Warsaw618738b2013-04-16 11:05:03 -04001834 env.pop('PATH', None)
1835 rv = shutil.which(self.file)
1836 self.assertIsNone(rv)
1837
Victor Stinner228a3c92019-04-17 16:26:36 +02001838 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1839 def test_pathext(self):
1840 ext = ".xyz"
1841 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1842 prefix="Tmp2", suffix=ext)
1843 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1844 self.addCleanup(temp_filexyz.close)
1845
1846 # strip path and extension
1847 program = os.path.basename(temp_filexyz.name)
1848 program = os.path.splitext(program)[0]
1849
Hai Shi0c4f0f32020-06-30 21:46:31 +08001850 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001851 env['PATHEXT'] = ext
1852 rv = shutil.which(program, path=self.temp_dir)
1853 self.assertEqual(rv, temp_filexyz.name)
1854
Christopher Marchfelderda6f0982020-10-23 12:08:24 +02001855 # Issue 40592: See https://bugs.python.org/issue40592
1856 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1857 def test_pathext_with_empty_str(self):
1858 ext = ".xyz"
1859 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1860 prefix="Tmp2", suffix=ext)
1861 self.addCleanup(temp_filexyz.close)
1862
1863 # strip path and extension
1864 program = os.path.basename(temp_filexyz.name)
1865 program = os.path.splitext(program)[0]
1866
1867 with os_helper.EnvironmentVarGuard() as env:
1868 env['PATHEXT'] = f"{ext};" # note the ;
1869 rv = shutil.which(program, path=self.temp_dir)
1870 self.assertEqual(rv, temp_filexyz.name)
1871
Brian Curtinc57a3452012-06-22 16:00:30 -05001872
Cheryl Sabella5680f652019-02-13 06:25:10 -05001873class TestWhichBytes(TestWhich):
1874 def setUp(self):
1875 TestWhich.setUp(self)
1876 self.dir = os.fsencode(self.dir)
1877 self.file = os.fsencode(self.file)
1878 self.temp_file.name = os.fsencode(self.temp_file.name)
1879 self.curdir = os.fsencode(self.curdir)
1880 self.ext = os.fsencode(self.ext)
1881
1882
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001883class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001884
1885 def setUp(self):
1886 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001887 self.src_dir = self.mkdtemp()
1888 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001889 self.src_file = os.path.join(self.src_dir, filename)
1890 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001891 with open(self.src_file, "wb") as f:
1892 f.write(b"spam")
1893
Christian Heimesada8c3b2008-03-18 18:26:33 +00001894 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001895 with open(src, "rb") as f:
1896 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001897 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001898 with open(real_dst, "rb") as f:
1899 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001900 self.assertFalse(os.path.exists(src))
1901
1902 def _check_move_dir(self, src, dst, real_dst):
1903 contents = sorted(os.listdir(src))
1904 shutil.move(src, dst)
1905 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1906 self.assertFalse(os.path.exists(src))
1907
1908 def test_move_file(self):
1909 # Move a file to another location on the same filesystem.
1910 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1911
1912 def test_move_file_to_dir(self):
1913 # Move a file inside an existing dir on the same filesystem.
1914 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1915
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001916 def test_move_file_to_dir_pathlike_src(self):
1917 # Move a pathlike file to another location on the same filesystem.
1918 src = pathlib.Path(self.src_file)
1919 self._check_move_file(src, self.dst_dir, self.dst_file)
1920
1921 def test_move_file_to_dir_pathlike_dst(self):
1922 # Move a file to another pathlike location on the same filesystem.
1923 dst = pathlib.Path(self.dst_dir)
1924 self._check_move_file(self.src_file, dst, self.dst_file)
1925
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001926 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001927 def test_move_file_other_fs(self):
1928 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001929 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001930
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001931 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001932 def test_move_file_to_dir_other_fs(self):
1933 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001934 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001935
1936 def test_move_dir(self):
1937 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001938 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001939 try:
1940 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1941 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001942 os_helper.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001943
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001944 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001945 def test_move_dir_other_fs(self):
1946 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001947 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001948
1949 def test_move_dir_to_dir(self):
1950 # Move a dir inside an existing dir on the same filesystem.
1951 self._check_move_dir(self.src_dir, self.dst_dir,
1952 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1953
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001954 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001955 def test_move_dir_to_dir_other_fs(self):
1956 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001957 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001958
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001959 def test_move_dir_sep_to_dir(self):
1960 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1961 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1962
1963 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1964 def test_move_dir_altsep_to_dir(self):
1965 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1966 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1967
Christian Heimesada8c3b2008-03-18 18:26:33 +00001968 def test_existing_file_inside_dest_dir(self):
1969 # A file with the same name inside the destination dir already exists.
1970 with open(self.dst_file, "wb"):
1971 pass
1972 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1973
1974 def test_dont_move_dir_in_itself(self):
1975 # Moving a dir inside itself raises an Error.
1976 dst = os.path.join(self.src_dir, "bar")
1977 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1978
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001979 def test_destinsrc_false_negative(self):
1980 os.mkdir(TESTFN)
1981 try:
1982 for src, dst in [('srcdir', 'srcdir/dest')]:
1983 src = os.path.join(TESTFN, src)
1984 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001985 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001986 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001987 'dst (%s) is not in src (%s)' % (dst, src))
1988 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001989 os_helper.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001990
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001991 def test_destinsrc_false_positive(self):
1992 os.mkdir(TESTFN)
1993 try:
1994 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1995 src = os.path.join(TESTFN, src)
1996 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001997 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001998 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001999 'dst (%s) is in src (%s)' % (dst, src))
2000 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002001 os_helper.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00002002
Hai Shi0c4f0f32020-06-30 21:46:31 +08002003 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002004 @mock_rename
2005 def test_move_file_symlink(self):
2006 dst = os.path.join(self.src_dir, 'bar')
2007 os.symlink(self.src_file, dst)
2008 shutil.move(dst, self.dst_file)
2009 self.assertTrue(os.path.islink(self.dst_file))
2010 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
2011
Hai Shi0c4f0f32020-06-30 21:46:31 +08002012 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002013 @mock_rename
2014 def test_move_file_symlink_to_dir(self):
2015 filename = "bar"
2016 dst = os.path.join(self.src_dir, filename)
2017 os.symlink(self.src_file, dst)
2018 shutil.move(dst, self.dst_dir)
2019 final_link = os.path.join(self.dst_dir, filename)
2020 self.assertTrue(os.path.islink(final_link))
2021 self.assertTrue(os.path.samefile(self.src_file, final_link))
2022
Hai Shi0c4f0f32020-06-30 21:46:31 +08002023 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002024 @mock_rename
2025 def test_move_dangling_symlink(self):
2026 src = os.path.join(self.src_dir, 'baz')
2027 dst = os.path.join(self.src_dir, 'bar')
2028 os.symlink(src, dst)
2029 dst_link = os.path.join(self.dst_dir, 'quux')
2030 shutil.move(dst, dst_link)
2031 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07002032 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002033
Hai Shi0c4f0f32020-06-30 21:46:31 +08002034 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002035 @mock_rename
2036 def test_move_dir_symlink(self):
2037 src = os.path.join(self.src_dir, 'baz')
2038 dst = os.path.join(self.src_dir, 'bar')
2039 os.mkdir(src)
2040 os.symlink(src, dst)
2041 dst_link = os.path.join(self.dst_dir, 'quux')
2042 shutil.move(dst, dst_link)
2043 self.assertTrue(os.path.islink(dst_link))
2044 self.assertTrue(os.path.samefile(src, dst_link))
2045
Brian Curtin0d0a1de2012-06-18 18:41:07 -05002046 def test_move_return_value(self):
2047 rv = shutil.move(self.src_file, self.dst_dir)
2048 self.assertEqual(rv,
2049 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2050
2051 def test_move_as_rename_return_value(self):
2052 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2053 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2054
R David Murray6ffface2014-06-11 14:40:13 -04002055 @mock_rename
2056 def test_move_file_special_function(self):
2057 moved = []
2058 def _copy(src, dst):
2059 moved.append((src, dst))
2060 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2061 self.assertEqual(len(moved), 1)
2062
2063 @mock_rename
2064 def test_move_dir_special_function(self):
2065 moved = []
2066 def _copy(src, dst):
2067 moved.append((src, dst))
Hai Shi0c4f0f32020-06-30 21:46:31 +08002068 os_helper.create_empty_file(os.path.join(self.src_dir, 'child'))
2069 os_helper.create_empty_file(os.path.join(self.src_dir, 'child1'))
R David Murray6ffface2014-06-11 14:40:13 -04002070 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2071 self.assertEqual(len(moved), 3)
2072
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002073 def test_move_dir_caseinsensitive(self):
2074 # Renames a folder to the same name
2075 # but a different case.
2076
2077 self.src_dir = self.mkdtemp()
2078 dst_dir = os.path.join(
2079 os.path.dirname(self.src_dir),
2080 os.path.basename(self.src_dir).upper())
2081 self.assertNotEqual(self.src_dir, dst_dir)
2082
2083 try:
2084 shutil.move(self.src_dir, dst_dir)
2085 self.assertTrue(os.path.isdir(dst_dir))
2086 finally:
2087 os.rmdir(dst_dir)
2088
Tarek Ziadé5340db32010-04-19 22:30:51 +00002089
Winson Luk132131b2021-03-02 15:53:15 -05002090 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0
2091 and hasattr(os, 'lchflags')
2092 and hasattr(stat, 'SF_IMMUTABLE')
2093 and hasattr(stat, 'UF_OPAQUE'),
2094 'root privileges required')
2095 def test_move_dir_permission_denied(self):
2096 # bpo-42782: shutil.move should not create destination directories
2097 # if the source directory cannot be removed.
2098 try:
2099 os.mkdir(TESTFN_SRC)
2100 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2101
2102 # Testing on an empty immutable directory
2103 # TESTFN_DST should not exist if shutil.move failed
2104 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2105 self.assertFalse(TESTFN_DST in os.listdir())
2106
2107 # Create a file and keep the directory immutable
2108 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2109 os_helper.create_empty_file(os.path.join(TESTFN_SRC, 'child'))
2110 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2111
2112 # Testing on a non-empty immutable directory
2113 # TESTFN_DST should not exist if shutil.move failed
2114 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2115 self.assertFalse(TESTFN_DST in os.listdir())
2116 finally:
2117 if os.path.exists(TESTFN_SRC):
2118 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2119 os_helper.rmtree(TESTFN_SRC)
2120 if os.path.exists(TESTFN_DST):
2121 os.lchflags(TESTFN_DST, stat.UF_OPAQUE)
2122 os_helper.rmtree(TESTFN_DST)
2123
2124
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002125class TestCopyFile(unittest.TestCase):
2126
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002127 class Faux(object):
2128 _entered = False
2129 _exited_with = None
2130 _raised = False
2131 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2132 self._raise_in_exit = raise_in_exit
2133 self._suppress_at_exit = suppress_at_exit
2134 def read(self, *args):
2135 return ''
2136 def __enter__(self):
2137 self._entered = True
2138 def __exit__(self, exc_type, exc_val, exc_tb):
2139 self._exited_with = exc_type, exc_val, exc_tb
2140 if self._raise_in_exit:
2141 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002142 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002143 return self._suppress_at_exit
2144
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002145 def test_w_source_open_fails(self):
2146 def _open(filename, mode='r'):
2147 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002148 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002149 assert 0 # shouldn't reach here.
2150
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002151 with support.swap_attr(shutil, 'open', _open):
2152 with self.assertRaises(OSError):
2153 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002154
Victor Stinner937ee9e2018-06-26 02:11:06 +02002155 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002156 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002157 srcfile = self.Faux()
2158
2159 def _open(filename, mode='r'):
2160 if filename == 'srcfile':
2161 return srcfile
2162 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002163 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002164 assert 0 # shouldn't reach here.
2165
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002166 with support.swap_attr(shutil, 'open', _open):
2167 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002168 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002169 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002170 self.assertEqual(srcfile._exited_with[1].args,
2171 ('Cannot open "destfile"',))
2172
Victor Stinner937ee9e2018-06-26 02:11:06 +02002173 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002174 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002175 srcfile = self.Faux()
2176 destfile = self.Faux(True)
2177
2178 def _open(filename, mode='r'):
2179 if filename == 'srcfile':
2180 return srcfile
2181 if filename == 'destfile':
2182 return destfile
2183 assert 0 # shouldn't reach here.
2184
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002185 with support.swap_attr(shutil, 'open', _open):
2186 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002187 self.assertTrue(srcfile._entered)
2188 self.assertTrue(destfile._entered)
2189 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002190 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002191 self.assertEqual(srcfile._exited_with[1].args,
2192 ('Cannot close',))
2193
Victor Stinner937ee9e2018-06-26 02:11:06 +02002194 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002195 def test_w_source_close_fails(self):
2196
2197 srcfile = self.Faux(True)
2198 destfile = self.Faux()
2199
2200 def _open(filename, mode='r'):
2201 if filename == 'srcfile':
2202 return srcfile
2203 if filename == 'destfile':
2204 return destfile
2205 assert 0 # shouldn't reach here.
2206
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002207 with support.swap_attr(shutil, 'open', _open):
2208 with self.assertRaises(OSError):
2209 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002210 self.assertTrue(srcfile._entered)
2211 self.assertTrue(destfile._entered)
2212 self.assertFalse(destfile._raised)
2213 self.assertTrue(srcfile._exited_with[0] is None)
2214 self.assertTrue(srcfile._raised)
2215
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002216
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002217class TestCopyFileObj(unittest.TestCase):
2218 FILESIZE = 2 * 1024 * 1024
2219
2220 @classmethod
2221 def setUpClass(cls):
2222 write_test_file(TESTFN, cls.FILESIZE)
2223
2224 @classmethod
2225 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002226 os_helper.unlink(TESTFN)
2227 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002228
2229 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002230 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002231
2232 @contextlib.contextmanager
2233 def get_files(self):
2234 with open(TESTFN, "rb") as src:
2235 with open(TESTFN2, "wb") as dst:
2236 yield (src, dst)
2237
2238 def assert_files_eq(self, src, dst):
2239 with open(src, 'rb') as fsrc:
2240 with open(dst, 'rb') as fdst:
2241 self.assertEqual(fsrc.read(), fdst.read())
2242
2243 def test_content(self):
2244 with self.get_files() as (src, dst):
2245 shutil.copyfileobj(src, dst)
2246 self.assert_files_eq(TESTFN, TESTFN2)
2247
2248 def test_file_not_closed(self):
2249 with self.get_files() as (src, dst):
2250 shutil.copyfileobj(src, dst)
2251 assert not src.closed
2252 assert not dst.closed
2253
2254 def test_file_offset(self):
2255 with self.get_files() as (src, dst):
2256 shutil.copyfileobj(src, dst)
2257 self.assertEqual(src.tell(), self.FILESIZE)
2258 self.assertEqual(dst.tell(), self.FILESIZE)
2259
2260 @unittest.skipIf(os.name != 'nt', "Windows only")
2261 def test_win_impl(self):
2262 # Make sure alternate Windows implementation is called.
2263 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2264 shutil.copyfile(TESTFN, TESTFN2)
2265 assert m.called
2266
2267 # File size is 2 MiB but max buf size should be 1 MiB.
2268 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2269
2270 # If file size < 1 MiB memoryview() length must be equal to
2271 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002272 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002273 f.write(b'foo')
2274 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002275 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002276 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2277 shutil.copyfile(fname, TESTFN2)
2278 self.assertEqual(m.call_args[0][2], 3)
2279
2280 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002281 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002282 pass
2283 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002284 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002285 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2286 shutil.copyfile(fname, TESTFN2)
2287 assert not m.called
2288 self.assert_files_eq(fname, TESTFN2)
2289
2290
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002291class _ZeroCopyFileTest(object):
2292 """Tests common to all zero-copy APIs."""
2293 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2294 FILEDATA = b""
2295 PATCHPOINT = ""
2296
2297 @classmethod
2298 def setUpClass(cls):
2299 write_test_file(TESTFN, cls.FILESIZE)
2300 with open(TESTFN, 'rb') as f:
2301 cls.FILEDATA = f.read()
2302 assert len(cls.FILEDATA) == cls.FILESIZE
2303
2304 @classmethod
2305 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002306 os_helper.unlink(TESTFN)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002307
2308 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002309 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002310
2311 @contextlib.contextmanager
2312 def get_files(self):
2313 with open(TESTFN, "rb") as src:
2314 with open(TESTFN2, "wb") as dst:
2315 yield (src, dst)
2316
2317 def zerocopy_fun(self, *args, **kwargs):
2318 raise NotImplementedError("must be implemented in subclass")
2319
2320 def reset(self):
2321 self.tearDown()
2322 self.tearDownClass()
2323 self.setUpClass()
2324 self.setUp()
2325
2326 # ---
2327
2328 def test_regular_copy(self):
2329 with self.get_files() as (src, dst):
2330 self.zerocopy_fun(src, dst)
2331 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2332 # Make sure the fallback function is not called.
2333 with self.get_files() as (src, dst):
2334 with unittest.mock.patch('shutil.copyfileobj') as m:
2335 shutil.copyfile(TESTFN, TESTFN2)
2336 assert not m.called
2337
2338 def test_same_file(self):
2339 self.addCleanup(self.reset)
2340 with self.get_files() as (src, dst):
2341 with self.assertRaises(Exception):
2342 self.zerocopy_fun(src, src)
2343 # Make sure src file is not corrupted.
2344 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2345
2346 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002347 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002348 with self.assertRaises(FileNotFoundError) as cm:
2349 shutil.copyfile(name, "new")
2350 self.assertEqual(cm.exception.filename, name)
2351
2352 def test_empty_file(self):
2353 srcname = TESTFN + 'src'
2354 dstname = TESTFN + 'dst'
Hai Shi0c4f0f32020-06-30 21:46:31 +08002355 self.addCleanup(lambda: os_helper.unlink(srcname))
2356 self.addCleanup(lambda: os_helper.unlink(dstname))
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002357 with open(srcname, "wb"):
2358 pass
2359
2360 with open(srcname, "rb") as src:
2361 with open(dstname, "wb") as dst:
2362 self.zerocopy_fun(src, dst)
2363
2364 self.assertEqual(read_file(dstname, binary=True), b"")
2365
2366 def test_unhandled_exception(self):
2367 with unittest.mock.patch(self.PATCHPOINT,
2368 side_effect=ZeroDivisionError):
2369 self.assertRaises(ZeroDivisionError,
2370 shutil.copyfile, TESTFN, TESTFN2)
2371
2372 def test_exception_on_first_call(self):
2373 # Emulate a case where the first call to the zero-copy
2374 # function raises an exception in which case the function is
2375 # supposed to give up immediately.
2376 with unittest.mock.patch(self.PATCHPOINT,
2377 side_effect=OSError(errno.EINVAL, "yo")):
2378 with self.get_files() as (src, dst):
2379 with self.assertRaises(_GiveupOnFastCopy):
2380 self.zerocopy_fun(src, dst)
2381
2382 def test_filesystem_full(self):
2383 # Emulate a case where filesystem is full and sendfile() fails
2384 # on first call.
2385 with unittest.mock.patch(self.PATCHPOINT,
2386 side_effect=OSError(errno.ENOSPC, "yo")):
2387 with self.get_files() as (src, dst):
2388 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2389
2390
2391@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2392class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2393 PATCHPOINT = "os.sendfile"
2394
2395 def zerocopy_fun(self, fsrc, fdst):
2396 return shutil._fastcopy_sendfile(fsrc, fdst)
2397
2398 def test_non_regular_file_src(self):
2399 with io.BytesIO(self.FILEDATA) as src:
2400 with open(TESTFN2, "wb") as dst:
2401 with self.assertRaises(_GiveupOnFastCopy):
2402 self.zerocopy_fun(src, dst)
2403 shutil.copyfileobj(src, dst)
2404
2405 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2406
2407 def test_non_regular_file_dst(self):
2408 with open(TESTFN, "rb") as src:
2409 with io.BytesIO() as dst:
2410 with self.assertRaises(_GiveupOnFastCopy):
2411 self.zerocopy_fun(src, dst)
2412 shutil.copyfileobj(src, dst)
2413 dst.seek(0)
2414 self.assertEqual(dst.read(), self.FILEDATA)
2415
2416 def test_exception_on_second_call(self):
2417 def sendfile(*args, **kwargs):
2418 if not flag:
2419 flag.append(None)
2420 return orig_sendfile(*args, **kwargs)
2421 else:
2422 raise OSError(errno.EBADF, "yo")
2423
2424 flag = []
2425 orig_sendfile = os.sendfile
2426 with unittest.mock.patch('os.sendfile', create=True,
2427 side_effect=sendfile):
2428 with self.get_files() as (src, dst):
2429 with self.assertRaises(OSError) as cm:
2430 shutil._fastcopy_sendfile(src, dst)
2431 assert flag
2432 self.assertEqual(cm.exception.errno, errno.EBADF)
2433
2434 def test_cant_get_size(self):
2435 # Emulate a case where src file size cannot be determined.
2436 # Internally bufsize will be set to a small value and
2437 # sendfile() will be called repeatedly.
2438 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2439 with self.get_files() as (src, dst):
2440 shutil._fastcopy_sendfile(src, dst)
2441 assert m.called
2442 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2443
2444 def test_small_chunks(self):
2445 # Force internal file size detection to be smaller than the
2446 # actual file size. We want to force sendfile() to be called
2447 # multiple times, also in order to emulate a src fd which gets
2448 # bigger while it is being copied.
2449 mock = unittest.mock.Mock()
2450 mock.st_size = 65536 + 1
2451 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2452 with self.get_files() as (src, dst):
2453 shutil._fastcopy_sendfile(src, dst)
2454 assert m.called
2455 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2456
2457 def test_big_chunk(self):
2458 # Force internal file size detection to be +100MB bigger than
2459 # the actual file size. Make sure sendfile() does not rely on
2460 # file size value except for (maybe) a better throughput /
2461 # performance.
2462 mock = unittest.mock.Mock()
2463 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2464 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2465 with self.get_files() as (src, dst):
2466 shutil._fastcopy_sendfile(src, dst)
2467 assert m.called
2468 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2469
2470 def test_blocksize_arg(self):
2471 with unittest.mock.patch('os.sendfile',
2472 side_effect=ZeroDivisionError) as m:
2473 self.assertRaises(ZeroDivisionError,
2474 shutil.copyfile, TESTFN, TESTFN2)
2475 blocksize = m.call_args[0][3]
2476 # Make sure file size and the block size arg passed to
2477 # sendfile() are the same.
2478 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2479 # ...unless we're dealing with a small file.
Hai Shi0c4f0f32020-06-30 21:46:31 +08002480 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002481 write_file(TESTFN2, b"hello", binary=True)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002482 self.addCleanup(os_helper.unlink, TESTFN2 + '3')
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002483 self.assertRaises(ZeroDivisionError,
2484 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2485 blocksize = m.call_args[0][3]
2486 self.assertEqual(blocksize, 2 ** 23)
2487
2488 def test_file2file_not_supported(self):
2489 # Emulate a case where sendfile() only support file->socket
2490 # fds. In such a case copyfile() is supposed to skip the
2491 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002492 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002493 try:
2494 with unittest.mock.patch(
2495 self.PATCHPOINT,
2496 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2497 with self.get_files() as (src, dst):
2498 with self.assertRaises(_GiveupOnFastCopy):
2499 shutil._fastcopy_sendfile(src, dst)
2500 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002501 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002502
2503 with unittest.mock.patch(self.PATCHPOINT) as m:
2504 shutil.copyfile(TESTFN, TESTFN2)
2505 assert not m.called
2506 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002507 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002508
2509
Victor Stinner937ee9e2018-06-26 02:11:06 +02002510@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002511class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002512 PATCHPOINT = "posix._fcopyfile"
2513
2514 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002515 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002516
2517
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002518class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002519 def test_does_not_crash(self):
2520 """Check if get_terminal_size() returns a meaningful value.
2521
2522 There's no easy portable way to actually check the size of the
2523 terminal, so let's check if it returns something sensible instead.
2524 """
2525 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002526 self.assertGreaterEqual(size.columns, 0)
2527 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002528
2529 def test_os_environ_first(self):
2530 "Check if environment variables have precedence"
2531
Hai Shi0c4f0f32020-06-30 21:46:31 +08002532 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002533 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002534 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002535 size = shutil.get_terminal_size()
2536 self.assertEqual(size.columns, 777)
2537
Hai Shi0c4f0f32020-06-30 21:46:31 +08002538 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002539 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002540 env['LINES'] = '888'
2541 size = shutil.get_terminal_size()
2542 self.assertEqual(size.lines, 888)
2543
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002544 def test_bad_environ(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002545 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002546 env['COLUMNS'] = 'xxx'
2547 env['LINES'] = 'yyy'
2548 size = shutil.get_terminal_size()
2549 self.assertGreaterEqual(size.columns, 0)
2550 self.assertGreaterEqual(size.lines, 0)
2551
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002552 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002553 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2554 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002555 def test_stty_match(self):
2556 """Check if stty returns the same results ignoring env
2557
2558 This test will fail if stdin and stdout are connected to
2559 different terminals with different sizes. Nevertheless, such
2560 situations should be pretty rare.
2561 """
2562 try:
2563 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002564 except (FileNotFoundError, PermissionError,
2565 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002566 self.skipTest("stty invocation failed")
2567 expected = (int(size[1]), int(size[0])) # reversed order
2568
Hai Shi0c4f0f32020-06-30 21:46:31 +08002569 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002570 del env['LINES']
2571 del env['COLUMNS']
2572 actual = shutil.get_terminal_size()
2573
2574 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002575
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002576 def test_fallback(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002577 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002578 del env['LINES']
2579 del env['COLUMNS']
2580
2581 # sys.__stdout__ has no fileno()
2582 with support.swap_attr(sys, '__stdout__', None):
2583 size = shutil.get_terminal_size(fallback=(10, 20))
2584 self.assertEqual(size.columns, 10)
2585 self.assertEqual(size.lines, 20)
2586
2587 # sys.__stdout__ is not a terminal on Unix
2588 # or fileno() not in (0, 1, 2) on Windows
2589 with open(os.devnull, 'w') as f, \
2590 support.swap_attr(sys, '__stdout__', f):
2591 size = shutil.get_terminal_size(fallback=(30, 40))
2592 self.assertEqual(size.columns, 30)
2593 self.assertEqual(size.lines, 40)
2594
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002595
Berker Peksag8083cd62014-11-01 11:04:06 +02002596class PublicAPITests(unittest.TestCase):
2597 """Ensures that the correct values are exposed in the public API."""
2598
2599 def test_module_all_attribute(self):
2600 self.assertTrue(hasattr(shutil, '__all__'))
2601 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2602 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2603 'SpecialFileError', 'ExecError', 'make_archive',
2604 'get_archive_formats', 'register_archive_format',
2605 'unregister_archive_format', 'get_unpack_formats',
2606 'register_unpack_format', 'unregister_unpack_format',
2607 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2608 'get_terminal_size', 'SameFileError']
2609 if hasattr(os, 'statvfs') or os.name == 'nt':
2610 target_api.append('disk_usage')
2611 self.assertEqual(set(shutil.__all__), set(target_api))
2612
2613
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002614if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002615 unittest.main()