blob: 678a190bcf5ee03ab2a42165f9a8258e3a90bddc [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010037AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000038try:
39 import grp
40 import pwd
41 UID_GID_SUPPORT = True
42except ImportError:
43 UID_GID_SUPPORT = False
44
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040045def _fake_rename(*args, **kwargs):
46 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010047 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040048
49def mock_rename(func):
50 @functools.wraps(func)
51 def wrap(*args, **kwargs):
52 try:
53 builtin_rename = os.rename
54 os.rename = _fake_rename
55 return func(*args, **kwargs)
56 finally:
57 os.rename = builtin_rename
58 return wrap
59
Éric Araujoa7e33a12011-08-12 19:51:35 +020060def write_file(path, content, binary=False):
61 """Write *content* to a file located at *path*.
62
63 If *path* is a tuple instead of a string, os.path.join will be used to
64 make a path. If *binary* is true, the file will be opened in binary
65 mode.
66 """
67 if isinstance(path, tuple):
68 path = os.path.join(*path)
69 with open(path, 'wb' if binary else 'w') as fp:
70 fp.write(content)
71
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020072def write_test_file(path, size):
73 """Create a test file with an arbitrary size and random text content."""
74 def chunks(total, step):
75 assert total >= step
76 while total > step:
77 yield step
78 total -= step
79 if total:
80 yield total
81
82 bufsize = min(size, 8192)
83 chunk = b"".join([random.choice(string.ascii_letters).encode()
84 for i in range(bufsize)])
85 with open(path, 'wb') as f:
86 for csize in chunks(size, bufsize):
87 f.write(chunk)
88 assert os.path.getsize(path) == size
89
Éric Araujoa7e33a12011-08-12 19:51:35 +020090def read_file(path, binary=False):
91 """Return contents from a file located at *path*.
92
93 If *path* is a tuple instead of a string, os.path.join will be used to
94 make a path. If *binary* is true, the file will be opened in binary
95 mode.
96 """
97 if isinstance(path, tuple):
98 path = os.path.join(*path)
99 with open(path, 'rb' if binary else 'r') as fp:
100 return fp.read()
101
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300102def rlistdir(path):
103 res = []
104 for name in sorted(os.listdir(path)):
105 p = os.path.join(path, name)
106 if os.path.isdir(p) and not os.path.islink(p):
107 res.append(name + '/')
108 for n in rlistdir(p):
109 res.append(name + '/' + n)
110 else:
111 res.append(name)
112 return res
113
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200114def supports_file2file_sendfile():
115 # ...apparently Linux and Solaris are the only ones
116 if not hasattr(os, "sendfile"):
117 return False
118 srcname = None
119 dstname = None
120 try:
121 with tempfile.NamedTemporaryFile("wb", delete=False) as f:
122 srcname = f.name
123 f.write(b"0123456789")
124
125 with open(srcname, "rb") as src:
126 with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
127 dstname = f.name
128 infd = src.fileno()
129 outfd = dst.fileno()
130 try:
131 os.sendfile(outfd, infd, 0, 2)
132 except OSError:
133 return False
134 else:
135 return True
136 finally:
137 if srcname is not None:
138 support.unlink(srcname)
139 if dstname is not None:
140 support.unlink(dstname)
141
142
143SUPPORTS_SENDFILE = supports_file2file_sendfile()
144
Michael Feltef110b12019-02-18 12:02:44 +0100145# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
146# The AIX command 'dump -o program' gives XCOFF header information
147# The second word of the last line in the maxdata value
148# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
149def _maxdataOK():
150 if AIX and sys.maxsize == 2147483647:
151 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
152 maxdata=hdrs.split("\n")[-1].split()[1]
153 return int(maxdata,16) >= 0x20000000
154 else:
155 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200156
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000157class TestShutil(unittest.TestCase):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000158
159 def setUp(self):
160 super(TestShutil, self).setUp()
161 self.tempdirs = []
162
163 def tearDown(self):
164 super(TestShutil, self).tearDown()
165 while self.tempdirs:
166 d = self.tempdirs.pop()
167 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
168
Tarek Ziadé396fad72010-02-23 05:30:31 +0000169
170 def mkdtemp(self):
171 """Create a temporary directory that will be cleaned up.
172
173 Returns the path of the directory.
174 """
175 d = tempfile.mkdtemp()
176 self.tempdirs.append(d)
177 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000178
Hynek Schlawack3b527782012-06-25 13:27:31 +0200179 def test_rmtree_works_on_bytes(self):
180 tmp = self.mkdtemp()
181 victim = os.path.join(tmp, 'killme')
182 os.mkdir(victim)
183 write_file(os.path.join(victim, 'somefile'), 'foo')
184 victim = os.fsencode(victim)
185 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700186 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200187
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200188 @support.skip_unless_symlink
189 def test_rmtree_fails_on_symlink(self):
190 tmp = self.mkdtemp()
191 dir_ = os.path.join(tmp, 'dir')
192 os.mkdir(dir_)
193 link = os.path.join(tmp, 'link')
194 os.symlink(dir_, link)
195 self.assertRaises(OSError, shutil.rmtree, link)
196 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100197 self.assertTrue(os.path.lexists(link))
198 errors = []
199 def onerror(*args):
200 errors.append(args)
201 shutil.rmtree(link, onerror=onerror)
202 self.assertEqual(len(errors), 1)
203 self.assertIs(errors[0][0], os.path.islink)
204 self.assertEqual(errors[0][1], link)
205 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200206
207 @support.skip_unless_symlink
208 def test_rmtree_works_on_symlinks(self):
209 tmp = self.mkdtemp()
210 dir1 = os.path.join(tmp, 'dir1')
211 dir2 = os.path.join(dir1, 'dir2')
212 dir3 = os.path.join(tmp, 'dir3')
213 for d in dir1, dir2, dir3:
214 os.mkdir(d)
215 file1 = os.path.join(tmp, 'file1')
216 write_file(file1, 'foo')
217 link1 = os.path.join(dir1, 'link1')
218 os.symlink(dir2, link1)
219 link2 = os.path.join(dir1, 'link2')
220 os.symlink(dir3, link2)
221 link3 = os.path.join(dir1, 'link3')
222 os.symlink(file1, link3)
223 # make sure symlinks are removed but not followed
224 shutil.rmtree(dir1)
225 self.assertFalse(os.path.exists(dir1))
226 self.assertTrue(os.path.exists(dir3))
227 self.assertTrue(os.path.exists(file1))
228
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000229 def test_rmtree_errors(self):
230 # filename is guaranteed not to exist
231 filename = tempfile.mktemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100232 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
233 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100234 shutil.rmtree(filename, ignore_errors=True)
235
236 # existing file
237 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100238 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100239 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100240 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100241 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100242 # The reason for this rather odd construct is that Windows sprinkles
243 # a \*.* at the end of file names. But only sometimes on some buildbots
244 possible_args = [filename, os.path.join(filename, '*.*')]
245 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100246 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100247 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100248 shutil.rmtree(filename, ignore_errors=True)
249 self.assertTrue(os.path.exists(filename))
250 errors = []
251 def onerror(*args):
252 errors.append(args)
253 shutil.rmtree(filename, onerror=onerror)
254 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200255 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100256 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100257 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100258 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100259 self.assertIs(errors[1][0], os.rmdir)
260 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100261 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100262 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000263
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000264
Serhiy Storchaka43767632013-11-03 21:31:38 +0200265 @unittest.skipIf(sys.platform[:6] == 'cygwin',
266 "This test can't be run on Cygwin (issue #1071513).")
267 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
268 "This test can't be run reliably as root (issue #1076467).")
269 def test_on_error(self):
270 self.errorState = 0
271 os.mkdir(TESTFN)
272 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200273
Serhiy Storchaka43767632013-11-03 21:31:38 +0200274 self.child_file_path = os.path.join(TESTFN, 'a')
275 self.child_dir_path = os.path.join(TESTFN, 'b')
276 support.create_empty_file(self.child_file_path)
277 os.mkdir(self.child_dir_path)
278 old_dir_mode = os.stat(TESTFN).st_mode
279 old_child_file_mode = os.stat(self.child_file_path).st_mode
280 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
281 # Make unwritable.
282 new_mode = stat.S_IREAD|stat.S_IEXEC
283 os.chmod(self.child_file_path, new_mode)
284 os.chmod(self.child_dir_path, new_mode)
285 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000286
Serhiy Storchaka43767632013-11-03 21:31:38 +0200287 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
288 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
289 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200290
Serhiy Storchaka43767632013-11-03 21:31:38 +0200291 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
292 # Test whether onerror has actually been called.
293 self.assertEqual(self.errorState, 3,
294 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000295
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000296 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000297 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200298 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000299 # This function is run when shutil.rmtree fails.
300 # 99.9% of the time it initially fails to remove
301 # a file in the directory, so the first time through
302 # func is os.remove.
303 # However, some Linux machines running ZFS on
304 # FUSE experienced a failure earlier in the process
305 # at os.listdir. The first failure may legally
306 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200307 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200308 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200309 self.assertEqual(arg, self.child_file_path)
310 elif func is os.rmdir:
311 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000312 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200313 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200314 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000315 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200316 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000317 else:
318 self.assertEqual(func, os.rmdir)
319 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000320 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200321 self.errorState = 3
322
323 def test_rmtree_does_not_choke_on_failing_lstat(self):
324 try:
325 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200326 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200327 if fn != TESTFN:
328 raise OSError()
329 else:
330 return orig_lstat(fn)
331 os.lstat = raiser
332
333 os.mkdir(TESTFN)
334 write_file((TESTFN, 'foo'), 'foo')
335 shutil.rmtree(TESTFN)
336 finally:
337 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000338
Antoine Pitrou78091e62011-12-29 18:54:15 +0100339 @support.skip_unless_symlink
340 def test_copymode_follow_symlinks(self):
341 tmp_dir = self.mkdtemp()
342 src = os.path.join(tmp_dir, 'foo')
343 dst = os.path.join(tmp_dir, 'bar')
344 src_link = os.path.join(tmp_dir, 'baz')
345 dst_link = os.path.join(tmp_dir, 'quux')
346 write_file(src, 'foo')
347 write_file(dst, 'foo')
348 os.symlink(src, src_link)
349 os.symlink(dst, dst_link)
350 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
351 # file to file
352 os.chmod(dst, stat.S_IRWXO)
353 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
354 shutil.copymode(src, dst)
355 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou3f48ac92014-01-01 02:50:45 +0100356 # On Windows, os.chmod does not follow symlinks (issue #15411)
357 if os.name != 'nt':
358 # follow src link
359 os.chmod(dst, stat.S_IRWXO)
360 shutil.copymode(src_link, dst)
361 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
362 # follow dst link
363 os.chmod(dst, stat.S_IRWXO)
364 shutil.copymode(src, dst_link)
365 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
366 # follow both links
367 os.chmod(dst, stat.S_IRWXO)
368 shutil.copymode(src_link, dst_link)
369 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100370
371 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
372 @support.skip_unless_symlink
373 def test_copymode_symlink_to_symlink(self):
374 tmp_dir = self.mkdtemp()
375 src = os.path.join(tmp_dir, 'foo')
376 dst = os.path.join(tmp_dir, 'bar')
377 src_link = os.path.join(tmp_dir, 'baz')
378 dst_link = os.path.join(tmp_dir, 'quux')
379 write_file(src, 'foo')
380 write_file(dst, 'foo')
381 os.symlink(src, src_link)
382 os.symlink(dst, dst_link)
383 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
384 os.chmod(dst, stat.S_IRWXU)
385 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
386 # link to link
387 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700388 shutil.copymode(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100389 self.assertEqual(os.lstat(src_link).st_mode,
390 os.lstat(dst_link).st_mode)
391 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
392 # src link - use chmod
393 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700394 shutil.copymode(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100395 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
396 # dst link - use chmod
397 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700398 shutil.copymode(src, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100399 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
400
401 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
402 @support.skip_unless_symlink
403 def test_copymode_symlink_to_symlink_wo_lchmod(self):
404 tmp_dir = self.mkdtemp()
405 src = os.path.join(tmp_dir, 'foo')
406 dst = os.path.join(tmp_dir, 'bar')
407 src_link = os.path.join(tmp_dir, 'baz')
408 dst_link = os.path.join(tmp_dir, 'quux')
409 write_file(src, 'foo')
410 write_file(dst, 'foo')
411 os.symlink(src, src_link)
412 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700413 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
Antoine Pitrou78091e62011-12-29 18:54:15 +0100414
415 @support.skip_unless_symlink
416 def test_copystat_symlinks(self):
417 tmp_dir = self.mkdtemp()
418 src = os.path.join(tmp_dir, 'foo')
419 dst = os.path.join(tmp_dir, 'bar')
420 src_link = os.path.join(tmp_dir, 'baz')
421 dst_link = os.path.join(tmp_dir, 'qux')
422 write_file(src, 'foo')
423 src_stat = os.stat(src)
424 os.utime(src, (src_stat.st_atime,
425 src_stat.st_mtime - 42.0)) # ensure different mtimes
426 write_file(dst, 'bar')
427 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
428 os.symlink(src, src_link)
429 os.symlink(dst, dst_link)
430 if hasattr(os, 'lchmod'):
431 os.lchmod(src_link, stat.S_IRWXO)
432 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
433 os.lchflags(src_link, stat.UF_NODUMP)
434 src_link_stat = os.lstat(src_link)
435 # follow
436 if hasattr(os, 'lchmod'):
Larry Hastingsb4038062012-07-15 10:57:38 -0700437 shutil.copystat(src_link, dst_link, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100438 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
439 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700440 shutil.copystat(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100441 dst_link_stat = os.lstat(dst_link)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700442 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100443 for attr in 'st_atime', 'st_mtime':
444 # The modification times may be truncated in the new file.
445 self.assertLessEqual(getattr(src_link_stat, attr),
446 getattr(dst_link_stat, attr) + 1)
447 if hasattr(os, 'lchmod'):
448 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
449 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
450 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
451 # tell to follow but dst is not a link
Larry Hastingsb4038062012-07-15 10:57:38 -0700452 shutil.copystat(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100453 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
454 00000.1)
455
Ned Deilybaf75712012-05-10 17:05:19 -0700456 @unittest.skipUnless(hasattr(os, 'chflags') and
457 hasattr(errno, 'EOPNOTSUPP') and
458 hasattr(errno, 'ENOTSUP'),
459 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
460 def test_copystat_handles_harmless_chflags_errors(self):
461 tmpdir = self.mkdtemp()
462 file1 = os.path.join(tmpdir, 'file1')
463 file2 = os.path.join(tmpdir, 'file2')
464 write_file(file1, 'xxx')
465 write_file(file2, 'xxx')
466
467 def make_chflags_raiser(err):
468 ex = OSError()
469
Larry Hastings90867a52012-06-22 17:01:41 -0700470 def _chflags_raiser(path, flags, *, follow_symlinks=True):
Ned Deilybaf75712012-05-10 17:05:19 -0700471 ex.errno = err
472 raise ex
473 return _chflags_raiser
474 old_chflags = os.chflags
475 try:
476 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
477 os.chflags = make_chflags_raiser(err)
478 shutil.copystat(file1, file2)
479 # assert others errors break it
480 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
481 self.assertRaises(OSError, shutil.copystat, file1, file2)
482 finally:
483 os.chflags = old_chflags
484
Antoine Pitrou424246f2012-05-12 19:02:01 +0200485 @support.skip_unless_xattr
486 def test_copyxattr(self):
487 tmp_dir = self.mkdtemp()
488 src = os.path.join(tmp_dir, 'foo')
489 write_file(src, 'foo')
490 dst = os.path.join(tmp_dir, 'bar')
491 write_file(dst, 'bar')
492
493 # no xattr == no problem
494 shutil._copyxattr(src, dst)
495 # common case
496 os.setxattr(src, 'user.foo', b'42')
497 os.setxattr(src, 'user.bar', b'43')
498 shutil._copyxattr(src, dst)
Gregory P. Smith1093bf22014-01-17 12:01:22 -0800499 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200500 self.assertEqual(
501 os.getxattr(src, 'user.foo'),
502 os.getxattr(dst, 'user.foo'))
503 # check errors don't affect other attrs
504 os.remove(dst)
505 write_file(dst, 'bar')
506 os_error = OSError(errno.EPERM, 'EPERM')
507
Larry Hastings9cf065c2012-06-22 16:30:09 -0700508 def _raise_on_user_foo(fname, attr, val, **kwargs):
Antoine Pitrou424246f2012-05-12 19:02:01 +0200509 if attr == 'user.foo':
510 raise os_error
511 else:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700512 orig_setxattr(fname, attr, val, **kwargs)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200513 try:
514 orig_setxattr = os.setxattr
515 os.setxattr = _raise_on_user_foo
516 shutil._copyxattr(src, dst)
Antoine Pitrou61597d32012-05-12 23:37:35 +0200517 self.assertIn('user.bar', os.listxattr(dst))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200518 finally:
519 os.setxattr = orig_setxattr
Hynek Schlawack0beab052013-02-05 08:22:44 +0100520 # the source filesystem not supporting xattrs should be ok, too.
521 def _raise_on_src(fname, *, follow_symlinks=True):
522 if fname == src:
523 raise OSError(errno.ENOTSUP, 'Operation not supported')
524 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
525 try:
526 orig_listxattr = os.listxattr
527 os.listxattr = _raise_on_src
528 shutil._copyxattr(src, dst)
529 finally:
530 os.listxattr = orig_listxattr
Antoine Pitrou424246f2012-05-12 19:02:01 +0200531
Larry Hastingsad5ae042012-07-14 17:55:11 -0700532 # test that shutil.copystat copies xattrs
533 src = os.path.join(tmp_dir, 'the_original')
534 write_file(src, src)
535 os.setxattr(src, 'user.the_value', b'fiddly')
536 dst = os.path.join(tmp_dir, 'the_copy')
537 write_file(dst, dst)
538 shutil.copystat(src, dst)
Hynek Schlawackc2d481f2012-07-16 17:11:10 +0200539 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700540
Antoine Pitrou424246f2012-05-12 19:02:01 +0200541 @support.skip_unless_symlink
542 @support.skip_unless_xattr
543 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
544 'root privileges required')
545 def test_copyxattr_symlinks(self):
546 # On Linux, it's only possible to access non-user xattr for symlinks;
547 # which in turn require root privileges. This test should be expanded
548 # as soon as other platforms gain support for extended attributes.
549 tmp_dir = self.mkdtemp()
550 src = os.path.join(tmp_dir, 'foo')
551 src_link = os.path.join(tmp_dir, 'baz')
552 write_file(src, 'foo')
553 os.symlink(src, src_link)
554 os.setxattr(src, 'trusted.foo', b'42')
Larry Hastings9cf065c2012-06-22 16:30:09 -0700555 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200556 dst = os.path.join(tmp_dir, 'bar')
557 dst_link = os.path.join(tmp_dir, 'qux')
558 write_file(dst, 'bar')
559 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700560 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700561 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
Antoine Pitrou424246f2012-05-12 19:02:01 +0200562 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
Larry Hastingsb4038062012-07-15 10:57:38 -0700563 shutil._copyxattr(src_link, dst, follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200564 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
565
Antoine Pitrou78091e62011-12-29 18:54:15 +0100566 @support.skip_unless_symlink
567 def test_copy_symlinks(self):
568 tmp_dir = self.mkdtemp()
569 src = os.path.join(tmp_dir, 'foo')
570 dst = os.path.join(tmp_dir, 'bar')
571 src_link = os.path.join(tmp_dir, 'baz')
572 write_file(src, 'foo')
573 os.symlink(src, src_link)
574 if hasattr(os, 'lchmod'):
575 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
576 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700577 shutil.copy(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100578 self.assertFalse(os.path.islink(dst))
579 self.assertEqual(read_file(src), read_file(dst))
580 os.remove(dst)
581 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700582 shutil.copy(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100583 self.assertTrue(os.path.islink(dst))
584 self.assertEqual(os.readlink(dst), os.readlink(src_link))
585 if hasattr(os, 'lchmod'):
586 self.assertEqual(os.lstat(src_link).st_mode,
587 os.lstat(dst).st_mode)
588
589 @support.skip_unless_symlink
590 def test_copy2_symlinks(self):
591 tmp_dir = self.mkdtemp()
592 src = os.path.join(tmp_dir, 'foo')
593 dst = os.path.join(tmp_dir, 'bar')
594 src_link = os.path.join(tmp_dir, 'baz')
595 write_file(src, 'foo')
596 os.symlink(src, src_link)
597 if hasattr(os, 'lchmod'):
598 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
599 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
600 os.lchflags(src_link, stat.UF_NODUMP)
601 src_stat = os.stat(src)
602 src_link_stat = os.lstat(src_link)
603 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700604 shutil.copy2(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100605 self.assertFalse(os.path.islink(dst))
606 self.assertEqual(read_file(src), read_file(dst))
607 os.remove(dst)
608 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700609 shutil.copy2(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100610 self.assertTrue(os.path.islink(dst))
611 self.assertEqual(os.readlink(dst), os.readlink(src_link))
612 dst_stat = os.lstat(dst)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700613 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100614 for attr in 'st_atime', 'st_mtime':
615 # The modification times may be truncated in the new file.
616 self.assertLessEqual(getattr(src_link_stat, attr),
617 getattr(dst_stat, attr) + 1)
618 if hasattr(os, 'lchmod'):
619 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
620 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
621 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
622 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
623
Antoine Pitrou424246f2012-05-12 19:02:01 +0200624 @support.skip_unless_xattr
625 def test_copy2_xattr(self):
626 tmp_dir = self.mkdtemp()
627 src = os.path.join(tmp_dir, 'foo')
628 dst = os.path.join(tmp_dir, 'bar')
629 write_file(src, 'foo')
630 os.setxattr(src, 'user.foo', b'42')
631 shutil.copy2(src, dst)
632 self.assertEqual(
633 os.getxattr(src, 'user.foo'),
634 os.getxattr(dst, 'user.foo'))
635 os.remove(dst)
636
Antoine Pitrou78091e62011-12-29 18:54:15 +0100637 @support.skip_unless_symlink
638 def test_copyfile_symlinks(self):
639 tmp_dir = self.mkdtemp()
640 src = os.path.join(tmp_dir, 'src')
641 dst = os.path.join(tmp_dir, 'dst')
642 dst_link = os.path.join(tmp_dir, 'dst_link')
643 link = os.path.join(tmp_dir, 'link')
644 write_file(src, 'foo')
645 os.symlink(src, link)
646 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700647 shutil.copyfile(link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100648 self.assertTrue(os.path.islink(dst_link))
649 self.assertEqual(os.readlink(link), os.readlink(dst_link))
650 # follow
651 shutil.copyfile(link, dst)
652 self.assertFalse(os.path.islink(dst))
653
Hynek Schlawack2100b422012-06-23 20:28:32 +0200654 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200655 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
656 os.supports_dir_fd and
657 os.listdir in os.supports_fd and
658 os.stat in os.supports_follow_symlinks)
659 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200660 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000661 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200662 tmp_dir = self.mkdtemp()
663 d = os.path.join(tmp_dir, 'a')
664 os.mkdir(d)
665 try:
666 real_rmtree = shutil._rmtree_safe_fd
667 class Called(Exception): pass
668 def _raiser(*args, **kwargs):
669 raise Called
670 shutil._rmtree_safe_fd = _raiser
671 self.assertRaises(Called, shutil.rmtree, d)
672 finally:
673 shutil._rmtree_safe_fd = real_rmtree
674 else:
675 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000676 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200677
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000678 def test_rmtree_dont_delete_file(self):
679 # When called on a file instead of a directory, don't delete it.
680 handle, path = tempfile.mkstemp()
Victor Stinnerbf816222011-06-30 23:25:47 +0200681 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200682 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000683 os.remove(path)
684
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000685 def test_copytree_simple(self):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000686 src_dir = tempfile.mkdtemp()
687 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200688 self.addCleanup(shutil.rmtree, src_dir)
689 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
690 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000691 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200692 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000693
Éric Araujoa7e33a12011-08-12 19:51:35 +0200694 shutil.copytree(src_dir, dst_dir)
695 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
696 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
697 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
698 'test.txt')))
699 actual = read_file((dst_dir, 'test.txt'))
700 self.assertEqual(actual, '123')
701 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
702 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000703
jab9e00d9e2018-12-28 13:03:40 -0500704 def test_copytree_dirs_exist_ok(self):
705 src_dir = tempfile.mkdtemp()
706 dst_dir = tempfile.mkdtemp()
707 self.addCleanup(shutil.rmtree, src_dir)
708 self.addCleanup(shutil.rmtree, dst_dir)
709
710 write_file((src_dir, 'nonexisting.txt'), '123')
711 os.mkdir(os.path.join(src_dir, 'existing_dir'))
712 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
713 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
714 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
715
716 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
717 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
718 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
719 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
720 'existing.txt')))
721 actual = read_file((dst_dir, 'nonexisting.txt'))
722 self.assertEqual(actual, '123')
723 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
724 self.assertEqual(actual, 'has been replaced')
725
726 with self.assertRaises(FileExistsError):
727 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
728
Antoine Pitrou78091e62011-12-29 18:54:15 +0100729 @support.skip_unless_symlink
730 def test_copytree_symlinks(self):
731 tmp_dir = self.mkdtemp()
732 src_dir = os.path.join(tmp_dir, 'src')
733 dst_dir = os.path.join(tmp_dir, 'dst')
734 sub_dir = os.path.join(src_dir, 'sub')
735 os.mkdir(src_dir)
736 os.mkdir(sub_dir)
737 write_file((src_dir, 'file.txt'), 'foo')
738 src_link = os.path.join(sub_dir, 'link')
739 dst_link = os.path.join(dst_dir, 'sub/link')
740 os.symlink(os.path.join(src_dir, 'file.txt'),
741 src_link)
742 if hasattr(os, 'lchmod'):
743 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
744 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
745 os.lchflags(src_link, stat.UF_NODUMP)
746 src_stat = os.lstat(src_link)
747 shutil.copytree(src_dir, dst_dir, symlinks=True)
748 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
749 self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
750 os.path.join(src_dir, 'file.txt'))
751 dst_stat = os.lstat(dst_link)
752 if hasattr(os, 'lchmod'):
753 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
754 if hasattr(os, 'lchflags'):
755 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
756
Georg Brandl2ee470f2008-07-16 12:55:28 +0000757 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000758 # creating data
759 join = os.path.join
760 exists = os.path.exists
761 src_dir = tempfile.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000762 try:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000763 dst_dir = join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200764 write_file((src_dir, 'test.txt'), '123')
765 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000766 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200767 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000768 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200769 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000770 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
771 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200772 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
773 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000774
775 # testing glob-like patterns
776 try:
777 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
778 shutil.copytree(src_dir, dst_dir, ignore=patterns)
779 # checking the result: some elements should not be copied
780 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200781 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
782 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000783 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200784 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000785 try:
786 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
787 shutil.copytree(src_dir, dst_dir, ignore=patterns)
788 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200789 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
790 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
791 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000792 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200793 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000794
795 # testing callable-style
796 try:
797 def _filter(src, names):
798 res = []
799 for name in names:
800 path = os.path.join(src, name)
801
802 if (os.path.isdir(path) and
803 path.split()[-1] == 'subdir'):
804 res.append(name)
805 elif os.path.splitext(path)[-1] in ('.py'):
806 res.append(name)
807 return res
808
809 shutil.copytree(src_dir, dst_dir, ignore=_filter)
810
811 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200812 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
813 'test.py')))
814 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000815
816 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200817 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000818 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000819 shutil.rmtree(src_dir)
820 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000821
Antoine Pitrouac601602013-08-16 19:35:02 +0200822 def test_copytree_retains_permissions(self):
823 tmp_dir = tempfile.mkdtemp()
824 src_dir = os.path.join(tmp_dir, 'source')
825 os.mkdir(src_dir)
826 dst_dir = os.path.join(tmp_dir, 'destination')
827 self.addCleanup(shutil.rmtree, tmp_dir)
828
829 os.chmod(src_dir, 0o777)
830 write_file((src_dir, 'permissive.txt'), '123')
831 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
832 write_file((src_dir, 'restrictive.txt'), '456')
833 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
834 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
835 os.chmod(restrictive_subdir, 0o600)
836
837 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400838 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
839 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200840 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400841 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200842 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
843 restrictive_subdir_dst = os.path.join(dst_dir,
844 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400845 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200846 os.stat(restrictive_subdir_dst).st_mode)
847
Berker Peksag884afd92014-12-10 02:50:32 +0200848 @unittest.mock.patch('os.chmod')
849 def test_copytree_winerror(self, mock_patch):
850 # When copying to VFAT, copystat() raises OSError. On Windows, the
851 # exception object has a meaningful 'winerror' attribute, but not
852 # on other operating systems. Do not assume 'winerror' is set.
853 src_dir = tempfile.mkdtemp()
854 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
855 self.addCleanup(shutil.rmtree, src_dir)
856 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
857
858 mock_patch.side_effect = PermissionError('ka-boom')
859 with self.assertRaises(shutil.Error):
860 shutil.copytree(src_dir, dst_dir)
861
Giampaolo Rodolac606a9c2019-02-26 12:04:41 +0100862 def test_copytree_custom_copy_function(self):
863 # See: https://bugs.python.org/issue35648
864 def custom_cpfun(a, b):
865 flag.append(None)
866 self.assertIsInstance(a, str)
867 self.assertIsInstance(b, str)
868 self.assertEqual(a, os.path.join(src, 'foo'))
869 self.assertEqual(b, os.path.join(dst, 'foo'))
870
871 flag = []
872 src = tempfile.mkdtemp()
873 dst = tempfile.mktemp()
874 self.addCleanup(shutil.rmtree, src)
875 with open(os.path.join(src, 'foo'), 'w') as f:
876 f.close()
877 shutil.copytree(src, dst, copy_function=custom_cpfun)
878 self.assertEqual(len(flag), 1)
879
Zachary Ware9fe6d862013-12-08 00:20:35 -0600880 @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000881 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000882 def test_dont_copy_file_onto_link_to_itself(self):
883 # bug 851123.
884 os.mkdir(TESTFN)
885 src = os.path.join(TESTFN, 'cheese')
886 dst = os.path.join(TESTFN, 'shop')
887 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000888 with open(src, 'w') as f:
889 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +0100890 try:
891 os.link(src, dst)
892 except PermissionError as e:
893 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +0200894 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000895 with open(src, 'r') as f:
896 self.assertEqual(f.read(), 'cheddar')
897 os.remove(dst)
898 finally:
899 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000900
Brian Curtin3b4499c2010-12-28 14:31:47 +0000901 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000902 def test_dont_copy_file_onto_symlink_to_itself(self):
903 # bug 851123.
904 os.mkdir(TESTFN)
905 src = os.path.join(TESTFN, 'cheese')
906 dst = os.path.join(TESTFN, 'shop')
907 try:
908 with open(src, 'w') as f:
909 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000910 # Using `src` here would mean we end up with a symlink pointing
911 # to TESTFN/TESTFN/cheese, while it should point at
912 # TESTFN/cheese.
913 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +0200914 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +0000915 with open(src, 'r') as f:
916 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000917 os.remove(dst)
918 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000919 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +0000920
Brian Curtin3b4499c2010-12-28 14:31:47 +0000921 @support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +0000922 def test_rmtree_on_symlink(self):
923 # bug 1669.
924 os.mkdir(TESTFN)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000925 try:
Brian Curtind40e6f72010-07-08 21:39:08 +0000926 src = os.path.join(TESTFN, 'cheese')
927 dst = os.path.join(TESTFN, 'shop')
928 os.mkdir(src)
929 os.symlink(src, dst)
930 self.assertRaises(OSError, shutil.rmtree, dst)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200931 shutil.rmtree(dst, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000932 finally:
Brian Curtind40e6f72010-07-08 21:39:08 +0000933 shutil.rmtree(TESTFN, ignore_errors=True)
934
Serhiy Storchaka43767632013-11-03 21:31:38 +0200935 # Issue #3002: copyfile and copytree block indefinitely on named pipes
936 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
937 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +0100938 try:
939 os.mkfifo(TESTFN)
940 except PermissionError as e:
941 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200942 try:
943 self.assertRaises(shutil.SpecialFileError,
944 shutil.copyfile, TESTFN, TESTFN2)
945 self.assertRaises(shutil.SpecialFileError,
946 shutil.copyfile, __file__, TESTFN)
947 finally:
948 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000949
Serhiy Storchaka43767632013-11-03 21:31:38 +0200950 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
951 @support.skip_unless_symlink
952 def test_copytree_named_pipe(self):
953 os.mkdir(TESTFN)
954 try:
955 subdir = os.path.join(TESTFN, "subdir")
956 os.mkdir(subdir)
957 pipe = os.path.join(subdir, "mypipe")
xdegaye92c2ca72017-11-12 17:31:07 +0100958 try:
959 os.mkfifo(pipe)
960 except PermissionError as e:
961 self.skipTest('os.mkfifo(): %s' % e)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000962 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200963 shutil.copytree(TESTFN, TESTFN2)
964 except shutil.Error as e:
965 errors = e.args[0]
966 self.assertEqual(len(errors), 1)
967 src, dst, error_msg = errors[0]
968 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
969 else:
970 self.fail("shutil.Error should have been raised")
971 finally:
972 shutil.rmtree(TESTFN, ignore_errors=True)
973 shutil.rmtree(TESTFN2, ignore_errors=True)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000974
Tarek Ziadé5340db32010-04-19 22:30:51 +0000975 def test_copytree_special_func(self):
976
977 src_dir = self.mkdtemp()
978 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200979 write_file((src_dir, 'test.txt'), '123')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000980 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200981 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000982
983 copied = []
984 def _copy(src, dst):
985 copied.append((src, dst))
986
987 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000988 self.assertEqual(len(copied), 2)
Tarek Ziadé5340db32010-04-19 22:30:51 +0000989
Brian Curtin3b4499c2010-12-28 14:31:47 +0000990 @support.skip_unless_symlink
Tarek Ziadéfb437512010-04-20 08:57:33 +0000991 def test_copytree_dangling_symlinks(self):
992
993 # a dangling symlink raises an error at the end
994 src_dir = self.mkdtemp()
995 dst_dir = os.path.join(self.mkdtemp(), 'destination')
996 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
997 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200998 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadéfb437512010-04-20 08:57:33 +0000999 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
1000
1001 # a dangling symlink is ignored with the proper flag
1002 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1003 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
1004 self.assertNotIn('test.txt', os.listdir(dst_dir))
1005
1006 # a dangling symlink is copied if symlinks=True
1007 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
1008 shutil.copytree(src_dir, dst_dir, symlinks=True)
1009 self.assertIn('test.txt', os.listdir(dst_dir))
1010
Berker Peksag5a294d82015-07-25 14:53:48 +03001011 @support.skip_unless_symlink
1012 def test_copytree_symlink_dir(self):
1013 src_dir = self.mkdtemp()
1014 dst_dir = os.path.join(self.mkdtemp(), 'destination')
1015 os.mkdir(os.path.join(src_dir, 'real_dir'))
1016 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
1017 pass
1018 os.symlink(os.path.join(src_dir, 'real_dir'),
1019 os.path.join(src_dir, 'link_to_dir'),
1020 target_is_directory=True)
1021
1022 shutil.copytree(src_dir, dst_dir, symlinks=False)
1023 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1024 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1025
1026 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1027 shutil.copytree(src_dir, dst_dir, symlinks=True)
1028 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1029 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1030
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001031 def _copy_file(self, method):
1032 fname = 'test.txt'
1033 tmpdir = self.mkdtemp()
Éric Araujoa7e33a12011-08-12 19:51:35 +02001034 write_file((tmpdir, fname), 'xxx')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001035 file1 = os.path.join(tmpdir, fname)
1036 tmpdir2 = self.mkdtemp()
1037 method(file1, tmpdir2)
1038 file2 = os.path.join(tmpdir2, fname)
1039 return (file1, file2)
1040
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001041 def test_copy(self):
1042 # Ensure that the copied file exists and has the same mode bits.
1043 file1, file2 = self._copy_file(shutil.copy)
1044 self.assertTrue(os.path.exists(file2))
1045 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1046
Senthil Kumaran0c2dba52011-07-03 18:21:38 -07001047 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001048 def test_copy2(self):
1049 # Ensure that the copied file exists and has the same mode and
1050 # modification time bits.
1051 file1, file2 = self._copy_file(shutil.copy2)
1052 self.assertTrue(os.path.exists(file2))
1053 file1_stat = os.stat(file1)
1054 file2_stat = os.stat(file2)
1055 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1056 for attr in 'st_atime', 'st_mtime':
1057 # The modification times may be truncated in the new file.
1058 self.assertLessEqual(getattr(file1_stat, attr),
1059 getattr(file2_stat, attr) + 1)
1060 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1061 self.assertEqual(getattr(file1_stat, 'st_flags'),
1062 getattr(file2_stat, 'st_flags'))
1063
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001064 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001065 def test_make_tarball(self):
1066 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001067 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001068
1069 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001070 # force shutil to create the directory
1071 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001072 # working with relative paths
1073 work_dir = os.path.dirname(tmpdir2)
1074 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001075
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001076 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001077 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001078 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001079
1080 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001081 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001082 self.assertTrue(os.path.isfile(tarball))
1083 self.assertTrue(tarfile.is_tarfile(tarball))
1084 with tarfile.open(tarball, 'r:gz') as tf:
1085 self.assertCountEqual(tf.getnames(),
1086 ['.', './sub', './sub2',
1087 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001088
1089 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001090 with support.change_cwd(work_dir):
1091 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001092 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001093 self.assertTrue(os.path.isfile(tarball))
1094 self.assertTrue(tarfile.is_tarfile(tarball))
1095 with tarfile.open(tarball, 'r') as tf:
1096 self.assertCountEqual(tf.getnames(),
1097 ['.', './sub', './sub2',
1098 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001099
1100 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001101 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001102 names = tar.getnames()
1103 names.sort()
1104 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001105
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001106 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001107 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001108 root_dir = self.mkdtemp()
1109 dist = os.path.join(root_dir, base_dir)
1110 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001111 write_file((dist, 'file1'), 'xxx')
1112 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001113 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001114 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001115 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001116 if base_dir:
1117 write_file((root_dir, 'outer'), 'xxx')
1118 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001119
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001120 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001121 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001122 'Need the tar command to run')
1123 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001124 root_dir, base_dir = self._create_files()
1125 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001126 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001127
1128 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001129 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001130 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001131
1132 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001133 tarball2 = os.path.join(root_dir, 'archive2.tar')
1134 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001135 subprocess.check_call(tar_cmd, cwd=root_dir,
1136 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001137
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001138 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001139 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001140 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001141
1142 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001143 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1144 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001145 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001146
1147 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001148 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1149 dry_run=True)
1150 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001151 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001152
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001153 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001154 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001155 # creating something to zip
1156 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001157
1158 tmpdir2 = self.mkdtemp()
1159 # force shutil to create the directory
1160 os.rmdir(tmpdir2)
1161 # working with relative paths
1162 work_dir = os.path.dirname(tmpdir2)
1163 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001164
1165 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001166 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001167 res = make_archive(rel_base_name, 'zip', root_dir)
1168
1169 self.assertEqual(res, base_name + '.zip')
1170 self.assertTrue(os.path.isfile(res))
1171 self.assertTrue(zipfile.is_zipfile(res))
1172 with zipfile.ZipFile(res) as zf:
1173 self.assertCountEqual(zf.namelist(),
1174 ['dist/', 'dist/sub/', 'dist/sub2/',
1175 'dist/file1', 'dist/file2', 'dist/sub/file3',
1176 'outer'])
1177
1178 with support.change_cwd(work_dir):
1179 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001180 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001181
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001182 self.assertEqual(res, base_name + '.zip')
1183 self.assertTrue(os.path.isfile(res))
1184 self.assertTrue(zipfile.is_zipfile(res))
1185 with zipfile.ZipFile(res) as zf:
1186 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001187 ['dist/', 'dist/sub/', 'dist/sub2/',
1188 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001189
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001190 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001191 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001192 'Need the zip command to run')
1193 def test_zipfile_vs_zip(self):
1194 root_dir, base_dir = self._create_files()
1195 base_name = os.path.join(self.mkdtemp(), 'archive')
1196 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1197
1198 # check if ZIP file was created
1199 self.assertEqual(archive, base_name + '.zip')
1200 self.assertTrue(os.path.isfile(archive))
1201
1202 # now create another ZIP file using `zip`
1203 archive2 = os.path.join(root_dir, 'archive2.zip')
1204 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001205 subprocess.check_call(zip_cmd, cwd=root_dir,
1206 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001207
1208 self.assertTrue(os.path.isfile(archive2))
1209 # let's compare both ZIP files
1210 with zipfile.ZipFile(archive) as zf:
1211 names = zf.namelist()
1212 with zipfile.ZipFile(archive2) as zf:
1213 names2 = zf.namelist()
1214 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001215
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001216 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001217 @unittest.skipUnless(shutil.which('unzip'),
1218 'Need the unzip command to run')
1219 def test_unzip_zipfile(self):
1220 root_dir, base_dir = self._create_files()
1221 base_name = os.path.join(self.mkdtemp(), 'archive')
1222 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1223
1224 # check if ZIP file was created
1225 self.assertEqual(archive, base_name + '.zip')
1226 self.assertTrue(os.path.isfile(archive))
1227
1228 # now check the ZIP file using `unzip -t`
1229 zip_cmd = ['unzip', '-t', archive]
1230 with support.change_cwd(root_dir):
1231 try:
1232 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1233 except subprocess.CalledProcessError as exc:
1234 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001235 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001236 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001237 msg = "{}\n\n**Unzip Output**\n{}"
1238 self.fail(msg.format(exc, details))
1239
Tarek Ziadé396fad72010-02-23 05:30:31 +00001240 def test_make_archive(self):
1241 tmpdir = self.mkdtemp()
1242 base_name = os.path.join(tmpdir, 'archive')
1243 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1244
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001245 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001246 def test_make_archive_owner_group(self):
1247 # testing make_archive with owner and group, with various combinations
1248 # this works even if there's not gid/uid support
1249 if UID_GID_SUPPORT:
1250 group = grp.getgrgid(0)[0]
1251 owner = pwd.getpwuid(0)[0]
1252 else:
1253 group = owner = 'root'
1254
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001255 root_dir, base_dir = self._create_files()
1256 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001257 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1258 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001259 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001260
1261 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001262 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001263
1264 res = make_archive(base_name, 'tar', root_dir, base_dir,
1265 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001266 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001267
1268 res = make_archive(base_name, 'tar', root_dir, base_dir,
1269 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001270 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001271
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001272
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001273 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001274 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1275 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001276 root_dir, base_dir = self._create_files()
1277 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001278 group = grp.getgrgid(0)[0]
1279 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001280 with support.change_cwd(root_dir):
1281 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1282 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001283
1284 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001285 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001286
1287 # now checks the rights
1288 archive = tarfile.open(archive_name)
1289 try:
1290 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001291 self.assertEqual(member.uid, 0)
1292 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001293 finally:
1294 archive.close()
1295
1296 def test_make_archive_cwd(self):
1297 current_dir = os.getcwd()
1298 def _breaks(*args, **kw):
1299 raise RuntimeError()
1300
1301 register_archive_format('xxx', _breaks, [], 'xxx file')
1302 try:
1303 try:
1304 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1305 except Exception:
1306 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001307 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001308 finally:
1309 unregister_archive_format('xxx')
1310
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001311 def test_make_tarfile_in_curdir(self):
1312 # Issue #21280
1313 root_dir = self.mkdtemp()
1314 with support.change_cwd(root_dir):
1315 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1316 self.assertTrue(os.path.isfile('test.tar'))
1317
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001318 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001319 def test_make_zipfile_in_curdir(self):
1320 # Issue #21280
1321 root_dir = self.mkdtemp()
1322 with support.change_cwd(root_dir):
1323 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1324 self.assertTrue(os.path.isfile('test.zip'))
1325
Tarek Ziadé396fad72010-02-23 05:30:31 +00001326 def test_register_archive_format(self):
1327
1328 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1329 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1330 1)
1331 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1332 [(1, 2), (1, 2, 3)])
1333
1334 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1335 formats = [name for name, params in get_archive_formats()]
1336 self.assertIn('xxx', formats)
1337
1338 unregister_archive_format('xxx')
1339 formats = [name for name, params in get_archive_formats()]
1340 self.assertNotIn('xxx', formats)
1341
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001342 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001343 self.check_unpack_archive_with_converter(format, lambda path: path)
1344 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001345 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001346
1347 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001348 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001349 expected = rlistdir(root_dir)
1350 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001351
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001352 base_name = os.path.join(self.mkdtemp(), 'archive')
1353 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001354
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001355 # let's try to unpack it now
1356 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001357 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001358 self.assertEqual(rlistdir(tmpdir2), expected)
1359
1360 # and again, this time with the format specified
1361 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001362 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001363 self.assertEqual(rlistdir(tmpdir3), expected)
1364
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001365 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1366 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001367
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001368 def test_unpack_archive_tar(self):
1369 self.check_unpack_archive('tar')
1370
1371 @support.requires_zlib
1372 def test_unpack_archive_gztar(self):
1373 self.check_unpack_archive('gztar')
1374
1375 @support.requires_bz2
1376 def test_unpack_archive_bztar(self):
1377 self.check_unpack_archive('bztar')
1378
1379 @support.requires_lzma
Michael Feltef110b12019-02-18 12:02:44 +01001380 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001381 def test_unpack_archive_xztar(self):
1382 self.check_unpack_archive('xztar')
1383
1384 @support.requires_zlib
1385 def test_unpack_archive_zip(self):
1386 self.check_unpack_archive('zip')
1387
Martin Pantereb995702016-07-28 01:11:04 +00001388 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001389
1390 formats = get_unpack_formats()
1391
1392 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001393 self.assertEqual(extra, 1)
1394 self.assertEqual(filename, 'stuff.boo')
1395 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001396
1397 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1398 unpack_archive('stuff.boo', 'xx')
1399
1400 # trying to register a .boo unpacker again
1401 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1402 ['.boo'], _boo)
1403
1404 # should work now
1405 unregister_unpack_format('Boo')
1406 register_unpack_format('Boo2', ['.boo'], _boo)
1407 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1408 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1409
1410 # let's leave a clean state
1411 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001412 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001413
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001414 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1415 "disk_usage not available on this platform")
1416 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001417 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001418 for attr in ('total', 'used', 'free'):
1419 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001420 self.assertGreater(usage.total, 0)
1421 self.assertGreater(usage.used, 0)
1422 self.assertGreaterEqual(usage.free, 0)
1423 self.assertGreaterEqual(usage.total, usage.used)
1424 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001425
Victor Stinnerdc525f42018-12-11 12:05:21 +01001426 # bpo-32557: Check that disk_usage() also accepts a filename
1427 shutil.disk_usage(__file__)
1428
Sandro Tosid902a142011-08-22 23:28:27 +02001429 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1430 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1431 def test_chown(self):
1432
1433 # cleaned-up automatically by TestShutil.tearDown method
1434 dirname = self.mkdtemp()
1435 filename = tempfile.mktemp(dir=dirname)
1436 write_file(filename, 'testing chown function')
1437
1438 with self.assertRaises(ValueError):
1439 shutil.chown(filename)
1440
1441 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001442 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001443
1444 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001445 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001446
1447 with self.assertRaises(TypeError):
1448 shutil.chown(filename, b'spam')
1449
1450 with self.assertRaises(TypeError):
1451 shutil.chown(filename, 3.14)
1452
1453 uid = os.getuid()
1454 gid = os.getgid()
1455
1456 def check_chown(path, uid=None, gid=None):
1457 s = os.stat(filename)
1458 if uid is not None:
1459 self.assertEqual(uid, s.st_uid)
1460 if gid is not None:
1461 self.assertEqual(gid, s.st_gid)
1462
1463 shutil.chown(filename, uid, gid)
1464 check_chown(filename, uid, gid)
1465 shutil.chown(filename, uid)
1466 check_chown(filename, uid)
1467 shutil.chown(filename, user=uid)
1468 check_chown(filename, uid)
1469 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001470 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001471
1472 shutil.chown(dirname, uid, gid)
1473 check_chown(dirname, uid, gid)
1474 shutil.chown(dirname, uid)
1475 check_chown(dirname, uid)
1476 shutil.chown(dirname, user=uid)
1477 check_chown(dirname, uid)
1478 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001479 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001480
1481 user = pwd.getpwuid(uid)[0]
1482 group = grp.getgrgid(gid)[0]
1483 shutil.chown(filename, user, group)
1484 check_chown(filename, uid, gid)
1485 shutil.chown(dirname, user, group)
1486 check_chown(dirname, uid, gid)
1487
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001488 def test_copy_return_value(self):
1489 # copy and copy2 both return their destination path.
1490 for fn in (shutil.copy, shutil.copy2):
1491 src_dir = self.mkdtemp()
1492 dst_dir = self.mkdtemp()
1493 src = os.path.join(src_dir, 'foo')
1494 write_file(src, 'foo')
1495 rv = fn(src, dst_dir)
1496 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1497 rv = fn(src, os.path.join(dst_dir, 'bar'))
1498 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1499
1500 def test_copyfile_return_value(self):
1501 # copytree returns its destination path.
1502 src_dir = self.mkdtemp()
1503 dst_dir = self.mkdtemp()
1504 dst_file = os.path.join(dst_dir, 'bar')
1505 src_file = os.path.join(src_dir, 'foo')
1506 write_file(src_file, 'foo')
1507 rv = shutil.copyfile(src_file, dst_file)
1508 self.assertTrue(os.path.exists(rv))
1509 self.assertEqual(read_file(src_file), read_file(dst_file))
1510
Hynek Schlawack48653762012-10-07 12:49:58 +02001511 def test_copyfile_same_file(self):
1512 # copyfile() should raise SameFileError if the source and destination
1513 # are the same.
1514 src_dir = self.mkdtemp()
1515 src_file = os.path.join(src_dir, 'foo')
1516 write_file(src_file, 'foo')
1517 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
Hynek Schlawack27ddb572012-10-28 13:59:27 +01001518 # But Error should work too, to stay backward compatible.
1519 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001520 # Make sure file is not corrupted.
1521 self.assertEqual(read_file(src_file), 'foo')
Hynek Schlawack48653762012-10-07 12:49:58 +02001522
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001523 def test_copytree_return_value(self):
1524 # copytree returns its destination path.
1525 src_dir = self.mkdtemp()
1526 dst_dir = src_dir + "dest"
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001527 self.addCleanup(shutil.rmtree, dst_dir, True)
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001528 src = os.path.join(src_dir, 'foo')
1529 write_file(src, 'foo')
1530 rv = shutil.copytree(src_dir, dst_dir)
1531 self.assertEqual(['foo'], os.listdir(rv))
1532
Christian Heimes9bd667a2008-01-20 15:14:11 +00001533
Brian Curtinc57a3452012-06-22 16:00:30 -05001534class TestWhich(unittest.TestCase):
1535
1536 def setUp(self):
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001537 self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001538 self.addCleanup(shutil.rmtree, self.temp_dir, True)
Brian Curtinc57a3452012-06-22 16:00:30 -05001539 # Give the temp_file an ".exe" suffix for all.
1540 # It's needed on Windows and not harmful on other platforms.
1541 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001542 prefix="Tmp",
1543 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001544 os.chmod(self.temp_file.name, stat.S_IXUSR)
1545 self.addCleanup(self.temp_file.close)
1546 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001547 self.env_path = self.dir
1548 self.curdir = os.curdir
1549 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001550
1551 def test_basic(self):
1552 # Given an EXE in a directory, it should be returned.
1553 rv = shutil.which(self.file, path=self.dir)
1554 self.assertEqual(rv, self.temp_file.name)
1555
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001556 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001557 # When given the fully qualified path to an executable that exists,
1558 # it should be returned.
1559 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001560 self.assertEqual(rv, self.temp_file.name)
1561
1562 def test_relative_cmd(self):
1563 # When given the relative path with a directory part to an executable
1564 # that exists, it should be returned.
1565 base_dir, tail_dir = os.path.split(self.dir)
1566 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001567 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001568 rv = shutil.which(relpath, path=self.temp_dir)
1569 self.assertEqual(rv, relpath)
1570 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001571 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001572 rv = shutil.which(relpath, path=base_dir)
1573 self.assertIsNone(rv)
1574
1575 def test_cwd(self):
1576 # Issue #16957
1577 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001578 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001579 rv = shutil.which(self.file, path=base_dir)
1580 if sys.platform == "win32":
1581 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001582 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001583 else:
1584 # Other platforms: shouldn't match in the current directory.
1585 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001586
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001587 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1588 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001589 def test_non_matching_mode(self):
1590 # Set the file read-only and ask for writeable files.
1591 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001592 if os.access(self.temp_file.name, os.W_OK):
1593 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001594 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1595 self.assertIsNone(rv)
1596
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001597 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001598 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001599 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001600 rv = shutil.which(self.file, path=tail_dir)
1601 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001602
Brian Curtinc57a3452012-06-22 16:00:30 -05001603 def test_nonexistent_file(self):
1604 # Return None when no matching executable file is found on the path.
1605 rv = shutil.which("foo.exe", path=self.dir)
1606 self.assertIsNone(rv)
1607
1608 @unittest.skipUnless(sys.platform == "win32",
1609 "pathext check is Windows-only")
1610 def test_pathext_checking(self):
1611 # Ask for the file without the ".exe" extension, then ensure that
1612 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001613 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001614 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001615
Barry Warsaw618738b2013-04-16 11:05:03 -04001616 def test_environ_path(self):
1617 with support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001618 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001619 rv = shutil.which(self.file)
1620 self.assertEqual(rv, self.temp_file.name)
1621
1622 def test_empty_path(self):
1623 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001624 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001625 support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001626 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001627 rv = shutil.which(self.file, path='')
1628 self.assertIsNone(rv)
1629
1630 def test_empty_path_no_PATH(self):
1631 with support.EnvironmentVarGuard() as env:
1632 env.pop('PATH', None)
1633 rv = shutil.which(self.file)
1634 self.assertIsNone(rv)
1635
Brian Curtinc57a3452012-06-22 16:00:30 -05001636
Cheryl Sabella5680f652019-02-13 06:25:10 -05001637class TestWhichBytes(TestWhich):
1638 def setUp(self):
1639 TestWhich.setUp(self)
1640 self.dir = os.fsencode(self.dir)
1641 self.file = os.fsencode(self.file)
1642 self.temp_file.name = os.fsencode(self.temp_file.name)
1643 self.curdir = os.fsencode(self.curdir)
1644 self.ext = os.fsencode(self.ext)
1645
1646
Christian Heimesada8c3b2008-03-18 18:26:33 +00001647class TestMove(unittest.TestCase):
1648
1649 def setUp(self):
1650 filename = "foo"
1651 self.src_dir = tempfile.mkdtemp()
1652 self.dst_dir = tempfile.mkdtemp()
1653 self.src_file = os.path.join(self.src_dir, filename)
1654 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001655 with open(self.src_file, "wb") as f:
1656 f.write(b"spam")
1657
1658 def tearDown(self):
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001659 for d in (self.src_dir, self.dst_dir):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001660 try:
1661 if d:
1662 shutil.rmtree(d)
1663 except:
1664 pass
1665
1666 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001667 with open(src, "rb") as f:
1668 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001669 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001670 with open(real_dst, "rb") as f:
1671 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001672 self.assertFalse(os.path.exists(src))
1673
1674 def _check_move_dir(self, src, dst, real_dst):
1675 contents = sorted(os.listdir(src))
1676 shutil.move(src, dst)
1677 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1678 self.assertFalse(os.path.exists(src))
1679
1680 def test_move_file(self):
1681 # Move a file to another location on the same filesystem.
1682 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1683
1684 def test_move_file_to_dir(self):
1685 # Move a file inside an existing dir on the same filesystem.
1686 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1687
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001688 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001689 def test_move_file_other_fs(self):
1690 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001691 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001692
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001693 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001694 def test_move_file_to_dir_other_fs(self):
1695 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001696 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001697
1698 def test_move_dir(self):
1699 # Move a dir to another location on the same filesystem.
1700 dst_dir = tempfile.mktemp()
1701 try:
1702 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1703 finally:
1704 try:
1705 shutil.rmtree(dst_dir)
1706 except:
1707 pass
1708
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001709 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001710 def test_move_dir_other_fs(self):
1711 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001712 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001713
1714 def test_move_dir_to_dir(self):
1715 # Move a dir inside an existing dir on the same filesystem.
1716 self._check_move_dir(self.src_dir, self.dst_dir,
1717 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1718
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001719 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001720 def test_move_dir_to_dir_other_fs(self):
1721 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001722 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001723
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001724 def test_move_dir_sep_to_dir(self):
1725 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1726 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1727
1728 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1729 def test_move_dir_altsep_to_dir(self):
1730 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1731 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1732
Christian Heimesada8c3b2008-03-18 18:26:33 +00001733 def test_existing_file_inside_dest_dir(self):
1734 # A file with the same name inside the destination dir already exists.
1735 with open(self.dst_file, "wb"):
1736 pass
1737 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1738
1739 def test_dont_move_dir_in_itself(self):
1740 # Moving a dir inside itself raises an Error.
1741 dst = os.path.join(self.src_dir, "bar")
1742 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1743
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001744 def test_destinsrc_false_negative(self):
1745 os.mkdir(TESTFN)
1746 try:
1747 for src, dst in [('srcdir', 'srcdir/dest')]:
1748 src = os.path.join(TESTFN, src)
1749 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001750 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001751 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001752 'dst (%s) is not in src (%s)' % (dst, src))
1753 finally:
1754 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001755
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001756 def test_destinsrc_false_positive(self):
1757 os.mkdir(TESTFN)
1758 try:
1759 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1760 src = os.path.join(TESTFN, src)
1761 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001762 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001763 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001764 'dst (%s) is in src (%s)' % (dst, src))
1765 finally:
1766 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001767
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001768 @support.skip_unless_symlink
1769 @mock_rename
1770 def test_move_file_symlink(self):
1771 dst = os.path.join(self.src_dir, 'bar')
1772 os.symlink(self.src_file, dst)
1773 shutil.move(dst, self.dst_file)
1774 self.assertTrue(os.path.islink(self.dst_file))
1775 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1776
1777 @support.skip_unless_symlink
1778 @mock_rename
1779 def test_move_file_symlink_to_dir(self):
1780 filename = "bar"
1781 dst = os.path.join(self.src_dir, filename)
1782 os.symlink(self.src_file, dst)
1783 shutil.move(dst, self.dst_dir)
1784 final_link = os.path.join(self.dst_dir, filename)
1785 self.assertTrue(os.path.islink(final_link))
1786 self.assertTrue(os.path.samefile(self.src_file, final_link))
1787
1788 @support.skip_unless_symlink
1789 @mock_rename
1790 def test_move_dangling_symlink(self):
1791 src = os.path.join(self.src_dir, 'baz')
1792 dst = os.path.join(self.src_dir, 'bar')
1793 os.symlink(src, dst)
1794 dst_link = os.path.join(self.dst_dir, 'quux')
1795 shutil.move(dst, dst_link)
1796 self.assertTrue(os.path.islink(dst_link))
Antoine Pitrou3f48ac92014-01-01 02:50:45 +01001797 # On Windows, os.path.realpath does not follow symlinks (issue #9949)
1798 if os.name == 'nt':
1799 self.assertEqual(os.path.realpath(src), os.readlink(dst_link))
1800 else:
1801 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001802
1803 @support.skip_unless_symlink
1804 @mock_rename
1805 def test_move_dir_symlink(self):
1806 src = os.path.join(self.src_dir, 'baz')
1807 dst = os.path.join(self.src_dir, 'bar')
1808 os.mkdir(src)
1809 os.symlink(src, dst)
1810 dst_link = os.path.join(self.dst_dir, 'quux')
1811 shutil.move(dst, dst_link)
1812 self.assertTrue(os.path.islink(dst_link))
1813 self.assertTrue(os.path.samefile(src, dst_link))
1814
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001815 def test_move_return_value(self):
1816 rv = shutil.move(self.src_file, self.dst_dir)
1817 self.assertEqual(rv,
1818 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1819
1820 def test_move_as_rename_return_value(self):
1821 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1822 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1823
R David Murray6ffface2014-06-11 14:40:13 -04001824 @mock_rename
1825 def test_move_file_special_function(self):
1826 moved = []
1827 def _copy(src, dst):
1828 moved.append((src, dst))
1829 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1830 self.assertEqual(len(moved), 1)
1831
1832 @mock_rename
1833 def test_move_dir_special_function(self):
1834 moved = []
1835 def _copy(src, dst):
1836 moved.append((src, dst))
1837 support.create_empty_file(os.path.join(self.src_dir, 'child'))
1838 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1839 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1840 self.assertEqual(len(moved), 3)
1841
Tarek Ziadé5340db32010-04-19 22:30:51 +00001842
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001843class TestCopyFile(unittest.TestCase):
1844
1845 _delete = False
1846
1847 class Faux(object):
1848 _entered = False
1849 _exited_with = None
1850 _raised = False
1851 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
1852 self._raise_in_exit = raise_in_exit
1853 self._suppress_at_exit = suppress_at_exit
1854 def read(self, *args):
1855 return ''
1856 def __enter__(self):
1857 self._entered = True
1858 def __exit__(self, exc_type, exc_val, exc_tb):
1859 self._exited_with = exc_type, exc_val, exc_tb
1860 if self._raise_in_exit:
1861 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001862 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001863 return self._suppress_at_exit
1864
1865 def tearDown(self):
1866 if self._delete:
1867 del shutil.open
1868
1869 def _set_shutil_open(self, func):
1870 shutil.open = func
1871 self._delete = True
1872
1873 def test_w_source_open_fails(self):
1874 def _open(filename, mode='r'):
1875 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001876 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001877 assert 0 # shouldn't reach here.
1878
1879 self._set_shutil_open(_open)
1880
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001881 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001882
Victor Stinner937ee9e2018-06-26 02:11:06 +02001883 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001884 def test_w_dest_open_fails(self):
1885
1886 srcfile = self.Faux()
1887
1888 def _open(filename, mode='r'):
1889 if filename == 'srcfile':
1890 return srcfile
1891 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001892 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001893 assert 0 # shouldn't reach here.
1894
1895 self._set_shutil_open(_open)
1896
1897 shutil.copyfile('srcfile', 'destfile')
1898 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001899 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001900 self.assertEqual(srcfile._exited_with[1].args,
1901 ('Cannot open "destfile"',))
1902
Victor Stinner937ee9e2018-06-26 02:11:06 +02001903 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001904 def test_w_dest_close_fails(self):
1905
1906 srcfile = self.Faux()
1907 destfile = self.Faux(True)
1908
1909 def _open(filename, mode='r'):
1910 if filename == 'srcfile':
1911 return srcfile
1912 if filename == 'destfile':
1913 return destfile
1914 assert 0 # shouldn't reach here.
1915
1916 self._set_shutil_open(_open)
1917
1918 shutil.copyfile('srcfile', 'destfile')
1919 self.assertTrue(srcfile._entered)
1920 self.assertTrue(destfile._entered)
1921 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001922 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001923 self.assertEqual(srcfile._exited_with[1].args,
1924 ('Cannot close',))
1925
Victor Stinner937ee9e2018-06-26 02:11:06 +02001926 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001927 def test_w_source_close_fails(self):
1928
1929 srcfile = self.Faux(True)
1930 destfile = self.Faux()
1931
1932 def _open(filename, mode='r'):
1933 if filename == 'srcfile':
1934 return srcfile
1935 if filename == 'destfile':
1936 return destfile
1937 assert 0 # shouldn't reach here.
1938
1939 self._set_shutil_open(_open)
1940
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001941 self.assertRaises(OSError,
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001942 shutil.copyfile, 'srcfile', 'destfile')
1943 self.assertTrue(srcfile._entered)
1944 self.assertTrue(destfile._entered)
1945 self.assertFalse(destfile._raised)
1946 self.assertTrue(srcfile._exited_with[0] is None)
1947 self.assertTrue(srcfile._raised)
1948
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001949 def test_move_dir_caseinsensitive(self):
1950 # Renames a folder to the same name
1951 # but a different case.
1952
1953 self.src_dir = tempfile.mkdtemp()
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001954 self.addCleanup(shutil.rmtree, self.src_dir, True)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001955 dst_dir = os.path.join(
1956 os.path.dirname(self.src_dir),
1957 os.path.basename(self.src_dir).upper())
1958 self.assertNotEqual(self.src_dir, dst_dir)
1959
1960 try:
1961 shutil.move(self.src_dir, dst_dir)
1962 self.assertTrue(os.path.isdir(dst_dir))
1963 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +02001964 os.rmdir(dst_dir)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001965
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001966
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07001967class TestCopyFileObj(unittest.TestCase):
1968 FILESIZE = 2 * 1024 * 1024
1969
1970 @classmethod
1971 def setUpClass(cls):
1972 write_test_file(TESTFN, cls.FILESIZE)
1973
1974 @classmethod
1975 def tearDownClass(cls):
1976 support.unlink(TESTFN)
1977 support.unlink(TESTFN2)
1978
1979 def tearDown(self):
1980 support.unlink(TESTFN2)
1981
1982 @contextlib.contextmanager
1983 def get_files(self):
1984 with open(TESTFN, "rb") as src:
1985 with open(TESTFN2, "wb") as dst:
1986 yield (src, dst)
1987
1988 def assert_files_eq(self, src, dst):
1989 with open(src, 'rb') as fsrc:
1990 with open(dst, 'rb') as fdst:
1991 self.assertEqual(fsrc.read(), fdst.read())
1992
1993 def test_content(self):
1994 with self.get_files() as (src, dst):
1995 shutil.copyfileobj(src, dst)
1996 self.assert_files_eq(TESTFN, TESTFN2)
1997
1998 def test_file_not_closed(self):
1999 with self.get_files() as (src, dst):
2000 shutil.copyfileobj(src, dst)
2001 assert not src.closed
2002 assert not dst.closed
2003
2004 def test_file_offset(self):
2005 with self.get_files() as (src, dst):
2006 shutil.copyfileobj(src, dst)
2007 self.assertEqual(src.tell(), self.FILESIZE)
2008 self.assertEqual(dst.tell(), self.FILESIZE)
2009
2010 @unittest.skipIf(os.name != 'nt', "Windows only")
2011 def test_win_impl(self):
2012 # Make sure alternate Windows implementation is called.
2013 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2014 shutil.copyfile(TESTFN, TESTFN2)
2015 assert m.called
2016
2017 # File size is 2 MiB but max buf size should be 1 MiB.
2018 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2019
2020 # If file size < 1 MiB memoryview() length must be equal to
2021 # the actual file size.
2022 with tempfile.NamedTemporaryFile(delete=False) as f:
2023 f.write(b'foo')
2024 fname = f.name
2025 self.addCleanup(support.unlink, fname)
2026 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2027 shutil.copyfile(fname, TESTFN2)
2028 self.assertEqual(m.call_args[0][2], 3)
2029
2030 # Empty files should not rely on readinto() variant.
2031 with tempfile.NamedTemporaryFile(delete=False) as f:
2032 pass
2033 fname = f.name
2034 self.addCleanup(support.unlink, fname)
2035 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2036 shutil.copyfile(fname, TESTFN2)
2037 assert not m.called
2038 self.assert_files_eq(fname, TESTFN2)
2039
2040
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002041class _ZeroCopyFileTest(object):
2042 """Tests common to all zero-copy APIs."""
2043 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2044 FILEDATA = b""
2045 PATCHPOINT = ""
2046
2047 @classmethod
2048 def setUpClass(cls):
2049 write_test_file(TESTFN, cls.FILESIZE)
2050 with open(TESTFN, 'rb') as f:
2051 cls.FILEDATA = f.read()
2052 assert len(cls.FILEDATA) == cls.FILESIZE
2053
2054 @classmethod
2055 def tearDownClass(cls):
2056 support.unlink(TESTFN)
2057
2058 def tearDown(self):
2059 support.unlink(TESTFN2)
2060
2061 @contextlib.contextmanager
2062 def get_files(self):
2063 with open(TESTFN, "rb") as src:
2064 with open(TESTFN2, "wb") as dst:
2065 yield (src, dst)
2066
2067 def zerocopy_fun(self, *args, **kwargs):
2068 raise NotImplementedError("must be implemented in subclass")
2069
2070 def reset(self):
2071 self.tearDown()
2072 self.tearDownClass()
2073 self.setUpClass()
2074 self.setUp()
2075
2076 # ---
2077
2078 def test_regular_copy(self):
2079 with self.get_files() as (src, dst):
2080 self.zerocopy_fun(src, dst)
2081 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2082 # Make sure the fallback function is not called.
2083 with self.get_files() as (src, dst):
2084 with unittest.mock.patch('shutil.copyfileobj') as m:
2085 shutil.copyfile(TESTFN, TESTFN2)
2086 assert not m.called
2087
2088 def test_same_file(self):
2089 self.addCleanup(self.reset)
2090 with self.get_files() as (src, dst):
2091 with self.assertRaises(Exception):
2092 self.zerocopy_fun(src, src)
2093 # Make sure src file is not corrupted.
2094 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2095
2096 def test_non_existent_src(self):
2097 name = tempfile.mktemp()
2098 with self.assertRaises(FileNotFoundError) as cm:
2099 shutil.copyfile(name, "new")
2100 self.assertEqual(cm.exception.filename, name)
2101
2102 def test_empty_file(self):
2103 srcname = TESTFN + 'src'
2104 dstname = TESTFN + 'dst'
2105 self.addCleanup(lambda: support.unlink(srcname))
2106 self.addCleanup(lambda: support.unlink(dstname))
2107 with open(srcname, "wb"):
2108 pass
2109
2110 with open(srcname, "rb") as src:
2111 with open(dstname, "wb") as dst:
2112 self.zerocopy_fun(src, dst)
2113
2114 self.assertEqual(read_file(dstname, binary=True), b"")
2115
2116 def test_unhandled_exception(self):
2117 with unittest.mock.patch(self.PATCHPOINT,
2118 side_effect=ZeroDivisionError):
2119 self.assertRaises(ZeroDivisionError,
2120 shutil.copyfile, TESTFN, TESTFN2)
2121
2122 def test_exception_on_first_call(self):
2123 # Emulate a case where the first call to the zero-copy
2124 # function raises an exception in which case the function is
2125 # supposed to give up immediately.
2126 with unittest.mock.patch(self.PATCHPOINT,
2127 side_effect=OSError(errno.EINVAL, "yo")):
2128 with self.get_files() as (src, dst):
2129 with self.assertRaises(_GiveupOnFastCopy):
2130 self.zerocopy_fun(src, dst)
2131
2132 def test_filesystem_full(self):
2133 # Emulate a case where filesystem is full and sendfile() fails
2134 # on first call.
2135 with unittest.mock.patch(self.PATCHPOINT,
2136 side_effect=OSError(errno.ENOSPC, "yo")):
2137 with self.get_files() as (src, dst):
2138 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2139
2140
2141@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2142class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2143 PATCHPOINT = "os.sendfile"
2144
2145 def zerocopy_fun(self, fsrc, fdst):
2146 return shutil._fastcopy_sendfile(fsrc, fdst)
2147
2148 def test_non_regular_file_src(self):
2149 with io.BytesIO(self.FILEDATA) as src:
2150 with open(TESTFN2, "wb") as dst:
2151 with self.assertRaises(_GiveupOnFastCopy):
2152 self.zerocopy_fun(src, dst)
2153 shutil.copyfileobj(src, dst)
2154
2155 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2156
2157 def test_non_regular_file_dst(self):
2158 with open(TESTFN, "rb") as src:
2159 with io.BytesIO() as dst:
2160 with self.assertRaises(_GiveupOnFastCopy):
2161 self.zerocopy_fun(src, dst)
2162 shutil.copyfileobj(src, dst)
2163 dst.seek(0)
2164 self.assertEqual(dst.read(), self.FILEDATA)
2165
2166 def test_exception_on_second_call(self):
2167 def sendfile(*args, **kwargs):
2168 if not flag:
2169 flag.append(None)
2170 return orig_sendfile(*args, **kwargs)
2171 else:
2172 raise OSError(errno.EBADF, "yo")
2173
2174 flag = []
2175 orig_sendfile = os.sendfile
2176 with unittest.mock.patch('os.sendfile', create=True,
2177 side_effect=sendfile):
2178 with self.get_files() as (src, dst):
2179 with self.assertRaises(OSError) as cm:
2180 shutil._fastcopy_sendfile(src, dst)
2181 assert flag
2182 self.assertEqual(cm.exception.errno, errno.EBADF)
2183
2184 def test_cant_get_size(self):
2185 # Emulate a case where src file size cannot be determined.
2186 # Internally bufsize will be set to a small value and
2187 # sendfile() will be called repeatedly.
2188 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2189 with self.get_files() as (src, dst):
2190 shutil._fastcopy_sendfile(src, dst)
2191 assert m.called
2192 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2193
2194 def test_small_chunks(self):
2195 # Force internal file size detection to be smaller than the
2196 # actual file size. We want to force sendfile() to be called
2197 # multiple times, also in order to emulate a src fd which gets
2198 # bigger while it is being copied.
2199 mock = unittest.mock.Mock()
2200 mock.st_size = 65536 + 1
2201 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2202 with self.get_files() as (src, dst):
2203 shutil._fastcopy_sendfile(src, dst)
2204 assert m.called
2205 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2206
2207 def test_big_chunk(self):
2208 # Force internal file size detection to be +100MB bigger than
2209 # the actual file size. Make sure sendfile() does not rely on
2210 # file size value except for (maybe) a better throughput /
2211 # performance.
2212 mock = unittest.mock.Mock()
2213 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2214 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2215 with self.get_files() as (src, dst):
2216 shutil._fastcopy_sendfile(src, dst)
2217 assert m.called
2218 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2219
2220 def test_blocksize_arg(self):
2221 with unittest.mock.patch('os.sendfile',
2222 side_effect=ZeroDivisionError) as m:
2223 self.assertRaises(ZeroDivisionError,
2224 shutil.copyfile, TESTFN, TESTFN2)
2225 blocksize = m.call_args[0][3]
2226 # Make sure file size and the block size arg passed to
2227 # sendfile() are the same.
2228 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2229 # ...unless we're dealing with a small file.
2230 support.unlink(TESTFN2)
2231 write_file(TESTFN2, b"hello", binary=True)
2232 self.addCleanup(support.unlink, TESTFN2 + '3')
2233 self.assertRaises(ZeroDivisionError,
2234 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2235 blocksize = m.call_args[0][3]
2236 self.assertEqual(blocksize, 2 ** 23)
2237
2238 def test_file2file_not_supported(self):
2239 # Emulate a case where sendfile() only support file->socket
2240 # fds. In such a case copyfile() is supposed to skip the
2241 # fast-copy attempt from then on.
2242 assert shutil._HAS_SENDFILE
2243 try:
2244 with unittest.mock.patch(
2245 self.PATCHPOINT,
2246 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2247 with self.get_files() as (src, dst):
2248 with self.assertRaises(_GiveupOnFastCopy):
2249 shutil._fastcopy_sendfile(src, dst)
2250 assert m.called
2251 assert not shutil._HAS_SENDFILE
2252
2253 with unittest.mock.patch(self.PATCHPOINT) as m:
2254 shutil.copyfile(TESTFN, TESTFN2)
2255 assert not m.called
2256 finally:
2257 shutil._HAS_SENDFILE = True
2258
2259
Victor Stinner937ee9e2018-06-26 02:11:06 +02002260@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002261class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002262 PATCHPOINT = "posix._fcopyfile"
2263
2264 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002265 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002266
2267
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002268class TermsizeTests(unittest.TestCase):
2269 def test_does_not_crash(self):
2270 """Check if get_terminal_size() returns a meaningful value.
2271
2272 There's no easy portable way to actually check the size of the
2273 terminal, so let's check if it returns something sensible instead.
2274 """
2275 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002276 self.assertGreaterEqual(size.columns, 0)
2277 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002278
2279 def test_os_environ_first(self):
2280 "Check if environment variables have precedence"
2281
2282 with support.EnvironmentVarGuard() as env:
2283 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002284 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002285 size = shutil.get_terminal_size()
2286 self.assertEqual(size.columns, 777)
2287
2288 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002289 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002290 env['LINES'] = '888'
2291 size = shutil.get_terminal_size()
2292 self.assertEqual(size.lines, 888)
2293
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002294 def test_bad_environ(self):
2295 with support.EnvironmentVarGuard() as env:
2296 env['COLUMNS'] = 'xxx'
2297 env['LINES'] = 'yyy'
2298 size = shutil.get_terminal_size()
2299 self.assertGreaterEqual(size.columns, 0)
2300 self.assertGreaterEqual(size.lines, 0)
2301
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002302 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002303 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2304 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002305 def test_stty_match(self):
2306 """Check if stty returns the same results ignoring env
2307
2308 This test will fail if stdin and stdout are connected to
2309 different terminals with different sizes. Nevertheless, such
2310 situations should be pretty rare.
2311 """
2312 try:
2313 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002314 except (FileNotFoundError, PermissionError,
2315 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002316 self.skipTest("stty invocation failed")
2317 expected = (int(size[1]), int(size[0])) # reversed order
2318
2319 with support.EnvironmentVarGuard() as env:
2320 del env['LINES']
2321 del env['COLUMNS']
2322 actual = shutil.get_terminal_size()
2323
2324 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002325
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002326 def test_fallback(self):
2327 with support.EnvironmentVarGuard() as env:
2328 del env['LINES']
2329 del env['COLUMNS']
2330
2331 # sys.__stdout__ has no fileno()
2332 with support.swap_attr(sys, '__stdout__', None):
2333 size = shutil.get_terminal_size(fallback=(10, 20))
2334 self.assertEqual(size.columns, 10)
2335 self.assertEqual(size.lines, 20)
2336
2337 # sys.__stdout__ is not a terminal on Unix
2338 # or fileno() not in (0, 1, 2) on Windows
2339 with open(os.devnull, 'w') as f, \
2340 support.swap_attr(sys, '__stdout__', f):
2341 size = shutil.get_terminal_size(fallback=(30, 40))
2342 self.assertEqual(size.columns, 30)
2343 self.assertEqual(size.lines, 40)
2344
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002345
Berker Peksag8083cd62014-11-01 11:04:06 +02002346class PublicAPITests(unittest.TestCase):
2347 """Ensures that the correct values are exposed in the public API."""
2348
2349 def test_module_all_attribute(self):
2350 self.assertTrue(hasattr(shutil, '__all__'))
2351 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2352 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2353 'SpecialFileError', 'ExecError', 'make_archive',
2354 'get_archive_formats', 'register_archive_format',
2355 'unregister_archive_format', 'get_unpack_formats',
2356 'register_unpack_format', 'unregister_unpack_format',
2357 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2358 'get_terminal_size', 'SameFileError']
2359 if hasattr(os, 'statvfs') or os.name == 'nt':
2360 target_api.append('disk_usage')
2361 self.assertEqual(set(shutil.__all__), set(target_api))
2362
2363
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002364if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002365 unittest.main()