blob: df7fbedf24a7c2adfa884fadb845708ee595dbb0 [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Hai Shi0c4f0f32020-06-30 21:46:31 +080033from test.support import os_helper
34from test.support.os_helper import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030035
Antoine Pitrou7fff0962009-05-01 21:09:44 +000036TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020037MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010038AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000039try:
40 import grp
41 import pwd
42 UID_GID_SUPPORT = True
43except ImportError:
44 UID_GID_SUPPORT = False
45
Steve Dowerdf2d4a62019-08-21 15:27:33 -070046try:
47 import _winapi
48except ImportError:
49 _winapi = None
50
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040051def _fake_rename(*args, **kwargs):
52 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010053 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040054
55def mock_rename(func):
56 @functools.wraps(func)
57 def wrap(*args, **kwargs):
58 try:
59 builtin_rename = os.rename
60 os.rename = _fake_rename
61 return func(*args, **kwargs)
62 finally:
63 os.rename = builtin_rename
64 return wrap
65
Éric Araujoa7e33a12011-08-12 19:51:35 +020066def write_file(path, content, binary=False):
67 """Write *content* to a file located at *path*.
68
69 If *path* is a tuple instead of a string, os.path.join will be used to
70 make a path. If *binary* is true, the file will be opened in binary
71 mode.
72 """
73 if isinstance(path, tuple):
74 path = os.path.join(*path)
75 with open(path, 'wb' if binary else 'w') as fp:
76 fp.write(content)
77
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020078def write_test_file(path, size):
79 """Create a test file with an arbitrary size and random text content."""
80 def chunks(total, step):
81 assert total >= step
82 while total > step:
83 yield step
84 total -= step
85 if total:
86 yield total
87
88 bufsize = min(size, 8192)
89 chunk = b"".join([random.choice(string.ascii_letters).encode()
90 for i in range(bufsize)])
91 with open(path, 'wb') as f:
92 for csize in chunks(size, bufsize):
93 f.write(chunk)
94 assert os.path.getsize(path) == size
95
Éric Araujoa7e33a12011-08-12 19:51:35 +020096def read_file(path, binary=False):
97 """Return contents from a file located at *path*.
98
99 If *path* is a tuple instead of a string, os.path.join will be used to
100 make a path. If *binary* is true, the file will be opened in binary
101 mode.
102 """
103 if isinstance(path, tuple):
104 path = os.path.join(*path)
105 with open(path, 'rb' if binary else 'r') as fp:
106 return fp.read()
107
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300108def rlistdir(path):
109 res = []
110 for name in sorted(os.listdir(path)):
111 p = os.path.join(path, name)
112 if os.path.isdir(p) and not os.path.islink(p):
113 res.append(name + '/')
114 for n in rlistdir(p):
115 res.append(name + '/' + n)
116 else:
117 res.append(name)
118 return res
119
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200120def supports_file2file_sendfile():
121 # ...apparently Linux and Solaris are the only ones
122 if not hasattr(os, "sendfile"):
123 return False
124 srcname = None
125 dstname = None
126 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800127 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200128 srcname = f.name
129 f.write(b"0123456789")
130
131 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800132 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200133 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200134 infd = src.fileno()
135 outfd = dst.fileno()
136 try:
137 os.sendfile(outfd, infd, 0, 2)
138 except OSError:
139 return False
140 else:
141 return True
142 finally:
143 if srcname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800144 os_helper.unlink(srcname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200145 if dstname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800146 os_helper.unlink(dstname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200147
148
149SUPPORTS_SENDFILE = supports_file2file_sendfile()
150
Michael Feltef110b12019-02-18 12:02:44 +0100151# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
152# The AIX command 'dump -o program' gives XCOFF header information
153# The second word of the last line in the maxdata value
154# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
155def _maxdataOK():
156 if AIX and sys.maxsize == 2147483647:
157 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
158 maxdata=hdrs.split("\n")[-1].split()[1]
159 return int(maxdata,16) >= 0x20000000
160 else:
161 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200162
Tarek Ziadé396fad72010-02-23 05:30:31 +0000163
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300164class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000165
Steve Dowerabde52c2019-11-15 09:49:21 -0800166 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000167 """Create a temporary directory that will be cleaned up.
168
169 Returns the path of the directory.
170 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800171 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Hai Shi0c4f0f32020-06-30 21:46:31 +0800172 self.addCleanup(os_helper.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000173 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000174
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300175
176class TestRmTree(BaseTest, unittest.TestCase):
177
Hynek Schlawack3b527782012-06-25 13:27:31 +0200178 def test_rmtree_works_on_bytes(self):
179 tmp = self.mkdtemp()
180 victim = os.path.join(tmp, 'killme')
181 os.mkdir(victim)
182 write_file(os.path.join(victim, 'somefile'), 'foo')
183 victim = os.fsencode(victim)
184 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700185 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200186
Hai Shi0c4f0f32020-06-30 21:46:31 +0800187 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200188 def test_rmtree_fails_on_symlink(self):
189 tmp = self.mkdtemp()
190 dir_ = os.path.join(tmp, 'dir')
191 os.mkdir(dir_)
192 link = os.path.join(tmp, 'link')
193 os.symlink(dir_, link)
194 self.assertRaises(OSError, shutil.rmtree, link)
195 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100196 self.assertTrue(os.path.lexists(link))
197 errors = []
198 def onerror(*args):
199 errors.append(args)
200 shutil.rmtree(link, onerror=onerror)
201 self.assertEqual(len(errors), 1)
202 self.assertIs(errors[0][0], os.path.islink)
203 self.assertEqual(errors[0][1], link)
204 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200205
Hai Shi0c4f0f32020-06-30 21:46:31 +0800206 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200207 def test_rmtree_works_on_symlinks(self):
208 tmp = self.mkdtemp()
209 dir1 = os.path.join(tmp, 'dir1')
210 dir2 = os.path.join(dir1, 'dir2')
211 dir3 = os.path.join(tmp, 'dir3')
212 for d in dir1, dir2, dir3:
213 os.mkdir(d)
214 file1 = os.path.join(tmp, 'file1')
215 write_file(file1, 'foo')
216 link1 = os.path.join(dir1, 'link1')
217 os.symlink(dir2, link1)
218 link2 = os.path.join(dir1, 'link2')
219 os.symlink(dir3, link2)
220 link3 = os.path.join(dir1, 'link3')
221 os.symlink(file1, link3)
222 # make sure symlinks are removed but not followed
223 shutil.rmtree(dir1)
224 self.assertFalse(os.path.exists(dir1))
225 self.assertTrue(os.path.exists(dir3))
226 self.assertTrue(os.path.exists(file1))
227
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700228 @unittest.skipUnless(_winapi, 'only relevant on Windows')
229 def test_rmtree_fails_on_junctions(self):
230 tmp = self.mkdtemp()
231 dir_ = os.path.join(tmp, 'dir')
232 os.mkdir(dir_)
233 link = os.path.join(tmp, 'link')
234 _winapi.CreateJunction(dir_, link)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800235 self.addCleanup(os_helper.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700236 self.assertRaises(OSError, shutil.rmtree, link)
237 self.assertTrue(os.path.exists(dir_))
238 self.assertTrue(os.path.lexists(link))
239 errors = []
240 def onerror(*args):
241 errors.append(args)
242 shutil.rmtree(link, onerror=onerror)
243 self.assertEqual(len(errors), 1)
244 self.assertIs(errors[0][0], os.path.islink)
245 self.assertEqual(errors[0][1], link)
246 self.assertIsInstance(errors[0][2][1], OSError)
247
248 @unittest.skipUnless(_winapi, 'only relevant on Windows')
249 def test_rmtree_works_on_junctions(self):
250 tmp = self.mkdtemp()
251 dir1 = os.path.join(tmp, 'dir1')
252 dir2 = os.path.join(dir1, 'dir2')
253 dir3 = os.path.join(tmp, 'dir3')
254 for d in dir1, dir2, dir3:
255 os.mkdir(d)
256 file1 = os.path.join(tmp, 'file1')
257 write_file(file1, 'foo')
258 link1 = os.path.join(dir1, 'link1')
259 _winapi.CreateJunction(dir2, link1)
260 link2 = os.path.join(dir1, 'link2')
261 _winapi.CreateJunction(dir3, link2)
262 link3 = os.path.join(dir1, 'link3')
263 _winapi.CreateJunction(file1, link3)
264 # make sure junctions are removed but not followed
265 shutil.rmtree(dir1)
266 self.assertFalse(os.path.exists(dir1))
267 self.assertTrue(os.path.exists(dir3))
268 self.assertTrue(os.path.exists(file1))
269
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000270 def test_rmtree_errors(self):
271 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800272 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100273 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
274 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100275 shutil.rmtree(filename, ignore_errors=True)
276
277 # existing file
278 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100279 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100280 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100281 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100282 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100283 # The reason for this rather odd construct is that Windows sprinkles
284 # a \*.* at the end of file names. But only sometimes on some buildbots
285 possible_args = [filename, os.path.join(filename, '*.*')]
286 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100287 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100288 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100289 shutil.rmtree(filename, ignore_errors=True)
290 self.assertTrue(os.path.exists(filename))
291 errors = []
292 def onerror(*args):
293 errors.append(args)
294 shutil.rmtree(filename, onerror=onerror)
295 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200296 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100297 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100298 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100299 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100300 self.assertIs(errors[1][0], os.rmdir)
301 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100302 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100303 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000304
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000305
Serhiy Storchaka43767632013-11-03 21:31:38 +0200306 @unittest.skipIf(sys.platform[:6] == 'cygwin',
307 "This test can't be run on Cygwin (issue #1071513).")
308 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
309 "This test can't be run reliably as root (issue #1076467).")
310 def test_on_error(self):
311 self.errorState = 0
312 os.mkdir(TESTFN)
313 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200314
Serhiy Storchaka43767632013-11-03 21:31:38 +0200315 self.child_file_path = os.path.join(TESTFN, 'a')
316 self.child_dir_path = os.path.join(TESTFN, 'b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800317 os_helper.create_empty_file(self.child_file_path)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200318 os.mkdir(self.child_dir_path)
319 old_dir_mode = os.stat(TESTFN).st_mode
320 old_child_file_mode = os.stat(self.child_file_path).st_mode
321 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
322 # Make unwritable.
323 new_mode = stat.S_IREAD|stat.S_IEXEC
324 os.chmod(self.child_file_path, new_mode)
325 os.chmod(self.child_dir_path, new_mode)
326 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000327
Serhiy Storchaka43767632013-11-03 21:31:38 +0200328 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
329 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
330 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200331
Serhiy Storchaka43767632013-11-03 21:31:38 +0200332 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
333 # Test whether onerror has actually been called.
334 self.assertEqual(self.errorState, 3,
335 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000336
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000337 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000338 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200339 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000340 # This function is run when shutil.rmtree fails.
341 # 99.9% of the time it initially fails to remove
342 # a file in the directory, so the first time through
343 # func is os.remove.
344 # However, some Linux machines running ZFS on
345 # FUSE experienced a failure earlier in the process
346 # at os.listdir. The first failure may legally
347 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200348 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200349 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200350 self.assertEqual(arg, self.child_file_path)
351 elif func is os.rmdir:
352 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000353 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200354 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200355 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000356 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200357 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000358 else:
359 self.assertEqual(func, os.rmdir)
360 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000361 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200362 self.errorState = 3
363
364 def test_rmtree_does_not_choke_on_failing_lstat(self):
365 try:
366 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200367 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200368 if fn != TESTFN:
369 raise OSError()
370 else:
371 return orig_lstat(fn)
372 os.lstat = raiser
373
374 os.mkdir(TESTFN)
375 write_file((TESTFN, 'foo'), 'foo')
376 shutil.rmtree(TESTFN)
377 finally:
378 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000379
Hynek Schlawack2100b422012-06-23 20:28:32 +0200380 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200381 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
382 os.supports_dir_fd and
383 os.listdir in os.supports_fd and
384 os.stat in os.supports_follow_symlinks)
385 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200386 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000387 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200388 tmp_dir = self.mkdtemp()
389 d = os.path.join(tmp_dir, 'a')
390 os.mkdir(d)
391 try:
392 real_rmtree = shutil._rmtree_safe_fd
393 class Called(Exception): pass
394 def _raiser(*args, **kwargs):
395 raise Called
396 shutil._rmtree_safe_fd = _raiser
397 self.assertRaises(Called, shutil.rmtree, d)
398 finally:
399 shutil._rmtree_safe_fd = real_rmtree
400 else:
401 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000402 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200403
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000404 def test_rmtree_dont_delete_file(self):
405 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800406 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200407 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200408 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000409 os.remove(path)
410
Hai Shi0c4f0f32020-06-30 21:46:31 +0800411 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300412 def test_rmtree_on_symlink(self):
413 # bug 1669.
414 os.mkdir(TESTFN)
415 try:
416 src = os.path.join(TESTFN, 'cheese')
417 dst = os.path.join(TESTFN, 'shop')
418 os.mkdir(src)
419 os.symlink(src, dst)
420 self.assertRaises(OSError, shutil.rmtree, dst)
421 shutil.rmtree(dst, ignore_errors=True)
422 finally:
423 shutil.rmtree(TESTFN, ignore_errors=True)
424
425 @unittest.skipUnless(_winapi, 'only relevant on Windows')
426 def test_rmtree_on_junction(self):
427 os.mkdir(TESTFN)
428 try:
429 src = os.path.join(TESTFN, 'cheese')
430 dst = os.path.join(TESTFN, 'shop')
431 os.mkdir(src)
432 open(os.path.join(src, 'spam'), 'wb').close()
433 _winapi.CreateJunction(src, dst)
434 self.assertRaises(OSError, shutil.rmtree, dst)
435 shutil.rmtree(dst, ignore_errors=True)
436 finally:
437 shutil.rmtree(TESTFN, ignore_errors=True)
438
439
440class TestCopyTree(BaseTest, unittest.TestCase):
441
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000442 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800443 src_dir = self.mkdtemp()
444 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200445 self.addCleanup(shutil.rmtree, src_dir)
446 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
447 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000448 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200449 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000450
Éric Araujoa7e33a12011-08-12 19:51:35 +0200451 shutil.copytree(src_dir, dst_dir)
452 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
453 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
454 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
455 'test.txt')))
456 actual = read_file((dst_dir, 'test.txt'))
457 self.assertEqual(actual, '123')
458 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
459 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000460
jab9e00d9e2018-12-28 13:03:40 -0500461 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800462 src_dir = self.mkdtemp()
463 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500464 self.addCleanup(shutil.rmtree, src_dir)
465 self.addCleanup(shutil.rmtree, dst_dir)
466
467 write_file((src_dir, 'nonexisting.txt'), '123')
468 os.mkdir(os.path.join(src_dir, 'existing_dir'))
469 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
470 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
471 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
472
473 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
474 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
475 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
476 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
477 'existing.txt')))
478 actual = read_file((dst_dir, 'nonexisting.txt'))
479 self.assertEqual(actual, '123')
480 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
481 self.assertEqual(actual, 'has been replaced')
482
483 with self.assertRaises(FileExistsError):
484 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
485
Hai Shi0c4f0f32020-06-30 21:46:31 +0800486 @os_helper.skip_unless_symlink
Antoine Pitrou78091e62011-12-29 18:54:15 +0100487 def test_copytree_symlinks(self):
488 tmp_dir = self.mkdtemp()
489 src_dir = os.path.join(tmp_dir, 'src')
490 dst_dir = os.path.join(tmp_dir, 'dst')
491 sub_dir = os.path.join(src_dir, 'sub')
492 os.mkdir(src_dir)
493 os.mkdir(sub_dir)
494 write_file((src_dir, 'file.txt'), 'foo')
495 src_link = os.path.join(sub_dir, 'link')
496 dst_link = os.path.join(dst_dir, 'sub/link')
497 os.symlink(os.path.join(src_dir, 'file.txt'),
498 src_link)
499 if hasattr(os, 'lchmod'):
500 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
501 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
502 os.lchflags(src_link, stat.UF_NODUMP)
503 src_stat = os.lstat(src_link)
504 shutil.copytree(src_dir, dst_dir, symlinks=True)
505 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700506 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
507 # Bad practice to blindly strip the prefix as it may be required to
508 # correctly refer to the file, but we're only comparing paths here.
509 if os.name == 'nt' and actual.startswith('\\\\?\\'):
510 actual = actual[4:]
511 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100512 dst_stat = os.lstat(dst_link)
513 if hasattr(os, 'lchmod'):
514 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
515 if hasattr(os, 'lchflags'):
516 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
517
Georg Brandl2ee470f2008-07-16 12:55:28 +0000518 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000519 # creating data
520 join = os.path.join
521 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800522 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000523 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800524 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200525 write_file((src_dir, 'test.txt'), '123')
526 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000527 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200528 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000529 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200530 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000531 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
532 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200533 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
534 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000535
536 # testing glob-like patterns
537 try:
538 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
539 shutil.copytree(src_dir, dst_dir, ignore=patterns)
540 # checking the result: some elements should not be copied
541 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200542 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
543 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000544 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200545 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000546 try:
547 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
548 shutil.copytree(src_dir, dst_dir, ignore=patterns)
549 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200550 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
551 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
552 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000553 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200554 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000555
556 # testing callable-style
557 try:
558 def _filter(src, names):
559 res = []
560 for name in names:
561 path = os.path.join(src, name)
562
563 if (os.path.isdir(path) and
564 path.split()[-1] == 'subdir'):
565 res.append(name)
566 elif os.path.splitext(path)[-1] in ('.py'):
567 res.append(name)
568 return res
569
570 shutil.copytree(src_dir, dst_dir, ignore=_filter)
571
572 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200573 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
574 'test.py')))
575 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000576
577 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200578 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000579 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000580 shutil.rmtree(src_dir)
581 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000582
mbarkhau88704332020-01-24 14:51:16 +0000583 def test_copytree_arg_types_of_ignore(self):
584 join = os.path.join
585 exists = os.path.exists
586
587 tmp_dir = self.mkdtemp()
588 src_dir = join(tmp_dir, "source")
589
590 os.mkdir(join(src_dir))
591 os.mkdir(join(src_dir, 'test_dir'))
592 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
593 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
594
595 invokations = []
596
597 def _ignore(src, names):
598 invokations.append(src)
599 self.assertIsInstance(src, str)
600 self.assertIsInstance(names, list)
601 self.assertEqual(len(names), len(set(names)))
602 for name in names:
603 self.assertIsInstance(name, str)
604 return []
605
606 dst_dir = join(self.mkdtemp(), 'destination')
607 shutil.copytree(src_dir, dst_dir, ignore=_ignore)
608 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
609 'test.txt')))
610
611 dst_dir = join(self.mkdtemp(), 'destination')
612 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
613 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
614 'test.txt')))
615
616 dst_dir = join(self.mkdtemp(), 'destination')
617 src_dir_entry = list(os.scandir(tmp_dir))[0]
618 self.assertIsInstance(src_dir_entry, os.DirEntry)
619 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
620 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
621 'test.txt')))
622
623 self.assertEqual(len(invokations), 9)
624
Antoine Pitrouac601602013-08-16 19:35:02 +0200625 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800626 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200627 src_dir = os.path.join(tmp_dir, 'source')
628 os.mkdir(src_dir)
629 dst_dir = os.path.join(tmp_dir, 'destination')
630 self.addCleanup(shutil.rmtree, tmp_dir)
631
632 os.chmod(src_dir, 0o777)
633 write_file((src_dir, 'permissive.txt'), '123')
634 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
635 write_file((src_dir, 'restrictive.txt'), '456')
636 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
637 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800638 self.addCleanup(os_helper.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200639 os.chmod(restrictive_subdir, 0o600)
640
641 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400642 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
643 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200644 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400645 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200646 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
647 restrictive_subdir_dst = os.path.join(dst_dir,
648 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400649 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200650 os.stat(restrictive_subdir_dst).st_mode)
651
Berker Peksag884afd92014-12-10 02:50:32 +0200652 @unittest.mock.patch('os.chmod')
653 def test_copytree_winerror(self, mock_patch):
654 # When copying to VFAT, copystat() raises OSError. On Windows, the
655 # exception object has a meaningful 'winerror' attribute, but not
656 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800657 src_dir = self.mkdtemp()
658 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200659 self.addCleanup(shutil.rmtree, src_dir)
660 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
661
662 mock_patch.side_effect = PermissionError('ka-boom')
663 with self.assertRaises(shutil.Error):
664 shutil.copytree(src_dir, dst_dir)
665
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100666 def test_copytree_custom_copy_function(self):
667 # See: https://bugs.python.org/issue35648
668 def custom_cpfun(a, b):
669 flag.append(None)
670 self.assertIsInstance(a, str)
671 self.assertIsInstance(b, str)
672 self.assertEqual(a, os.path.join(src, 'foo'))
673 self.assertEqual(b, os.path.join(dst, 'foo'))
674
675 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800676 src = self.mkdtemp()
677 dst = tempfile.mktemp(dir=self.mkdtemp())
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100678 with open(os.path.join(src, 'foo'), 'w') as f:
679 f.close()
680 shutil.copytree(src, dst, copy_function=custom_cpfun)
681 self.assertEqual(len(flag), 1)
682
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300683 # Issue #3002: copyfile and copytree block indefinitely on named pipes
684 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800685 @os_helper.skip_unless_symlink
pxinwr6a273fd2020-11-29 06:06:36 +0800686 @unittest.skipIf(sys.platform == "vxworks",
687 "fifo requires special path on VxWorks")
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300688 def test_copytree_named_pipe(self):
689 os.mkdir(TESTFN)
690 try:
691 subdir = os.path.join(TESTFN, "subdir")
692 os.mkdir(subdir)
693 pipe = os.path.join(subdir, "mypipe")
694 try:
695 os.mkfifo(pipe)
696 except PermissionError as e:
697 self.skipTest('os.mkfifo(): %s' % e)
698 try:
699 shutil.copytree(TESTFN, TESTFN2)
700 except shutil.Error as e:
701 errors = e.args[0]
702 self.assertEqual(len(errors), 1)
703 src, dst, error_msg = errors[0]
704 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
705 else:
706 self.fail("shutil.Error should have been raised")
707 finally:
708 shutil.rmtree(TESTFN, ignore_errors=True)
709 shutil.rmtree(TESTFN2, ignore_errors=True)
710
711 def test_copytree_special_func(self):
712 src_dir = self.mkdtemp()
713 dst_dir = os.path.join(self.mkdtemp(), 'destination')
714 write_file((src_dir, 'test.txt'), '123')
715 os.mkdir(os.path.join(src_dir, 'test_dir'))
716 write_file((src_dir, 'test_dir', 'test.txt'), '456')
717
718 copied = []
719 def _copy(src, dst):
720 copied.append((src, dst))
721
722 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
723 self.assertEqual(len(copied), 2)
724
Hai Shi0c4f0f32020-06-30 21:46:31 +0800725 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300726 def test_copytree_dangling_symlinks(self):
727 # a dangling symlink raises an error at the end
728 src_dir = self.mkdtemp()
729 dst_dir = os.path.join(self.mkdtemp(), 'destination')
730 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
731 os.mkdir(os.path.join(src_dir, 'test_dir'))
732 write_file((src_dir, 'test_dir', 'test.txt'), '456')
733 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
734
735 # a dangling symlink is ignored with the proper flag
736 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
737 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
738 self.assertNotIn('test.txt', os.listdir(dst_dir))
739
740 # a dangling symlink is copied if symlinks=True
741 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
742 shutil.copytree(src_dir, dst_dir, symlinks=True)
743 self.assertIn('test.txt', os.listdir(dst_dir))
744
Hai Shi0c4f0f32020-06-30 21:46:31 +0800745 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300746 def test_copytree_symlink_dir(self):
747 src_dir = self.mkdtemp()
748 dst_dir = os.path.join(self.mkdtemp(), 'destination')
749 os.mkdir(os.path.join(src_dir, 'real_dir'))
750 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
751 pass
752 os.symlink(os.path.join(src_dir, 'real_dir'),
753 os.path.join(src_dir, 'link_to_dir'),
754 target_is_directory=True)
755
756 shutil.copytree(src_dir, dst_dir, symlinks=False)
757 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
758 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
759
760 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
761 shutil.copytree(src_dir, dst_dir, symlinks=True)
762 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
763 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
764
765 def test_copytree_return_value(self):
766 # copytree returns its destination path.
767 src_dir = self.mkdtemp()
768 dst_dir = src_dir + "dest"
769 self.addCleanup(shutil.rmtree, dst_dir, True)
770 src = os.path.join(src_dir, 'foo')
771 write_file(src, 'foo')
772 rv = shutil.copytree(src_dir, dst_dir)
773 self.assertEqual(['foo'], os.listdir(rv))
774
Bruno P. Kinoshita9bbcbc92019-11-27 14:10:37 +1300775 def test_copytree_subdirectory(self):
776 # copytree where dst is a subdirectory of src, see Issue 38688
777 base_dir = self.mkdtemp()
778 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
779 src_dir = os.path.join(base_dir, "t", "pg")
780 dst_dir = os.path.join(src_dir, "somevendor", "1.0")
781 os.makedirs(src_dir)
782 src = os.path.join(src_dir, 'pol')
783 write_file(src, 'pol')
784 rv = shutil.copytree(src_dir, dst_dir)
785 self.assertEqual(['pol'], os.listdir(rv))
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300786
787class TestCopy(BaseTest, unittest.TestCase):
788
789 ### shutil.copymode
790
Hai Shi0c4f0f32020-06-30 21:46:31 +0800791 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300792 def test_copymode_follow_symlinks(self):
793 tmp_dir = self.mkdtemp()
794 src = os.path.join(tmp_dir, 'foo')
795 dst = os.path.join(tmp_dir, 'bar')
796 src_link = os.path.join(tmp_dir, 'baz')
797 dst_link = os.path.join(tmp_dir, 'quux')
798 write_file(src, 'foo')
799 write_file(dst, 'foo')
800 os.symlink(src, src_link)
801 os.symlink(dst, dst_link)
802 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
803 # file to file
804 os.chmod(dst, stat.S_IRWXO)
805 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
806 shutil.copymode(src, dst)
807 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
808 # On Windows, os.chmod does not follow symlinks (issue #15411)
809 if os.name != 'nt':
810 # follow src link
811 os.chmod(dst, stat.S_IRWXO)
812 shutil.copymode(src_link, dst)
813 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
814 # follow dst link
815 os.chmod(dst, stat.S_IRWXO)
816 shutil.copymode(src, dst_link)
817 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
818 # follow both links
819 os.chmod(dst, stat.S_IRWXO)
820 shutil.copymode(src_link, dst_link)
821 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
822
823 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800824 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300825 def test_copymode_symlink_to_symlink(self):
826 tmp_dir = self.mkdtemp()
827 src = os.path.join(tmp_dir, 'foo')
828 dst = os.path.join(tmp_dir, 'bar')
829 src_link = os.path.join(tmp_dir, 'baz')
830 dst_link = os.path.join(tmp_dir, 'quux')
831 write_file(src, 'foo')
832 write_file(dst, 'foo')
833 os.symlink(src, src_link)
834 os.symlink(dst, dst_link)
835 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
836 os.chmod(dst, stat.S_IRWXU)
837 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
838 # link to link
839 os.lchmod(dst_link, stat.S_IRWXO)
840 shutil.copymode(src_link, dst_link, follow_symlinks=False)
841 self.assertEqual(os.lstat(src_link).st_mode,
842 os.lstat(dst_link).st_mode)
843 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
844 # src link - use chmod
845 os.lchmod(dst_link, stat.S_IRWXO)
846 shutil.copymode(src_link, dst, follow_symlinks=False)
847 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
848 # dst link - use chmod
849 os.lchmod(dst_link, stat.S_IRWXO)
850 shutil.copymode(src, dst_link, follow_symlinks=False)
851 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
852
853 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800854 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300855 def test_copymode_symlink_to_symlink_wo_lchmod(self):
856 tmp_dir = self.mkdtemp()
857 src = os.path.join(tmp_dir, 'foo')
858 dst = os.path.join(tmp_dir, 'bar')
859 src_link = os.path.join(tmp_dir, 'baz')
860 dst_link = os.path.join(tmp_dir, 'quux')
861 write_file(src, 'foo')
862 write_file(dst, 'foo')
863 os.symlink(src, src_link)
864 os.symlink(dst, dst_link)
865 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
866
867 ### shutil.copystat
868
Hai Shi0c4f0f32020-06-30 21:46:31 +0800869 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300870 def test_copystat_symlinks(self):
871 tmp_dir = self.mkdtemp()
872 src = os.path.join(tmp_dir, 'foo')
873 dst = os.path.join(tmp_dir, 'bar')
874 src_link = os.path.join(tmp_dir, 'baz')
875 dst_link = os.path.join(tmp_dir, 'qux')
876 write_file(src, 'foo')
877 src_stat = os.stat(src)
878 os.utime(src, (src_stat.st_atime,
879 src_stat.st_mtime - 42.0)) # ensure different mtimes
880 write_file(dst, 'bar')
881 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
882 os.symlink(src, src_link)
883 os.symlink(dst, dst_link)
884 if hasattr(os, 'lchmod'):
885 os.lchmod(src_link, stat.S_IRWXO)
886 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
887 os.lchflags(src_link, stat.UF_NODUMP)
888 src_link_stat = os.lstat(src_link)
889 # follow
890 if hasattr(os, 'lchmod'):
891 shutil.copystat(src_link, dst_link, follow_symlinks=True)
892 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
893 # don't follow
894 shutil.copystat(src_link, dst_link, follow_symlinks=False)
895 dst_link_stat = os.lstat(dst_link)
896 if os.utime in os.supports_follow_symlinks:
897 for attr in 'st_atime', 'st_mtime':
898 # The modification times may be truncated in the new file.
899 self.assertLessEqual(getattr(src_link_stat, attr),
900 getattr(dst_link_stat, attr) + 1)
901 if hasattr(os, 'lchmod'):
902 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
903 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
904 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
905 # tell to follow but dst is not a link
906 shutil.copystat(src_link, dst, follow_symlinks=False)
907 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
908 00000.1)
909
910 @unittest.skipUnless(hasattr(os, 'chflags') and
911 hasattr(errno, 'EOPNOTSUPP') and
912 hasattr(errno, 'ENOTSUP'),
913 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
914 def test_copystat_handles_harmless_chflags_errors(self):
915 tmpdir = self.mkdtemp()
916 file1 = os.path.join(tmpdir, 'file1')
917 file2 = os.path.join(tmpdir, 'file2')
918 write_file(file1, 'xxx')
919 write_file(file2, 'xxx')
920
921 def make_chflags_raiser(err):
922 ex = OSError()
923
924 def _chflags_raiser(path, flags, *, follow_symlinks=True):
925 ex.errno = err
926 raise ex
927 return _chflags_raiser
928 old_chflags = os.chflags
929 try:
930 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
931 os.chflags = make_chflags_raiser(err)
932 shutil.copystat(file1, file2)
933 # assert others errors break it
934 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
935 self.assertRaises(OSError, shutil.copystat, file1, file2)
936 finally:
937 os.chflags = old_chflags
938
939 ### shutil.copyxattr
940
Hai Shi0c4f0f32020-06-30 21:46:31 +0800941 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300942 def test_copyxattr(self):
943 tmp_dir = self.mkdtemp()
944 src = os.path.join(tmp_dir, 'foo')
945 write_file(src, 'foo')
946 dst = os.path.join(tmp_dir, 'bar')
947 write_file(dst, 'bar')
948
949 # no xattr == no problem
950 shutil._copyxattr(src, dst)
951 # common case
952 os.setxattr(src, 'user.foo', b'42')
953 os.setxattr(src, 'user.bar', b'43')
954 shutil._copyxattr(src, dst)
955 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
956 self.assertEqual(
957 os.getxattr(src, 'user.foo'),
958 os.getxattr(dst, 'user.foo'))
959 # check errors don't affect other attrs
960 os.remove(dst)
961 write_file(dst, 'bar')
962 os_error = OSError(errno.EPERM, 'EPERM')
963
964 def _raise_on_user_foo(fname, attr, val, **kwargs):
965 if attr == 'user.foo':
966 raise os_error
967 else:
968 orig_setxattr(fname, attr, val, **kwargs)
969 try:
970 orig_setxattr = os.setxattr
971 os.setxattr = _raise_on_user_foo
972 shutil._copyxattr(src, dst)
973 self.assertIn('user.bar', os.listxattr(dst))
974 finally:
975 os.setxattr = orig_setxattr
976 # the source filesystem not supporting xattrs should be ok, too.
977 def _raise_on_src(fname, *, follow_symlinks=True):
978 if fname == src:
979 raise OSError(errno.ENOTSUP, 'Operation not supported')
980 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
981 try:
982 orig_listxattr = os.listxattr
983 os.listxattr = _raise_on_src
984 shutil._copyxattr(src, dst)
985 finally:
986 os.listxattr = orig_listxattr
987
988 # test that shutil.copystat copies xattrs
989 src = os.path.join(tmp_dir, 'the_original')
990 srcro = os.path.join(tmp_dir, 'the_original_ro')
991 write_file(src, src)
992 write_file(srcro, srcro)
993 os.setxattr(src, 'user.the_value', b'fiddly')
994 os.setxattr(srcro, 'user.the_value', b'fiddly')
995 os.chmod(srcro, 0o444)
996 dst = os.path.join(tmp_dir, 'the_copy')
997 dstro = os.path.join(tmp_dir, 'the_copy_ro')
998 write_file(dst, dst)
999 write_file(dstro, dstro)
1000 shutil.copystat(src, dst)
1001 shutil.copystat(srcro, dstro)
1002 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1003 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1004
Hai Shi0c4f0f32020-06-30 21:46:31 +08001005 @os_helper.skip_unless_symlink
1006 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001007 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
1008 'root privileges required')
1009 def test_copyxattr_symlinks(self):
1010 # On Linux, it's only possible to access non-user xattr for symlinks;
1011 # which in turn require root privileges. This test should be expanded
1012 # as soon as other platforms gain support for extended attributes.
1013 tmp_dir = self.mkdtemp()
1014 src = os.path.join(tmp_dir, 'foo')
1015 src_link = os.path.join(tmp_dir, 'baz')
1016 write_file(src, 'foo')
1017 os.symlink(src, src_link)
1018 os.setxattr(src, 'trusted.foo', b'42')
1019 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1020 dst = os.path.join(tmp_dir, 'bar')
1021 dst_link = os.path.join(tmp_dir, 'qux')
1022 write_file(dst, 'bar')
1023 os.symlink(dst, dst_link)
1024 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1025 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1026 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1027 shutil._copyxattr(src_link, dst, follow_symlinks=False)
1028 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1029
1030 ### shutil.copy
1031
1032 def _copy_file(self, method):
1033 fname = 'test.txt'
1034 tmpdir = self.mkdtemp()
1035 write_file((tmpdir, fname), 'xxx')
1036 file1 = os.path.join(tmpdir, fname)
1037 tmpdir2 = self.mkdtemp()
1038 method(file1, tmpdir2)
1039 file2 = os.path.join(tmpdir2, fname)
1040 return (file1, file2)
1041
1042 def test_copy(self):
1043 # Ensure that the copied file exists and has the same mode bits.
1044 file1, file2 = self._copy_file(shutil.copy)
1045 self.assertTrue(os.path.exists(file2))
1046 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1047
Hai Shi0c4f0f32020-06-30 21:46:31 +08001048 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001049 def test_copy_symlinks(self):
1050 tmp_dir = self.mkdtemp()
1051 src = os.path.join(tmp_dir, 'foo')
1052 dst = os.path.join(tmp_dir, 'bar')
1053 src_link = os.path.join(tmp_dir, 'baz')
1054 write_file(src, 'foo')
1055 os.symlink(src, src_link)
1056 if hasattr(os, 'lchmod'):
1057 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1058 # don't follow
1059 shutil.copy(src_link, dst, follow_symlinks=True)
1060 self.assertFalse(os.path.islink(dst))
1061 self.assertEqual(read_file(src), read_file(dst))
1062 os.remove(dst)
1063 # follow
1064 shutil.copy(src_link, dst, follow_symlinks=False)
1065 self.assertTrue(os.path.islink(dst))
1066 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1067 if hasattr(os, 'lchmod'):
1068 self.assertEqual(os.lstat(src_link).st_mode,
1069 os.lstat(dst).st_mode)
1070
1071 ### shutil.copy2
1072
1073 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1074 def test_copy2(self):
1075 # Ensure that the copied file exists and has the same mode and
1076 # modification time bits.
1077 file1, file2 = self._copy_file(shutil.copy2)
1078 self.assertTrue(os.path.exists(file2))
1079 file1_stat = os.stat(file1)
1080 file2_stat = os.stat(file2)
1081 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1082 for attr in 'st_atime', 'st_mtime':
1083 # The modification times may be truncated in the new file.
1084 self.assertLessEqual(getattr(file1_stat, attr),
1085 getattr(file2_stat, attr) + 1)
1086 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1087 self.assertEqual(getattr(file1_stat, 'st_flags'),
1088 getattr(file2_stat, 'st_flags'))
1089
Hai Shi0c4f0f32020-06-30 21:46:31 +08001090 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001091 def test_copy2_symlinks(self):
1092 tmp_dir = self.mkdtemp()
1093 src = os.path.join(tmp_dir, 'foo')
1094 dst = os.path.join(tmp_dir, 'bar')
1095 src_link = os.path.join(tmp_dir, 'baz')
1096 write_file(src, 'foo')
1097 os.symlink(src, src_link)
1098 if hasattr(os, 'lchmod'):
1099 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1100 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1101 os.lchflags(src_link, stat.UF_NODUMP)
1102 src_stat = os.stat(src)
1103 src_link_stat = os.lstat(src_link)
1104 # follow
1105 shutil.copy2(src_link, dst, follow_symlinks=True)
1106 self.assertFalse(os.path.islink(dst))
1107 self.assertEqual(read_file(src), read_file(dst))
1108 os.remove(dst)
1109 # don't follow
1110 shutil.copy2(src_link, dst, follow_symlinks=False)
1111 self.assertTrue(os.path.islink(dst))
1112 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1113 dst_stat = os.lstat(dst)
1114 if os.utime in os.supports_follow_symlinks:
1115 for attr in 'st_atime', 'st_mtime':
1116 # The modification times may be truncated in the new file.
1117 self.assertLessEqual(getattr(src_link_stat, attr),
1118 getattr(dst_stat, attr) + 1)
1119 if hasattr(os, 'lchmod'):
1120 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1121 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1122 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1123 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1124
Hai Shi0c4f0f32020-06-30 21:46:31 +08001125 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001126 def test_copy2_xattr(self):
1127 tmp_dir = self.mkdtemp()
1128 src = os.path.join(tmp_dir, 'foo')
1129 dst = os.path.join(tmp_dir, 'bar')
1130 write_file(src, 'foo')
1131 os.setxattr(src, 'user.foo', b'42')
1132 shutil.copy2(src, dst)
1133 self.assertEqual(
1134 os.getxattr(src, 'user.foo'),
1135 os.getxattr(dst, 'user.foo'))
1136 os.remove(dst)
1137
1138 def test_copy_return_value(self):
1139 # copy and copy2 both return their destination path.
1140 for fn in (shutil.copy, shutil.copy2):
1141 src_dir = self.mkdtemp()
1142 dst_dir = self.mkdtemp()
1143 src = os.path.join(src_dir, 'foo')
1144 write_file(src, 'foo')
1145 rv = fn(src, dst_dir)
1146 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1147 rv = fn(src, os.path.join(dst_dir, 'bar'))
1148 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1149
1150 ### shutil.copyfile
1151
Hai Shi0c4f0f32020-06-30 21:46:31 +08001152 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001153 def test_copyfile_symlinks(self):
1154 tmp_dir = self.mkdtemp()
1155 src = os.path.join(tmp_dir, 'src')
1156 dst = os.path.join(tmp_dir, 'dst')
1157 dst_link = os.path.join(tmp_dir, 'dst_link')
1158 link = os.path.join(tmp_dir, 'link')
1159 write_file(src, 'foo')
1160 os.symlink(src, link)
1161 # don't follow
1162 shutil.copyfile(link, dst_link, follow_symlinks=False)
1163 self.assertTrue(os.path.islink(dst_link))
1164 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1165 # follow
1166 shutil.copyfile(link, dst)
1167 self.assertFalse(os.path.islink(dst))
1168
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001169 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001170 def test_dont_copy_file_onto_link_to_itself(self):
1171 # bug 851123.
1172 os.mkdir(TESTFN)
1173 src = os.path.join(TESTFN, 'cheese')
1174 dst = os.path.join(TESTFN, 'shop')
1175 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001176 with open(src, 'w') as f:
1177 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001178 try:
1179 os.link(src, dst)
1180 except PermissionError as e:
1181 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001182 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001183 with open(src, 'r') as f:
1184 self.assertEqual(f.read(), 'cheddar')
1185 os.remove(dst)
1186 finally:
1187 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001188
Hai Shi0c4f0f32020-06-30 21:46:31 +08001189 @os_helper.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001190 def test_dont_copy_file_onto_symlink_to_itself(self):
1191 # bug 851123.
1192 os.mkdir(TESTFN)
1193 src = os.path.join(TESTFN, 'cheese')
1194 dst = os.path.join(TESTFN, 'shop')
1195 try:
1196 with open(src, 'w') as f:
1197 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001198 # Using `src` here would mean we end up with a symlink pointing
1199 # to TESTFN/TESTFN/cheese, while it should point at
1200 # TESTFN/cheese.
1201 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001202 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001203 with open(src, 'r') as f:
1204 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001205 os.remove(dst)
1206 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001207 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001208
Serhiy Storchaka43767632013-11-03 21:31:38 +02001209 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1210 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
pxinwr6a273fd2020-11-29 06:06:36 +08001211 @unittest.skipIf(sys.platform == "vxworks",
1212 "fifo requires special path on VxWorks")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001213 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001214 try:
1215 os.mkfifo(TESTFN)
1216 except PermissionError as e:
1217 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001218 try:
1219 self.assertRaises(shutil.SpecialFileError,
1220 shutil.copyfile, TESTFN, TESTFN2)
1221 self.assertRaises(shutil.SpecialFileError,
1222 shutil.copyfile, __file__, TESTFN)
1223 finally:
1224 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001225
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001226 def test_copyfile_return_value(self):
1227 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001228 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001229 dst_dir = self.mkdtemp()
1230 dst_file = os.path.join(dst_dir, 'bar')
1231 src_file = os.path.join(src_dir, 'foo')
1232 write_file(src_file, 'foo')
1233 rv = shutil.copyfile(src_file, dst_file)
1234 self.assertTrue(os.path.exists(rv))
1235 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001236
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001237 def test_copyfile_same_file(self):
1238 # copyfile() should raise SameFileError if the source and destination
1239 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001240 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001241 src_file = os.path.join(src_dir, 'foo')
1242 write_file(src_file, 'foo')
1243 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1244 # But Error should work too, to stay backward compatible.
1245 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1246 # Make sure file is not corrupted.
1247 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001248
Tarek Ziadéfb437512010-04-20 08:57:33 +00001249
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001250class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001251
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001252 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001253
Hai Shia3ec3ad2020-05-19 06:02:57 +08001254 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001255 def test_make_tarball(self):
1256 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001257 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001258
1259 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001260 # force shutil to create the directory
1261 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001262 # working with relative paths
1263 work_dir = os.path.dirname(tmpdir2)
1264 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001265
Hai Shi0c4f0f32020-06-30 21:46:31 +08001266 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001267 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001268 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001269
1270 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001271 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001272 self.assertTrue(os.path.isfile(tarball))
1273 self.assertTrue(tarfile.is_tarfile(tarball))
1274 with tarfile.open(tarball, 'r:gz') as tf:
1275 self.assertCountEqual(tf.getnames(),
1276 ['.', './sub', './sub2',
1277 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001278
1279 # trying an uncompressed one
Hai Shi0c4f0f32020-06-30 21:46:31 +08001280 with os_helper.change_cwd(work_dir):
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001281 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001282 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001283 self.assertTrue(os.path.isfile(tarball))
1284 self.assertTrue(tarfile.is_tarfile(tarball))
1285 with tarfile.open(tarball, 'r') as tf:
1286 self.assertCountEqual(tf.getnames(),
1287 ['.', './sub', './sub2',
1288 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001289
1290 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001291 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001292 names = tar.getnames()
1293 names.sort()
1294 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001295
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001296 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001297 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001298 root_dir = self.mkdtemp()
1299 dist = os.path.join(root_dir, base_dir)
1300 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001301 write_file((dist, 'file1'), 'xxx')
1302 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001303 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001304 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001305 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001306 if base_dir:
1307 write_file((root_dir, 'outer'), 'xxx')
1308 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001309
Hai Shia3ec3ad2020-05-19 06:02:57 +08001310 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001311 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001312 'Need the tar command to run')
1313 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001314 root_dir, base_dir = self._create_files()
1315 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001316 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001317
1318 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001319 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001320 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001321
1322 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001323 tarball2 = os.path.join(root_dir, 'archive2.tar')
1324 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001325 subprocess.check_call(tar_cmd, cwd=root_dir,
1326 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001327
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001328 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001329 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001330 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001331
1332 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001333 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1334 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001335 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001336
1337 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001338 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1339 dry_run=True)
1340 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001341 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001342
Hai Shia3ec3ad2020-05-19 06:02:57 +08001343 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001344 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001345 # creating something to zip
1346 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001347
1348 tmpdir2 = self.mkdtemp()
1349 # force shutil to create the directory
1350 os.rmdir(tmpdir2)
1351 # working with relative paths
1352 work_dir = os.path.dirname(tmpdir2)
1353 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001354
Hai Shi0c4f0f32020-06-30 21:46:31 +08001355 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001356 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001357 res = make_archive(rel_base_name, 'zip', root_dir)
1358
1359 self.assertEqual(res, base_name + '.zip')
1360 self.assertTrue(os.path.isfile(res))
1361 self.assertTrue(zipfile.is_zipfile(res))
1362 with zipfile.ZipFile(res) as zf:
1363 self.assertCountEqual(zf.namelist(),
1364 ['dist/', 'dist/sub/', 'dist/sub2/',
1365 'dist/file1', 'dist/file2', 'dist/sub/file3',
1366 'outer'])
1367
Hai Shi0c4f0f32020-06-30 21:46:31 +08001368 with os_helper.change_cwd(work_dir):
Serhiy Storchaka666de772016-10-23 15:55:09 +03001369 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001370 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001371
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001372 self.assertEqual(res, base_name + '.zip')
1373 self.assertTrue(os.path.isfile(res))
1374 self.assertTrue(zipfile.is_zipfile(res))
1375 with zipfile.ZipFile(res) as zf:
1376 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001377 ['dist/', 'dist/sub/', 'dist/sub2/',
1378 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001379
Hai Shia3ec3ad2020-05-19 06:02:57 +08001380 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001381 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001382 'Need the zip command to run')
1383 def test_zipfile_vs_zip(self):
1384 root_dir, base_dir = self._create_files()
1385 base_name = os.path.join(self.mkdtemp(), 'archive')
1386 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1387
1388 # check if ZIP file was created
1389 self.assertEqual(archive, base_name + '.zip')
1390 self.assertTrue(os.path.isfile(archive))
1391
1392 # now create another ZIP file using `zip`
1393 archive2 = os.path.join(root_dir, 'archive2.zip')
1394 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001395 subprocess.check_call(zip_cmd, cwd=root_dir,
1396 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001397
1398 self.assertTrue(os.path.isfile(archive2))
1399 # let's compare both ZIP files
1400 with zipfile.ZipFile(archive) as zf:
1401 names = zf.namelist()
1402 with zipfile.ZipFile(archive2) as zf:
1403 names2 = zf.namelist()
1404 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001405
Hai Shia3ec3ad2020-05-19 06:02:57 +08001406 @support.requires_zlib()
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001407 @unittest.skipUnless(shutil.which('unzip'),
1408 'Need the unzip command to run')
1409 def test_unzip_zipfile(self):
1410 root_dir, base_dir = self._create_files()
1411 base_name = os.path.join(self.mkdtemp(), 'archive')
1412 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1413
1414 # check if ZIP file was created
1415 self.assertEqual(archive, base_name + '.zip')
1416 self.assertTrue(os.path.isfile(archive))
1417
1418 # now check the ZIP file using `unzip -t`
1419 zip_cmd = ['unzip', '-t', archive]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001420 with os_helper.change_cwd(root_dir):
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001421 try:
1422 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1423 except subprocess.CalledProcessError as exc:
1424 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001425 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001426 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001427 msg = "{}\n\n**Unzip Output**\n{}"
1428 self.fail(msg.format(exc, details))
1429
Tarek Ziadé396fad72010-02-23 05:30:31 +00001430 def test_make_archive(self):
1431 tmpdir = self.mkdtemp()
1432 base_name = os.path.join(tmpdir, 'archive')
1433 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1434
Hai Shia3ec3ad2020-05-19 06:02:57 +08001435 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001436 def test_make_archive_owner_group(self):
1437 # testing make_archive with owner and group, with various combinations
1438 # this works even if there's not gid/uid support
1439 if UID_GID_SUPPORT:
1440 group = grp.getgrgid(0)[0]
1441 owner = pwd.getpwuid(0)[0]
1442 else:
1443 group = owner = 'root'
1444
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001445 root_dir, base_dir = self._create_files()
1446 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001447 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1448 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001449 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001450
1451 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001452 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001453
1454 res = make_archive(base_name, 'tar', root_dir, base_dir,
1455 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001456 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001457
1458 res = make_archive(base_name, 'tar', root_dir, base_dir,
1459 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001460 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001461
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001462
Hai Shia3ec3ad2020-05-19 06:02:57 +08001463 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001464 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1465 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001466 root_dir, base_dir = self._create_files()
1467 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001468 group = grp.getgrgid(0)[0]
1469 owner = pwd.getpwuid(0)[0]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001470 with os_helper.change_cwd(root_dir):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001471 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1472 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001473
1474 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001475 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001476
1477 # now checks the rights
1478 archive = tarfile.open(archive_name)
1479 try:
1480 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001481 self.assertEqual(member.uid, 0)
1482 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001483 finally:
1484 archive.close()
1485
1486 def test_make_archive_cwd(self):
1487 current_dir = os.getcwd()
1488 def _breaks(*args, **kw):
1489 raise RuntimeError()
1490
1491 register_archive_format('xxx', _breaks, [], 'xxx file')
1492 try:
1493 try:
1494 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1495 except Exception:
1496 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001497 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001498 finally:
1499 unregister_archive_format('xxx')
1500
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001501 def test_make_tarfile_in_curdir(self):
1502 # Issue #21280
1503 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001504 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001505 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1506 self.assertTrue(os.path.isfile('test.tar'))
1507
Hai Shia3ec3ad2020-05-19 06:02:57 +08001508 @support.requires_zlib()
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001509 def test_make_zipfile_in_curdir(self):
1510 # Issue #21280
1511 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001512 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001513 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1514 self.assertTrue(os.path.isfile('test.zip'))
1515
Tarek Ziadé396fad72010-02-23 05:30:31 +00001516 def test_register_archive_format(self):
1517
1518 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1519 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1520 1)
1521 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1522 [(1, 2), (1, 2, 3)])
1523
1524 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1525 formats = [name for name, params in get_archive_formats()]
1526 self.assertIn('xxx', formats)
1527
1528 unregister_archive_format('xxx')
1529 formats = [name for name, params in get_archive_formats()]
1530 self.assertNotIn('xxx', formats)
1531
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001532 ### shutil.unpack_archive
1533
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001534 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001535 self.check_unpack_archive_with_converter(format, lambda path: path)
1536 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001537 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001538
1539 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001540 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001541 expected = rlistdir(root_dir)
1542 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001543
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001544 base_name = os.path.join(self.mkdtemp(), 'archive')
1545 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001546
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001547 # let's try to unpack it now
1548 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001549 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001550 self.assertEqual(rlistdir(tmpdir2), expected)
1551
1552 # and again, this time with the format specified
1553 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001554 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001555 self.assertEqual(rlistdir(tmpdir3), expected)
1556
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001557 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1558 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001559
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001560 def test_unpack_archive_tar(self):
1561 self.check_unpack_archive('tar')
1562
Hai Shia3ec3ad2020-05-19 06:02:57 +08001563 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001564 def test_unpack_archive_gztar(self):
1565 self.check_unpack_archive('gztar')
1566
Hai Shia3ec3ad2020-05-19 06:02:57 +08001567 @support.requires_bz2()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001568 def test_unpack_archive_bztar(self):
1569 self.check_unpack_archive('bztar')
1570
Hai Shia3ec3ad2020-05-19 06:02:57 +08001571 @support.requires_lzma()
Michael Feltef110b12019-02-18 12:02:44 +01001572 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001573 def test_unpack_archive_xztar(self):
1574 self.check_unpack_archive('xztar')
1575
Hai Shia3ec3ad2020-05-19 06:02:57 +08001576 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001577 def test_unpack_archive_zip(self):
1578 self.check_unpack_archive('zip')
1579
Martin Pantereb995702016-07-28 01:11:04 +00001580 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001581
1582 formats = get_unpack_formats()
1583
1584 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001585 self.assertEqual(extra, 1)
1586 self.assertEqual(filename, 'stuff.boo')
1587 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001588
1589 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1590 unpack_archive('stuff.boo', 'xx')
1591
1592 # trying to register a .boo unpacker again
1593 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1594 ['.boo'], _boo)
1595
1596 # should work now
1597 unregister_unpack_format('Boo')
1598 register_unpack_format('Boo2', ['.boo'], _boo)
1599 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1600 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1601
1602 # let's leave a clean state
1603 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001604 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001605
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001606
1607class TestMisc(BaseTest, unittest.TestCase):
1608
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001609 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1610 "disk_usage not available on this platform")
1611 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001612 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001613 for attr in ('total', 'used', 'free'):
1614 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001615 self.assertGreater(usage.total, 0)
1616 self.assertGreater(usage.used, 0)
1617 self.assertGreaterEqual(usage.free, 0)
1618 self.assertGreaterEqual(usage.total, usage.used)
1619 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001620
Victor Stinnerdc525f42018-12-11 12:05:21 +01001621 # bpo-32557: Check that disk_usage() also accepts a filename
1622 shutil.disk_usage(__file__)
1623
Sandro Tosid902a142011-08-22 23:28:27 +02001624 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1625 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1626 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001627 dirname = self.mkdtemp()
1628 filename = tempfile.mktemp(dir=dirname)
1629 write_file(filename, 'testing chown function')
1630
1631 with self.assertRaises(ValueError):
1632 shutil.chown(filename)
1633
1634 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001635 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001636
1637 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001638 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001639
1640 with self.assertRaises(TypeError):
1641 shutil.chown(filename, b'spam')
1642
1643 with self.assertRaises(TypeError):
1644 shutil.chown(filename, 3.14)
1645
1646 uid = os.getuid()
1647 gid = os.getgid()
1648
1649 def check_chown(path, uid=None, gid=None):
1650 s = os.stat(filename)
1651 if uid is not None:
1652 self.assertEqual(uid, s.st_uid)
1653 if gid is not None:
1654 self.assertEqual(gid, s.st_gid)
1655
1656 shutil.chown(filename, uid, gid)
1657 check_chown(filename, uid, gid)
1658 shutil.chown(filename, uid)
1659 check_chown(filename, uid)
1660 shutil.chown(filename, user=uid)
1661 check_chown(filename, uid)
1662 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001663 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001664
1665 shutil.chown(dirname, uid, gid)
1666 check_chown(dirname, uid, gid)
1667 shutil.chown(dirname, uid)
1668 check_chown(dirname, uid)
1669 shutil.chown(dirname, user=uid)
1670 check_chown(dirname, uid)
1671 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001672 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001673
Matthias Braun52268942020-03-17 09:51:44 -07001674 try:
1675 user = pwd.getpwuid(uid)[0]
1676 group = grp.getgrgid(gid)[0]
1677 except KeyError:
1678 # On some systems uid/gid cannot be resolved.
1679 pass
1680 else:
1681 shutil.chown(filename, user, group)
1682 check_chown(filename, uid, gid)
1683 shutil.chown(dirname, user, group)
1684 check_chown(dirname, uid, gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001685
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001686
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001687class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001688
1689 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001690 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001691 # Give the temp_file an ".exe" suffix for all.
1692 # It's needed on Windows and not harmful on other platforms.
1693 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001694 prefix="Tmp",
1695 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001696 os.chmod(self.temp_file.name, stat.S_IXUSR)
1697 self.addCleanup(self.temp_file.close)
1698 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001699 self.env_path = self.dir
1700 self.curdir = os.curdir
1701 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001702
1703 def test_basic(self):
1704 # Given an EXE in a directory, it should be returned.
1705 rv = shutil.which(self.file, path=self.dir)
1706 self.assertEqual(rv, self.temp_file.name)
1707
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001708 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001709 # When given the fully qualified path to an executable that exists,
1710 # it should be returned.
1711 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001712 self.assertEqual(rv, self.temp_file.name)
1713
1714 def test_relative_cmd(self):
1715 # When given the relative path with a directory part to an executable
1716 # that exists, it should be returned.
1717 base_dir, tail_dir = os.path.split(self.dir)
1718 relpath = os.path.join(tail_dir, self.file)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001719 with os_helper.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001720 rv = shutil.which(relpath, path=self.temp_dir)
1721 self.assertEqual(rv, relpath)
1722 # But it shouldn't be searched in PATH directories (issue #16957).
Hai Shi0c4f0f32020-06-30 21:46:31 +08001723 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001724 rv = shutil.which(relpath, path=base_dir)
1725 self.assertIsNone(rv)
1726
1727 def test_cwd(self):
1728 # Issue #16957
1729 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001730 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001731 rv = shutil.which(self.file, path=base_dir)
1732 if sys.platform == "win32":
1733 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001734 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001735 else:
1736 # Other platforms: shouldn't match in the current directory.
1737 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001738
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001739 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1740 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001741 def test_non_matching_mode(self):
1742 # Set the file read-only and ask for writeable files.
1743 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001744 if os.access(self.temp_file.name, os.W_OK):
1745 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001746 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1747 self.assertIsNone(rv)
1748
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001749 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001750 base_dir, tail_dir = os.path.split(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001751 with os_helper.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001752 rv = shutil.which(self.file, path=tail_dir)
1753 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001754
Brian Curtinc57a3452012-06-22 16:00:30 -05001755 def test_nonexistent_file(self):
1756 # Return None when no matching executable file is found on the path.
1757 rv = shutil.which("foo.exe", path=self.dir)
1758 self.assertIsNone(rv)
1759
1760 @unittest.skipUnless(sys.platform == "win32",
1761 "pathext check is Windows-only")
1762 def test_pathext_checking(self):
1763 # Ask for the file without the ".exe" extension, then ensure that
1764 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001765 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001766 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001767
Barry Warsaw618738b2013-04-16 11:05:03 -04001768 def test_environ_path(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001769 with os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001770 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001771 rv = shutil.which(self.file)
1772 self.assertEqual(rv, self.temp_file.name)
1773
Victor Stinner228a3c92019-04-17 16:26:36 +02001774 def test_environ_path_empty(self):
1775 # PATH='': no match
Hai Shi0c4f0f32020-06-30 21:46:31 +08001776 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001777 env['PATH'] = ''
1778 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1779 create=True), \
1780 support.swap_attr(os, 'defpath', self.dir), \
Hai Shi0c4f0f32020-06-30 21:46:31 +08001781 os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001782 rv = shutil.which(self.file)
1783 self.assertIsNone(rv)
1784
1785 def test_environ_path_cwd(self):
1786 expected_cwd = os.path.basename(self.temp_file.name)
1787 if sys.platform == "win32":
1788 curdir = os.curdir
1789 if isinstance(expected_cwd, bytes):
1790 curdir = os.fsencode(curdir)
1791 expected_cwd = os.path.join(curdir, expected_cwd)
1792
1793 # PATH=':': explicitly looks in the current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001794 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001795 env['PATH'] = os.pathsep
1796 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1797 create=True), \
1798 support.swap_attr(os, 'defpath', self.dir):
1799 rv = shutil.which(self.file)
1800 self.assertIsNone(rv)
1801
1802 # look in current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001803 with os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001804 rv = shutil.which(self.file)
1805 self.assertEqual(rv, expected_cwd)
1806
1807 def test_environ_path_missing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001808 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001809 env.pop('PATH', None)
1810
1811 # without confstr
1812 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1813 create=True), \
1814 support.swap_attr(os, 'defpath', self.dir):
1815 rv = shutil.which(self.file)
1816 self.assertEqual(rv, self.temp_file.name)
1817
1818 # with confstr
1819 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1820 create=True), \
1821 support.swap_attr(os, 'defpath', ''):
1822 rv = shutil.which(self.file)
1823 self.assertEqual(rv, self.temp_file.name)
1824
Barry Warsaw618738b2013-04-16 11:05:03 -04001825 def test_empty_path(self):
1826 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001827 with os_helper.change_cwd(path=self.dir), \
1828 os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001829 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001830 rv = shutil.which(self.file, path='')
1831 self.assertIsNone(rv)
1832
1833 def test_empty_path_no_PATH(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001834 with os_helper.EnvironmentVarGuard() as env:
Barry Warsaw618738b2013-04-16 11:05:03 -04001835 env.pop('PATH', None)
1836 rv = shutil.which(self.file)
1837 self.assertIsNone(rv)
1838
Victor Stinner228a3c92019-04-17 16:26:36 +02001839 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1840 def test_pathext(self):
1841 ext = ".xyz"
1842 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1843 prefix="Tmp2", suffix=ext)
1844 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1845 self.addCleanup(temp_filexyz.close)
1846
1847 # strip path and extension
1848 program = os.path.basename(temp_filexyz.name)
1849 program = os.path.splitext(program)[0]
1850
Hai Shi0c4f0f32020-06-30 21:46:31 +08001851 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001852 env['PATHEXT'] = ext
1853 rv = shutil.which(program, path=self.temp_dir)
1854 self.assertEqual(rv, temp_filexyz.name)
1855
Christopher Marchfelderda6f0982020-10-23 12:08:24 +02001856 # Issue 40592: See https://bugs.python.org/issue40592
1857 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1858 def test_pathext_with_empty_str(self):
1859 ext = ".xyz"
1860 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1861 prefix="Tmp2", suffix=ext)
1862 self.addCleanup(temp_filexyz.close)
1863
1864 # strip path and extension
1865 program = os.path.basename(temp_filexyz.name)
1866 program = os.path.splitext(program)[0]
1867
1868 with os_helper.EnvironmentVarGuard() as env:
1869 env['PATHEXT'] = f"{ext};" # note the ;
1870 rv = shutil.which(program, path=self.temp_dir)
1871 self.assertEqual(rv, temp_filexyz.name)
1872
Brian Curtinc57a3452012-06-22 16:00:30 -05001873
Cheryl Sabella5680f652019-02-13 06:25:10 -05001874class TestWhichBytes(TestWhich):
1875 def setUp(self):
1876 TestWhich.setUp(self)
1877 self.dir = os.fsencode(self.dir)
1878 self.file = os.fsencode(self.file)
1879 self.temp_file.name = os.fsencode(self.temp_file.name)
1880 self.curdir = os.fsencode(self.curdir)
1881 self.ext = os.fsencode(self.ext)
1882
1883
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001884class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001885
1886 def setUp(self):
1887 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001888 self.src_dir = self.mkdtemp()
1889 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001890 self.src_file = os.path.join(self.src_dir, filename)
1891 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001892 with open(self.src_file, "wb") as f:
1893 f.write(b"spam")
1894
Christian Heimesada8c3b2008-03-18 18:26:33 +00001895 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001896 with open(src, "rb") as f:
1897 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001898 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001899 with open(real_dst, "rb") as f:
1900 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001901 self.assertFalse(os.path.exists(src))
1902
1903 def _check_move_dir(self, src, dst, real_dst):
1904 contents = sorted(os.listdir(src))
1905 shutil.move(src, dst)
1906 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1907 self.assertFalse(os.path.exists(src))
1908
1909 def test_move_file(self):
1910 # Move a file to another location on the same filesystem.
1911 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1912
1913 def test_move_file_to_dir(self):
1914 # Move a file inside an existing dir on the same filesystem.
1915 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1916
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001917 def test_move_file_to_dir_pathlike_src(self):
1918 # Move a pathlike file to another location on the same filesystem.
1919 src = pathlib.Path(self.src_file)
1920 self._check_move_file(src, self.dst_dir, self.dst_file)
1921
1922 def test_move_file_to_dir_pathlike_dst(self):
1923 # Move a file to another pathlike location on the same filesystem.
1924 dst = pathlib.Path(self.dst_dir)
1925 self._check_move_file(self.src_file, dst, self.dst_file)
1926
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001927 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001928 def test_move_file_other_fs(self):
1929 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001930 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001931
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001932 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001933 def test_move_file_to_dir_other_fs(self):
1934 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001935 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001936
1937 def test_move_dir(self):
1938 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001939 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001940 try:
1941 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1942 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001943 os_helper.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001944
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001945 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001946 def test_move_dir_other_fs(self):
1947 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001948 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001949
1950 def test_move_dir_to_dir(self):
1951 # Move a dir inside an existing dir on the same filesystem.
1952 self._check_move_dir(self.src_dir, self.dst_dir,
1953 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1954
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001955 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001956 def test_move_dir_to_dir_other_fs(self):
1957 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001958 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001959
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001960 def test_move_dir_sep_to_dir(self):
1961 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1962 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1963
1964 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1965 def test_move_dir_altsep_to_dir(self):
1966 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1967 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1968
Christian Heimesada8c3b2008-03-18 18:26:33 +00001969 def test_existing_file_inside_dest_dir(self):
1970 # A file with the same name inside the destination dir already exists.
1971 with open(self.dst_file, "wb"):
1972 pass
1973 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1974
1975 def test_dont_move_dir_in_itself(self):
1976 # Moving a dir inside itself raises an Error.
1977 dst = os.path.join(self.src_dir, "bar")
1978 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1979
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001980 def test_destinsrc_false_negative(self):
1981 os.mkdir(TESTFN)
1982 try:
1983 for src, dst in [('srcdir', 'srcdir/dest')]:
1984 src = os.path.join(TESTFN, src)
1985 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001986 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001987 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001988 'dst (%s) is not in src (%s)' % (dst, src))
1989 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001990 os_helper.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001991
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001992 def test_destinsrc_false_positive(self):
1993 os.mkdir(TESTFN)
1994 try:
1995 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1996 src = os.path.join(TESTFN, src)
1997 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001998 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001999 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00002000 'dst (%s) is in src (%s)' % (dst, src))
2001 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08002002 os_helper.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00002003
Hai Shi0c4f0f32020-06-30 21:46:31 +08002004 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002005 @mock_rename
2006 def test_move_file_symlink(self):
2007 dst = os.path.join(self.src_dir, 'bar')
2008 os.symlink(self.src_file, dst)
2009 shutil.move(dst, self.dst_file)
2010 self.assertTrue(os.path.islink(self.dst_file))
2011 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
2012
Hai Shi0c4f0f32020-06-30 21:46:31 +08002013 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002014 @mock_rename
2015 def test_move_file_symlink_to_dir(self):
2016 filename = "bar"
2017 dst = os.path.join(self.src_dir, filename)
2018 os.symlink(self.src_file, dst)
2019 shutil.move(dst, self.dst_dir)
2020 final_link = os.path.join(self.dst_dir, filename)
2021 self.assertTrue(os.path.islink(final_link))
2022 self.assertTrue(os.path.samefile(self.src_file, final_link))
2023
Hai Shi0c4f0f32020-06-30 21:46:31 +08002024 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002025 @mock_rename
2026 def test_move_dangling_symlink(self):
2027 src = os.path.join(self.src_dir, 'baz')
2028 dst = os.path.join(self.src_dir, 'bar')
2029 os.symlink(src, dst)
2030 dst_link = os.path.join(self.dst_dir, 'quux')
2031 shutil.move(dst, dst_link)
2032 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07002033 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002034
Hai Shi0c4f0f32020-06-30 21:46:31 +08002035 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002036 @mock_rename
2037 def test_move_dir_symlink(self):
2038 src = os.path.join(self.src_dir, 'baz')
2039 dst = os.path.join(self.src_dir, 'bar')
2040 os.mkdir(src)
2041 os.symlink(src, dst)
2042 dst_link = os.path.join(self.dst_dir, 'quux')
2043 shutil.move(dst, dst_link)
2044 self.assertTrue(os.path.islink(dst_link))
2045 self.assertTrue(os.path.samefile(src, dst_link))
2046
Brian Curtin0d0a1de2012-06-18 18:41:07 -05002047 def test_move_return_value(self):
2048 rv = shutil.move(self.src_file, self.dst_dir)
2049 self.assertEqual(rv,
2050 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2051
2052 def test_move_as_rename_return_value(self):
2053 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2054 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2055
R David Murray6ffface2014-06-11 14:40:13 -04002056 @mock_rename
2057 def test_move_file_special_function(self):
2058 moved = []
2059 def _copy(src, dst):
2060 moved.append((src, dst))
2061 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2062 self.assertEqual(len(moved), 1)
2063
2064 @mock_rename
2065 def test_move_dir_special_function(self):
2066 moved = []
2067 def _copy(src, dst):
2068 moved.append((src, dst))
Hai Shi0c4f0f32020-06-30 21:46:31 +08002069 os_helper.create_empty_file(os.path.join(self.src_dir, 'child'))
2070 os_helper.create_empty_file(os.path.join(self.src_dir, 'child1'))
R David Murray6ffface2014-06-11 14:40:13 -04002071 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2072 self.assertEqual(len(moved), 3)
2073
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002074 def test_move_dir_caseinsensitive(self):
2075 # Renames a folder to the same name
2076 # but a different case.
2077
2078 self.src_dir = self.mkdtemp()
2079 dst_dir = os.path.join(
2080 os.path.dirname(self.src_dir),
2081 os.path.basename(self.src_dir).upper())
2082 self.assertNotEqual(self.src_dir, dst_dir)
2083
2084 try:
2085 shutil.move(self.src_dir, dst_dir)
2086 self.assertTrue(os.path.isdir(dst_dir))
2087 finally:
2088 os.rmdir(dst_dir)
2089
Tarek Ziadé5340db32010-04-19 22:30:51 +00002090
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002091class TestCopyFile(unittest.TestCase):
2092
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002093 class Faux(object):
2094 _entered = False
2095 _exited_with = None
2096 _raised = False
2097 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2098 self._raise_in_exit = raise_in_exit
2099 self._suppress_at_exit = suppress_at_exit
2100 def read(self, *args):
2101 return ''
2102 def __enter__(self):
2103 self._entered = True
2104 def __exit__(self, exc_type, exc_val, exc_tb):
2105 self._exited_with = exc_type, exc_val, exc_tb
2106 if self._raise_in_exit:
2107 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002108 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002109 return self._suppress_at_exit
2110
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002111 def test_w_source_open_fails(self):
2112 def _open(filename, mode='r'):
2113 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002114 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002115 assert 0 # shouldn't reach here.
2116
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002117 with support.swap_attr(shutil, 'open', _open):
2118 with self.assertRaises(OSError):
2119 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002120
Victor Stinner937ee9e2018-06-26 02:11:06 +02002121 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002122 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002123 srcfile = self.Faux()
2124
2125 def _open(filename, mode='r'):
2126 if filename == 'srcfile':
2127 return srcfile
2128 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002129 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002130 assert 0 # shouldn't reach here.
2131
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002132 with support.swap_attr(shutil, 'open', _open):
2133 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002134 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002135 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002136 self.assertEqual(srcfile._exited_with[1].args,
2137 ('Cannot open "destfile"',))
2138
Victor Stinner937ee9e2018-06-26 02:11:06 +02002139 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002140 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002141 srcfile = self.Faux()
2142 destfile = self.Faux(True)
2143
2144 def _open(filename, mode='r'):
2145 if filename == 'srcfile':
2146 return srcfile
2147 if filename == 'destfile':
2148 return destfile
2149 assert 0 # shouldn't reach here.
2150
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002151 with support.swap_attr(shutil, 'open', _open):
2152 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002153 self.assertTrue(srcfile._entered)
2154 self.assertTrue(destfile._entered)
2155 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002156 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002157 self.assertEqual(srcfile._exited_with[1].args,
2158 ('Cannot close',))
2159
Victor Stinner937ee9e2018-06-26 02:11:06 +02002160 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002161 def test_w_source_close_fails(self):
2162
2163 srcfile = self.Faux(True)
2164 destfile = self.Faux()
2165
2166 def _open(filename, mode='r'):
2167 if filename == 'srcfile':
2168 return srcfile
2169 if filename == 'destfile':
2170 return destfile
2171 assert 0 # shouldn't reach here.
2172
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002173 with support.swap_attr(shutil, 'open', _open):
2174 with self.assertRaises(OSError):
2175 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002176 self.assertTrue(srcfile._entered)
2177 self.assertTrue(destfile._entered)
2178 self.assertFalse(destfile._raised)
2179 self.assertTrue(srcfile._exited_with[0] is None)
2180 self.assertTrue(srcfile._raised)
2181
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002182
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002183class TestCopyFileObj(unittest.TestCase):
2184 FILESIZE = 2 * 1024 * 1024
2185
2186 @classmethod
2187 def setUpClass(cls):
2188 write_test_file(TESTFN, cls.FILESIZE)
2189
2190 @classmethod
2191 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002192 os_helper.unlink(TESTFN)
2193 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002194
2195 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002196 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002197
2198 @contextlib.contextmanager
2199 def get_files(self):
2200 with open(TESTFN, "rb") as src:
2201 with open(TESTFN2, "wb") as dst:
2202 yield (src, dst)
2203
2204 def assert_files_eq(self, src, dst):
2205 with open(src, 'rb') as fsrc:
2206 with open(dst, 'rb') as fdst:
2207 self.assertEqual(fsrc.read(), fdst.read())
2208
2209 def test_content(self):
2210 with self.get_files() as (src, dst):
2211 shutil.copyfileobj(src, dst)
2212 self.assert_files_eq(TESTFN, TESTFN2)
2213
2214 def test_file_not_closed(self):
2215 with self.get_files() as (src, dst):
2216 shutil.copyfileobj(src, dst)
2217 assert not src.closed
2218 assert not dst.closed
2219
2220 def test_file_offset(self):
2221 with self.get_files() as (src, dst):
2222 shutil.copyfileobj(src, dst)
2223 self.assertEqual(src.tell(), self.FILESIZE)
2224 self.assertEqual(dst.tell(), self.FILESIZE)
2225
2226 @unittest.skipIf(os.name != 'nt', "Windows only")
2227 def test_win_impl(self):
2228 # Make sure alternate Windows implementation is called.
2229 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2230 shutil.copyfile(TESTFN, TESTFN2)
2231 assert m.called
2232
2233 # File size is 2 MiB but max buf size should be 1 MiB.
2234 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2235
2236 # If file size < 1 MiB memoryview() length must be equal to
2237 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002238 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002239 f.write(b'foo')
2240 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002241 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002242 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2243 shutil.copyfile(fname, TESTFN2)
2244 self.assertEqual(m.call_args[0][2], 3)
2245
2246 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002247 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002248 pass
2249 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002250 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002251 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2252 shutil.copyfile(fname, TESTFN2)
2253 assert not m.called
2254 self.assert_files_eq(fname, TESTFN2)
2255
2256
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002257class _ZeroCopyFileTest(object):
2258 """Tests common to all zero-copy APIs."""
2259 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2260 FILEDATA = b""
2261 PATCHPOINT = ""
2262
2263 @classmethod
2264 def setUpClass(cls):
2265 write_test_file(TESTFN, cls.FILESIZE)
2266 with open(TESTFN, 'rb') as f:
2267 cls.FILEDATA = f.read()
2268 assert len(cls.FILEDATA) == cls.FILESIZE
2269
2270 @classmethod
2271 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002272 os_helper.unlink(TESTFN)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002273
2274 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002275 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002276
2277 @contextlib.contextmanager
2278 def get_files(self):
2279 with open(TESTFN, "rb") as src:
2280 with open(TESTFN2, "wb") as dst:
2281 yield (src, dst)
2282
2283 def zerocopy_fun(self, *args, **kwargs):
2284 raise NotImplementedError("must be implemented in subclass")
2285
2286 def reset(self):
2287 self.tearDown()
2288 self.tearDownClass()
2289 self.setUpClass()
2290 self.setUp()
2291
2292 # ---
2293
2294 def test_regular_copy(self):
2295 with self.get_files() as (src, dst):
2296 self.zerocopy_fun(src, dst)
2297 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2298 # Make sure the fallback function is not called.
2299 with self.get_files() as (src, dst):
2300 with unittest.mock.patch('shutil.copyfileobj') as m:
2301 shutil.copyfile(TESTFN, TESTFN2)
2302 assert not m.called
2303
2304 def test_same_file(self):
2305 self.addCleanup(self.reset)
2306 with self.get_files() as (src, dst):
2307 with self.assertRaises(Exception):
2308 self.zerocopy_fun(src, src)
2309 # Make sure src file is not corrupted.
2310 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2311
2312 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002313 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002314 with self.assertRaises(FileNotFoundError) as cm:
2315 shutil.copyfile(name, "new")
2316 self.assertEqual(cm.exception.filename, name)
2317
2318 def test_empty_file(self):
2319 srcname = TESTFN + 'src'
2320 dstname = TESTFN + 'dst'
Hai Shi0c4f0f32020-06-30 21:46:31 +08002321 self.addCleanup(lambda: os_helper.unlink(srcname))
2322 self.addCleanup(lambda: os_helper.unlink(dstname))
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002323 with open(srcname, "wb"):
2324 pass
2325
2326 with open(srcname, "rb") as src:
2327 with open(dstname, "wb") as dst:
2328 self.zerocopy_fun(src, dst)
2329
2330 self.assertEqual(read_file(dstname, binary=True), b"")
2331
2332 def test_unhandled_exception(self):
2333 with unittest.mock.patch(self.PATCHPOINT,
2334 side_effect=ZeroDivisionError):
2335 self.assertRaises(ZeroDivisionError,
2336 shutil.copyfile, TESTFN, TESTFN2)
2337
2338 def test_exception_on_first_call(self):
2339 # Emulate a case where the first call to the zero-copy
2340 # function raises an exception in which case the function is
2341 # supposed to give up immediately.
2342 with unittest.mock.patch(self.PATCHPOINT,
2343 side_effect=OSError(errno.EINVAL, "yo")):
2344 with self.get_files() as (src, dst):
2345 with self.assertRaises(_GiveupOnFastCopy):
2346 self.zerocopy_fun(src, dst)
2347
2348 def test_filesystem_full(self):
2349 # Emulate a case where filesystem is full and sendfile() fails
2350 # on first call.
2351 with unittest.mock.patch(self.PATCHPOINT,
2352 side_effect=OSError(errno.ENOSPC, "yo")):
2353 with self.get_files() as (src, dst):
2354 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2355
2356
2357@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2358class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2359 PATCHPOINT = "os.sendfile"
2360
2361 def zerocopy_fun(self, fsrc, fdst):
2362 return shutil._fastcopy_sendfile(fsrc, fdst)
2363
2364 def test_non_regular_file_src(self):
2365 with io.BytesIO(self.FILEDATA) as src:
2366 with open(TESTFN2, "wb") as dst:
2367 with self.assertRaises(_GiveupOnFastCopy):
2368 self.zerocopy_fun(src, dst)
2369 shutil.copyfileobj(src, dst)
2370
2371 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2372
2373 def test_non_regular_file_dst(self):
2374 with open(TESTFN, "rb") as src:
2375 with io.BytesIO() as dst:
2376 with self.assertRaises(_GiveupOnFastCopy):
2377 self.zerocopy_fun(src, dst)
2378 shutil.copyfileobj(src, dst)
2379 dst.seek(0)
2380 self.assertEqual(dst.read(), self.FILEDATA)
2381
2382 def test_exception_on_second_call(self):
2383 def sendfile(*args, **kwargs):
2384 if not flag:
2385 flag.append(None)
2386 return orig_sendfile(*args, **kwargs)
2387 else:
2388 raise OSError(errno.EBADF, "yo")
2389
2390 flag = []
2391 orig_sendfile = os.sendfile
2392 with unittest.mock.patch('os.sendfile', create=True,
2393 side_effect=sendfile):
2394 with self.get_files() as (src, dst):
2395 with self.assertRaises(OSError) as cm:
2396 shutil._fastcopy_sendfile(src, dst)
2397 assert flag
2398 self.assertEqual(cm.exception.errno, errno.EBADF)
2399
2400 def test_cant_get_size(self):
2401 # Emulate a case where src file size cannot be determined.
2402 # Internally bufsize will be set to a small value and
2403 # sendfile() will be called repeatedly.
2404 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2405 with self.get_files() as (src, dst):
2406 shutil._fastcopy_sendfile(src, dst)
2407 assert m.called
2408 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2409
2410 def test_small_chunks(self):
2411 # Force internal file size detection to be smaller than the
2412 # actual file size. We want to force sendfile() to be called
2413 # multiple times, also in order to emulate a src fd which gets
2414 # bigger while it is being copied.
2415 mock = unittest.mock.Mock()
2416 mock.st_size = 65536 + 1
2417 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2418 with self.get_files() as (src, dst):
2419 shutil._fastcopy_sendfile(src, dst)
2420 assert m.called
2421 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2422
2423 def test_big_chunk(self):
2424 # Force internal file size detection to be +100MB bigger than
2425 # the actual file size. Make sure sendfile() does not rely on
2426 # file size value except for (maybe) a better throughput /
2427 # performance.
2428 mock = unittest.mock.Mock()
2429 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2430 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2431 with self.get_files() as (src, dst):
2432 shutil._fastcopy_sendfile(src, dst)
2433 assert m.called
2434 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2435
2436 def test_blocksize_arg(self):
2437 with unittest.mock.patch('os.sendfile',
2438 side_effect=ZeroDivisionError) as m:
2439 self.assertRaises(ZeroDivisionError,
2440 shutil.copyfile, TESTFN, TESTFN2)
2441 blocksize = m.call_args[0][3]
2442 # Make sure file size and the block size arg passed to
2443 # sendfile() are the same.
2444 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2445 # ...unless we're dealing with a small file.
Hai Shi0c4f0f32020-06-30 21:46:31 +08002446 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002447 write_file(TESTFN2, b"hello", binary=True)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002448 self.addCleanup(os_helper.unlink, TESTFN2 + '3')
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002449 self.assertRaises(ZeroDivisionError,
2450 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2451 blocksize = m.call_args[0][3]
2452 self.assertEqual(blocksize, 2 ** 23)
2453
2454 def test_file2file_not_supported(self):
2455 # Emulate a case where sendfile() only support file->socket
2456 # fds. In such a case copyfile() is supposed to skip the
2457 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002458 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002459 try:
2460 with unittest.mock.patch(
2461 self.PATCHPOINT,
2462 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2463 with self.get_files() as (src, dst):
2464 with self.assertRaises(_GiveupOnFastCopy):
2465 shutil._fastcopy_sendfile(src, dst)
2466 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002467 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002468
2469 with unittest.mock.patch(self.PATCHPOINT) as m:
2470 shutil.copyfile(TESTFN, TESTFN2)
2471 assert not m.called
2472 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002473 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002474
2475
Victor Stinner937ee9e2018-06-26 02:11:06 +02002476@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002477class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002478 PATCHPOINT = "posix._fcopyfile"
2479
2480 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002481 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002482
2483
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002484class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002485 def test_does_not_crash(self):
2486 """Check if get_terminal_size() returns a meaningful value.
2487
2488 There's no easy portable way to actually check the size of the
2489 terminal, so let's check if it returns something sensible instead.
2490 """
2491 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002492 self.assertGreaterEqual(size.columns, 0)
2493 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002494
2495 def test_os_environ_first(self):
2496 "Check if environment variables have precedence"
2497
Hai Shi0c4f0f32020-06-30 21:46:31 +08002498 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002499 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002500 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002501 size = shutil.get_terminal_size()
2502 self.assertEqual(size.columns, 777)
2503
Hai Shi0c4f0f32020-06-30 21:46:31 +08002504 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002505 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002506 env['LINES'] = '888'
2507 size = shutil.get_terminal_size()
2508 self.assertEqual(size.lines, 888)
2509
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002510 def test_bad_environ(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002511 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002512 env['COLUMNS'] = 'xxx'
2513 env['LINES'] = 'yyy'
2514 size = shutil.get_terminal_size()
2515 self.assertGreaterEqual(size.columns, 0)
2516 self.assertGreaterEqual(size.lines, 0)
2517
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002518 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002519 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2520 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002521 def test_stty_match(self):
2522 """Check if stty returns the same results ignoring env
2523
2524 This test will fail if stdin and stdout are connected to
2525 different terminals with different sizes. Nevertheless, such
2526 situations should be pretty rare.
2527 """
2528 try:
2529 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002530 except (FileNotFoundError, PermissionError,
2531 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002532 self.skipTest("stty invocation failed")
2533 expected = (int(size[1]), int(size[0])) # reversed order
2534
Hai Shi0c4f0f32020-06-30 21:46:31 +08002535 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002536 del env['LINES']
2537 del env['COLUMNS']
2538 actual = shutil.get_terminal_size()
2539
2540 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002541
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002542 def test_fallback(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002543 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002544 del env['LINES']
2545 del env['COLUMNS']
2546
2547 # sys.__stdout__ has no fileno()
2548 with support.swap_attr(sys, '__stdout__', None):
2549 size = shutil.get_terminal_size(fallback=(10, 20))
2550 self.assertEqual(size.columns, 10)
2551 self.assertEqual(size.lines, 20)
2552
2553 # sys.__stdout__ is not a terminal on Unix
2554 # or fileno() not in (0, 1, 2) on Windows
2555 with open(os.devnull, 'w') as f, \
2556 support.swap_attr(sys, '__stdout__', f):
2557 size = shutil.get_terminal_size(fallback=(30, 40))
2558 self.assertEqual(size.columns, 30)
2559 self.assertEqual(size.lines, 40)
2560
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002561
Berker Peksag8083cd62014-11-01 11:04:06 +02002562class PublicAPITests(unittest.TestCase):
2563 """Ensures that the correct values are exposed in the public API."""
2564
2565 def test_module_all_attribute(self):
2566 self.assertTrue(hasattr(shutil, '__all__'))
2567 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2568 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2569 'SpecialFileError', 'ExecError', 'make_archive',
2570 'get_archive_formats', 'register_archive_format',
2571 'unregister_archive_format', 'get_unpack_formats',
2572 'register_unpack_format', 'unregister_unpack_format',
2573 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2574 'get_terminal_size', 'SameFileError']
2575 if hasattr(os, 'statvfs') or os.name == 'nt':
2576 target_api.append('disk_usage')
2577 self.assertEqual(set(shutil.__all__), set(target_api))
2578
2579
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002580if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002581 unittest.main()