blob: 890f2c7d406c24619c05d1107f343b17735b4fce [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Hai Shi0c4f0f32020-06-30 21:46:31 +080033from test.support import os_helper
34from test.support.os_helper import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030035
Antoine Pitrou7fff0962009-05-01 21:09:44 +000036TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020037MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010038AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000039try:
40 import grp
41 import pwd
42 UID_GID_SUPPORT = True
43except ImportError:
44 UID_GID_SUPPORT = False
45
Steve Dowerdf2d4a62019-08-21 15:27:33 -070046try:
47 import _winapi
48except ImportError:
49 _winapi = None
50
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040051def _fake_rename(*args, **kwargs):
52 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010053 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040054
55def mock_rename(func):
56 @functools.wraps(func)
57 def wrap(*args, **kwargs):
58 try:
59 builtin_rename = os.rename
60 os.rename = _fake_rename
61 return func(*args, **kwargs)
62 finally:
63 os.rename = builtin_rename
64 return wrap
65
Éric Araujoa7e33a12011-08-12 19:51:35 +020066def write_file(path, content, binary=False):
67 """Write *content* to a file located at *path*.
68
69 If *path* is a tuple instead of a string, os.path.join will be used to
70 make a path. If *binary* is true, the file will be opened in binary
71 mode.
72 """
73 if isinstance(path, tuple):
74 path = os.path.join(*path)
75 with open(path, 'wb' if binary else 'w') as fp:
76 fp.write(content)
77
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020078def write_test_file(path, size):
79 """Create a test file with an arbitrary size and random text content."""
80 def chunks(total, step):
81 assert total >= step
82 while total > step:
83 yield step
84 total -= step
85 if total:
86 yield total
87
88 bufsize = min(size, 8192)
89 chunk = b"".join([random.choice(string.ascii_letters).encode()
90 for i in range(bufsize)])
91 with open(path, 'wb') as f:
92 for csize in chunks(size, bufsize):
93 f.write(chunk)
94 assert os.path.getsize(path) == size
95
Éric Araujoa7e33a12011-08-12 19:51:35 +020096def read_file(path, binary=False):
97 """Return contents from a file located at *path*.
98
99 If *path* is a tuple instead of a string, os.path.join will be used to
100 make a path. If *binary* is true, the file will be opened in binary
101 mode.
102 """
103 if isinstance(path, tuple):
104 path = os.path.join(*path)
105 with open(path, 'rb' if binary else 'r') as fp:
106 return fp.read()
107
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300108def rlistdir(path):
109 res = []
110 for name in sorted(os.listdir(path)):
111 p = os.path.join(path, name)
112 if os.path.isdir(p) and not os.path.islink(p):
113 res.append(name + '/')
114 for n in rlistdir(p):
115 res.append(name + '/' + n)
116 else:
117 res.append(name)
118 return res
119
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200120def supports_file2file_sendfile():
121 # ...apparently Linux and Solaris are the only ones
122 if not hasattr(os, "sendfile"):
123 return False
124 srcname = None
125 dstname = None
126 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800127 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200128 srcname = f.name
129 f.write(b"0123456789")
130
131 with open(srcname, "rb") as src:
Steve Dowerabde52c2019-11-15 09:49:21 -0800132 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
Victor Stinner4c26abd2019-06-27 01:39:53 +0200133 dstname = dst.name
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200134 infd = src.fileno()
135 outfd = dst.fileno()
136 try:
137 os.sendfile(outfd, infd, 0, 2)
138 except OSError:
139 return False
140 else:
141 return True
142 finally:
143 if srcname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800144 os_helper.unlink(srcname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200145 if dstname is not None:
Hai Shi0c4f0f32020-06-30 21:46:31 +0800146 os_helper.unlink(dstname)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200147
148
149SUPPORTS_SENDFILE = supports_file2file_sendfile()
150
Michael Feltef110b12019-02-18 12:02:44 +0100151# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
152# The AIX command 'dump -o program' gives XCOFF header information
153# The second word of the last line in the maxdata value
154# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
155def _maxdataOK():
156 if AIX and sys.maxsize == 2147483647:
157 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
158 maxdata=hdrs.split("\n")[-1].split()[1]
159 return int(maxdata,16) >= 0x20000000
160 else:
161 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200162
Tarek Ziadé396fad72010-02-23 05:30:31 +0000163
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300164class BaseTest:
Tarek Ziadé396fad72010-02-23 05:30:31 +0000165
Steve Dowerabde52c2019-11-15 09:49:21 -0800166 def mkdtemp(self, prefix=None):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000167 """Create a temporary directory that will be cleaned up.
168
169 Returns the path of the directory.
170 """
Steve Dowerabde52c2019-11-15 09:49:21 -0800171 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
Hai Shi0c4f0f32020-06-30 21:46:31 +0800172 self.addCleanup(os_helper.rmtree, d)
Tarek Ziadé396fad72010-02-23 05:30:31 +0000173 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000174
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300175
176class TestRmTree(BaseTest, unittest.TestCase):
177
Hynek Schlawack3b527782012-06-25 13:27:31 +0200178 def test_rmtree_works_on_bytes(self):
179 tmp = self.mkdtemp()
180 victim = os.path.join(tmp, 'killme')
181 os.mkdir(victim)
182 write_file(os.path.join(victim, 'somefile'), 'foo')
183 victim = os.fsencode(victim)
184 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700185 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200186
Hai Shi0c4f0f32020-06-30 21:46:31 +0800187 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200188 def test_rmtree_fails_on_symlink(self):
189 tmp = self.mkdtemp()
190 dir_ = os.path.join(tmp, 'dir')
191 os.mkdir(dir_)
192 link = os.path.join(tmp, 'link')
193 os.symlink(dir_, link)
194 self.assertRaises(OSError, shutil.rmtree, link)
195 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100196 self.assertTrue(os.path.lexists(link))
197 errors = []
198 def onerror(*args):
199 errors.append(args)
200 shutil.rmtree(link, onerror=onerror)
201 self.assertEqual(len(errors), 1)
202 self.assertIs(errors[0][0], os.path.islink)
203 self.assertEqual(errors[0][1], link)
204 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200205
Hai Shi0c4f0f32020-06-30 21:46:31 +0800206 @os_helper.skip_unless_symlink
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200207 def test_rmtree_works_on_symlinks(self):
208 tmp = self.mkdtemp()
209 dir1 = os.path.join(tmp, 'dir1')
210 dir2 = os.path.join(dir1, 'dir2')
211 dir3 = os.path.join(tmp, 'dir3')
212 for d in dir1, dir2, dir3:
213 os.mkdir(d)
214 file1 = os.path.join(tmp, 'file1')
215 write_file(file1, 'foo')
216 link1 = os.path.join(dir1, 'link1')
217 os.symlink(dir2, link1)
218 link2 = os.path.join(dir1, 'link2')
219 os.symlink(dir3, link2)
220 link3 = os.path.join(dir1, 'link3')
221 os.symlink(file1, link3)
222 # make sure symlinks are removed but not followed
223 shutil.rmtree(dir1)
224 self.assertFalse(os.path.exists(dir1))
225 self.assertTrue(os.path.exists(dir3))
226 self.assertTrue(os.path.exists(file1))
227
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700228 @unittest.skipUnless(_winapi, 'only relevant on Windows')
229 def test_rmtree_fails_on_junctions(self):
230 tmp = self.mkdtemp()
231 dir_ = os.path.join(tmp, 'dir')
232 os.mkdir(dir_)
233 link = os.path.join(tmp, 'link')
234 _winapi.CreateJunction(dir_, link)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800235 self.addCleanup(os_helper.unlink, link)
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700236 self.assertRaises(OSError, shutil.rmtree, link)
237 self.assertTrue(os.path.exists(dir_))
238 self.assertTrue(os.path.lexists(link))
239 errors = []
240 def onerror(*args):
241 errors.append(args)
242 shutil.rmtree(link, onerror=onerror)
243 self.assertEqual(len(errors), 1)
244 self.assertIs(errors[0][0], os.path.islink)
245 self.assertEqual(errors[0][1], link)
246 self.assertIsInstance(errors[0][2][1], OSError)
247
248 @unittest.skipUnless(_winapi, 'only relevant on Windows')
249 def test_rmtree_works_on_junctions(self):
250 tmp = self.mkdtemp()
251 dir1 = os.path.join(tmp, 'dir1')
252 dir2 = os.path.join(dir1, 'dir2')
253 dir3 = os.path.join(tmp, 'dir3')
254 for d in dir1, dir2, dir3:
255 os.mkdir(d)
256 file1 = os.path.join(tmp, 'file1')
257 write_file(file1, 'foo')
258 link1 = os.path.join(dir1, 'link1')
259 _winapi.CreateJunction(dir2, link1)
260 link2 = os.path.join(dir1, 'link2')
261 _winapi.CreateJunction(dir3, link2)
262 link3 = os.path.join(dir1, 'link3')
263 _winapi.CreateJunction(file1, link3)
264 # make sure junctions are removed but not followed
265 shutil.rmtree(dir1)
266 self.assertFalse(os.path.exists(dir1))
267 self.assertTrue(os.path.exists(dir3))
268 self.assertTrue(os.path.exists(file1))
269
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000270 def test_rmtree_errors(self):
271 # filename is guaranteed not to exist
Steve Dowerabde52c2019-11-15 09:49:21 -0800272 filename = tempfile.mktemp(dir=self.mkdtemp())
Hynek Schlawackb5501102012-12-10 09:11:25 +0100273 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
274 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100275 shutil.rmtree(filename, ignore_errors=True)
276
277 # existing file
278 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100279 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100280 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100281 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100282 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100283 # The reason for this rather odd construct is that Windows sprinkles
284 # a \*.* at the end of file names. But only sometimes on some buildbots
285 possible_args = [filename, os.path.join(filename, '*.*')]
286 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100287 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100288 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100289 shutil.rmtree(filename, ignore_errors=True)
290 self.assertTrue(os.path.exists(filename))
291 errors = []
292 def onerror(*args):
293 errors.append(args)
294 shutil.rmtree(filename, onerror=onerror)
295 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200296 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100297 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100298 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100299 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100300 self.assertIs(errors[1][0], os.rmdir)
301 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100302 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100303 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000304
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000305
Serhiy Storchaka43767632013-11-03 21:31:38 +0200306 @unittest.skipIf(sys.platform[:6] == 'cygwin',
307 "This test can't be run on Cygwin (issue #1071513).")
308 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
309 "This test can't be run reliably as root (issue #1076467).")
310 def test_on_error(self):
311 self.errorState = 0
312 os.mkdir(TESTFN)
313 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200314
Serhiy Storchaka43767632013-11-03 21:31:38 +0200315 self.child_file_path = os.path.join(TESTFN, 'a')
316 self.child_dir_path = os.path.join(TESTFN, 'b')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800317 os_helper.create_empty_file(self.child_file_path)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200318 os.mkdir(self.child_dir_path)
319 old_dir_mode = os.stat(TESTFN).st_mode
320 old_child_file_mode = os.stat(self.child_file_path).st_mode
321 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
322 # Make unwritable.
323 new_mode = stat.S_IREAD|stat.S_IEXEC
324 os.chmod(self.child_file_path, new_mode)
325 os.chmod(self.child_dir_path, new_mode)
326 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000327
Serhiy Storchaka43767632013-11-03 21:31:38 +0200328 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
329 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
330 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200331
Serhiy Storchaka43767632013-11-03 21:31:38 +0200332 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
333 # Test whether onerror has actually been called.
334 self.assertEqual(self.errorState, 3,
335 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000336
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000337 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000338 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200339 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000340 # This function is run when shutil.rmtree fails.
341 # 99.9% of the time it initially fails to remove
342 # a file in the directory, so the first time through
343 # func is os.remove.
344 # However, some Linux machines running ZFS on
345 # FUSE experienced a failure earlier in the process
346 # at os.listdir. The first failure may legally
347 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200348 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200349 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200350 self.assertEqual(arg, self.child_file_path)
351 elif func is os.rmdir:
352 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000353 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200354 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200355 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000356 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200357 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000358 else:
359 self.assertEqual(func, os.rmdir)
360 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000361 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200362 self.errorState = 3
363
364 def test_rmtree_does_not_choke_on_failing_lstat(self):
365 try:
366 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200367 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200368 if fn != TESTFN:
369 raise OSError()
370 else:
371 return orig_lstat(fn)
372 os.lstat = raiser
373
374 os.mkdir(TESTFN)
375 write_file((TESTFN, 'foo'), 'foo')
376 shutil.rmtree(TESTFN)
377 finally:
378 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000379
Hynek Schlawack2100b422012-06-23 20:28:32 +0200380 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200381 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
382 os.supports_dir_fd and
383 os.listdir in os.supports_fd and
384 os.stat in os.supports_follow_symlinks)
385 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200386 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000387 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200388 tmp_dir = self.mkdtemp()
389 d = os.path.join(tmp_dir, 'a')
390 os.mkdir(d)
391 try:
392 real_rmtree = shutil._rmtree_safe_fd
393 class Called(Exception): pass
394 def _raiser(*args, **kwargs):
395 raise Called
396 shutil._rmtree_safe_fd = _raiser
397 self.assertRaises(Called, shutil.rmtree, d)
398 finally:
399 shutil._rmtree_safe_fd = real_rmtree
400 else:
401 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000402 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200403
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000404 def test_rmtree_dont_delete_file(self):
405 # When called on a file instead of a directory, don't delete it.
Steve Dowerabde52c2019-11-15 09:49:21 -0800406 handle, path = tempfile.mkstemp(dir=self.mkdtemp())
Victor Stinnerbf816222011-06-30 23:25:47 +0200407 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200408 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000409 os.remove(path)
410
Hai Shi0c4f0f32020-06-30 21:46:31 +0800411 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300412 def test_rmtree_on_symlink(self):
413 # bug 1669.
414 os.mkdir(TESTFN)
415 try:
416 src = os.path.join(TESTFN, 'cheese')
417 dst = os.path.join(TESTFN, 'shop')
418 os.mkdir(src)
419 os.symlink(src, dst)
420 self.assertRaises(OSError, shutil.rmtree, dst)
421 shutil.rmtree(dst, ignore_errors=True)
422 finally:
423 shutil.rmtree(TESTFN, ignore_errors=True)
424
425 @unittest.skipUnless(_winapi, 'only relevant on Windows')
426 def test_rmtree_on_junction(self):
427 os.mkdir(TESTFN)
428 try:
429 src = os.path.join(TESTFN, 'cheese')
430 dst = os.path.join(TESTFN, 'shop')
431 os.mkdir(src)
432 open(os.path.join(src, 'spam'), 'wb').close()
433 _winapi.CreateJunction(src, dst)
434 self.assertRaises(OSError, shutil.rmtree, dst)
435 shutil.rmtree(dst, ignore_errors=True)
436 finally:
437 shutil.rmtree(TESTFN, ignore_errors=True)
438
439
440class TestCopyTree(BaseTest, unittest.TestCase):
441
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000442 def test_copytree_simple(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800443 src_dir = self.mkdtemp()
444 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200445 self.addCleanup(shutil.rmtree, src_dir)
446 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
447 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000448 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200449 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000450
Éric Araujoa7e33a12011-08-12 19:51:35 +0200451 shutil.copytree(src_dir, dst_dir)
452 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
453 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
454 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
455 'test.txt')))
456 actual = read_file((dst_dir, 'test.txt'))
457 self.assertEqual(actual, '123')
458 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
459 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000460
jab9e00d9e2018-12-28 13:03:40 -0500461 def test_copytree_dirs_exist_ok(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800462 src_dir = self.mkdtemp()
463 dst_dir = self.mkdtemp()
jab9e00d9e2018-12-28 13:03:40 -0500464 self.addCleanup(shutil.rmtree, src_dir)
465 self.addCleanup(shutil.rmtree, dst_dir)
466
467 write_file((src_dir, 'nonexisting.txt'), '123')
468 os.mkdir(os.path.join(src_dir, 'existing_dir'))
469 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
470 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
471 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
472
473 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
474 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
475 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
476 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
477 'existing.txt')))
478 actual = read_file((dst_dir, 'nonexisting.txt'))
479 self.assertEqual(actual, '123')
480 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
481 self.assertEqual(actual, 'has been replaced')
482
483 with self.assertRaises(FileExistsError):
484 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
485
Hai Shi0c4f0f32020-06-30 21:46:31 +0800486 @os_helper.skip_unless_symlink
Antoine Pitrou78091e62011-12-29 18:54:15 +0100487 def test_copytree_symlinks(self):
488 tmp_dir = self.mkdtemp()
489 src_dir = os.path.join(tmp_dir, 'src')
490 dst_dir = os.path.join(tmp_dir, 'dst')
491 sub_dir = os.path.join(src_dir, 'sub')
492 os.mkdir(src_dir)
493 os.mkdir(sub_dir)
494 write_file((src_dir, 'file.txt'), 'foo')
495 src_link = os.path.join(sub_dir, 'link')
496 dst_link = os.path.join(dst_dir, 'sub/link')
497 os.symlink(os.path.join(src_dir, 'file.txt'),
498 src_link)
499 if hasattr(os, 'lchmod'):
500 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
501 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
502 os.lchflags(src_link, stat.UF_NODUMP)
503 src_stat = os.lstat(src_link)
504 shutil.copytree(src_dir, dst_dir, symlinks=True)
505 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
Steve Dowerdf2d4a62019-08-21 15:27:33 -0700506 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
507 # Bad practice to blindly strip the prefix as it may be required to
508 # correctly refer to the file, but we're only comparing paths here.
509 if os.name == 'nt' and actual.startswith('\\\\?\\'):
510 actual = actual[4:]
511 self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
Antoine Pitrou78091e62011-12-29 18:54:15 +0100512 dst_stat = os.lstat(dst_link)
513 if hasattr(os, 'lchmod'):
514 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
515 if hasattr(os, 'lchflags'):
516 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
517
Georg Brandl2ee470f2008-07-16 12:55:28 +0000518 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000519 # creating data
520 join = os.path.join
521 exists = os.path.exists
Steve Dowerabde52c2019-11-15 09:49:21 -0800522 src_dir = self.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000523 try:
Steve Dowerabde52c2019-11-15 09:49:21 -0800524 dst_dir = join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200525 write_file((src_dir, 'test.txt'), '123')
526 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000527 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200528 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000529 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200530 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000531 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
532 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200533 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
534 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000535
536 # testing glob-like patterns
537 try:
538 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
539 shutil.copytree(src_dir, dst_dir, ignore=patterns)
540 # checking the result: some elements should not be copied
541 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200542 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
543 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000544 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200545 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000546 try:
547 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
548 shutil.copytree(src_dir, dst_dir, ignore=patterns)
549 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200550 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
551 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
552 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000553 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200554 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000555
556 # testing callable-style
557 try:
558 def _filter(src, names):
559 res = []
560 for name in names:
561 path = os.path.join(src, name)
562
563 if (os.path.isdir(path) and
564 path.split()[-1] == 'subdir'):
565 res.append(name)
566 elif os.path.splitext(path)[-1] in ('.py'):
567 res.append(name)
568 return res
569
570 shutil.copytree(src_dir, dst_dir, ignore=_filter)
571
572 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200573 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
574 'test.py')))
575 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000576
577 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200578 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000579 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000580 shutil.rmtree(src_dir)
581 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000582
mbarkhau88704332020-01-24 14:51:16 +0000583 def test_copytree_arg_types_of_ignore(self):
584 join = os.path.join
585 exists = os.path.exists
586
587 tmp_dir = self.mkdtemp()
588 src_dir = join(tmp_dir, "source")
589
590 os.mkdir(join(src_dir))
591 os.mkdir(join(src_dir, 'test_dir'))
592 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
593 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
594
595 invokations = []
596
597 def _ignore(src, names):
598 invokations.append(src)
599 self.assertIsInstance(src, str)
600 self.assertIsInstance(names, list)
601 self.assertEqual(len(names), len(set(names)))
602 for name in names:
603 self.assertIsInstance(name, str)
604 return []
605
606 dst_dir = join(self.mkdtemp(), 'destination')
607 shutil.copytree(src_dir, dst_dir, ignore=_ignore)
608 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
609 'test.txt')))
610
611 dst_dir = join(self.mkdtemp(), 'destination')
612 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
613 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
614 'test.txt')))
615
616 dst_dir = join(self.mkdtemp(), 'destination')
617 src_dir_entry = list(os.scandir(tmp_dir))[0]
618 self.assertIsInstance(src_dir_entry, os.DirEntry)
619 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
620 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
621 'test.txt')))
622
623 self.assertEqual(len(invokations), 9)
624
Antoine Pitrouac601602013-08-16 19:35:02 +0200625 def test_copytree_retains_permissions(self):
Steve Dowerabde52c2019-11-15 09:49:21 -0800626 tmp_dir = self.mkdtemp()
Antoine Pitrouac601602013-08-16 19:35:02 +0200627 src_dir = os.path.join(tmp_dir, 'source')
628 os.mkdir(src_dir)
629 dst_dir = os.path.join(tmp_dir, 'destination')
630 self.addCleanup(shutil.rmtree, tmp_dir)
631
632 os.chmod(src_dir, 0o777)
633 write_file((src_dir, 'permissive.txt'), '123')
634 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
635 write_file((src_dir, 'restrictive.txt'), '456')
636 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
637 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +0800638 self.addCleanup(os_helper.rmtree, restrictive_subdir)
Antoine Pitrouac601602013-08-16 19:35:02 +0200639 os.chmod(restrictive_subdir, 0o600)
640
641 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400642 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
643 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200644 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400645 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200646 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
647 restrictive_subdir_dst = os.path.join(dst_dir,
648 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400649 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200650 os.stat(restrictive_subdir_dst).st_mode)
651
Berker Peksag884afd92014-12-10 02:50:32 +0200652 @unittest.mock.patch('os.chmod')
653 def test_copytree_winerror(self, mock_patch):
654 # When copying to VFAT, copystat() raises OSError. On Windows, the
655 # exception object has a meaningful 'winerror' attribute, but not
656 # on other operating systems. Do not assume 'winerror' is set.
Steve Dowerabde52c2019-11-15 09:49:21 -0800657 src_dir = self.mkdtemp()
658 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Berker Peksag884afd92014-12-10 02:50:32 +0200659 self.addCleanup(shutil.rmtree, src_dir)
660 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
661
662 mock_patch.side_effect = PermissionError('ka-boom')
663 with self.assertRaises(shutil.Error):
664 shutil.copytree(src_dir, dst_dir)
665
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100666 def test_copytree_custom_copy_function(self):
667 # See: https://bugs.python.org/issue35648
668 def custom_cpfun(a, b):
669 flag.append(None)
670 self.assertIsInstance(a, str)
671 self.assertIsInstance(b, str)
672 self.assertEqual(a, os.path.join(src, 'foo'))
673 self.assertEqual(b, os.path.join(dst, 'foo'))
674
675 flag = []
Steve Dowerabde52c2019-11-15 09:49:21 -0800676 src = self.mkdtemp()
677 dst = tempfile.mktemp(dir=self.mkdtemp())
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100678 with open(os.path.join(src, 'foo'), 'w') as f:
679 f.close()
680 shutil.copytree(src, dst, copy_function=custom_cpfun)
681 self.assertEqual(len(flag), 1)
682
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300683 # Issue #3002: copyfile and copytree block indefinitely on named pipes
684 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800685 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300686 def test_copytree_named_pipe(self):
687 os.mkdir(TESTFN)
688 try:
689 subdir = os.path.join(TESTFN, "subdir")
690 os.mkdir(subdir)
691 pipe = os.path.join(subdir, "mypipe")
692 try:
693 os.mkfifo(pipe)
694 except PermissionError as e:
695 self.skipTest('os.mkfifo(): %s' % e)
696 try:
697 shutil.copytree(TESTFN, TESTFN2)
698 except shutil.Error as e:
699 errors = e.args[0]
700 self.assertEqual(len(errors), 1)
701 src, dst, error_msg = errors[0]
702 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
703 else:
704 self.fail("shutil.Error should have been raised")
705 finally:
706 shutil.rmtree(TESTFN, ignore_errors=True)
707 shutil.rmtree(TESTFN2, ignore_errors=True)
708
709 def test_copytree_special_func(self):
710 src_dir = self.mkdtemp()
711 dst_dir = os.path.join(self.mkdtemp(), 'destination')
712 write_file((src_dir, 'test.txt'), '123')
713 os.mkdir(os.path.join(src_dir, 'test_dir'))
714 write_file((src_dir, 'test_dir', 'test.txt'), '456')
715
716 copied = []
717 def _copy(src, dst):
718 copied.append((src, dst))
719
720 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
721 self.assertEqual(len(copied), 2)
722
Hai Shi0c4f0f32020-06-30 21:46:31 +0800723 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300724 def test_copytree_dangling_symlinks(self):
725 # a dangling symlink raises an error at the end
726 src_dir = self.mkdtemp()
727 dst_dir = os.path.join(self.mkdtemp(), 'destination')
728 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
729 os.mkdir(os.path.join(src_dir, 'test_dir'))
730 write_file((src_dir, 'test_dir', 'test.txt'), '456')
731 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
732
733 # a dangling symlink is ignored with the proper flag
734 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
735 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
736 self.assertNotIn('test.txt', os.listdir(dst_dir))
737
738 # a dangling symlink is copied if symlinks=True
739 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
740 shutil.copytree(src_dir, dst_dir, symlinks=True)
741 self.assertIn('test.txt', os.listdir(dst_dir))
742
Hai Shi0c4f0f32020-06-30 21:46:31 +0800743 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300744 def test_copytree_symlink_dir(self):
745 src_dir = self.mkdtemp()
746 dst_dir = os.path.join(self.mkdtemp(), 'destination')
747 os.mkdir(os.path.join(src_dir, 'real_dir'))
748 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
749 pass
750 os.symlink(os.path.join(src_dir, 'real_dir'),
751 os.path.join(src_dir, 'link_to_dir'),
752 target_is_directory=True)
753
754 shutil.copytree(src_dir, dst_dir, symlinks=False)
755 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
756 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
757
758 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
759 shutil.copytree(src_dir, dst_dir, symlinks=True)
760 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
761 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
762
763 def test_copytree_return_value(self):
764 # copytree returns its destination path.
765 src_dir = self.mkdtemp()
766 dst_dir = src_dir + "dest"
767 self.addCleanup(shutil.rmtree, dst_dir, True)
768 src = os.path.join(src_dir, 'foo')
769 write_file(src, 'foo')
770 rv = shutil.copytree(src_dir, dst_dir)
771 self.assertEqual(['foo'], os.listdir(rv))
772
Bruno P. Kinoshita9bbcbc92019-11-27 14:10:37 +1300773 def test_copytree_subdirectory(self):
774 # copytree where dst is a subdirectory of src, see Issue 38688
775 base_dir = self.mkdtemp()
776 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
777 src_dir = os.path.join(base_dir, "t", "pg")
778 dst_dir = os.path.join(src_dir, "somevendor", "1.0")
779 os.makedirs(src_dir)
780 src = os.path.join(src_dir, 'pol')
781 write_file(src, 'pol')
782 rv = shutil.copytree(src_dir, dst_dir)
783 self.assertEqual(['pol'], os.listdir(rv))
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300784
785class TestCopy(BaseTest, unittest.TestCase):
786
787 ### shutil.copymode
788
Hai Shi0c4f0f32020-06-30 21:46:31 +0800789 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300790 def test_copymode_follow_symlinks(self):
791 tmp_dir = self.mkdtemp()
792 src = os.path.join(tmp_dir, 'foo')
793 dst = os.path.join(tmp_dir, 'bar')
794 src_link = os.path.join(tmp_dir, 'baz')
795 dst_link = os.path.join(tmp_dir, 'quux')
796 write_file(src, 'foo')
797 write_file(dst, 'foo')
798 os.symlink(src, src_link)
799 os.symlink(dst, dst_link)
800 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
801 # file to file
802 os.chmod(dst, stat.S_IRWXO)
803 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
804 shutil.copymode(src, dst)
805 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
806 # On Windows, os.chmod does not follow symlinks (issue #15411)
807 if os.name != 'nt':
808 # follow src link
809 os.chmod(dst, stat.S_IRWXO)
810 shutil.copymode(src_link, dst)
811 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
812 # follow dst link
813 os.chmod(dst, stat.S_IRWXO)
814 shutil.copymode(src, dst_link)
815 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
816 # follow both links
817 os.chmod(dst, stat.S_IRWXO)
818 shutil.copymode(src_link, dst_link)
819 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
820
821 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800822 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300823 def test_copymode_symlink_to_symlink(self):
824 tmp_dir = self.mkdtemp()
825 src = os.path.join(tmp_dir, 'foo')
826 dst = os.path.join(tmp_dir, 'bar')
827 src_link = os.path.join(tmp_dir, 'baz')
828 dst_link = os.path.join(tmp_dir, 'quux')
829 write_file(src, 'foo')
830 write_file(dst, 'foo')
831 os.symlink(src, src_link)
832 os.symlink(dst, dst_link)
833 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
834 os.chmod(dst, stat.S_IRWXU)
835 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
836 # link to link
837 os.lchmod(dst_link, stat.S_IRWXO)
838 shutil.copymode(src_link, dst_link, follow_symlinks=False)
839 self.assertEqual(os.lstat(src_link).st_mode,
840 os.lstat(dst_link).st_mode)
841 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
842 # src link - use chmod
843 os.lchmod(dst_link, stat.S_IRWXO)
844 shutil.copymode(src_link, dst, follow_symlinks=False)
845 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
846 # dst link - use chmod
847 os.lchmod(dst_link, stat.S_IRWXO)
848 shutil.copymode(src, dst_link, follow_symlinks=False)
849 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
850
851 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
Hai Shi0c4f0f32020-06-30 21:46:31 +0800852 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300853 def test_copymode_symlink_to_symlink_wo_lchmod(self):
854 tmp_dir = self.mkdtemp()
855 src = os.path.join(tmp_dir, 'foo')
856 dst = os.path.join(tmp_dir, 'bar')
857 src_link = os.path.join(tmp_dir, 'baz')
858 dst_link = os.path.join(tmp_dir, 'quux')
859 write_file(src, 'foo')
860 write_file(dst, 'foo')
861 os.symlink(src, src_link)
862 os.symlink(dst, dst_link)
863 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
864
865 ### shutil.copystat
866
Hai Shi0c4f0f32020-06-30 21:46:31 +0800867 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300868 def test_copystat_symlinks(self):
869 tmp_dir = self.mkdtemp()
870 src = os.path.join(tmp_dir, 'foo')
871 dst = os.path.join(tmp_dir, 'bar')
872 src_link = os.path.join(tmp_dir, 'baz')
873 dst_link = os.path.join(tmp_dir, 'qux')
874 write_file(src, 'foo')
875 src_stat = os.stat(src)
876 os.utime(src, (src_stat.st_atime,
877 src_stat.st_mtime - 42.0)) # ensure different mtimes
878 write_file(dst, 'bar')
879 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
880 os.symlink(src, src_link)
881 os.symlink(dst, dst_link)
882 if hasattr(os, 'lchmod'):
883 os.lchmod(src_link, stat.S_IRWXO)
884 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
885 os.lchflags(src_link, stat.UF_NODUMP)
886 src_link_stat = os.lstat(src_link)
887 # follow
888 if hasattr(os, 'lchmod'):
889 shutil.copystat(src_link, dst_link, follow_symlinks=True)
890 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
891 # don't follow
892 shutil.copystat(src_link, dst_link, follow_symlinks=False)
893 dst_link_stat = os.lstat(dst_link)
894 if os.utime in os.supports_follow_symlinks:
895 for attr in 'st_atime', 'st_mtime':
896 # The modification times may be truncated in the new file.
897 self.assertLessEqual(getattr(src_link_stat, attr),
898 getattr(dst_link_stat, attr) + 1)
899 if hasattr(os, 'lchmod'):
900 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
901 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
902 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
903 # tell to follow but dst is not a link
904 shutil.copystat(src_link, dst, follow_symlinks=False)
905 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
906 00000.1)
907
908 @unittest.skipUnless(hasattr(os, 'chflags') and
909 hasattr(errno, 'EOPNOTSUPP') and
910 hasattr(errno, 'ENOTSUP'),
911 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
912 def test_copystat_handles_harmless_chflags_errors(self):
913 tmpdir = self.mkdtemp()
914 file1 = os.path.join(tmpdir, 'file1')
915 file2 = os.path.join(tmpdir, 'file2')
916 write_file(file1, 'xxx')
917 write_file(file2, 'xxx')
918
919 def make_chflags_raiser(err):
920 ex = OSError()
921
922 def _chflags_raiser(path, flags, *, follow_symlinks=True):
923 ex.errno = err
924 raise ex
925 return _chflags_raiser
926 old_chflags = os.chflags
927 try:
928 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
929 os.chflags = make_chflags_raiser(err)
930 shutil.copystat(file1, file2)
931 # assert others errors break it
932 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
933 self.assertRaises(OSError, shutil.copystat, file1, file2)
934 finally:
935 os.chflags = old_chflags
936
937 ### shutil.copyxattr
938
Hai Shi0c4f0f32020-06-30 21:46:31 +0800939 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +0300940 def test_copyxattr(self):
941 tmp_dir = self.mkdtemp()
942 src = os.path.join(tmp_dir, 'foo')
943 write_file(src, 'foo')
944 dst = os.path.join(tmp_dir, 'bar')
945 write_file(dst, 'bar')
946
947 # no xattr == no problem
948 shutil._copyxattr(src, dst)
949 # common case
950 os.setxattr(src, 'user.foo', b'42')
951 os.setxattr(src, 'user.bar', b'43')
952 shutil._copyxattr(src, dst)
953 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
954 self.assertEqual(
955 os.getxattr(src, 'user.foo'),
956 os.getxattr(dst, 'user.foo'))
957 # check errors don't affect other attrs
958 os.remove(dst)
959 write_file(dst, 'bar')
960 os_error = OSError(errno.EPERM, 'EPERM')
961
962 def _raise_on_user_foo(fname, attr, val, **kwargs):
963 if attr == 'user.foo':
964 raise os_error
965 else:
966 orig_setxattr(fname, attr, val, **kwargs)
967 try:
968 orig_setxattr = os.setxattr
969 os.setxattr = _raise_on_user_foo
970 shutil._copyxattr(src, dst)
971 self.assertIn('user.bar', os.listxattr(dst))
972 finally:
973 os.setxattr = orig_setxattr
974 # the source filesystem not supporting xattrs should be ok, too.
975 def _raise_on_src(fname, *, follow_symlinks=True):
976 if fname == src:
977 raise OSError(errno.ENOTSUP, 'Operation not supported')
978 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
979 try:
980 orig_listxattr = os.listxattr
981 os.listxattr = _raise_on_src
982 shutil._copyxattr(src, dst)
983 finally:
984 os.listxattr = orig_listxattr
985
986 # test that shutil.copystat copies xattrs
987 src = os.path.join(tmp_dir, 'the_original')
988 srcro = os.path.join(tmp_dir, 'the_original_ro')
989 write_file(src, src)
990 write_file(srcro, srcro)
991 os.setxattr(src, 'user.the_value', b'fiddly')
992 os.setxattr(srcro, 'user.the_value', b'fiddly')
993 os.chmod(srcro, 0o444)
994 dst = os.path.join(tmp_dir, 'the_copy')
995 dstro = os.path.join(tmp_dir, 'the_copy_ro')
996 write_file(dst, dst)
997 write_file(dstro, dstro)
998 shutil.copystat(src, dst)
999 shutil.copystat(srcro, dstro)
1000 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1001 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1002
Hai Shi0c4f0f32020-06-30 21:46:31 +08001003 @os_helper.skip_unless_symlink
1004 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001005 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
1006 'root privileges required')
1007 def test_copyxattr_symlinks(self):
1008 # On Linux, it's only possible to access non-user xattr for symlinks;
1009 # which in turn require root privileges. This test should be expanded
1010 # as soon as other platforms gain support for extended attributes.
1011 tmp_dir = self.mkdtemp()
1012 src = os.path.join(tmp_dir, 'foo')
1013 src_link = os.path.join(tmp_dir, 'baz')
1014 write_file(src, 'foo')
1015 os.symlink(src, src_link)
1016 os.setxattr(src, 'trusted.foo', b'42')
1017 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1018 dst = os.path.join(tmp_dir, 'bar')
1019 dst_link = os.path.join(tmp_dir, 'qux')
1020 write_file(dst, 'bar')
1021 os.symlink(dst, dst_link)
1022 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1023 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1024 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1025 shutil._copyxattr(src_link, dst, follow_symlinks=False)
1026 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1027
1028 ### shutil.copy
1029
1030 def _copy_file(self, method):
1031 fname = 'test.txt'
1032 tmpdir = self.mkdtemp()
1033 write_file((tmpdir, fname), 'xxx')
1034 file1 = os.path.join(tmpdir, fname)
1035 tmpdir2 = self.mkdtemp()
1036 method(file1, tmpdir2)
1037 file2 = os.path.join(tmpdir2, fname)
1038 return (file1, file2)
1039
1040 def test_copy(self):
1041 # Ensure that the copied file exists and has the same mode bits.
1042 file1, file2 = self._copy_file(shutil.copy)
1043 self.assertTrue(os.path.exists(file2))
1044 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1045
Hai Shi0c4f0f32020-06-30 21:46:31 +08001046 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001047 def test_copy_symlinks(self):
1048 tmp_dir = self.mkdtemp()
1049 src = os.path.join(tmp_dir, 'foo')
1050 dst = os.path.join(tmp_dir, 'bar')
1051 src_link = os.path.join(tmp_dir, 'baz')
1052 write_file(src, 'foo')
1053 os.symlink(src, src_link)
1054 if hasattr(os, 'lchmod'):
1055 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1056 # don't follow
1057 shutil.copy(src_link, dst, follow_symlinks=True)
1058 self.assertFalse(os.path.islink(dst))
1059 self.assertEqual(read_file(src), read_file(dst))
1060 os.remove(dst)
1061 # follow
1062 shutil.copy(src_link, dst, follow_symlinks=False)
1063 self.assertTrue(os.path.islink(dst))
1064 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1065 if hasattr(os, 'lchmod'):
1066 self.assertEqual(os.lstat(src_link).st_mode,
1067 os.lstat(dst).st_mode)
1068
1069 ### shutil.copy2
1070
1071 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1072 def test_copy2(self):
1073 # Ensure that the copied file exists and has the same mode and
1074 # modification time bits.
1075 file1, file2 = self._copy_file(shutil.copy2)
1076 self.assertTrue(os.path.exists(file2))
1077 file1_stat = os.stat(file1)
1078 file2_stat = os.stat(file2)
1079 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1080 for attr in 'st_atime', 'st_mtime':
1081 # The modification times may be truncated in the new file.
1082 self.assertLessEqual(getattr(file1_stat, attr),
1083 getattr(file2_stat, attr) + 1)
1084 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1085 self.assertEqual(getattr(file1_stat, 'st_flags'),
1086 getattr(file2_stat, 'st_flags'))
1087
Hai Shi0c4f0f32020-06-30 21:46:31 +08001088 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001089 def test_copy2_symlinks(self):
1090 tmp_dir = self.mkdtemp()
1091 src = os.path.join(tmp_dir, 'foo')
1092 dst = os.path.join(tmp_dir, 'bar')
1093 src_link = os.path.join(tmp_dir, 'baz')
1094 write_file(src, 'foo')
1095 os.symlink(src, src_link)
1096 if hasattr(os, 'lchmod'):
1097 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1098 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1099 os.lchflags(src_link, stat.UF_NODUMP)
1100 src_stat = os.stat(src)
1101 src_link_stat = os.lstat(src_link)
1102 # follow
1103 shutil.copy2(src_link, dst, follow_symlinks=True)
1104 self.assertFalse(os.path.islink(dst))
1105 self.assertEqual(read_file(src), read_file(dst))
1106 os.remove(dst)
1107 # don't follow
1108 shutil.copy2(src_link, dst, follow_symlinks=False)
1109 self.assertTrue(os.path.islink(dst))
1110 self.assertEqual(os.readlink(dst), os.readlink(src_link))
1111 dst_stat = os.lstat(dst)
1112 if os.utime in os.supports_follow_symlinks:
1113 for attr in 'st_atime', 'st_mtime':
1114 # The modification times may be truncated in the new file.
1115 self.assertLessEqual(getattr(src_link_stat, attr),
1116 getattr(dst_stat, attr) + 1)
1117 if hasattr(os, 'lchmod'):
1118 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1119 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1120 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1121 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1122
Hai Shi0c4f0f32020-06-30 21:46:31 +08001123 @os_helper.skip_unless_xattr
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001124 def test_copy2_xattr(self):
1125 tmp_dir = self.mkdtemp()
1126 src = os.path.join(tmp_dir, 'foo')
1127 dst = os.path.join(tmp_dir, 'bar')
1128 write_file(src, 'foo')
1129 os.setxattr(src, 'user.foo', b'42')
1130 shutil.copy2(src, dst)
1131 self.assertEqual(
1132 os.getxattr(src, 'user.foo'),
1133 os.getxattr(dst, 'user.foo'))
1134 os.remove(dst)
1135
1136 def test_copy_return_value(self):
1137 # copy and copy2 both return their destination path.
1138 for fn in (shutil.copy, shutil.copy2):
1139 src_dir = self.mkdtemp()
1140 dst_dir = self.mkdtemp()
1141 src = os.path.join(src_dir, 'foo')
1142 write_file(src, 'foo')
1143 rv = fn(src, dst_dir)
1144 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1145 rv = fn(src, os.path.join(dst_dir, 'bar'))
1146 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1147
1148 ### shutil.copyfile
1149
Hai Shi0c4f0f32020-06-30 21:46:31 +08001150 @os_helper.skip_unless_symlink
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001151 def test_copyfile_symlinks(self):
1152 tmp_dir = self.mkdtemp()
1153 src = os.path.join(tmp_dir, 'src')
1154 dst = os.path.join(tmp_dir, 'dst')
1155 dst_link = os.path.join(tmp_dir, 'dst_link')
1156 link = os.path.join(tmp_dir, 'link')
1157 write_file(src, 'foo')
1158 os.symlink(src, link)
1159 # don't follow
1160 shutil.copyfile(link, dst_link, follow_symlinks=False)
1161 self.assertTrue(os.path.islink(dst_link))
1162 self.assertEqual(os.readlink(link), os.readlink(dst_link))
1163 # follow
1164 shutil.copyfile(link, dst)
1165 self.assertFalse(os.path.islink(dst))
1166
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001167 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001168 def test_dont_copy_file_onto_link_to_itself(self):
1169 # bug 851123.
1170 os.mkdir(TESTFN)
1171 src = os.path.join(TESTFN, 'cheese')
1172 dst = os.path.join(TESTFN, 'shop')
1173 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001174 with open(src, 'w') as f:
1175 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +01001176 try:
1177 os.link(src, dst)
1178 except PermissionError as e:
1179 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +02001180 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001181 with open(src, 'r') as f:
1182 self.assertEqual(f.read(), 'cheddar')
1183 os.remove(dst)
1184 finally:
1185 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001186
Hai Shi0c4f0f32020-06-30 21:46:31 +08001187 @os_helper.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001188 def test_dont_copy_file_onto_symlink_to_itself(self):
1189 # bug 851123.
1190 os.mkdir(TESTFN)
1191 src = os.path.join(TESTFN, 'cheese')
1192 dst = os.path.join(TESTFN, 'shop')
1193 try:
1194 with open(src, 'w') as f:
1195 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001196 # Using `src` here would mean we end up with a symlink pointing
1197 # to TESTFN/TESTFN/cheese, while it should point at
1198 # TESTFN/cheese.
1199 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +02001200 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001201 with open(src, 'r') as f:
1202 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +00001203 os.remove(dst)
1204 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +00001205 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +00001206
Serhiy Storchaka43767632013-11-03 21:31:38 +02001207 # Issue #3002: copyfile and copytree block indefinitely on named pipes
1208 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1209 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +01001210 try:
1211 os.mkfifo(TESTFN)
1212 except PermissionError as e:
1213 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001214 try:
1215 self.assertRaises(shutil.SpecialFileError,
1216 shutil.copyfile, TESTFN, TESTFN2)
1217 self.assertRaises(shutil.SpecialFileError,
1218 shutil.copyfile, __file__, TESTFN)
1219 finally:
1220 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +00001221
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001222 def test_copyfile_return_value(self):
1223 # copytree returns its destination path.
Tarek Ziadé5340db32010-04-19 22:30:51 +00001224 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001225 dst_dir = self.mkdtemp()
1226 dst_file = os.path.join(dst_dir, 'bar')
1227 src_file = os.path.join(src_dir, 'foo')
1228 write_file(src_file, 'foo')
1229 rv = shutil.copyfile(src_file, dst_file)
1230 self.assertTrue(os.path.exists(rv))
1231 self.assertEqual(read_file(src_file), read_file(dst_file))
Tarek Ziadé5340db32010-04-19 22:30:51 +00001232
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001233 def test_copyfile_same_file(self):
1234 # copyfile() should raise SameFileError if the source and destination
1235 # are the same.
Tarek Ziadéfb437512010-04-20 08:57:33 +00001236 src_dir = self.mkdtemp()
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001237 src_file = os.path.join(src_dir, 'foo')
1238 write_file(src_file, 'foo')
1239 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1240 # But Error should work too, to stay backward compatible.
1241 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1242 # Make sure file is not corrupted.
1243 self.assertEqual(read_file(src_file), 'foo')
Tarek Ziadéfb437512010-04-20 08:57:33 +00001244
Tarek Ziadéfb437512010-04-20 08:57:33 +00001245
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001246class TestArchives(BaseTest, unittest.TestCase):
Tarek Ziadéfb437512010-04-20 08:57:33 +00001247
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001248 ### shutil.make_archive
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001249
Hai Shia3ec3ad2020-05-19 06:02:57 +08001250 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001251 def test_make_tarball(self):
1252 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001253 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001254
1255 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001256 # force shutil to create the directory
1257 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001258 # working with relative paths
1259 work_dir = os.path.dirname(tmpdir2)
1260 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001261
Hai Shi0c4f0f32020-06-30 21:46:31 +08001262 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001263 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001264 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001265
1266 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001267 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001268 self.assertTrue(os.path.isfile(tarball))
1269 self.assertTrue(tarfile.is_tarfile(tarball))
1270 with tarfile.open(tarball, 'r:gz') as tf:
1271 self.assertCountEqual(tf.getnames(),
1272 ['.', './sub', './sub2',
1273 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001274
1275 # trying an uncompressed one
Hai Shi0c4f0f32020-06-30 21:46:31 +08001276 with os_helper.change_cwd(work_dir):
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001277 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001278 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001279 self.assertTrue(os.path.isfile(tarball))
1280 self.assertTrue(tarfile.is_tarfile(tarball))
1281 with tarfile.open(tarball, 'r') as tf:
1282 self.assertCountEqual(tf.getnames(),
1283 ['.', './sub', './sub2',
1284 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001285
1286 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001287 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001288 names = tar.getnames()
1289 names.sort()
1290 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001291
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001292 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001293 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001294 root_dir = self.mkdtemp()
1295 dist = os.path.join(root_dir, base_dir)
1296 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001297 write_file((dist, 'file1'), 'xxx')
1298 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001299 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001300 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001301 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001302 if base_dir:
1303 write_file((root_dir, 'outer'), 'xxx')
1304 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001305
Hai Shia3ec3ad2020-05-19 06:02:57 +08001306 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001307 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001308 'Need the tar command to run')
1309 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001310 root_dir, base_dir = self._create_files()
1311 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001312 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001313
1314 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001315 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001316 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001317
1318 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001319 tarball2 = os.path.join(root_dir, 'archive2.tar')
1320 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001321 subprocess.check_call(tar_cmd, cwd=root_dir,
1322 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001323
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001324 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001325 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001326 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001327
1328 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001329 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1330 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001331 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001332
1333 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001334 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1335 dry_run=True)
1336 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001337 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001338
Hai Shia3ec3ad2020-05-19 06:02:57 +08001339 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001340 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001341 # creating something to zip
1342 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001343
1344 tmpdir2 = self.mkdtemp()
1345 # force shutil to create the directory
1346 os.rmdir(tmpdir2)
1347 # working with relative paths
1348 work_dir = os.path.dirname(tmpdir2)
1349 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001350
Hai Shi0c4f0f32020-06-30 21:46:31 +08001351 with os_helper.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001352 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001353 res = make_archive(rel_base_name, 'zip', root_dir)
1354
1355 self.assertEqual(res, base_name + '.zip')
1356 self.assertTrue(os.path.isfile(res))
1357 self.assertTrue(zipfile.is_zipfile(res))
1358 with zipfile.ZipFile(res) as zf:
1359 self.assertCountEqual(zf.namelist(),
1360 ['dist/', 'dist/sub/', 'dist/sub2/',
1361 'dist/file1', 'dist/file2', 'dist/sub/file3',
1362 'outer'])
1363
Hai Shi0c4f0f32020-06-30 21:46:31 +08001364 with os_helper.change_cwd(work_dir):
Serhiy Storchaka666de772016-10-23 15:55:09 +03001365 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001366 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001367
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001368 self.assertEqual(res, base_name + '.zip')
1369 self.assertTrue(os.path.isfile(res))
1370 self.assertTrue(zipfile.is_zipfile(res))
1371 with zipfile.ZipFile(res) as zf:
1372 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001373 ['dist/', 'dist/sub/', 'dist/sub2/',
1374 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001375
Hai Shia3ec3ad2020-05-19 06:02:57 +08001376 @support.requires_zlib()
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001377 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001378 'Need the zip command to run')
1379 def test_zipfile_vs_zip(self):
1380 root_dir, base_dir = self._create_files()
1381 base_name = os.path.join(self.mkdtemp(), 'archive')
1382 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1383
1384 # check if ZIP file was created
1385 self.assertEqual(archive, base_name + '.zip')
1386 self.assertTrue(os.path.isfile(archive))
1387
1388 # now create another ZIP file using `zip`
1389 archive2 = os.path.join(root_dir, 'archive2.zip')
1390 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001391 subprocess.check_call(zip_cmd, cwd=root_dir,
1392 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001393
1394 self.assertTrue(os.path.isfile(archive2))
1395 # let's compare both ZIP files
1396 with zipfile.ZipFile(archive) as zf:
1397 names = zf.namelist()
1398 with zipfile.ZipFile(archive2) as zf:
1399 names2 = zf.namelist()
1400 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001401
Hai Shia3ec3ad2020-05-19 06:02:57 +08001402 @support.requires_zlib()
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001403 @unittest.skipUnless(shutil.which('unzip'),
1404 'Need the unzip command to run')
1405 def test_unzip_zipfile(self):
1406 root_dir, base_dir = self._create_files()
1407 base_name = os.path.join(self.mkdtemp(), 'archive')
1408 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1409
1410 # check if ZIP file was created
1411 self.assertEqual(archive, base_name + '.zip')
1412 self.assertTrue(os.path.isfile(archive))
1413
1414 # now check the ZIP file using `unzip -t`
1415 zip_cmd = ['unzip', '-t', archive]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001416 with os_helper.change_cwd(root_dir):
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001417 try:
1418 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1419 except subprocess.CalledProcessError as exc:
1420 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001421 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001422 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001423 msg = "{}\n\n**Unzip Output**\n{}"
1424 self.fail(msg.format(exc, details))
1425
Tarek Ziadé396fad72010-02-23 05:30:31 +00001426 def test_make_archive(self):
1427 tmpdir = self.mkdtemp()
1428 base_name = os.path.join(tmpdir, 'archive')
1429 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1430
Hai Shia3ec3ad2020-05-19 06:02:57 +08001431 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001432 def test_make_archive_owner_group(self):
1433 # testing make_archive with owner and group, with various combinations
1434 # this works even if there's not gid/uid support
1435 if UID_GID_SUPPORT:
1436 group = grp.getgrgid(0)[0]
1437 owner = pwd.getpwuid(0)[0]
1438 else:
1439 group = owner = 'root'
1440
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001441 root_dir, base_dir = self._create_files()
1442 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001443 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1444 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001445 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001446
1447 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001448 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001449
1450 res = make_archive(base_name, 'tar', root_dir, base_dir,
1451 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001452 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001453
1454 res = make_archive(base_name, 'tar', root_dir, base_dir,
1455 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001456 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001457
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001458
Hai Shia3ec3ad2020-05-19 06:02:57 +08001459 @support.requires_zlib()
Tarek Ziadé396fad72010-02-23 05:30:31 +00001460 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1461 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001462 root_dir, base_dir = self._create_files()
1463 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001464 group = grp.getgrgid(0)[0]
1465 owner = pwd.getpwuid(0)[0]
Hai Shi0c4f0f32020-06-30 21:46:31 +08001466 with os_helper.change_cwd(root_dir):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001467 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1468 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001469
1470 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001471 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001472
1473 # now checks the rights
1474 archive = tarfile.open(archive_name)
1475 try:
1476 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001477 self.assertEqual(member.uid, 0)
1478 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001479 finally:
1480 archive.close()
1481
1482 def test_make_archive_cwd(self):
1483 current_dir = os.getcwd()
1484 def _breaks(*args, **kw):
1485 raise RuntimeError()
1486
1487 register_archive_format('xxx', _breaks, [], 'xxx file')
1488 try:
1489 try:
1490 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1491 except Exception:
1492 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001493 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001494 finally:
1495 unregister_archive_format('xxx')
1496
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001497 def test_make_tarfile_in_curdir(self):
1498 # Issue #21280
1499 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001500 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001501 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1502 self.assertTrue(os.path.isfile('test.tar'))
1503
Hai Shia3ec3ad2020-05-19 06:02:57 +08001504 @support.requires_zlib()
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001505 def test_make_zipfile_in_curdir(self):
1506 # Issue #21280
1507 root_dir = self.mkdtemp()
Hai Shi0c4f0f32020-06-30 21:46:31 +08001508 with os_helper.change_cwd(root_dir):
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001509 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1510 self.assertTrue(os.path.isfile('test.zip'))
1511
Tarek Ziadé396fad72010-02-23 05:30:31 +00001512 def test_register_archive_format(self):
1513
1514 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1515 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1516 1)
1517 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1518 [(1, 2), (1, 2, 3)])
1519
1520 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1521 formats = [name for name, params in get_archive_formats()]
1522 self.assertIn('xxx', formats)
1523
1524 unregister_archive_format('xxx')
1525 formats = [name for name, params in get_archive_formats()]
1526 self.assertNotIn('xxx', formats)
1527
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001528 ### shutil.unpack_archive
1529
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001530 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001531 self.check_unpack_archive_with_converter(format, lambda path: path)
1532 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001533 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001534
1535 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001536 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001537 expected = rlistdir(root_dir)
1538 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001539
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001540 base_name = os.path.join(self.mkdtemp(), 'archive')
1541 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001542
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001543 # let's try to unpack it now
1544 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001545 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001546 self.assertEqual(rlistdir(tmpdir2), expected)
1547
1548 # and again, this time with the format specified
1549 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001550 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001551 self.assertEqual(rlistdir(tmpdir3), expected)
1552
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001553 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1554 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001555
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001556 def test_unpack_archive_tar(self):
1557 self.check_unpack_archive('tar')
1558
Hai Shia3ec3ad2020-05-19 06:02:57 +08001559 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001560 def test_unpack_archive_gztar(self):
1561 self.check_unpack_archive('gztar')
1562
Hai Shia3ec3ad2020-05-19 06:02:57 +08001563 @support.requires_bz2()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001564 def test_unpack_archive_bztar(self):
1565 self.check_unpack_archive('bztar')
1566
Hai Shia3ec3ad2020-05-19 06:02:57 +08001567 @support.requires_lzma()
Michael Feltef110b12019-02-18 12:02:44 +01001568 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001569 def test_unpack_archive_xztar(self):
1570 self.check_unpack_archive('xztar')
1571
Hai Shia3ec3ad2020-05-19 06:02:57 +08001572 @support.requires_zlib()
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001573 def test_unpack_archive_zip(self):
1574 self.check_unpack_archive('zip')
1575
Martin Pantereb995702016-07-28 01:11:04 +00001576 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001577
1578 formats = get_unpack_formats()
1579
1580 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001581 self.assertEqual(extra, 1)
1582 self.assertEqual(filename, 'stuff.boo')
1583 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001584
1585 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1586 unpack_archive('stuff.boo', 'xx')
1587
1588 # trying to register a .boo unpacker again
1589 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1590 ['.boo'], _boo)
1591
1592 # should work now
1593 unregister_unpack_format('Boo')
1594 register_unpack_format('Boo2', ['.boo'], _boo)
1595 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1596 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1597
1598 # let's leave a clean state
1599 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001600 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001601
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001602
1603class TestMisc(BaseTest, unittest.TestCase):
1604
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001605 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1606 "disk_usage not available on this platform")
1607 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001608 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001609 for attr in ('total', 'used', 'free'):
1610 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001611 self.assertGreater(usage.total, 0)
1612 self.assertGreater(usage.used, 0)
1613 self.assertGreaterEqual(usage.free, 0)
1614 self.assertGreaterEqual(usage.total, usage.used)
1615 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001616
Victor Stinnerdc525f42018-12-11 12:05:21 +01001617 # bpo-32557: Check that disk_usage() also accepts a filename
1618 shutil.disk_usage(__file__)
1619
Sandro Tosid902a142011-08-22 23:28:27 +02001620 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1621 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1622 def test_chown(self):
Sandro Tosid902a142011-08-22 23:28:27 +02001623 dirname = self.mkdtemp()
1624 filename = tempfile.mktemp(dir=dirname)
1625 write_file(filename, 'testing chown function')
1626
1627 with self.assertRaises(ValueError):
1628 shutil.chown(filename)
1629
1630 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001631 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001632
1633 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001634 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001635
1636 with self.assertRaises(TypeError):
1637 shutil.chown(filename, b'spam')
1638
1639 with self.assertRaises(TypeError):
1640 shutil.chown(filename, 3.14)
1641
1642 uid = os.getuid()
1643 gid = os.getgid()
1644
1645 def check_chown(path, uid=None, gid=None):
1646 s = os.stat(filename)
1647 if uid is not None:
1648 self.assertEqual(uid, s.st_uid)
1649 if gid is not None:
1650 self.assertEqual(gid, s.st_gid)
1651
1652 shutil.chown(filename, uid, gid)
1653 check_chown(filename, uid, gid)
1654 shutil.chown(filename, uid)
1655 check_chown(filename, uid)
1656 shutil.chown(filename, user=uid)
1657 check_chown(filename, uid)
1658 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001659 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001660
1661 shutil.chown(dirname, uid, gid)
1662 check_chown(dirname, uid, gid)
1663 shutil.chown(dirname, uid)
1664 check_chown(dirname, uid)
1665 shutil.chown(dirname, user=uid)
1666 check_chown(dirname, uid)
1667 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001668 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001669
Matthias Braun52268942020-03-17 09:51:44 -07001670 try:
1671 user = pwd.getpwuid(uid)[0]
1672 group = grp.getgrgid(gid)[0]
1673 except KeyError:
1674 # On some systems uid/gid cannot be resolved.
1675 pass
1676 else:
1677 shutil.chown(filename, user, group)
1678 check_chown(filename, uid, gid)
1679 shutil.chown(dirname, user, group)
1680 check_chown(dirname, uid, gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001681
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001682
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001683class TestWhich(BaseTest, unittest.TestCase):
Brian Curtinc57a3452012-06-22 16:00:30 -05001684
1685 def setUp(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08001686 self.temp_dir = self.mkdtemp(prefix="Tmp")
Brian Curtinc57a3452012-06-22 16:00:30 -05001687 # Give the temp_file an ".exe" suffix for all.
1688 # It's needed on Windows and not harmful on other platforms.
1689 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001690 prefix="Tmp",
1691 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001692 os.chmod(self.temp_file.name, stat.S_IXUSR)
1693 self.addCleanup(self.temp_file.close)
1694 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001695 self.env_path = self.dir
1696 self.curdir = os.curdir
1697 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001698
1699 def test_basic(self):
1700 # Given an EXE in a directory, it should be returned.
1701 rv = shutil.which(self.file, path=self.dir)
1702 self.assertEqual(rv, self.temp_file.name)
1703
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001704 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001705 # When given the fully qualified path to an executable that exists,
1706 # it should be returned.
1707 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001708 self.assertEqual(rv, self.temp_file.name)
1709
1710 def test_relative_cmd(self):
1711 # When given the relative path with a directory part to an executable
1712 # that exists, it should be returned.
1713 base_dir, tail_dir = os.path.split(self.dir)
1714 relpath = os.path.join(tail_dir, self.file)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001715 with os_helper.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001716 rv = shutil.which(relpath, path=self.temp_dir)
1717 self.assertEqual(rv, relpath)
1718 # But it shouldn't be searched in PATH directories (issue #16957).
Hai Shi0c4f0f32020-06-30 21:46:31 +08001719 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001720 rv = shutil.which(relpath, path=base_dir)
1721 self.assertIsNone(rv)
1722
1723 def test_cwd(self):
1724 # Issue #16957
1725 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001726 with os_helper.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001727 rv = shutil.which(self.file, path=base_dir)
1728 if sys.platform == "win32":
1729 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001730 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001731 else:
1732 # Other platforms: shouldn't match in the current directory.
1733 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001734
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001735 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1736 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001737 def test_non_matching_mode(self):
1738 # Set the file read-only and ask for writeable files.
1739 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001740 if os.access(self.temp_file.name, os.W_OK):
1741 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001742 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1743 self.assertIsNone(rv)
1744
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001745 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001746 base_dir, tail_dir = os.path.split(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001747 with os_helper.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001748 rv = shutil.which(self.file, path=tail_dir)
1749 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001750
Brian Curtinc57a3452012-06-22 16:00:30 -05001751 def test_nonexistent_file(self):
1752 # Return None when no matching executable file is found on the path.
1753 rv = shutil.which("foo.exe", path=self.dir)
1754 self.assertIsNone(rv)
1755
1756 @unittest.skipUnless(sys.platform == "win32",
1757 "pathext check is Windows-only")
1758 def test_pathext_checking(self):
1759 # Ask for the file without the ".exe" extension, then ensure that
1760 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001761 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001762 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001763
Barry Warsaw618738b2013-04-16 11:05:03 -04001764 def test_environ_path(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001765 with os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001766 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001767 rv = shutil.which(self.file)
1768 self.assertEqual(rv, self.temp_file.name)
1769
Victor Stinner228a3c92019-04-17 16:26:36 +02001770 def test_environ_path_empty(self):
1771 # PATH='': no match
Hai Shi0c4f0f32020-06-30 21:46:31 +08001772 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001773 env['PATH'] = ''
1774 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1775 create=True), \
1776 support.swap_attr(os, 'defpath', self.dir), \
Hai Shi0c4f0f32020-06-30 21:46:31 +08001777 os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001778 rv = shutil.which(self.file)
1779 self.assertIsNone(rv)
1780
1781 def test_environ_path_cwd(self):
1782 expected_cwd = os.path.basename(self.temp_file.name)
1783 if sys.platform == "win32":
1784 curdir = os.curdir
1785 if isinstance(expected_cwd, bytes):
1786 curdir = os.fsencode(curdir)
1787 expected_cwd = os.path.join(curdir, expected_cwd)
1788
1789 # PATH=':': explicitly looks in the current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001790 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001791 env['PATH'] = os.pathsep
1792 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1793 create=True), \
1794 support.swap_attr(os, 'defpath', self.dir):
1795 rv = shutil.which(self.file)
1796 self.assertIsNone(rv)
1797
1798 # look in current directory
Hai Shi0c4f0f32020-06-30 21:46:31 +08001799 with os_helper.change_cwd(self.dir):
Victor Stinner228a3c92019-04-17 16:26:36 +02001800 rv = shutil.which(self.file)
1801 self.assertEqual(rv, expected_cwd)
1802
1803 def test_environ_path_missing(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001804 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001805 env.pop('PATH', None)
1806
1807 # without confstr
1808 with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1809 create=True), \
1810 support.swap_attr(os, 'defpath', self.dir):
1811 rv = shutil.which(self.file)
1812 self.assertEqual(rv, self.temp_file.name)
1813
1814 # with confstr
1815 with unittest.mock.patch('os.confstr', return_value=self.dir, \
1816 create=True), \
1817 support.swap_attr(os, 'defpath', ''):
1818 rv = shutil.which(self.file)
1819 self.assertEqual(rv, self.temp_file.name)
1820
Barry Warsaw618738b2013-04-16 11:05:03 -04001821 def test_empty_path(self):
1822 base_dir = os.path.dirname(self.dir)
Hai Shi0c4f0f32020-06-30 21:46:31 +08001823 with os_helper.change_cwd(path=self.dir), \
1824 os_helper.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001825 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001826 rv = shutil.which(self.file, path='')
1827 self.assertIsNone(rv)
1828
1829 def test_empty_path_no_PATH(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08001830 with os_helper.EnvironmentVarGuard() as env:
Barry Warsaw618738b2013-04-16 11:05:03 -04001831 env.pop('PATH', None)
1832 rv = shutil.which(self.file)
1833 self.assertIsNone(rv)
1834
Victor Stinner228a3c92019-04-17 16:26:36 +02001835 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1836 def test_pathext(self):
1837 ext = ".xyz"
1838 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1839 prefix="Tmp2", suffix=ext)
1840 os.chmod(temp_filexyz.name, stat.S_IXUSR)
1841 self.addCleanup(temp_filexyz.close)
1842
1843 # strip path and extension
1844 program = os.path.basename(temp_filexyz.name)
1845 program = os.path.splitext(program)[0]
1846
Hai Shi0c4f0f32020-06-30 21:46:31 +08001847 with os_helper.EnvironmentVarGuard() as env:
Victor Stinner228a3c92019-04-17 16:26:36 +02001848 env['PATHEXT'] = ext
1849 rv = shutil.which(program, path=self.temp_dir)
1850 self.assertEqual(rv, temp_filexyz.name)
1851
Christopher Marchfelderda6f0982020-10-23 12:08:24 +02001852 # Issue 40592: See https://bugs.python.org/issue40592
1853 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1854 def test_pathext_with_empty_str(self):
1855 ext = ".xyz"
1856 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1857 prefix="Tmp2", suffix=ext)
1858 self.addCleanup(temp_filexyz.close)
1859
1860 # strip path and extension
1861 program = os.path.basename(temp_filexyz.name)
1862 program = os.path.splitext(program)[0]
1863
1864 with os_helper.EnvironmentVarGuard() as env:
1865 env['PATHEXT'] = f"{ext};" # note the ;
1866 rv = shutil.which(program, path=self.temp_dir)
1867 self.assertEqual(rv, temp_filexyz.name)
1868
Brian Curtinc57a3452012-06-22 16:00:30 -05001869
Cheryl Sabella5680f652019-02-13 06:25:10 -05001870class TestWhichBytes(TestWhich):
1871 def setUp(self):
1872 TestWhich.setUp(self)
1873 self.dir = os.fsencode(self.dir)
1874 self.file = os.fsencode(self.file)
1875 self.temp_file.name = os.fsencode(self.temp_file.name)
1876 self.curdir = os.fsencode(self.curdir)
1877 self.ext = os.fsencode(self.ext)
1878
1879
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001880class TestMove(BaseTest, unittest.TestCase):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001881
1882 def setUp(self):
1883 filename = "foo"
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03001884 self.src_dir = self.mkdtemp()
1885 self.dst_dir = self.mkdtemp()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001886 self.src_file = os.path.join(self.src_dir, filename)
1887 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001888 with open(self.src_file, "wb") as f:
1889 f.write(b"spam")
1890
Christian Heimesada8c3b2008-03-18 18:26:33 +00001891 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001892 with open(src, "rb") as f:
1893 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001894 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001895 with open(real_dst, "rb") as f:
1896 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001897 self.assertFalse(os.path.exists(src))
1898
1899 def _check_move_dir(self, src, dst, real_dst):
1900 contents = sorted(os.listdir(src))
1901 shutil.move(src, dst)
1902 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1903 self.assertFalse(os.path.exists(src))
1904
1905 def test_move_file(self):
1906 # Move a file to another location on the same filesystem.
1907 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1908
1909 def test_move_file_to_dir(self):
1910 # Move a file inside an existing dir on the same filesystem.
1911 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1912
Maxwell A McKinnoncf57cab2019-09-30 19:41:16 -07001913 def test_move_file_to_dir_pathlike_src(self):
1914 # Move a pathlike file to another location on the same filesystem.
1915 src = pathlib.Path(self.src_file)
1916 self._check_move_file(src, self.dst_dir, self.dst_file)
1917
1918 def test_move_file_to_dir_pathlike_dst(self):
1919 # Move a file to another pathlike location on the same filesystem.
1920 dst = pathlib.Path(self.dst_dir)
1921 self._check_move_file(self.src_file, dst, self.dst_file)
1922
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001923 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001924 def test_move_file_other_fs(self):
1925 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001926 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001927
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001928 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001929 def test_move_file_to_dir_other_fs(self):
1930 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001931 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001932
1933 def test_move_dir(self):
1934 # Move a dir to another location on the same filesystem.
Steve Dowerabde52c2019-11-15 09:49:21 -08001935 dst_dir = tempfile.mktemp(dir=self.mkdtemp())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001936 try:
1937 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1938 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001939 os_helper.rmtree(dst_dir)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001940
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001941 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001942 def test_move_dir_other_fs(self):
1943 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001944 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001945
1946 def test_move_dir_to_dir(self):
1947 # Move a dir inside an existing dir on the same filesystem.
1948 self._check_move_dir(self.src_dir, self.dst_dir,
1949 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1950
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001951 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001952 def test_move_dir_to_dir_other_fs(self):
1953 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001954 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001955
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001956 def test_move_dir_sep_to_dir(self):
1957 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1958 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1959
1960 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1961 def test_move_dir_altsep_to_dir(self):
1962 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1963 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1964
Christian Heimesada8c3b2008-03-18 18:26:33 +00001965 def test_existing_file_inside_dest_dir(self):
1966 # A file with the same name inside the destination dir already exists.
1967 with open(self.dst_file, "wb"):
1968 pass
1969 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1970
1971 def test_dont_move_dir_in_itself(self):
1972 # Moving a dir inside itself raises an Error.
1973 dst = os.path.join(self.src_dir, "bar")
1974 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1975
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001976 def test_destinsrc_false_negative(self):
1977 os.mkdir(TESTFN)
1978 try:
1979 for src, dst in [('srcdir', 'srcdir/dest')]:
1980 src = os.path.join(TESTFN, src)
1981 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001982 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001983 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001984 'dst (%s) is not in src (%s)' % (dst, src))
1985 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001986 os_helper.rmtree(TESTFN)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001987
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001988 def test_destinsrc_false_positive(self):
1989 os.mkdir(TESTFN)
1990 try:
1991 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1992 src = os.path.join(TESTFN, src)
1993 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001994 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001995 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001996 'dst (%s) is in src (%s)' % (dst, src))
1997 finally:
Hai Shi0c4f0f32020-06-30 21:46:31 +08001998 os_helper.rmtree(TESTFN)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001999
Hai Shi0c4f0f32020-06-30 21:46:31 +08002000 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002001 @mock_rename
2002 def test_move_file_symlink(self):
2003 dst = os.path.join(self.src_dir, 'bar')
2004 os.symlink(self.src_file, dst)
2005 shutil.move(dst, self.dst_file)
2006 self.assertTrue(os.path.islink(self.dst_file))
2007 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
2008
Hai Shi0c4f0f32020-06-30 21:46:31 +08002009 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002010 @mock_rename
2011 def test_move_file_symlink_to_dir(self):
2012 filename = "bar"
2013 dst = os.path.join(self.src_dir, filename)
2014 os.symlink(self.src_file, dst)
2015 shutil.move(dst, self.dst_dir)
2016 final_link = os.path.join(self.dst_dir, filename)
2017 self.assertTrue(os.path.islink(final_link))
2018 self.assertTrue(os.path.samefile(self.src_file, final_link))
2019
Hai Shi0c4f0f32020-06-30 21:46:31 +08002020 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002021 @mock_rename
2022 def test_move_dangling_symlink(self):
2023 src = os.path.join(self.src_dir, 'baz')
2024 dst = os.path.join(self.src_dir, 'bar')
2025 os.symlink(src, dst)
2026 dst_link = os.path.join(self.dst_dir, 'quux')
2027 shutil.move(dst, dst_link)
2028 self.assertTrue(os.path.islink(dst_link))
Steve Dower75e06492019-08-21 13:43:06 -07002029 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002030
Hai Shi0c4f0f32020-06-30 21:46:31 +08002031 @os_helper.skip_unless_symlink
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01002032 @mock_rename
2033 def test_move_dir_symlink(self):
2034 src = os.path.join(self.src_dir, 'baz')
2035 dst = os.path.join(self.src_dir, 'bar')
2036 os.mkdir(src)
2037 os.symlink(src, dst)
2038 dst_link = os.path.join(self.dst_dir, 'quux')
2039 shutil.move(dst, dst_link)
2040 self.assertTrue(os.path.islink(dst_link))
2041 self.assertTrue(os.path.samefile(src, dst_link))
2042
Brian Curtin0d0a1de2012-06-18 18:41:07 -05002043 def test_move_return_value(self):
2044 rv = shutil.move(self.src_file, self.dst_dir)
2045 self.assertEqual(rv,
2046 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2047
2048 def test_move_as_rename_return_value(self):
2049 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2050 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2051
R David Murray6ffface2014-06-11 14:40:13 -04002052 @mock_rename
2053 def test_move_file_special_function(self):
2054 moved = []
2055 def _copy(src, dst):
2056 moved.append((src, dst))
2057 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2058 self.assertEqual(len(moved), 1)
2059
2060 @mock_rename
2061 def test_move_dir_special_function(self):
2062 moved = []
2063 def _copy(src, dst):
2064 moved.append((src, dst))
Hai Shi0c4f0f32020-06-30 21:46:31 +08002065 os_helper.create_empty_file(os.path.join(self.src_dir, 'child'))
2066 os_helper.create_empty_file(os.path.join(self.src_dir, 'child1'))
R David Murray6ffface2014-06-11 14:40:13 -04002067 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2068 self.assertEqual(len(moved), 3)
2069
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002070 def test_move_dir_caseinsensitive(self):
2071 # Renames a folder to the same name
2072 # but a different case.
2073
2074 self.src_dir = self.mkdtemp()
2075 dst_dir = os.path.join(
2076 os.path.dirname(self.src_dir),
2077 os.path.basename(self.src_dir).upper())
2078 self.assertNotEqual(self.src_dir, dst_dir)
2079
2080 try:
2081 shutil.move(self.src_dir, dst_dir)
2082 self.assertTrue(os.path.isdir(dst_dir))
2083 finally:
2084 os.rmdir(dst_dir)
2085
Tarek Ziadé5340db32010-04-19 22:30:51 +00002086
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002087class TestCopyFile(unittest.TestCase):
2088
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002089 class Faux(object):
2090 _entered = False
2091 _exited_with = None
2092 _raised = False
2093 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2094 self._raise_in_exit = raise_in_exit
2095 self._suppress_at_exit = suppress_at_exit
2096 def read(self, *args):
2097 return ''
2098 def __enter__(self):
2099 self._entered = True
2100 def __exit__(self, exc_type, exc_val, exc_tb):
2101 self._exited_with = exc_type, exc_val, exc_tb
2102 if self._raise_in_exit:
2103 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002104 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002105 return self._suppress_at_exit
2106
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002107 def test_w_source_open_fails(self):
2108 def _open(filename, mode='r'):
2109 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002110 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002111 assert 0 # shouldn't reach here.
2112
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002113 with support.swap_attr(shutil, 'open', _open):
2114 with self.assertRaises(OSError):
2115 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002116
Victor Stinner937ee9e2018-06-26 02:11:06 +02002117 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002118 def test_w_dest_open_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002119 srcfile = self.Faux()
2120
2121 def _open(filename, mode='r'):
2122 if filename == 'srcfile':
2123 return srcfile
2124 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002125 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002126 assert 0 # shouldn't reach here.
2127
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002128 with support.swap_attr(shutil, 'open', _open):
2129 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002130 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002131 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002132 self.assertEqual(srcfile._exited_with[1].args,
2133 ('Cannot open "destfile"',))
2134
Victor Stinner937ee9e2018-06-26 02:11:06 +02002135 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002136 def test_w_dest_close_fails(self):
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002137 srcfile = self.Faux()
2138 destfile = self.Faux(True)
2139
2140 def _open(filename, mode='r'):
2141 if filename == 'srcfile':
2142 return srcfile
2143 if filename == 'destfile':
2144 return destfile
2145 assert 0 # shouldn't reach here.
2146
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002147 with support.swap_attr(shutil, 'open', _open):
2148 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002149 self.assertTrue(srcfile._entered)
2150 self.assertTrue(destfile._entered)
2151 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002152 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002153 self.assertEqual(srcfile._exited_with[1].args,
2154 ('Cannot close',))
2155
Victor Stinner937ee9e2018-06-26 02:11:06 +02002156 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002157 def test_w_source_close_fails(self):
2158
2159 srcfile = self.Faux(True)
2160 destfile = self.Faux()
2161
2162 def _open(filename, mode='r'):
2163 if filename == 'srcfile':
2164 return srcfile
2165 if filename == 'destfile':
2166 return destfile
2167 assert 0 # shouldn't reach here.
2168
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002169 with support.swap_attr(shutil, 'open', _open):
2170 with self.assertRaises(OSError):
2171 shutil.copyfile('srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002172 self.assertTrue(srcfile._entered)
2173 self.assertTrue(destfile._entered)
2174 self.assertFalse(destfile._raised)
2175 self.assertTrue(srcfile._exited_with[0] is None)
2176 self.assertTrue(srcfile._raised)
2177
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002178
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002179class TestCopyFileObj(unittest.TestCase):
2180 FILESIZE = 2 * 1024 * 1024
2181
2182 @classmethod
2183 def setUpClass(cls):
2184 write_test_file(TESTFN, cls.FILESIZE)
2185
2186 @classmethod
2187 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002188 os_helper.unlink(TESTFN)
2189 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002190
2191 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002192 os_helper.unlink(TESTFN2)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002193
2194 @contextlib.contextmanager
2195 def get_files(self):
2196 with open(TESTFN, "rb") as src:
2197 with open(TESTFN2, "wb") as dst:
2198 yield (src, dst)
2199
2200 def assert_files_eq(self, src, dst):
2201 with open(src, 'rb') as fsrc:
2202 with open(dst, 'rb') as fdst:
2203 self.assertEqual(fsrc.read(), fdst.read())
2204
2205 def test_content(self):
2206 with self.get_files() as (src, dst):
2207 shutil.copyfileobj(src, dst)
2208 self.assert_files_eq(TESTFN, TESTFN2)
2209
2210 def test_file_not_closed(self):
2211 with self.get_files() as (src, dst):
2212 shutil.copyfileobj(src, dst)
2213 assert not src.closed
2214 assert not dst.closed
2215
2216 def test_file_offset(self):
2217 with self.get_files() as (src, dst):
2218 shutil.copyfileobj(src, dst)
2219 self.assertEqual(src.tell(), self.FILESIZE)
2220 self.assertEqual(dst.tell(), self.FILESIZE)
2221
2222 @unittest.skipIf(os.name != 'nt', "Windows only")
2223 def test_win_impl(self):
2224 # Make sure alternate Windows implementation is called.
2225 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2226 shutil.copyfile(TESTFN, TESTFN2)
2227 assert m.called
2228
2229 # File size is 2 MiB but max buf size should be 1 MiB.
2230 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2231
2232 # If file size < 1 MiB memoryview() length must be equal to
2233 # the actual file size.
Steve Dowerabde52c2019-11-15 09:49:21 -08002234 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002235 f.write(b'foo')
2236 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002237 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002238 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2239 shutil.copyfile(fname, TESTFN2)
2240 self.assertEqual(m.call_args[0][2], 3)
2241
2242 # Empty files should not rely on readinto() variant.
Steve Dowerabde52c2019-11-15 09:49:21 -08002243 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002244 pass
2245 fname = f.name
Hai Shi0c4f0f32020-06-30 21:46:31 +08002246 self.addCleanup(os_helper.unlink, fname)
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002247 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2248 shutil.copyfile(fname, TESTFN2)
2249 assert not m.called
2250 self.assert_files_eq(fname, TESTFN2)
2251
2252
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002253class _ZeroCopyFileTest(object):
2254 """Tests common to all zero-copy APIs."""
2255 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2256 FILEDATA = b""
2257 PATCHPOINT = ""
2258
2259 @classmethod
2260 def setUpClass(cls):
2261 write_test_file(TESTFN, cls.FILESIZE)
2262 with open(TESTFN, 'rb') as f:
2263 cls.FILEDATA = f.read()
2264 assert len(cls.FILEDATA) == cls.FILESIZE
2265
2266 @classmethod
2267 def tearDownClass(cls):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002268 os_helper.unlink(TESTFN)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002269
2270 def tearDown(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002271 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002272
2273 @contextlib.contextmanager
2274 def get_files(self):
2275 with open(TESTFN, "rb") as src:
2276 with open(TESTFN2, "wb") as dst:
2277 yield (src, dst)
2278
2279 def zerocopy_fun(self, *args, **kwargs):
2280 raise NotImplementedError("must be implemented in subclass")
2281
2282 def reset(self):
2283 self.tearDown()
2284 self.tearDownClass()
2285 self.setUpClass()
2286 self.setUp()
2287
2288 # ---
2289
2290 def test_regular_copy(self):
2291 with self.get_files() as (src, dst):
2292 self.zerocopy_fun(src, dst)
2293 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2294 # Make sure the fallback function is not called.
2295 with self.get_files() as (src, dst):
2296 with unittest.mock.patch('shutil.copyfileobj') as m:
2297 shutil.copyfile(TESTFN, TESTFN2)
2298 assert not m.called
2299
2300 def test_same_file(self):
2301 self.addCleanup(self.reset)
2302 with self.get_files() as (src, dst):
2303 with self.assertRaises(Exception):
2304 self.zerocopy_fun(src, src)
2305 # Make sure src file is not corrupted.
2306 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2307
2308 def test_non_existent_src(self):
Steve Dowerabde52c2019-11-15 09:49:21 -08002309 name = tempfile.mktemp(dir=os.getcwd())
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002310 with self.assertRaises(FileNotFoundError) as cm:
2311 shutil.copyfile(name, "new")
2312 self.assertEqual(cm.exception.filename, name)
2313
2314 def test_empty_file(self):
2315 srcname = TESTFN + 'src'
2316 dstname = TESTFN + 'dst'
Hai Shi0c4f0f32020-06-30 21:46:31 +08002317 self.addCleanup(lambda: os_helper.unlink(srcname))
2318 self.addCleanup(lambda: os_helper.unlink(dstname))
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002319 with open(srcname, "wb"):
2320 pass
2321
2322 with open(srcname, "rb") as src:
2323 with open(dstname, "wb") as dst:
2324 self.zerocopy_fun(src, dst)
2325
2326 self.assertEqual(read_file(dstname, binary=True), b"")
2327
2328 def test_unhandled_exception(self):
2329 with unittest.mock.patch(self.PATCHPOINT,
2330 side_effect=ZeroDivisionError):
2331 self.assertRaises(ZeroDivisionError,
2332 shutil.copyfile, TESTFN, TESTFN2)
2333
2334 def test_exception_on_first_call(self):
2335 # Emulate a case where the first call to the zero-copy
2336 # function raises an exception in which case the function is
2337 # supposed to give up immediately.
2338 with unittest.mock.patch(self.PATCHPOINT,
2339 side_effect=OSError(errno.EINVAL, "yo")):
2340 with self.get_files() as (src, dst):
2341 with self.assertRaises(_GiveupOnFastCopy):
2342 self.zerocopy_fun(src, dst)
2343
2344 def test_filesystem_full(self):
2345 # Emulate a case where filesystem is full and sendfile() fails
2346 # on first call.
2347 with unittest.mock.patch(self.PATCHPOINT,
2348 side_effect=OSError(errno.ENOSPC, "yo")):
2349 with self.get_files() as (src, dst):
2350 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2351
2352
2353@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2354class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2355 PATCHPOINT = "os.sendfile"
2356
2357 def zerocopy_fun(self, fsrc, fdst):
2358 return shutil._fastcopy_sendfile(fsrc, fdst)
2359
2360 def test_non_regular_file_src(self):
2361 with io.BytesIO(self.FILEDATA) as src:
2362 with open(TESTFN2, "wb") as dst:
2363 with self.assertRaises(_GiveupOnFastCopy):
2364 self.zerocopy_fun(src, dst)
2365 shutil.copyfileobj(src, dst)
2366
2367 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2368
2369 def test_non_regular_file_dst(self):
2370 with open(TESTFN, "rb") as src:
2371 with io.BytesIO() as dst:
2372 with self.assertRaises(_GiveupOnFastCopy):
2373 self.zerocopy_fun(src, dst)
2374 shutil.copyfileobj(src, dst)
2375 dst.seek(0)
2376 self.assertEqual(dst.read(), self.FILEDATA)
2377
2378 def test_exception_on_second_call(self):
2379 def sendfile(*args, **kwargs):
2380 if not flag:
2381 flag.append(None)
2382 return orig_sendfile(*args, **kwargs)
2383 else:
2384 raise OSError(errno.EBADF, "yo")
2385
2386 flag = []
2387 orig_sendfile = os.sendfile
2388 with unittest.mock.patch('os.sendfile', create=True,
2389 side_effect=sendfile):
2390 with self.get_files() as (src, dst):
2391 with self.assertRaises(OSError) as cm:
2392 shutil._fastcopy_sendfile(src, dst)
2393 assert flag
2394 self.assertEqual(cm.exception.errno, errno.EBADF)
2395
2396 def test_cant_get_size(self):
2397 # Emulate a case where src file size cannot be determined.
2398 # Internally bufsize will be set to a small value and
2399 # sendfile() will be called repeatedly.
2400 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2401 with self.get_files() as (src, dst):
2402 shutil._fastcopy_sendfile(src, dst)
2403 assert m.called
2404 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2405
2406 def test_small_chunks(self):
2407 # Force internal file size detection to be smaller than the
2408 # actual file size. We want to force sendfile() to be called
2409 # multiple times, also in order to emulate a src fd which gets
2410 # bigger while it is being copied.
2411 mock = unittest.mock.Mock()
2412 mock.st_size = 65536 + 1
2413 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2414 with self.get_files() as (src, dst):
2415 shutil._fastcopy_sendfile(src, dst)
2416 assert m.called
2417 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2418
2419 def test_big_chunk(self):
2420 # Force internal file size detection to be +100MB bigger than
2421 # the actual file size. Make sure sendfile() does not rely on
2422 # file size value except for (maybe) a better throughput /
2423 # performance.
2424 mock = unittest.mock.Mock()
2425 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2426 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2427 with self.get_files() as (src, dst):
2428 shutil._fastcopy_sendfile(src, dst)
2429 assert m.called
2430 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2431
2432 def test_blocksize_arg(self):
2433 with unittest.mock.patch('os.sendfile',
2434 side_effect=ZeroDivisionError) as m:
2435 self.assertRaises(ZeroDivisionError,
2436 shutil.copyfile, TESTFN, TESTFN2)
2437 blocksize = m.call_args[0][3]
2438 # Make sure file size and the block size arg passed to
2439 # sendfile() are the same.
2440 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2441 # ...unless we're dealing with a small file.
Hai Shi0c4f0f32020-06-30 21:46:31 +08002442 os_helper.unlink(TESTFN2)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002443 write_file(TESTFN2, b"hello", binary=True)
Hai Shi0c4f0f32020-06-30 21:46:31 +08002444 self.addCleanup(os_helper.unlink, TESTFN2 + '3')
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002445 self.assertRaises(ZeroDivisionError,
2446 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2447 blocksize = m.call_args[0][3]
2448 self.assertEqual(blocksize, 2 ** 23)
2449
2450 def test_file2file_not_supported(self):
2451 # Emulate a case where sendfile() only support file->socket
2452 # fds. In such a case copyfile() is supposed to skip the
2453 # fast-copy attempt from then on.
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002454 assert shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002455 try:
2456 with unittest.mock.patch(
2457 self.PATCHPOINT,
2458 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2459 with self.get_files() as (src, dst):
2460 with self.assertRaises(_GiveupOnFastCopy):
2461 shutil._fastcopy_sendfile(src, dst)
2462 assert m.called
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002463 assert not shutil._USE_CP_SENDFILE
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002464
2465 with unittest.mock.patch(self.PATCHPOINT) as m:
2466 shutil.copyfile(TESTFN, TESTFN2)
2467 assert not m.called
2468 finally:
Giampaolo Rodola413d9552019-05-30 14:05:41 +08002469 shutil._USE_CP_SENDFILE = True
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002470
2471
Victor Stinner937ee9e2018-06-26 02:11:06 +02002472@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002473class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002474 PATCHPOINT = "posix._fcopyfile"
2475
2476 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002477 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002478
2479
Serhiy Storchaka4f2eac02019-09-26 13:15:08 +03002480class TestGetTerminalSize(unittest.TestCase):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002481 def test_does_not_crash(self):
2482 """Check if get_terminal_size() returns a meaningful value.
2483
2484 There's no easy portable way to actually check the size of the
2485 terminal, so let's check if it returns something sensible instead.
2486 """
2487 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002488 self.assertGreaterEqual(size.columns, 0)
2489 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002490
2491 def test_os_environ_first(self):
2492 "Check if environment variables have precedence"
2493
Hai Shi0c4f0f32020-06-30 21:46:31 +08002494 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002495 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002496 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002497 size = shutil.get_terminal_size()
2498 self.assertEqual(size.columns, 777)
2499
Hai Shi0c4f0f32020-06-30 21:46:31 +08002500 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002501 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002502 env['LINES'] = '888'
2503 size = shutil.get_terminal_size()
2504 self.assertEqual(size.lines, 888)
2505
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002506 def test_bad_environ(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002507 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002508 env['COLUMNS'] = 'xxx'
2509 env['LINES'] = 'yyy'
2510 size = shutil.get_terminal_size()
2511 self.assertGreaterEqual(size.columns, 0)
2512 self.assertGreaterEqual(size.lines, 0)
2513
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002514 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002515 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2516 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002517 def test_stty_match(self):
2518 """Check if stty returns the same results ignoring env
2519
2520 This test will fail if stdin and stdout are connected to
2521 different terminals with different sizes. Nevertheless, such
2522 situations should be pretty rare.
2523 """
2524 try:
2525 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002526 except (FileNotFoundError, PermissionError,
2527 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002528 self.skipTest("stty invocation failed")
2529 expected = (int(size[1]), int(size[0])) # reversed order
2530
Hai Shi0c4f0f32020-06-30 21:46:31 +08002531 with os_helper.EnvironmentVarGuard() as env:
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002532 del env['LINES']
2533 del env['COLUMNS']
2534 actual = shutil.get_terminal_size()
2535
2536 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002537
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002538 def test_fallback(self):
Hai Shi0c4f0f32020-06-30 21:46:31 +08002539 with os_helper.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002540 del env['LINES']
2541 del env['COLUMNS']
2542
2543 # sys.__stdout__ has no fileno()
2544 with support.swap_attr(sys, '__stdout__', None):
2545 size = shutil.get_terminal_size(fallback=(10, 20))
2546 self.assertEqual(size.columns, 10)
2547 self.assertEqual(size.lines, 20)
2548
2549 # sys.__stdout__ is not a terminal on Unix
2550 # or fileno() not in (0, 1, 2) on Windows
2551 with open(os.devnull, 'w') as f, \
2552 support.swap_attr(sys, '__stdout__', f):
2553 size = shutil.get_terminal_size(fallback=(30, 40))
2554 self.assertEqual(size.columns, 30)
2555 self.assertEqual(size.lines, 40)
2556
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002557
Berker Peksag8083cd62014-11-01 11:04:06 +02002558class PublicAPITests(unittest.TestCase):
2559 """Ensures that the correct values are exposed in the public API."""
2560
2561 def test_module_all_attribute(self):
2562 self.assertTrue(hasattr(shutil, '__all__'))
2563 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2564 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2565 'SpecialFileError', 'ExecError', 'make_archive',
2566 'get_archive_formats', 'register_archive_format',
2567 'unregister_archive_format', 'get_unpack_formats',
2568 'register_unpack_format', 'unregister_unpack_format',
2569 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2570 'get_terminal_size', 'SameFileError']
2571 if hasattr(os, 'statvfs') or os.name == 'nt':
2572 target_api.append('disk_usage')
2573 self.assertEqual(set(shutil.__all__), set(target_api))
2574
2575
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002576if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002577 unittest.main()