blob: df8dcdcce609191b3104a769591b9c8302f50e6d [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Hai Shi0c4f0f32020-06-30 21:46:31 +080033from test.support import os_helper
34from test.support.os_helper import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030035
Antoine Pitrou7fff0962009-05-01 21:09:44 +000036TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020037MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010038AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000039try:
40 import grp
41 import pwd
42 UID_GID_SUPPORT = True
43except ImportError:
44 UID_GID_SUPPORT = False
45
Steve Dowerdf2d4a62019-08-21 15:27:33 -070046try:
47 import _winapi
48except ImportError:
49 _winapi = None
50
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040051def _fake_rename(*args, **kwargs):
52 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010053 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040054
55def mock_rename(func):
56 @functools.wraps(func)
57 def wrap(*args, **kwargs):
58 try:
59 builtin_rename = os.rename
60 os.rename = _fake_rename
61 return func(*args, **kwargs)
62 finally:
63 os.rename = builtin_rename
64 return wrap
65
Éric Araujoa7e33a12011-08-12 19:51:35 +020066def write_file(path, content, binary=False):
67 """Write *content* to a file located at *path*.
68
69 If *path* is a tuple instead of a string, os.path.join will be used to
70 make a path. If *binary* is true, the file will be opened in binary
71 mode.
72 """
73 if isinstance(path, tuple):
74 path = os.path.join(*path)
75 with open(path, 'wb' if binary else 'w') as fp:
76 fp.write(content)
77
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020078def write_test_file(path, size):
79 """Create a test file with an arbitrary size and random text content."""
80 def chunks(total, step):
81 assert total >= step
82 while total > step:
83 yield step
84 total -= step
85 if total:
86 yield total
87
88 bufsize = min(size, 8192)
89 chunk = b"".join([random.choice(string.ascii_letters).encode()
90 for i in range(bufsize)])
91 with open(path, 'wb') as f:
92 for csize in chunks(size, bufsize):
93 f.write(chunk)
94 assert os.path.getsize(path) == size
95
Éric Araujoa7e33a12011-08-12 19:51:35 +020096def read_file(path, binary=False):
97 """Return contents from a file located at *path*.
98
99 If *path* is a tuple instead of a string, os.path.join will be used to
100 make a path. If *binary* is true, the file will be opened in binary
101 mode.
102 """
103 if isinstance(path, tuple):
104 path = os.path.join(*path)
105 with open(path, 'rb' if binary else 'r') as fp:
106 return fp.read()
107
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300108def rlistdir(path):
109 res = []
110 for name in sorted(os.listdir(path)):
111 p = os.path.join(path, name)
112 if os.path.isdir(p) and not os.path.islink(p):
113 res.append(name + '/')
114 for n in rlistdir(p):
115 res.append(name + '/' + n)
116 else:
117 res.append(name)
118 return res
119
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200120def supports_file2file_sendfile():
121 # ...apparently Linux and Solaris are the only ones
122 if not hasattr(os, "sendfile"):
123 return False
124 srcname = None
125 dstname = None
126 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800127 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200128 srcname = f.name
129 f.write(b"0123456789")
130
131 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800132 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200133 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200134 infd = src.fileno()
135 outfd = dst.fileno()
136 try:
137 os.sendfile(outfd, infd, 0, 2)
138 except OSError:
139 return False
140 else:
141 return True
142 finally:
143 if srcname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800144 os_helper.unlink(srcname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200145 if dstname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800146 os_helper.unlink(dstname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200147
148
149SUPPORTS_SENDFILE = supports_file2file_sendfile()
150
Michael Feltef110b12019-02-18 12:02:44 +0100151# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
152# The AIX command 'dump -o program' gives XCOFF header information
153# The second word of the last line in the maxdata value
154# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
155def _maxdataOK():
156 if AIX and sys.maxsize == 2147483647:
157 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
158 maxdata=hdrs.split("\n")[-1].split()[1]
159 return int(maxdata,16) >= 0x20000000
160 else:
161 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200162
Tarek Ziadé396fad72010-02-23 05:30:31 +0000163
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300164class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000165
Steve Dowerabde52c2019-11-15 09:49:21 -0800166 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000167 """Create a temporary directory that will be cleaned up.
168
169 Returns the path of the directory.
170 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800171 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Hai Shi0c4f0f32020-06-30 21:46:31 +0800172 self.addCleanup(os_helper.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000173 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000174
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300175
176class TestRmTree(BaseTest, unittest.TestCase):
177
Hynek Schlawack3b527782012-06-25 13:27:31 +0200178 def test_rmtree_works_on_bytes(self):
179 tmp = self.mkdtemp()
180 victim = os.path.join(tmp, 'killme')
181 os.mkdir(victim)
182 write_file(os.path.join(victim, 'somefile'), 'foo')
183 victim = os.fsencode(victim)
184 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700185 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200186
Hai Shi0c4f0f32020-06-30 21:46:31 +0800187 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200188 def test_rmtree_fails_on_symlink(self):
189 tmp = self.mkdtemp()
190 dir_ = os.path.join(tmp, 'dir')
191 os.mkdir(dir_)
192 link = os.path.join(tmp, 'link')
193 os.symlink(dir_, link)
194 self.assertRaises(OSError, shutil.rmtree, link)
195 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100196 self.assertTrue(os.path.lexists(link))
197 errors = []
198 def onerror(*args):
199 errors.append(args)
200 shutil.rmtree(link, onerror=onerror)
201 self.assertEqual(len(errors), 1)
202 self.assertIs(errors[0][0], os.path.islink)
203 self.assertEqual(errors[0][1], link)
204 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200205
Hai Shi0c4f0f32020-06-30 21:46:31 +0800206 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200207 def test_rmtree_works_on_symlinks(self):
208 tmp = self.mkdtemp()
209 dir1 = os.path.join(tmp, 'dir1')
210 dir2 = os.path.join(dir1, 'dir2')
211 dir3 = os.path.join(tmp, 'dir3')
212 for d in dir1, dir2, dir3:
213 os.mkdir(d)
214 file1 = os.path.join(tmp, 'file1')
215 write_file(file1, 'foo')
216 link1 = os.path.join(dir1, 'link1')
217 os.symlink(dir2, link1)
218 link2 = os.path.join(dir1, 'link2')
219 os.symlink(dir3, link2)
220 link3 = os.path.join(dir1, 'link3')
221 os.symlink(file1, link3)
222 # make sure symlinks are removed but not followed
223 shutil.rmtree(dir1)
224 self.assertFalse(os.path.exists(dir1))
225 self.assertTrue(os.path.exists(dir3))
226 self.assertTrue(os.path.exists(file1))
227
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700228 @unittest.skipUnless(_winapi, 'only relevant on Windows')
229 def test_rmtree_fails_on_junctions(self):
230 tmp = self.mkdtemp()
231 dir_ = os.path.join(tmp, 'dir')
232 os.mkdir(dir_)
233 link = os.path.join(tmp, 'link')
234 _winapi.CreateJunction(dir_, link)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800235 self.addCleanup(os_helper.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700236 self.assertRaises(OSError, shutil.rmtree, link)
237 self.assertTrue(os.path.exists(dir_))
238 self.assertTrue(os.path.lexists(link))
239 errors = []
240 def onerror(*args):
241 errors.append(args)
242 shutil.rmtree(link, onerror=onerror)
243 self.assertEqual(len(errors), 1)
244 self.assertIs(errors[0][0], os.path.islink)
245 self.assertEqual(errors[0][1], link)
246 self.assertIsInstance(errors[0][2][1], OSError)
247
248 @unittest.skipUnless(_winapi, 'only relevant on Windows')
249 def test_rmtree_works_on_junctions(self):
250 tmp = self.mkdtemp()
251 dir1 = os.path.join(tmp, 'dir1')
252 dir2 = os.path.join(dir1, 'dir2')
253 dir3 = os.path.join(tmp, 'dir3')
254 for d in dir1, dir2, dir3:
255 os.mkdir(d)
256 file1 = os.path.join(tmp, 'file1')
257 write_file(file1, 'foo')
258 link1 = os.path.join(dir1, 'link1')
259 _winapi.CreateJunction(dir2, link1)
260 link2 = os.path.join(dir1, 'link2')
261 _winapi.CreateJunction(dir3, link2)
262 link3 = os.path.join(dir1, 'link3')
263 _winapi.CreateJunction(file1, link3)
264 # make sure junctions are removed but not followed
265 shutil.rmtree(dir1)
266 self.assertFalse(os.path.exists(dir1))
267 self.assertTrue(os.path.exists(dir3))
268 self.assertTrue(os.path.exists(file1))
269
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000270 def test_rmtree_errors(self):
271 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800272 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100273 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
274 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100275 shutil.rmtree(filename, ignore_errors=True)
276
277 # existing file
278 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100279 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100280 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100281 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100282 shutil.rmtree(filename)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100283 self.assertEqual(cm.exception.filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100284 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100285 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100286 shutil.rmtree(filename, ignore_errors=True)
287 self.assertTrue(os.path.exists(filename))
288 errors = []
289 def onerror(*args):
290 errors.append(args)
291 shutil.rmtree(filename, onerror=onerror)
292 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200293 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100294 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100295 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100296 self.assertEqual(errors[0][2][1].filename, filename)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100297 self.assertIs(errors[1][0], os.rmdir)
298 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100299 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Daniel Hahler37a6d5f2020-12-21 07:38:02 +0100300 self.assertEqual(errors[1][2][1].filename, filename)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000301
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000302
Serhiy Storchaka43767632013-11-03 21:31:38 +0200303 @unittest.skipIf(sys.platform[:6] == 'cygwin',
304 "This test can't be run on Cygwin (issue #1071513).")
305 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
306 "This test can't be run reliably as root (issue #1076467).")
307 def test_on_error(self):
308 self.errorState = 0
309 os.mkdir(TESTFN)
310 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200311
Serhiy Storchaka43767632013-11-03 21:31:38 +0200312 self.child_file_path = os.path.join(TESTFN, 'a')
313 self.child_dir_path = os.path.join(TESTFN, 'b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800314 os_helper.create_empty_file(self.child_file_path)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200315 os.mkdir(self.child_dir_path)
316 old_dir_mode = os.stat(TESTFN).st_mode
317 old_child_file_mode = os.stat(self.child_file_path).st_mode
318 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
319 # Make unwritable.
320 new_mode = stat.S_IREAD|stat.S_IEXEC
321 os.chmod(self.child_file_path, new_mode)
322 os.chmod(self.child_dir_path, new_mode)
323 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000324
Serhiy Storchaka43767632013-11-03 21:31:38 +0200325 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
326 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
327 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200328
Serhiy Storchaka43767632013-11-03 21:31:38 +0200329 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
330 # Test whether onerror has actually been called.
331 self.assertEqual(self.errorState, 3,
332 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000333
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000334 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000335 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200336 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000337 # This function is run when shutil.rmtree fails.
338 # 99.9% of the time it initially fails to remove
339 # a file in the directory, so the first time through
340 # func is os.remove.
341 # However, some Linux machines running ZFS on
342 # FUSE experienced a failure earlier in the process
343 # at os.listdir. The first failure may legally
344 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200345 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200346 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200347 self.assertEqual(arg, self.child_file_path)
348 elif func is os.rmdir:
349 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000350 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200351 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200352 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000353 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200354 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000355 else:
356 self.assertEqual(func, os.rmdir)
357 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000358 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200359 self.errorState = 3
360
361 def test_rmtree_does_not_choke_on_failing_lstat(self):
362 try:
363 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200364 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200365 if fn != TESTFN:
366 raise OSError()
367 else:
368 return orig_lstat(fn)
369 os.lstat = raiser
370
371 os.mkdir(TESTFN)
372 write_file((TESTFN, 'foo'), 'foo')
373 shutil.rmtree(TESTFN)
374 finally:
375 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000376
Hynek Schlawack2100b422012-06-23 20:28:32 +0200377 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200378 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
379 os.supports_dir_fd and
380 os.listdir in os.supports_fd and
381 os.stat in os.supports_follow_symlinks)
382 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200383 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000384 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200385 tmp_dir = self.mkdtemp()
386 d = os.path.join(tmp_dir, 'a')
387 os.mkdir(d)
388 try:
389 real_rmtree = shutil._rmtree_safe_fd
390 class Called(Exception): pass
391 def _raiser(*args, **kwargs):
392 raise Called
393 shutil._rmtree_safe_fd = _raiser
394 self.assertRaises(Called, shutil.rmtree, d)
395 finally:
396 shutil._rmtree_safe_fd = real_rmtree
397 else:
398 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000399 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200400
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000401 def test_rmtree_dont_delete_file(self):
402 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800403 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200404 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200405 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000406 os.remove(path)
407
Hai Shi0c4f0f32020-06-30 21:46:31 +0800408 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300409 def test_rmtree_on_symlink(self):
410 # bug 1669.
411 os.mkdir(TESTFN)
412 try:
413 src = os.path.join(TESTFN, 'cheese')
414 dst = os.path.join(TESTFN, 'shop')
415 os.mkdir(src)
416 os.symlink(src, dst)
417 self.assertRaises(OSError, shutil.rmtree, dst)
418 shutil.rmtree(dst, ignore_errors=True)
419 finally:
420 shutil.rmtree(TESTFN, ignore_errors=True)
421
422 @unittest.skipUnless(_winapi, 'only relevant on Windows')
423 def test_rmtree_on_junction(self):
424 os.mkdir(TESTFN)
425 try:
426 src = os.path.join(TESTFN, 'cheese')
427 dst = os.path.join(TESTFN, 'shop')
428 os.mkdir(src)
429 open(os.path.join(src, 'spam'), 'wb').close()
430 _winapi.CreateJunction(src, dst)
431 self.assertRaises(OSError, shutil.rmtree, dst)
432 shutil.rmtree(dst, ignore_errors=True)
433 finally:
434 shutil.rmtree(TESTFN, ignore_errors=True)
435
436
437class TestCopyTree(BaseTest, unittest.TestCase):
438
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000439 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800440 src_dir = self.mkdtemp()
441 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200442 self.addCleanup(shutil.rmtree, src_dir)
443 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
444 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000445 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200446 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000447
Éric Araujoa7e33a12011-08-12 19:51:35 +0200448 shutil.copytree(src_dir, dst_dir)
449 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
450 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
451 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
452 'test.txt')))
453 actual = read_file((dst_dir, 'test.txt'))
454 self.assertEqual(actual, '123')
455 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
456 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000457
jab9e00d9e2018-12-28 13:03:40 -0500458 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800459 src_dir = self.mkdtemp()
460 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500461 self.addCleanup(shutil.rmtree, src_dir)
462 self.addCleanup(shutil.rmtree, dst_dir)
463
464 write_file((src_dir, 'nonexisting.txt'), '123')
465 os.mkdir(os.path.join(src_dir, 'existing_dir'))
466 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
467 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
468 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
469
470 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
471 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
472 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
473 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
474 'existing.txt')))
475 actual = read_file((dst_dir, 'nonexisting.txt'))
476 self.assertEqual(actual, '123')
477 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
478 self.assertEqual(actual, 'has been replaced')
479
480 with self.assertRaises(FileExistsError):
481 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
482
Hai Shi0c4f0f32020-06-30 21:46:31 +0800483 @os_helper.skip_unless_symlink
Antoine Pitrou78091e62011-12-29 18:54:15 +0100484 def test_copytree_symlinks(self):
485 tmp_dir = self.mkdtemp()
486 src_dir = os.path.join(tmp_dir, 'src')
487 dst_dir = os.path.join(tmp_dir, 'dst')
488 sub_dir = os.path.join(src_dir, 'sub')
489 os.mkdir(src_dir)
490 os.mkdir(sub_dir)
491 write_file((src_dir, 'file.txt'), 'foo')
492 src_link = os.path.join(sub_dir, 'link')
493 dst_link = os.path.join(dst_dir, 'sub/link')
494 os.symlink(os.path.join(src_dir, 'file.txt'),
495 src_link)
496 if hasattr(os, 'lchmod'):
497 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
498 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
499 os.lchflags(src_link, stat.UF_NODUMP)
500 src_stat = os.lstat(src_link)
501 shutil.copytree(src_dir, dst_dir, symlinks=True)
502 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700503 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
504 # Bad practice to blindly strip the prefix as it may be required to
505 # correctly refer to the file, but we're only comparing paths here.
506 if os.name == 'nt' and actual.startswith('\\\\?\\'):
507 actual = actual[4:]
508 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100509 dst_stat = os.lstat(dst_link)
510 if hasattr(os, 'lchmod'):
511 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
512 if hasattr(os, 'lchflags'):
513 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
514
Georg Brandl2ee470f2008-07-16 12:55:28 +0000515 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000516 # creating data
517 join = os.path.join
518 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800519 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000520 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800521 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200522 write_file((src_dir, 'test.txt'), '123')
523 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000524 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200525 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000526 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200527 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000528 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
529 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200530 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
531 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000532
533 # testing glob-like patterns
534 try:
535 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
536 shutil.copytree(src_dir, dst_dir, ignore=patterns)
537 # checking the result: some elements should not be copied
538 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200539 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
540 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000541 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200542 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000543 try:
544 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
545 shutil.copytree(src_dir, dst_dir, ignore=patterns)
546 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200547 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
548 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
549 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000550 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200551 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000552
553 # testing callable-style
554 try:
555 def _filter(src, names):
556 res = []
557 for name in names:
558 path = os.path.join(src, name)
559
560 if (os.path.isdir(path) and
561 path.split()[-1] == 'subdir'):
562 res.append(name)
563 elif os.path.splitext(path)[-1] in ('.py'):
564 res.append(name)
565 return res
566
567 shutil.copytree(src_dir, dst_dir, ignore=_filter)
568
569 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200570 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
571 'test.py')))
572 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000573
574 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200575 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000576 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000577 shutil.rmtree(src_dir)
578 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000579
mbarkhau88704332020-01-24 14:51:16 +0000580 def test_copytree_arg_types_of_ignore(self):
581 join = os.path.join
582 exists = os.path.exists
583
584 tmp_dir = self.mkdtemp()
585 src_dir = join(tmp_dir, "source")
586
587 os.mkdir(join(src_dir))
588 os.mkdir(join(src_dir, 'test_dir'))
589 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
590 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
591
592 invokations = []
593
594 def _ignore(src, names):
595 invokations.append(src)
596 self.assertIsInstance(src, str)
597 self.assertIsInstance(names, list)
598 self.assertEqual(len(names), len(set(names)))
599 for name in names:
600 self.assertIsInstance(name, str)
601 return []
602
603 dst_dir = join(self.mkdtemp(), 'destination')
604 shutil.copytree(src_dir, dst_dir, ignore=_ignore)
605 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
606 'test.txt')))
607
608 dst_dir = join(self.mkdtemp(), 'destination')
609 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
610 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
611 'test.txt')))
612
613 dst_dir = join(self.mkdtemp(), 'destination')
614 src_dir_entry = list(os.scandir(tmp_dir))[0]
615 self.assertIsInstance(src_dir_entry, os.DirEntry)
616 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
617 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
618 'test.txt')))
619
620 self.assertEqual(len(invokations), 9)
621
Antoine Pitrouac601602013-08-16 19:35:02 +0200622 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800623 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200624 src_dir = os.path.join(tmp_dir, 'source')
625 os.mkdir(src_dir)
626 dst_dir = os.path.join(tmp_dir, 'destination')
627 self.addCleanup(shutil.rmtree, tmp_dir)
628
629 os.chmod(src_dir, 0o777)
630 write_file((src_dir, 'permissive.txt'), '123')
631 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
632 write_file((src_dir, 'restrictive.txt'), '456')
633 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
634 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800635 self.addCleanup(os_helper.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200636 os.chmod(restrictive_subdir, 0o600)
637
638 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400639 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
640 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200641 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400642 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200643 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
644 restrictive_subdir_dst = os.path.join(dst_dir,
645 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400646 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200647 os.stat(restrictive_subdir_dst).st_mode)
648
Berker Peksag884afd92014-12-10 02:50:32 +0200649 @unittest.mock.patch('os.chmod')
650 def test_copytree_winerror(self, mock_patch):
651 # When copying to VFAT, copystat() raises OSError. On Windows, the
652 # exception object has a meaningful 'winerror' attribute, but not
653 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800654 src_dir = self.mkdtemp()
655 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200656 self.addCleanup(shutil.rmtree, src_dir)
657 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
658
659 mock_patch.side_effect = PermissionError('ka-boom')
660 with self.assertRaises(shutil.Error):
661 shutil.copytree(src_dir, dst_dir)
662
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100663 def test_copytree_custom_copy_function(self):
664 # See: https://bugs.python.org/issue35648
665 def custom_cpfun(a, b):
666 flag.append(None)
667 self.assertIsInstance(a, str)
668 self.assertIsInstance(b, str)
669 self.assertEqual(a, os.path.join(src, 'foo'))
670 self.assertEqual(b, os.path.join(dst, 'foo'))
671
672 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800673 src = self.mkdtemp()
674 dst = tempfile.mktemp(dir=self.mkdtemp())
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100675 with open(os.path.join(src, 'foo'), 'w') as f:
676 f.close()
677 shutil.copytree(src, dst, copy_function=custom_cpfun)
678 self.assertEqual(len(flag), 1)
679
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300680 # Issue #3002: copyfile and copytree block indefinitely on named pipes
681 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800682 @os_helper.skip_unless_symlink
pxinwr6a273fd2020-11-29 06:06:36 +0800683 @unittest.skipIf(sys.platform == "vxworks",
684 "fifo requires special path on VxWorks")
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300685 def test_copytree_named_pipe(self):
686 os.mkdir(TESTFN)
687 try:
688 subdir = os.path.join(TESTFN, "subdir")
689 os.mkdir(subdir)
690 pipe = os.path.join(subdir, "mypipe")
691 try:
692 os.mkfifo(pipe)
693 except PermissionError as e:
694 self.skipTest('os.mkfifo(): %s' % e)
695 try:
696 shutil.copytree(TESTFN, TESTFN2)
697 except shutil.Error as e:
698 errors = e.args[0]
699 self.assertEqual(len(errors), 1)
700 src, dst, error_msg = errors[0]
701 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
702 else:
703 self.fail("shutil.Error should have been raised")
704 finally:
705 shutil.rmtree(TESTFN, ignore_errors=True)
706 shutil.rmtree(TESTFN2, ignore_errors=True)
707
708 def test_copytree_special_func(self):
709 src_dir = self.mkdtemp()
710 dst_dir = os.path.join(self.mkdtemp(), 'destination')
711 write_file((src_dir, 'test.txt'), '123')
712 os.mkdir(os.path.join(src_dir, 'test_dir'))
713 write_file((src_dir, 'test_dir', 'test.txt'), '456')
714
715 copied = []
716 def _copy(src, dst):
717 copied.append((src, dst))
718
719 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
720 self.assertEqual(len(copied), 2)
721
Hai Shi0c4f0f32020-06-30 21:46:31 +0800722 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300723 def test_copytree_dangling_symlinks(self):
724 # a dangling symlink raises an error at the end
725 src_dir = self.mkdtemp()
726 dst_dir = os.path.join(self.mkdtemp(), 'destination')
727 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
728 os.mkdir(os.path.join(src_dir, 'test_dir'))
729 write_file((src_dir, 'test_dir', 'test.txt'), '456')
730 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
731
732 # a dangling symlink is ignored with the proper flag
733 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
734 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
735 self.assertNotIn('test.txt', os.listdir(dst_dir))
736
737 # a dangling symlink is copied if symlinks=True
738 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
739 shutil.copytree(src_dir, dst_dir, symlinks=True)
740 self.assertIn('test.txt', os.listdir(dst_dir))
741
Hai Shi0c4f0f32020-06-30 21:46:31 +0800742 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300743 def test_copytree_symlink_dir(self):
744 src_dir = self.mkdtemp()
745 dst_dir = os.path.join(self.mkdtemp(), 'destination')
746 os.mkdir(os.path.join(src_dir, 'real_dir'))
747 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
748 pass
749 os.symlink(os.path.join(src_dir, 'real_dir'),
750 os.path.join(src_dir, 'link_to_dir'),
751 target_is_directory=True)
752
753 shutil.copytree(src_dir, dst_dir, symlinks=False)
754 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
755 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
756
757 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
758 shutil.copytree(src_dir, dst_dir, symlinks=True)
759 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
760 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
761
762 def test_copytree_return_value(self):
763 # copytree returns its destination path.
764 src_dir = self.mkdtemp()
765 dst_dir = src_dir + "dest"
766 self.addCleanup(shutil.rmtree, dst_dir, True)
767 src = os.path.join(src_dir, 'foo')
768 write_file(src, 'foo')
769 rv = shutil.copytree(src_dir, dst_dir)
770 self.assertEqual(['foo'], os.listdir(rv))
771
Bruno P. Kinoshita9bbcbc92019-11-27 14:10:37 +1300772 def test_copytree_subdirectory(self):
773 # copytree where dst is a subdirectory of src, see Issue 38688
774 base_dir = self.mkdtemp()
775 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
776 src_dir = os.path.join(base_dir, "t", "pg")
777 dst_dir = os.path.join(src_dir, "somevendor", "1.0")
778 os.makedirs(src_dir)
779 src = os.path.join(src_dir, 'pol')
780 write_file(src, 'pol')
781 rv = shutil.copytree(src_dir, dst_dir)
782 self.assertEqual(['pol'], os.listdir(rv))
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300783
784class TestCopy(BaseTest, unittest.TestCase):
785
786 ### shutil.copymode
787
Hai Shi0c4f0f32020-06-30 21:46:31 +0800788 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300789 def test_copymode_follow_symlinks(self):
790 tmp_dir = self.mkdtemp()
791 src = os.path.join(tmp_dir, 'foo')
792 dst = os.path.join(tmp_dir, 'bar')
793 src_link = os.path.join(tmp_dir, 'baz')
794 dst_link = os.path.join(tmp_dir, 'quux')
795 write_file(src, 'foo')
796 write_file(dst, 'foo')
797 os.symlink(src, src_link)
798 os.symlink(dst, dst_link)
799 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
800 # file to file
801 os.chmod(dst, stat.S_IRWXO)
802 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
803 shutil.copymode(src, dst)
804 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
805 # On Windows, os.chmod does not follow symlinks (issue #15411)
806 if os.name != 'nt':
807 # follow src link
808 os.chmod(dst, stat.S_IRWXO)
809 shutil.copymode(src_link, dst)
810 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
811 # follow dst link
812 os.chmod(dst, stat.S_IRWXO)
813 shutil.copymode(src, dst_link)
814 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
815 # follow both links
816 os.chmod(dst, stat.S_IRWXO)
817 shutil.copymode(src_link, dst_link)
818 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
819
820 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800821 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300822 def test_copymode_symlink_to_symlink(self):
823 tmp_dir = self.mkdtemp()
824 src = os.path.join(tmp_dir, 'foo')
825 dst = os.path.join(tmp_dir, 'bar')
826 src_link = os.path.join(tmp_dir, 'baz')
827 dst_link = os.path.join(tmp_dir, 'quux')
828 write_file(src, 'foo')
829 write_file(dst, 'foo')
830 os.symlink(src, src_link)
831 os.symlink(dst, dst_link)
832 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
833 os.chmod(dst, stat.S_IRWXU)
834 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
835 # link to link
836 os.lchmod(dst_link, stat.S_IRWXO)
837 shutil.copymode(src_link, dst_link, follow_symlinks=False)
838 self.assertEqual(os.lstat(src_link).st_mode,
839 os.lstat(dst_link).st_mode)
840 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
841 # src link - use chmod
842 os.lchmod(dst_link, stat.S_IRWXO)
843 shutil.copymode(src_link, dst, follow_symlinks=False)
844 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
845 # dst link - use chmod
846 os.lchmod(dst_link, stat.S_IRWXO)
847 shutil.copymode(src, dst_link, follow_symlinks=False)
848 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
849
850 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800851 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300852 def test_copymode_symlink_to_symlink_wo_lchmod(self):
853 tmp_dir = self.mkdtemp()
854 src = os.path.join(tmp_dir, 'foo')
855 dst = os.path.join(tmp_dir, 'bar')
856 src_link = os.path.join(tmp_dir, 'baz')
857 dst_link = os.path.join(tmp_dir, 'quux')
858 write_file(src, 'foo')
859 write_file(dst, 'foo')
860 os.symlink(src, src_link)
861 os.symlink(dst, dst_link)
862 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
863
864 ### shutil.copystat
865
Hai Shi0c4f0f32020-06-30 21:46:31 +0800866 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300867 def test_copystat_symlinks(self):
868 tmp_dir = self.mkdtemp()
869 src = os.path.join(tmp_dir, 'foo')
870 dst = os.path.join(tmp_dir, 'bar')
871 src_link = os.path.join(tmp_dir, 'baz')
872 dst_link = os.path.join(tmp_dir, 'qux')
873 write_file(src, 'foo')
874 src_stat = os.stat(src)
875 os.utime(src, (src_stat.st_atime,
876 src_stat.st_mtime - 42.0)) # ensure different mtimes
877 write_file(dst, 'bar')
878 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
879 os.symlink(src, src_link)
880 os.symlink(dst, dst_link)
881 if hasattr(os, 'lchmod'):
882 os.lchmod(src_link, stat.S_IRWXO)
883 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
884 os.lchflags(src_link, stat.UF_NODUMP)
885 src_link_stat = os.lstat(src_link)
886 # follow
887 if hasattr(os, 'lchmod'):
888 shutil.copystat(src_link, dst_link, follow_symlinks=True)
889 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
890 # don't follow
891 shutil.copystat(src_link, dst_link, follow_symlinks=False)
892 dst_link_stat = os.lstat(dst_link)
893 if os.utime in os.supports_follow_symlinks:
894 for attr in 'st_atime', 'st_mtime':
895 # The modification times may be truncated in the new file.
896 self.assertLessEqual(getattr(src_link_stat, attr),
897 getattr(dst_link_stat, attr) + 1)
898 if hasattr(os, 'lchmod'):
899 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
900 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
901 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
902 # tell to follow but dst is not a link
903 shutil.copystat(src_link, dst, follow_symlinks=False)
904 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
905 00000.1)
906
907 @unittest.skipUnless(hasattr(os, 'chflags') and
908 hasattr(errno, 'EOPNOTSUPP') and
909 hasattr(errno, 'ENOTSUP'),
910 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
911 def test_copystat_handles_harmless_chflags_errors(self):
912 tmpdir = self.mkdtemp()
913 file1 = os.path.join(tmpdir, 'file1')
914 file2 = os.path.join(tmpdir, 'file2')
915 write_file(file1, 'xxx')
916 write_file(file2, 'xxx')
917
918 def make_chflags_raiser(err):
919 ex = OSError()
920
921 def _chflags_raiser(path, flags, *, follow_symlinks=True):
922 ex.errno = err
923 raise ex
924 return _chflags_raiser
925 old_chflags = os.chflags
926 try:
927 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
928 os.chflags = make_chflags_raiser(err)
929 shutil.copystat(file1, file2)
930 # assert others errors break it
931 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
932 self.assertRaises(OSError, shutil.copystat, file1, file2)
933 finally:
934 os.chflags = old_chflags
935
936 ### shutil.copyxattr
937
Hai Shi0c4f0f32020-06-30 21:46:31 +0800938 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300939 def test_copyxattr(self):
940 tmp_dir = self.mkdtemp()
941 src = os.path.join(tmp_dir, 'foo')
942 write_file(src, 'foo')
943 dst = os.path.join(tmp_dir, 'bar')
944 write_file(dst, 'bar')
945
946 # no xattr == no problem
947 shutil._copyxattr(src, dst)
948 # common case
949 os.setxattr(src, 'user.foo', b'42')
950 os.setxattr(src, 'user.bar', b'43')
951 shutil._copyxattr(src, dst)
952 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
953 self.assertEqual(
954 os.getxattr(src, 'user.foo'),
955 os.getxattr(dst, 'user.foo'))
956 # check errors don't affect other attrs
957 os.remove(dst)
958 write_file(dst, 'bar')
959 os_error = OSError(errno.EPERM, 'EPERM')
960
961 def _raise_on_user_foo(fname, attr, val, **kwargs):
962 if attr == 'user.foo':
963 raise os_error
964 else:
965 orig_setxattr(fname, attr, val, **kwargs)
966 try:
967 orig_setxattr = os.setxattr
968 os.setxattr = _raise_on_user_foo
969 shutil._copyxattr(src, dst)
970 self.assertIn('user.bar', os.listxattr(dst))
971 finally:
972 os.setxattr = orig_setxattr
973 # the source filesystem not supporting xattrs should be ok, too.
974 def _raise_on_src(fname, *, follow_symlinks=True):
975 if fname == src:
976 raise OSError(errno.ENOTSUP, 'Operation not supported')
977 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
978 try:
979 orig_listxattr = os.listxattr
980 os.listxattr = _raise_on_src
981 shutil._copyxattr(src, dst)
982 finally:
983 os.listxattr = orig_listxattr
984
985 # test that shutil.copystat copies xattrs
986 src = os.path.join(tmp_dir, 'the_original')
987 srcro = os.path.join(tmp_dir, 'the_original_ro')
988 write_file(src, src)
989 write_file(srcro, srcro)
990 os.setxattr(src, 'user.the_value', b'fiddly')
991 os.setxattr(srcro, 'user.the_value', b'fiddly')
992 os.chmod(srcro, 0o444)
993 dst = os.path.join(tmp_dir, 'the_copy')
994 dstro = os.path.join(tmp_dir, 'the_copy_ro')
995 write_file(dst, dst)
996 write_file(dstro, dstro)
997 shutil.copystat(src, dst)
998 shutil.copystat(srcro, dstro)
999 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1000 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1001
Hai Shi0c4f0f32020-06-30 21:46:31 +08001002 @os_helper.skip_unless_symlink
1003 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001004 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
1005 'root privileges required')
1006 def test_copyxattr_symlinks(self):
1007 # On Linux, it's only possible to access non-user xattr for symlinks;
1008 # which in turn require root privileges. This test should be expanded
1009 # as soon as other platforms gain support for extended attributes.
1010 tmp_dir = self.mkdtemp()
1011 src = os.path.join(tmp_dir, 'foo')
1012 src_link = os.path.join(tmp_dir, 'baz')
1013 write_file(src, 'foo')
1014 os.symlink(src, src_link)
1015 os.setxattr(src, 'trusted.foo', b'42')
1016 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1017 dst = os.path.join(tmp_dir, 'bar')
1018 dst_link = os.path.join(tmp_dir, 'qux')
1019 write_file(dst, 'bar')
1020 os.symlink(dst, dst_link)
1021 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1022 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1023 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1024 shutil._copyxattr(src_link, dst, follow_symlinks=False)
1025 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1026
1027 ### shutil.copy
1028
1029 def _copy_file(self, method):
1030 fname = 'test.txt'
1031 tmpdir = self.mkdtemp()
1032 write_file((tmpdir, fname), 'xxx')
1033 file1 = os.path.join(tmpdir, fname)
1034 tmpdir2 = self.mkdtemp()
1035 method(file1, tmpdir2)
1036 file2 = os.path.join(tmpdir2, fname)
1037 return (file1, file2)
1038
1039 def test_copy(self):
1040 # Ensure that the copied file exists and has the same mode bits.
1041 file1, file2 = self._copy_file(shutil.copy)
1042 self.assertTrue(os.path.exists(file2))
1043 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1044
Hai Shi0c4f0f32020-06-30 21:46:31 +08001045 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001046 def test_copy_symlinks(self):
1047 tmp_dir = self.mkdtemp()
1048 src = os.path.join(tmp_dir, 'foo')
1049 dst = os.path.join(tmp_dir, 'bar')
1050 src_link = os.path.join(tmp_dir, 'baz')
1051 write_file(src, 'foo')
1052 os.symlink(src, src_link)
1053 if hasattr(os, 'lchmod'):
1054 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1055 # don't follow
1056 shutil.copy(src_link, dst, follow_symlinks=True)
1057 self.assertFalse(os.path.islink(dst))
1058 self.assertEqual(read_file(src), read_file(dst))
1059 os.remove(dst)
1060 # follow
1061 shutil.copy(src_link, dst, follow_symlinks=False)
1062 self.assertTrue(os.path.islink(dst))
1063 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1064 if hasattr(os, 'lchmod'):
1065 self.assertEqual(os.lstat(src_link).st_mode,
1066 os.lstat(dst).st_mode)
1067
1068 ### shutil.copy2
1069
1070 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1071 def test_copy2(self):
1072 # Ensure that the copied file exists and has the same mode and
1073 # modification time bits.
1074 file1, file2 = self._copy_file(shutil.copy2)
1075 self.assertTrue(os.path.exists(file2))
1076 file1_stat = os.stat(file1)
1077 file2_stat = os.stat(file2)
1078 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1079 for attr in 'st_atime', 'st_mtime':
1080 # The modification times may be truncated in the new file.
1081 self.assertLessEqual(getattr(file1_stat, attr),
1082 getattr(file2_stat, attr) + 1)
1083 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1084 self.assertEqual(getattr(file1_stat, 'st_flags'),
1085 getattr(file2_stat, 'st_flags'))
1086
Hai Shi0c4f0f32020-06-30 21:46:31 +08001087 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001088 def test_copy2_symlinks(self):
1089 tmp_dir = self.mkdtemp()
1090 src = os.path.join(tmp_dir, 'foo')
1091 dst = os.path.join(tmp_dir, 'bar')
1092 src_link = os.path.join(tmp_dir, 'baz')
1093 write_file(src, 'foo')
1094 os.symlink(src, src_link)
1095 if hasattr(os, 'lchmod'):
1096 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1097 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1098 os.lchflags(src_link, stat.UF_NODUMP)
1099 src_stat = os.stat(src)
1100 src_link_stat = os.lstat(src_link)
1101 # follow
1102 shutil.copy2(src_link, dst, follow_symlinks=True)
1103 self.assertFalse(os.path.islink(dst))
1104 self.assertEqual(read_file(src), read_file(dst))
1105 os.remove(dst)
1106 # don't follow
1107 shutil.copy2(src_link, dst, follow_symlinks=False)
1108 self.assertTrue(os.path.islink(dst))
1109 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1110 dst_stat = os.lstat(dst)
1111 if os.utime in os.supports_follow_symlinks:
1112 for attr in 'st_atime', 'st_mtime':
1113 # The modification times may be truncated in the new file.
1114 self.assertLessEqual(getattr(src_link_stat, attr),
1115 getattr(dst_stat, attr) + 1)
1116 if hasattr(os, 'lchmod'):
1117 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1118 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1119 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1120 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1121
Hai Shi0c4f0f32020-06-30 21:46:31 +08001122 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001123 def test_copy2_xattr(self):
1124 tmp_dir = self.mkdtemp()
1125 src = os.path.join(tmp_dir, 'foo')
1126 dst = os.path.join(tmp_dir, 'bar')
1127 write_file(src, 'foo')
1128 os.setxattr(src, 'user.foo', b'42')
1129 shutil.copy2(src, dst)
1130 self.assertEqual(
1131 os.getxattr(src, 'user.foo'),
1132 os.getxattr(dst, 'user.foo'))
1133 os.remove(dst)
1134
1135 def test_copy_return_value(self):
1136 # copy and copy2 both return their destination path.
1137 for fn in (shutil.copy, shutil.copy2):
1138 src_dir = self.mkdtemp()
1139 dst_dir = self.mkdtemp()
1140 src = os.path.join(src_dir, 'foo')
1141 write_file(src, 'foo')
1142 rv = fn(src, dst_dir)
1143 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1144 rv = fn(src, os.path.join(dst_dir, 'bar'))
1145 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1146
1147 ### shutil.copyfile
1148
Hai Shi0c4f0f32020-06-30 21:46:31 +08001149 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001150 def test_copyfile_symlinks(self):
1151 tmp_dir = self.mkdtemp()
1152 src = os.path.join(tmp_dir, 'src')
1153 dst = os.path.join(tmp_dir, 'dst')
1154 dst_link = os.path.join(tmp_dir, 'dst_link')
1155 link = os.path.join(tmp_dir, 'link')
1156 write_file(src, 'foo')
1157 os.symlink(src, link)
1158 # don't follow
1159 shutil.copyfile(link, dst_link, follow_symlinks=False)
1160 self.assertTrue(os.path.islink(dst_link))
1161 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1162 # follow
1163 shutil.copyfile(link, dst)
1164 self.assertFalse(os.path.islink(dst))
1165
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001166 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001167 def test_dont_copy_file_onto_link_to_itself(self):
1168 # bug 851123.
1169 os.mkdir(TESTFN)
1170 src = os.path.join(TESTFN, 'cheese')
1171 dst = os.path.join(TESTFN, 'shop')
1172 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001173 with open(src, 'w') as f:
1174 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001175 try:
1176 os.link(src, dst)
1177 except PermissionError as e:
1178 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001179 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001180 with open(src, 'r') as f:
1181 self.assertEqual(f.read(), 'cheddar')
1182 os.remove(dst)
1183 finally:
1184 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001185
Hai Shi0c4f0f32020-06-30 21:46:31 +08001186 @os_helper.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001187 def test_dont_copy_file_onto_symlink_to_itself(self):
1188 # bug 851123.
1189 os.mkdir(TESTFN)
1190 src = os.path.join(TESTFN, 'cheese')
1191 dst = os.path.join(TESTFN, 'shop')
1192 try:
1193 with open(src, 'w') as f:
1194 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001195 # Using `src` here would mean we end up with a symlink pointing
1196 # to TESTFN/TESTFN/cheese, while it should point at
1197 # TESTFN/cheese.
1198 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001199 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001200 with open(src, 'r') as f:
1201 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001202 os.remove(dst)
1203 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001204 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001205
Serhiy Storchaka43767632013-11-03 21:31:38 +02001206 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1207 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
pxinwr6a273fd2020-11-29 06:06:36 +08001208 @unittest.skipIf(sys.platform == "vxworks",
1209 "fifo requires special path on VxWorks")
Serhiy Storchaka43767632013-11-03 21:31:38 +02001210 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001211 try:
1212 os.mkfifo(TESTFN)
1213 except PermissionError as e:
1214 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001215 try:
1216 self.assertRaises(shutil.SpecialFileError,
1217 shutil.copyfile, TESTFN, TESTFN2)
1218 self.assertRaises(shutil.SpecialFileError,
1219 shutil.copyfile, __file__, TESTFN)
1220 finally:
1221 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001222
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001223 def test_copyfile_return_value(self):
1224 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001225 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001226 dst_dir = self.mkdtemp()
1227 dst_file = os.path.join(dst_dir, 'bar')
1228 src_file = os.path.join(src_dir, 'foo')
1229 write_file(src_file, 'foo')
1230 rv = shutil.copyfile(src_file, dst_file)
1231 self.assertTrue(os.path.exists(rv))
1232 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001233
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001234 def test_copyfile_same_file(self):
1235 # copyfile() should raise SameFileError if the source and destination
1236 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001237 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001238 src_file = os.path.join(src_dir, 'foo')
1239 write_file(src_file, 'foo')
1240 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1241 # But Error should work too, to stay backward compatible.
1242 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1243 # Make sure file is not corrupted.
1244 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001245
Tarek Ziadéfb437512010-04-20 08:57:33 +00001246
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001247class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001248
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001249 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001250
Hai Shia3ec3ad2020-05-19 06:02:57 +08001251 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001252 def test_make_tarball(self):
1253 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001254 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001255
1256 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001257 # force shutil to create the directory
1258 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001259 # working with relative paths
1260 work_dir = os.path.dirname(tmpdir2)
1261 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001262
Hai Shi0c4f0f32020-06-30 21:46:31 +08001263 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001264 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001265 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001266
1267 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001268 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001269 self.assertTrue(os.path.isfile(tarball))
1270 self.assertTrue(tarfile.is_tarfile(tarball))
1271 with tarfile.open(tarball, 'r:gz') as tf:
1272 self.assertCountEqual(tf.getnames(),
1273 ['.', './sub', './sub2',
1274 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001275
1276 # trying an uncompressed one
Hai Shi0c4f0f32020-06-30 21:46:31 +08001277 with os_helper.change_cwd(work_dir):
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001278 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001279 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001280 self.assertTrue(os.path.isfile(tarball))
1281 self.assertTrue(tarfile.is_tarfile(tarball))
1282 with tarfile.open(tarball, 'r') as tf:
1283 self.assertCountEqual(tf.getnames(),
1284 ['.', './sub', './sub2',
1285 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001286
1287 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001288 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001289 names = tar.getnames()
1290 names.sort()
1291 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001292
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001293 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001294 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001295 root_dir = self.mkdtemp()
1296 dist = os.path.join(root_dir, base_dir)
1297 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001298 write_file((dist, 'file1'), 'xxx')
1299 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001300 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001301 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001302 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001303 if base_dir:
1304 write_file((root_dir, 'outer'), 'xxx')
1305 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001306
Hai Shia3ec3ad2020-05-19 06:02:57 +08001307 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001308 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001309 'Need the tar command to run')
1310 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001311 root_dir, base_dir = self._create_files()
1312 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001313 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001314
1315 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001316 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001317 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001318
1319 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001320 tarball2 = os.path.join(root_dir, 'archive2.tar')
1321 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001322 subprocess.check_call(tar_cmd, cwd=root_dir,
1323 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001324
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001325 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001326 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001327 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001328
1329 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001330 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1331 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001332 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001333
1334 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001335 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1336 dry_run=True)
1337 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001338 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001339
Hai Shia3ec3ad2020-05-19 06:02:57 +08001340 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001341 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001342 # creating something to zip
1343 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001344
1345 tmpdir2 = self.mkdtemp()
1346 # force shutil to create the directory
1347 os.rmdir(tmpdir2)
1348 # working with relative paths
1349 work_dir = os.path.dirname(tmpdir2)
1350 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001351
Hai Shi0c4f0f32020-06-30 21:46:31 +08001352 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001353 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001354 res = make_archive(rel_base_name, 'zip', root_dir)
1355
1356 self.assertEqual(res, base_name + '.zip')
1357 self.assertTrue(os.path.isfile(res))
1358 self.assertTrue(zipfile.is_zipfile(res))
1359 with zipfile.ZipFile(res) as zf:
1360 self.assertCountEqual(zf.namelist(),
1361 ['dist/', 'dist/sub/', 'dist/sub2/',
1362 'dist/file1', 'dist/file2', 'dist/sub/file3',
1363 'outer'])
1364
Hai Shi0c4f0f32020-06-30 21:46:31 +08001365 with os_helper.change_cwd(work_dir):
Serhiy Storchaka666de772016-10-23 15:55:09 +03001366 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001367 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001368
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001369 self.assertEqual(res, base_name + '.zip')
1370 self.assertTrue(os.path.isfile(res))
1371 self.assertTrue(zipfile.is_zipfile(res))
1372 with zipfile.ZipFile(res) as zf:
1373 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001374 ['dist/', 'dist/sub/', 'dist/sub2/',
1375 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001376
Hai Shia3ec3ad2020-05-19 06:02:57 +08001377 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001378 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001379 'Need the zip command to run')
1380 def test_zipfile_vs_zip(self):
1381 root_dir, base_dir = self._create_files()
1382 base_name = os.path.join(self.mkdtemp(), 'archive')
1383 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1384
1385 # check if ZIP file was created
1386 self.assertEqual(archive, base_name + '.zip')
1387 self.assertTrue(os.path.isfile(archive))
1388
1389 # now create another ZIP file using `zip`
1390 archive2 = os.path.join(root_dir, 'archive2.zip')
1391 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001392 subprocess.check_call(zip_cmd, cwd=root_dir,
1393 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001394
1395 self.assertTrue(os.path.isfile(archive2))
1396 # let's compare both ZIP files
1397 with zipfile.ZipFile(archive) as zf:
1398 names = zf.namelist()
1399 with zipfile.ZipFile(archive2) as zf:
1400 names2 = zf.namelist()
1401 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001402
Hai Shia3ec3ad2020-05-19 06:02:57 +08001403 @support.requires_zlib()
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001404 @unittest.skipUnless(shutil.which('unzip'),
1405 'Need the unzip command to run')
1406 def test_unzip_zipfile(self):
1407 root_dir, base_dir = self._create_files()
1408 base_name = os.path.join(self.mkdtemp(), 'archive')
1409 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1410
1411 # check if ZIP file was created
1412 self.assertEqual(archive, base_name + '.zip')
1413 self.assertTrue(os.path.isfile(archive))
1414
1415 # now check the ZIP file using `unzip -t`
1416 zip_cmd = ['unzip', '-t', archive]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001417 with os_helper.change_cwd(root_dir):
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001418 try:
1419 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1420 except subprocess.CalledProcessError as exc:
1421 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001422 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001423 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001424 msg = "{}\n\n**Unzip Output**\n{}"
1425 self.fail(msg.format(exc, details))
1426
Tarek Ziadé396fad72010-02-23 05:30:31 +00001427 def test_make_archive(self):
1428 tmpdir = self.mkdtemp()
1429 base_name = os.path.join(tmpdir, 'archive')
1430 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1431
Hai Shia3ec3ad2020-05-19 06:02:57 +08001432 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001433 def test_make_archive_owner_group(self):
1434 # testing make_archive with owner and group, with various combinations
1435 # this works even if there's not gid/uid support
1436 if UID_GID_SUPPORT:
1437 group = grp.getgrgid(0)[0]
1438 owner = pwd.getpwuid(0)[0]
1439 else:
1440 group = owner = 'root'
1441
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001442 root_dir, base_dir = self._create_files()
1443 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001444 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1445 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001446 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001447
1448 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001449 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001450
1451 res = make_archive(base_name, 'tar', root_dir, base_dir,
1452 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001453 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001454
1455 res = make_archive(base_name, 'tar', root_dir, base_dir,
1456 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001457 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001458
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001459
Hai Shia3ec3ad2020-05-19 06:02:57 +08001460 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001461 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1462 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001463 root_dir, base_dir = self._create_files()
1464 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001465 group = grp.getgrgid(0)[0]
1466 owner = pwd.getpwuid(0)[0]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001467 with os_helper.change_cwd(root_dir):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001468 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1469 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001470
1471 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001472 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001473
1474 # now checks the rights
1475 archive = tarfile.open(archive_name)
1476 try:
1477 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001478 self.assertEqual(member.uid, 0)
1479 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001480 finally:
1481 archive.close()
1482
1483 def test_make_archive_cwd(self):
1484 current_dir = os.getcwd()
1485 def _breaks(*args, **kw):
1486 raise RuntimeError()
1487
1488 register_archive_format('xxx', _breaks, [], 'xxx file')
1489 try:
1490 try:
1491 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1492 except Exception:
1493 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001494 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001495 finally:
1496 unregister_archive_format('xxx')
1497
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001498 def test_make_tarfile_in_curdir(self):
1499 # Issue #21280
1500 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001501 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001502 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1503 self.assertTrue(os.path.isfile('test.tar'))
1504
Hai Shia3ec3ad2020-05-19 06:02:57 +08001505 @support.requires_zlib()
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001506 def test_make_zipfile_in_curdir(self):
1507 # Issue #21280
1508 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001509 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001510 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1511 self.assertTrue(os.path.isfile('test.zip'))
1512
Tarek Ziadé396fad72010-02-23 05:30:31 +00001513 def test_register_archive_format(self):
1514
1515 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1516 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1517 1)
1518 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1519 [(1, 2), (1, 2, 3)])
1520
1521 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1522 formats = [name for name, params in get_archive_formats()]
1523 self.assertIn('xxx', formats)
1524
1525 unregister_archive_format('xxx')
1526 formats = [name for name, params in get_archive_formats()]
1527 self.assertNotIn('xxx', formats)
1528
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001529 ### shutil.unpack_archive
1530
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001531 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001532 self.check_unpack_archive_with_converter(format, lambda path: path)
1533 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001534 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001535
1536 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001537 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001538 expected = rlistdir(root_dir)
1539 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001540
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001541 base_name = os.path.join(self.mkdtemp(), 'archive')
1542 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001543
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001544 # let's try to unpack it now
1545 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001546 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001547 self.assertEqual(rlistdir(tmpdir2), expected)
1548
1549 # and again, this time with the format specified
1550 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001551 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001552 self.assertEqual(rlistdir(tmpdir3), expected)
1553
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001554 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1555 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001556
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001557 def test_unpack_archive_tar(self):
1558 self.check_unpack_archive('tar')
1559
Hai Shia3ec3ad2020-05-19 06:02:57 +08001560 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001561 def test_unpack_archive_gztar(self):
1562 self.check_unpack_archive('gztar')
1563
Hai Shia3ec3ad2020-05-19 06:02:57 +08001564 @support.requires_bz2()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001565 def test_unpack_archive_bztar(self):
1566 self.check_unpack_archive('bztar')
1567
Hai Shia3ec3ad2020-05-19 06:02:57 +08001568 @support.requires_lzma()
Michael Feltef110b12019-02-18 12:02:44 +01001569 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001570 def test_unpack_archive_xztar(self):
1571 self.check_unpack_archive('xztar')
1572
Hai Shia3ec3ad2020-05-19 06:02:57 +08001573 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001574 def test_unpack_archive_zip(self):
1575 self.check_unpack_archive('zip')
1576
Martin Pantereb995702016-07-28 01:11:04 +00001577 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001578
1579 formats = get_unpack_formats()
1580
1581 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001582 self.assertEqual(extra, 1)
1583 self.assertEqual(filename, 'stuff.boo')
1584 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001585
1586 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1587 unpack_archive('stuff.boo', 'xx')
1588
1589 # trying to register a .boo unpacker again
1590 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1591 ['.boo'], _boo)
1592
1593 # should work now
1594 unregister_unpack_format('Boo')
1595 register_unpack_format('Boo2', ['.boo'], _boo)
1596 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1597 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1598
1599 # let's leave a clean state
1600 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001601 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001602
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001603
1604class TestMisc(BaseTest, unittest.TestCase):
1605
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001606 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1607 "disk_usage not available on this platform")
1608 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001609 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001610 for attr in ('total', 'used', 'free'):
1611 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001612 self.assertGreater(usage.total, 0)
1613 self.assertGreater(usage.used, 0)
1614 self.assertGreaterEqual(usage.free, 0)
1615 self.assertGreaterEqual(usage.total, usage.used)
1616 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001617
Victor Stinnerdc525f42018-12-11 12:05:21 +01001618 # bpo-32557: Check that disk_usage() also accepts a filename
1619 shutil.disk_usage(__file__)
1620
Sandro Tosid902a142011-08-22 23:28:27 +02001621 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1622 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1623 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001624 dirname = self.mkdtemp()
1625 filename = tempfile.mktemp(dir=dirname)
1626 write_file(filename, 'testing chown function')
1627
1628 with self.assertRaises(ValueError):
1629 shutil.chown(filename)
1630
1631 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001632 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001633
1634 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001635 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001636
1637 with self.assertRaises(TypeError):
1638 shutil.chown(filename, b'spam')
1639
1640 with self.assertRaises(TypeError):
1641 shutil.chown(filename, 3.14)
1642
1643 uid = os.getuid()
1644 gid = os.getgid()
1645
1646 def check_chown(path, uid=None, gid=None):
1647 s = os.stat(filename)
1648 if uid is not None:
1649 self.assertEqual(uid, s.st_uid)
1650 if gid is not None:
1651 self.assertEqual(gid, s.st_gid)
1652
1653 shutil.chown(filename, uid, gid)
1654 check_chown(filename, uid, gid)
1655 shutil.chown(filename, uid)
1656 check_chown(filename, uid)
1657 shutil.chown(filename, user=uid)
1658 check_chown(filename, uid)
1659 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001660 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001661
1662 shutil.chown(dirname, uid, gid)
1663 check_chown(dirname, uid, gid)
1664 shutil.chown(dirname, uid)
1665 check_chown(dirname, uid)
1666 shutil.chown(dirname, user=uid)
1667 check_chown(dirname, uid)
1668 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001669 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001670
Matthias Braun52268942020-03-17 09:51:44 -07001671 try:
1672 user = pwd.getpwuid(uid)[0]
1673 group = grp.getgrgid(gid)[0]
1674 except KeyError:
1675 # On some systems uid/gid cannot be resolved.
1676 pass
1677 else:
1678 shutil.chown(filename, user, group)
1679 check_chown(filename, uid, gid)
1680 shutil.chown(dirname, user, group)
1681 check_chown(dirname, uid, gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001682
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001683
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001684class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001685
1686 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001687 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001688 # Give the temp_file an ".exe" suffix for all.
1689 # It's needed on Windows and not harmful on other platforms.
1690 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001691 prefix="Tmp",
1692 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001693 os.chmod(self.temp_file.name, stat.S_IXUSR)
1694 self.addCleanup(self.temp_file.close)
1695 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001696 self.env_path = self.dir
1697 self.curdir = os.curdir
1698 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001699
1700 def test_basic(self):
1701 # Given an EXE in a directory, it should be returned.
1702 rv = shutil.which(self.file, path=self.dir)
1703 self.assertEqual(rv, self.temp_file.name)
1704
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001705 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001706 # When given the fully qualified path to an executable that exists,
1707 # it should be returned.
1708 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001709 self.assertEqual(rv, self.temp_file.name)
1710
1711 def test_relative_cmd(self):
1712 # When given the relative path with a directory part to an executable
1713 # that exists, it should be returned.
1714 base_dir, tail_dir = os.path.split(self.dir)
1715 relpath = os.path.join(tail_dir, self.file)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001716 with os_helper.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001717 rv = shutil.which(relpath, path=self.temp_dir)
1718 self.assertEqual(rv, relpath)
1719 # But it shouldn't be searched in PATH directories (issue #16957).
Hai Shi0c4f0f32020-06-30 21:46:31 +08001720 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001721 rv = shutil.which(relpath, path=base_dir)
1722 self.assertIsNone(rv)
1723
1724 def test_cwd(self):
1725 # Issue #16957
1726 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001727 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001728 rv = shutil.which(self.file, path=base_dir)
1729 if sys.platform == "win32":
1730 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001731 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001732 else:
1733 # Other platforms: shouldn't match in the current directory.
1734 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001735
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001736 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1737 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001738 def test_non_matching_mode(self):
1739 # Set the file read-only and ask for writeable files.
1740 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001741 if os.access(self.temp_file.name, os.W_OK):
1742 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001743 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1744 self.assertIsNone(rv)
1745
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001746 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001747 base_dir, tail_dir = os.path.split(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001748 with os_helper.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001749 rv = shutil.which(self.file, path=tail_dir)
1750 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001751
Brian Curtinc57a3452012-06-22 16:00:30 -05001752 def test_nonexistent_file(self):
1753 # Return None when no matching executable file is found on the path.
1754 rv = shutil.which("foo.exe", path=self.dir)
1755 self.assertIsNone(rv)
1756
1757 @unittest.skipUnless(sys.platform == "win32",
1758 "pathext check is Windows-only")
1759 def test_pathext_checking(self):
1760 # Ask for the file without the ".exe" extension, then ensure that
1761 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001762 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001763 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001764
Barry Warsaw618738b2013-04-16 11:05:03 -04001765 def test_environ_path(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001766 with os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001767 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001768 rv = shutil.which(self.file)
1769 self.assertEqual(rv, self.temp_file.name)
1770
Victor Stinner228a3c92019-04-17 16:26:36 +02001771 def test_environ_path_empty(self):
1772 # PATH='': no match
Hai Shi0c4f0f32020-06-30 21:46:31 +08001773 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001774 env['PATH'] = ''
1775 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1776 create=True), \
1777 support.swap_attr(os, 'defpath', self.dir), \
Hai Shi0c4f0f32020-06-30 21:46:31 +08001778 os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001779 rv = shutil.which(self.file)
1780 self.assertIsNone(rv)
1781
1782 def test_environ_path_cwd(self):
1783 expected_cwd = os.path.basename(self.temp_file.name)
1784 if sys.platform == "win32":
1785 curdir = os.curdir
1786 if isinstance(expected_cwd, bytes):
1787 curdir = os.fsencode(curdir)
1788 expected_cwd = os.path.join(curdir, expected_cwd)
1789
1790 # PATH=':': explicitly looks in the current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001791 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001792 env['PATH'] = os.pathsep
1793 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1794 create=True), \
1795 support.swap_attr(os, 'defpath', self.dir):
1796 rv = shutil.which(self.file)
1797 self.assertIsNone(rv)
1798
1799 # look in current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001800 with os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001801 rv = shutil.which(self.file)
1802 self.assertEqual(rv, expected_cwd)
1803
1804 def test_environ_path_missing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001805 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001806 env.pop('PATH', None)
1807
1808 # without confstr
1809 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1810 create=True), \
1811 support.swap_attr(os, 'defpath', self.dir):
1812 rv = shutil.which(self.file)
1813 self.assertEqual(rv, self.temp_file.name)
1814
1815 # with confstr
1816 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1817 create=True), \
1818 support.swap_attr(os, 'defpath', ''):
1819 rv = shutil.which(self.file)
1820 self.assertEqual(rv, self.temp_file.name)
1821
Barry Warsaw618738b2013-04-16 11:05:03 -04001822 def test_empty_path(self):
1823 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001824 with os_helper.change_cwd(path=self.dir), \
1825 os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001826 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001827 rv = shutil.which(self.file, path='')
1828 self.assertIsNone(rv)
1829
1830 def test_empty_path_no_PATH(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001831 with os_helper.EnvironmentVarGuard() as env:
Barry Warsaw618738b2013-04-16 11:05:03 -04001832 env.pop('PATH', None)
1833 rv = shutil.which(self.file)
1834 self.assertIsNone(rv)
1835
Victor Stinner228a3c92019-04-17 16:26:36 +02001836 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1837 def test_pathext(self):
1838 ext = ".xyz"
1839 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1840 prefix="Tmp2", suffix=ext)
1841 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1842 self.addCleanup(temp_filexyz.close)
1843
1844 # strip path and extension
1845 program = os.path.basename(temp_filexyz.name)
1846 program = os.path.splitext(program)[0]
1847
Hai Shi0c4f0f32020-06-30 21:46:31 +08001848 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001849 env['PATHEXT'] = ext
1850 rv = shutil.which(program, path=self.temp_dir)
1851 self.assertEqual(rv, temp_filexyz.name)
1852
Christopher Marchfelderda6f0982020-10-23 12:08:24 +02001853 # Issue 40592: See https://bugs.python.org/issue40592
1854 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1855 def test_pathext_with_empty_str(self):
1856 ext = ".xyz"
1857 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1858 prefix="Tmp2", suffix=ext)
1859 self.addCleanup(temp_filexyz.close)
1860
1861 # strip path and extension
1862 program = os.path.basename(temp_filexyz.name)
1863 program = os.path.splitext(program)[0]
1864
1865 with os_helper.EnvironmentVarGuard() as env:
1866 env['PATHEXT'] = f"{ext};" # note the ;
1867 rv = shutil.which(program, path=self.temp_dir)
1868 self.assertEqual(rv, temp_filexyz.name)
1869
Brian Curtinc57a3452012-06-22 16:00:30 -05001870
Cheryl Sabella5680f652019-02-13 06:25:10 -05001871class TestWhichBytes(TestWhich):
1872 def setUp(self):
1873 TestWhich.setUp(self)
1874 self.dir = os.fsencode(self.dir)
1875 self.file = os.fsencode(self.file)
1876 self.temp_file.name = os.fsencode(self.temp_file.name)
1877 self.curdir = os.fsencode(self.curdir)
1878 self.ext = os.fsencode(self.ext)
1879
1880
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001881class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001882
1883 def setUp(self):
1884 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001885 self.src_dir = self.mkdtemp()
1886 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001887 self.src_file = os.path.join(self.src_dir, filename)
1888 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001889 with open(self.src_file, "wb") as f:
1890 f.write(b"spam")
1891
Christian Heimesada8c3b2008-03-18 18:26:33 +00001892 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001893 with open(src, "rb") as f:
1894 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001895 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001896 with open(real_dst, "rb") as f:
1897 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001898 self.assertFalse(os.path.exists(src))
1899
1900 def _check_move_dir(self, src, dst, real_dst):
1901 contents = sorted(os.listdir(src))
1902 shutil.move(src, dst)
1903 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1904 self.assertFalse(os.path.exists(src))
1905
1906 def test_move_file(self):
1907 # Move a file to another location on the same filesystem.
1908 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1909
1910 def test_move_file_to_dir(self):
1911 # Move a file inside an existing dir on the same filesystem.
1912 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1913
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001914 def test_move_file_to_dir_pathlike_src(self):
1915 # Move a pathlike file to another location on the same filesystem.
1916 src = pathlib.Path(self.src_file)
1917 self._check_move_file(src, self.dst_dir, self.dst_file)
1918
1919 def test_move_file_to_dir_pathlike_dst(self):
1920 # Move a file to another pathlike location on the same filesystem.
1921 dst = pathlib.Path(self.dst_dir)
1922 self._check_move_file(self.src_file, dst, self.dst_file)
1923
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001924 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001925 def test_move_file_other_fs(self):
1926 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001927 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001928
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001929 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001930 def test_move_file_to_dir_other_fs(self):
1931 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001932 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001933
1934 def test_move_dir(self):
1935 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001936 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001937 try:
1938 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1939 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001940 os_helper.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001941
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001942 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001943 def test_move_dir_other_fs(self):
1944 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001945 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001946
1947 def test_move_dir_to_dir(self):
1948 # Move a dir inside an existing dir on the same filesystem.
1949 self._check_move_dir(self.src_dir, self.dst_dir,
1950 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1951
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001952 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001953 def test_move_dir_to_dir_other_fs(self):
1954 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001955 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001956
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001957 def test_move_dir_sep_to_dir(self):
1958 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1959 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1960
1961 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1962 def test_move_dir_altsep_to_dir(self):
1963 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1964 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1965
Christian Heimesada8c3b2008-03-18 18:26:33 +00001966 def test_existing_file_inside_dest_dir(self):
1967 # A file with the same name inside the destination dir already exists.
1968 with open(self.dst_file, "wb"):
1969 pass
1970 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1971
1972 def test_dont_move_dir_in_itself(self):
1973 # Moving a dir inside itself raises an Error.
1974 dst = os.path.join(self.src_dir, "bar")
1975 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1976
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001977 def test_destinsrc_false_negative(self):
1978 os.mkdir(TESTFN)
1979 try:
1980 for src, dst in [('srcdir', 'srcdir/dest')]:
1981 src = os.path.join(TESTFN, src)
1982 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001983 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001984 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001985 'dst (%s) is not in src (%s)' % (dst, src))
1986 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001987 os_helper.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001988
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001989 def test_destinsrc_false_positive(self):
1990 os.mkdir(TESTFN)
1991 try:
1992 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1993 src = os.path.join(TESTFN, src)
1994 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001995 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001996 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001997 'dst (%s) is in src (%s)' % (dst, src))
1998 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001999 os_helper.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00002000
Hai Shi0c4f0f32020-06-30 21:46:31 +08002001 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002002 @mock_rename
2003 def test_move_file_symlink(self):
2004 dst = os.path.join(self.src_dir, 'bar')
2005 os.symlink(self.src_file, dst)
2006 shutil.move(dst, self.dst_file)
2007 self.assertTrue(os.path.islink(self.dst_file))
2008 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
2009
Hai Shi0c4f0f32020-06-30 21:46:31 +08002010 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002011 @mock_rename
2012 def test_move_file_symlink_to_dir(self):
2013 filename = "bar"
2014 dst = os.path.join(self.src_dir, filename)
2015 os.symlink(self.src_file, dst)
2016 shutil.move(dst, self.dst_dir)
2017 final_link = os.path.join(self.dst_dir, filename)
2018 self.assertTrue(os.path.islink(final_link))
2019 self.assertTrue(os.path.samefile(self.src_file, final_link))
2020
Hai Shi0c4f0f32020-06-30 21:46:31 +08002021 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002022 @mock_rename
2023 def test_move_dangling_symlink(self):
2024 src = os.path.join(self.src_dir, 'baz')
2025 dst = os.path.join(self.src_dir, 'bar')
2026 os.symlink(src, dst)
2027 dst_link = os.path.join(self.dst_dir, 'quux')
2028 shutil.move(dst, dst_link)
2029 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07002030 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002031
Hai Shi0c4f0f32020-06-30 21:46:31 +08002032 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002033 @mock_rename
2034 def test_move_dir_symlink(self):
2035 src = os.path.join(self.src_dir, 'baz')
2036 dst = os.path.join(self.src_dir, 'bar')
2037 os.mkdir(src)
2038 os.symlink(src, dst)
2039 dst_link = os.path.join(self.dst_dir, 'quux')
2040 shutil.move(dst, dst_link)
2041 self.assertTrue(os.path.islink(dst_link))
2042 self.assertTrue(os.path.samefile(src, dst_link))
2043
Brian Curtin0d0a1de2012-06-18 18:41:07 -05002044 def test_move_return_value(self):
2045 rv = shutil.move(self.src_file, self.dst_dir)
2046 self.assertEqual(rv,
2047 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2048
2049 def test_move_as_rename_return_value(self):
2050 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2051 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2052
R David Murray6ffface2014-06-11 14:40:13 -04002053 @mock_rename
2054 def test_move_file_special_function(self):
2055 moved = []
2056 def _copy(src, dst):
2057 moved.append((src, dst))
2058 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2059 self.assertEqual(len(moved), 1)
2060
2061 @mock_rename
2062 def test_move_dir_special_function(self):
2063 moved = []
2064 def _copy(src, dst):
2065 moved.append((src, dst))
Hai Shi0c4f0f32020-06-30 21:46:31 +08002066 os_helper.create_empty_file(os.path.join(self.src_dir, 'child'))
2067 os_helper.create_empty_file(os.path.join(self.src_dir, 'child1'))
R David Murray6ffface2014-06-11 14:40:13 -04002068 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2069 self.assertEqual(len(moved), 3)
2070
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002071 def test_move_dir_caseinsensitive(self):
2072 # Renames a folder to the same name
2073 # but a different case.
2074
2075 self.src_dir = self.mkdtemp()
2076 dst_dir = os.path.join(
2077 os.path.dirname(self.src_dir),
2078 os.path.basename(self.src_dir).upper())
2079 self.assertNotEqual(self.src_dir, dst_dir)
2080
2081 try:
2082 shutil.move(self.src_dir, dst_dir)
2083 self.assertTrue(os.path.isdir(dst_dir))
2084 finally:
2085 os.rmdir(dst_dir)
2086
Tarek Ziadé5340db32010-04-19 22:30:51 +00002087
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002088class TestCopyFile(unittest.TestCase):
2089
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002090 class Faux(object):
2091 _entered = False
2092 _exited_with = None
2093 _raised = False
2094 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2095 self._raise_in_exit = raise_in_exit
2096 self._suppress_at_exit = suppress_at_exit
2097 def read(self, *args):
2098 return ''
2099 def __enter__(self):
2100 self._entered = True
2101 def __exit__(self, exc_type, exc_val, exc_tb):
2102 self._exited_with = exc_type, exc_val, exc_tb
2103 if self._raise_in_exit:
2104 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002105 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002106 return self._suppress_at_exit
2107
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002108 def test_w_source_open_fails(self):
2109 def _open(filename, mode='r'):
2110 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002111 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002112 assert 0 # shouldn't reach here.
2113
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002114 with support.swap_attr(shutil, 'open', _open):
2115 with self.assertRaises(OSError):
2116 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002117
Victor Stinner937ee9e2018-06-26 02:11:06 +02002118 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002119 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002120 srcfile = self.Faux()
2121
2122 def _open(filename, mode='r'):
2123 if filename == 'srcfile':
2124 return srcfile
2125 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002126 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002127 assert 0 # shouldn't reach here.
2128
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002129 with support.swap_attr(shutil, 'open', _open):
2130 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002131 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002132 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002133 self.assertEqual(srcfile._exited_with[1].args,
2134 ('Cannot open "destfile"',))
2135
Victor Stinner937ee9e2018-06-26 02:11:06 +02002136 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002137 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002138 srcfile = self.Faux()
2139 destfile = self.Faux(True)
2140
2141 def _open(filename, mode='r'):
2142 if filename == 'srcfile':
2143 return srcfile
2144 if filename == 'destfile':
2145 return destfile
2146 assert 0 # shouldn't reach here.
2147
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002148 with support.swap_attr(shutil, 'open', _open):
2149 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002150 self.assertTrue(srcfile._entered)
2151 self.assertTrue(destfile._entered)
2152 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002153 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002154 self.assertEqual(srcfile._exited_with[1].args,
2155 ('Cannot close',))
2156
Victor Stinner937ee9e2018-06-26 02:11:06 +02002157 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002158 def test_w_source_close_fails(self):
2159
2160 srcfile = self.Faux(True)
2161 destfile = self.Faux()
2162
2163 def _open(filename, mode='r'):
2164 if filename == 'srcfile':
2165 return srcfile
2166 if filename == 'destfile':
2167 return destfile
2168 assert 0 # shouldn't reach here.
2169
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002170 with support.swap_attr(shutil, 'open', _open):
2171 with self.assertRaises(OSError):
2172 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002173 self.assertTrue(srcfile._entered)
2174 self.assertTrue(destfile._entered)
2175 self.assertFalse(destfile._raised)
2176 self.assertTrue(srcfile._exited_with[0] is None)
2177 self.assertTrue(srcfile._raised)
2178
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002179
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002180class TestCopyFileObj(unittest.TestCase):
2181 FILESIZE = 2 * 1024 * 1024
2182
2183 @classmethod
2184 def setUpClass(cls):
2185 write_test_file(TESTFN, cls.FILESIZE)
2186
2187 @classmethod
2188 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002189 os_helper.unlink(TESTFN)
2190 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002191
2192 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002193 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002194
2195 @contextlib.contextmanager
2196 def get_files(self):
2197 with open(TESTFN, "rb") as src:
2198 with open(TESTFN2, "wb") as dst:
2199 yield (src, dst)
2200
2201 def assert_files_eq(self, src, dst):
2202 with open(src, 'rb') as fsrc:
2203 with open(dst, 'rb') as fdst:
2204 self.assertEqual(fsrc.read(), fdst.read())
2205
2206 def test_content(self):
2207 with self.get_files() as (src, dst):
2208 shutil.copyfileobj(src, dst)
2209 self.assert_files_eq(TESTFN, TESTFN2)
2210
2211 def test_file_not_closed(self):
2212 with self.get_files() as (src, dst):
2213 shutil.copyfileobj(src, dst)
2214 assert not src.closed
2215 assert not dst.closed
2216
2217 def test_file_offset(self):
2218 with self.get_files() as (src, dst):
2219 shutil.copyfileobj(src, dst)
2220 self.assertEqual(src.tell(), self.FILESIZE)
2221 self.assertEqual(dst.tell(), self.FILESIZE)
2222
2223 @unittest.skipIf(os.name != 'nt', "Windows only")
2224 def test_win_impl(self):
2225 # Make sure alternate Windows implementation is called.
2226 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2227 shutil.copyfile(TESTFN, TESTFN2)
2228 assert m.called
2229
2230 # File size is 2 MiB but max buf size should be 1 MiB.
2231 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2232
2233 # If file size < 1 MiB memoryview() length must be equal to
2234 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002235 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002236 f.write(b'foo')
2237 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002238 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002239 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2240 shutil.copyfile(fname, TESTFN2)
2241 self.assertEqual(m.call_args[0][2], 3)
2242
2243 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002244 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002245 pass
2246 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002247 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002248 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2249 shutil.copyfile(fname, TESTFN2)
2250 assert not m.called
2251 self.assert_files_eq(fname, TESTFN2)
2252
2253
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002254class _ZeroCopyFileTest(object):
2255 """Tests common to all zero-copy APIs."""
2256 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2257 FILEDATA = b""
2258 PATCHPOINT = ""
2259
2260 @classmethod
2261 def setUpClass(cls):
2262 write_test_file(TESTFN, cls.FILESIZE)
2263 with open(TESTFN, 'rb') as f:
2264 cls.FILEDATA = f.read()
2265 assert len(cls.FILEDATA) == cls.FILESIZE
2266
2267 @classmethod
2268 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002269 os_helper.unlink(TESTFN)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002270
2271 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002272 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002273
2274 @contextlib.contextmanager
2275 def get_files(self):
2276 with open(TESTFN, "rb") as src:
2277 with open(TESTFN2, "wb") as dst:
2278 yield (src, dst)
2279
2280 def zerocopy_fun(self, *args, **kwargs):
2281 raise NotImplementedError("must be implemented in subclass")
2282
2283 def reset(self):
2284 self.tearDown()
2285 self.tearDownClass()
2286 self.setUpClass()
2287 self.setUp()
2288
2289 # ---
2290
2291 def test_regular_copy(self):
2292 with self.get_files() as (src, dst):
2293 self.zerocopy_fun(src, dst)
2294 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2295 # Make sure the fallback function is not called.
2296 with self.get_files() as (src, dst):
2297 with unittest.mock.patch('shutil.copyfileobj') as m:
2298 shutil.copyfile(TESTFN, TESTFN2)
2299 assert not m.called
2300
2301 def test_same_file(self):
2302 self.addCleanup(self.reset)
2303 with self.get_files() as (src, dst):
2304 with self.assertRaises(Exception):
2305 self.zerocopy_fun(src, src)
2306 # Make sure src file is not corrupted.
2307 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2308
2309 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002310 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002311 with self.assertRaises(FileNotFoundError) as cm:
2312 shutil.copyfile(name, "new")
2313 self.assertEqual(cm.exception.filename, name)
2314
2315 def test_empty_file(self):
2316 srcname = TESTFN + 'src'
2317 dstname = TESTFN + 'dst'
Hai Shi0c4f0f32020-06-30 21:46:31 +08002318 self.addCleanup(lambda: os_helper.unlink(srcname))
2319 self.addCleanup(lambda: os_helper.unlink(dstname))
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002320 with open(srcname, "wb"):
2321 pass
2322
2323 with open(srcname, "rb") as src:
2324 with open(dstname, "wb") as dst:
2325 self.zerocopy_fun(src, dst)
2326
2327 self.assertEqual(read_file(dstname, binary=True), b"")
2328
2329 def test_unhandled_exception(self):
2330 with unittest.mock.patch(self.PATCHPOINT,
2331 side_effect=ZeroDivisionError):
2332 self.assertRaises(ZeroDivisionError,
2333 shutil.copyfile, TESTFN, TESTFN2)
2334
2335 def test_exception_on_first_call(self):
2336 # Emulate a case where the first call to the zero-copy
2337 # function raises an exception in which case the function is
2338 # supposed to give up immediately.
2339 with unittest.mock.patch(self.PATCHPOINT,
2340 side_effect=OSError(errno.EINVAL, "yo")):
2341 with self.get_files() as (src, dst):
2342 with self.assertRaises(_GiveupOnFastCopy):
2343 self.zerocopy_fun(src, dst)
2344
2345 def test_filesystem_full(self):
2346 # Emulate a case where filesystem is full and sendfile() fails
2347 # on first call.
2348 with unittest.mock.patch(self.PATCHPOINT,
2349 side_effect=OSError(errno.ENOSPC, "yo")):
2350 with self.get_files() as (src, dst):
2351 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2352
2353
2354@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2355class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2356 PATCHPOINT = "os.sendfile"
2357
2358 def zerocopy_fun(self, fsrc, fdst):
2359 return shutil._fastcopy_sendfile(fsrc, fdst)
2360
2361 def test_non_regular_file_src(self):
2362 with io.BytesIO(self.FILEDATA) as src:
2363 with open(TESTFN2, "wb") as dst:
2364 with self.assertRaises(_GiveupOnFastCopy):
2365 self.zerocopy_fun(src, dst)
2366 shutil.copyfileobj(src, dst)
2367
2368 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2369
2370 def test_non_regular_file_dst(self):
2371 with open(TESTFN, "rb") as src:
2372 with io.BytesIO() as dst:
2373 with self.assertRaises(_GiveupOnFastCopy):
2374 self.zerocopy_fun(src, dst)
2375 shutil.copyfileobj(src, dst)
2376 dst.seek(0)
2377 self.assertEqual(dst.read(), self.FILEDATA)
2378
2379 def test_exception_on_second_call(self):
2380 def sendfile(*args, **kwargs):
2381 if not flag:
2382 flag.append(None)
2383 return orig_sendfile(*args, **kwargs)
2384 else:
2385 raise OSError(errno.EBADF, "yo")
2386
2387 flag = []
2388 orig_sendfile = os.sendfile
2389 with unittest.mock.patch('os.sendfile', create=True,
2390 side_effect=sendfile):
2391 with self.get_files() as (src, dst):
2392 with self.assertRaises(OSError) as cm:
2393 shutil._fastcopy_sendfile(src, dst)
2394 assert flag
2395 self.assertEqual(cm.exception.errno, errno.EBADF)
2396
2397 def test_cant_get_size(self):
2398 # Emulate a case where src file size cannot be determined.
2399 # Internally bufsize will be set to a small value and
2400 # sendfile() will be called repeatedly.
2401 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2402 with self.get_files() as (src, dst):
2403 shutil._fastcopy_sendfile(src, dst)
2404 assert m.called
2405 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2406
2407 def test_small_chunks(self):
2408 # Force internal file size detection to be smaller than the
2409 # actual file size. We want to force sendfile() to be called
2410 # multiple times, also in order to emulate a src fd which gets
2411 # bigger while it is being copied.
2412 mock = unittest.mock.Mock()
2413 mock.st_size = 65536 + 1
2414 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2415 with self.get_files() as (src, dst):
2416 shutil._fastcopy_sendfile(src, dst)
2417 assert m.called
2418 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2419
2420 def test_big_chunk(self):
2421 # Force internal file size detection to be +100MB bigger than
2422 # the actual file size. Make sure sendfile() does not rely on
2423 # file size value except for (maybe) a better throughput /
2424 # performance.
2425 mock = unittest.mock.Mock()
2426 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2427 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2428 with self.get_files() as (src, dst):
2429 shutil._fastcopy_sendfile(src, dst)
2430 assert m.called
2431 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2432
2433 def test_blocksize_arg(self):
2434 with unittest.mock.patch('os.sendfile',
2435 side_effect=ZeroDivisionError) as m:
2436 self.assertRaises(ZeroDivisionError,
2437 shutil.copyfile, TESTFN, TESTFN2)
2438 blocksize = m.call_args[0][3]
2439 # Make sure file size and the block size arg passed to
2440 # sendfile() are the same.
2441 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2442 # ...unless we're dealing with a small file.
Hai Shi0c4f0f32020-06-30 21:46:31 +08002443 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002444 write_file(TESTFN2, b"hello", binary=True)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002445 self.addCleanup(os_helper.unlink, TESTFN2 + '3')
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002446 self.assertRaises(ZeroDivisionError,
2447 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2448 blocksize = m.call_args[0][3]
2449 self.assertEqual(blocksize, 2 ** 23)
2450
2451 def test_file2file_not_supported(self):
2452 # Emulate a case where sendfile() only support file->socket
2453 # fds. In such a case copyfile() is supposed to skip the
2454 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002455 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002456 try:
2457 with unittest.mock.patch(
2458 self.PATCHPOINT,
2459 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2460 with self.get_files() as (src, dst):
2461 with self.assertRaises(_GiveupOnFastCopy):
2462 shutil._fastcopy_sendfile(src, dst)
2463 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002464 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002465
2466 with unittest.mock.patch(self.PATCHPOINT) as m:
2467 shutil.copyfile(TESTFN, TESTFN2)
2468 assert not m.called
2469 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002470 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002471
2472
Victor Stinner937ee9e2018-06-26 02:11:06 +02002473@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002474class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002475 PATCHPOINT = "posix._fcopyfile"
2476
2477 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002478 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002479
2480
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002481class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002482 def test_does_not_crash(self):
2483 """Check if get_terminal_size() returns a meaningful value.
2484
2485 There's no easy portable way to actually check the size of the
2486 terminal, so let's check if it returns something sensible instead.
2487 """
2488 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002489 self.assertGreaterEqual(size.columns, 0)
2490 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002491
2492 def test_os_environ_first(self):
2493 "Check if environment variables have precedence"
2494
Hai Shi0c4f0f32020-06-30 21:46:31 +08002495 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002496 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002497 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002498 size = shutil.get_terminal_size()
2499 self.assertEqual(size.columns, 777)
2500
Hai Shi0c4f0f32020-06-30 21:46:31 +08002501 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002502 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002503 env['LINES'] = '888'
2504 size = shutil.get_terminal_size()
2505 self.assertEqual(size.lines, 888)
2506
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002507 def test_bad_environ(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002508 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002509 env['COLUMNS'] = 'xxx'
2510 env['LINES'] = 'yyy'
2511 size = shutil.get_terminal_size()
2512 self.assertGreaterEqual(size.columns, 0)
2513 self.assertGreaterEqual(size.lines, 0)
2514
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002515 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002516 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2517 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002518 def test_stty_match(self):
2519 """Check if stty returns the same results ignoring env
2520
2521 This test will fail if stdin and stdout are connected to
2522 different terminals with different sizes. Nevertheless, such
2523 situations should be pretty rare.
2524 """
2525 try:
2526 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002527 except (FileNotFoundError, PermissionError,
2528 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002529 self.skipTest("stty invocation failed")
2530 expected = (int(size[1]), int(size[0])) # reversed order
2531
Hai Shi0c4f0f32020-06-30 21:46:31 +08002532 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002533 del env['LINES']
2534 del env['COLUMNS']
2535 actual = shutil.get_terminal_size()
2536
2537 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002538
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002539 def test_fallback(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002540 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002541 del env['LINES']
2542 del env['COLUMNS']
2543
2544 # sys.__stdout__ has no fileno()
2545 with support.swap_attr(sys, '__stdout__', None):
2546 size = shutil.get_terminal_size(fallback=(10, 20))
2547 self.assertEqual(size.columns, 10)
2548 self.assertEqual(size.lines, 20)
2549
2550 # sys.__stdout__ is not a terminal on Unix
2551 # or fileno() not in (0, 1, 2) on Windows
2552 with open(os.devnull, 'w') as f, \
2553 support.swap_attr(sys, '__stdout__', f):
2554 size = shutil.get_terminal_size(fallback=(30, 40))
2555 self.assertEqual(size.columns, 30)
2556 self.assertEqual(size.lines, 40)
2557
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002558
Berker Peksag8083cd62014-11-01 11:04:06 +02002559class PublicAPITests(unittest.TestCase):
2560 """Ensures that the correct values are exposed in the public API."""
2561
2562 def test_module_all_attribute(self):
2563 self.assertTrue(hasattr(shutil, '__all__'))
2564 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2565 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2566 'SpecialFileError', 'ExecError', 'make_archive',
2567 'get_archive_formats', 'register_archive_format',
2568 'unregister_archive_format', 'get_unpack_formats',
2569 'register_unpack_format', 'unregister_unpack_format',
2570 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2571 'get_terminal_size', 'SameFileError']
2572 if hasattr(os, 'statvfs') or os.name == 'nt':
2573 target_api.append('disk_usage')
2574 self.assertEqual(set(shutil.__all__), set(target_api))
2575
2576
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002577if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002578 unittest.main()