blob: 86f4dc97c3a142d69120f5c0aefee13501f1df3b [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Michael Feltef110b12019-02-18 12:02:44 +010037AIX = sys.platform[:3] == 'aix'
Tarek Ziadé396fad72010-02-23 05:30:31 +000038try:
39 import grp
40 import pwd
41 UID_GID_SUPPORT = True
42except ImportError:
43 UID_GID_SUPPORT = False
44
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040045def _fake_rename(*args, **kwargs):
46 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010047 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040048
49def mock_rename(func):
50 @functools.wraps(func)
51 def wrap(*args, **kwargs):
52 try:
53 builtin_rename = os.rename
54 os.rename = _fake_rename
55 return func(*args, **kwargs)
56 finally:
57 os.rename = builtin_rename
58 return wrap
59
Éric Araujoa7e33a12011-08-12 19:51:35 +020060def write_file(path, content, binary=False):
61 """Write *content* to a file located at *path*.
62
63 If *path* is a tuple instead of a string, os.path.join will be used to
64 make a path. If *binary* is true, the file will be opened in binary
65 mode.
66 """
67 if isinstance(path, tuple):
68 path = os.path.join(*path)
69 with open(path, 'wb' if binary else 'w') as fp:
70 fp.write(content)
71
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020072def write_test_file(path, size):
73 """Create a test file with an arbitrary size and random text content."""
74 def chunks(total, step):
75 assert total >= step
76 while total > step:
77 yield step
78 total -= step
79 if total:
80 yield total
81
82 bufsize = min(size, 8192)
83 chunk = b"".join([random.choice(string.ascii_letters).encode()
84 for i in range(bufsize)])
85 with open(path, 'wb') as f:
86 for csize in chunks(size, bufsize):
87 f.write(chunk)
88 assert os.path.getsize(path) == size
89
Éric Araujoa7e33a12011-08-12 19:51:35 +020090def read_file(path, binary=False):
91 """Return contents from a file located at *path*.
92
93 If *path* is a tuple instead of a string, os.path.join will be used to
94 make a path. If *binary* is true, the file will be opened in binary
95 mode.
96 """
97 if isinstance(path, tuple):
98 path = os.path.join(*path)
99 with open(path, 'rb' if binary else 'r') as fp:
100 return fp.read()
101
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300102def rlistdir(path):
103 res = []
104 for name in sorted(os.listdir(path)):
105 p = os.path.join(path, name)
106 if os.path.isdir(p) and not os.path.islink(p):
107 res.append(name + '/')
108 for n in rlistdir(p):
109 res.append(name + '/' + n)
110 else:
111 res.append(name)
112 return res
113
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200114def supports_file2file_sendfile():
115 # ...apparently Linux and Solaris are the only ones
116 if not hasattr(os, "sendfile"):
117 return False
118 srcname = None
119 dstname = None
120 try:
121 with tempfile.NamedTemporaryFile("wb", delete=False) as f:
122 srcname = f.name
123 f.write(b"0123456789")
124
125 with open(srcname, "rb") as src:
126 with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
127 dstname = f.name
128 infd = src.fileno()
129 outfd = dst.fileno()
130 try:
131 os.sendfile(outfd, infd, 0, 2)
132 except OSError:
133 return False
134 else:
135 return True
136 finally:
137 if srcname is not None:
138 support.unlink(srcname)
139 if dstname is not None:
140 support.unlink(dstname)
141
142
143SUPPORTS_SENDFILE = supports_file2file_sendfile()
144
Michael Feltef110b12019-02-18 12:02:44 +0100145# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
146# The AIX command 'dump -o program' gives XCOFF header information
147# The second word of the last line in the maxdata value
148# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
149def _maxdataOK():
150 if AIX and sys.maxsize == 2147483647:
151 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
152 maxdata=hdrs.split("\n")[-1].split()[1]
153 return int(maxdata,16) >= 0x20000000
154 else:
155 return True
Éric Araujoa7e33a12011-08-12 19:51:35 +0200156
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000157class TestShutil(unittest.TestCase):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000158
159 def setUp(self):
160 super(TestShutil, self).setUp()
161 self.tempdirs = []
162
163 def tearDown(self):
164 super(TestShutil, self).tearDown()
165 while self.tempdirs:
166 d = self.tempdirs.pop()
167 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
168
Tarek Ziadé396fad72010-02-23 05:30:31 +0000169
170 def mkdtemp(self):
171 """Create a temporary directory that will be cleaned up.
172
173 Returns the path of the directory.
174 """
175 d = tempfile.mkdtemp()
176 self.tempdirs.append(d)
177 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000178
Hynek Schlawack3b527782012-06-25 13:27:31 +0200179 def test_rmtree_works_on_bytes(self):
180 tmp = self.mkdtemp()
181 victim = os.path.join(tmp, 'killme')
182 os.mkdir(victim)
183 write_file(os.path.join(victim, 'somefile'), 'foo')
184 victim = os.fsencode(victim)
185 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700186 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200187
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200188 @support.skip_unless_symlink
189 def test_rmtree_fails_on_symlink(self):
190 tmp = self.mkdtemp()
191 dir_ = os.path.join(tmp, 'dir')
192 os.mkdir(dir_)
193 link = os.path.join(tmp, 'link')
194 os.symlink(dir_, link)
195 self.assertRaises(OSError, shutil.rmtree, link)
196 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100197 self.assertTrue(os.path.lexists(link))
198 errors = []
199 def onerror(*args):
200 errors.append(args)
201 shutil.rmtree(link, onerror=onerror)
202 self.assertEqual(len(errors), 1)
203 self.assertIs(errors[0][0], os.path.islink)
204 self.assertEqual(errors[0][1], link)
205 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200206
207 @support.skip_unless_symlink
208 def test_rmtree_works_on_symlinks(self):
209 tmp = self.mkdtemp()
210 dir1 = os.path.join(tmp, 'dir1')
211 dir2 = os.path.join(dir1, 'dir2')
212 dir3 = os.path.join(tmp, 'dir3')
213 for d in dir1, dir2, dir3:
214 os.mkdir(d)
215 file1 = os.path.join(tmp, 'file1')
216 write_file(file1, 'foo')
217 link1 = os.path.join(dir1, 'link1')
218 os.symlink(dir2, link1)
219 link2 = os.path.join(dir1, 'link2')
220 os.symlink(dir3, link2)
221 link3 = os.path.join(dir1, 'link3')
222 os.symlink(file1, link3)
223 # make sure symlinks are removed but not followed
224 shutil.rmtree(dir1)
225 self.assertFalse(os.path.exists(dir1))
226 self.assertTrue(os.path.exists(dir3))
227 self.assertTrue(os.path.exists(file1))
228
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000229 def test_rmtree_errors(self):
230 # filename is guaranteed not to exist
231 filename = tempfile.mktemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100232 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
233 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100234 shutil.rmtree(filename, ignore_errors=True)
235
236 # existing file
237 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100238 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100239 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100240 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100241 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100242 # The reason for this rather odd construct is that Windows sprinkles
243 # a \*.* at the end of file names. But only sometimes on some buildbots
244 possible_args = [filename, os.path.join(filename, '*.*')]
245 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100246 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100247 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100248 shutil.rmtree(filename, ignore_errors=True)
249 self.assertTrue(os.path.exists(filename))
250 errors = []
251 def onerror(*args):
252 errors.append(args)
253 shutil.rmtree(filename, onerror=onerror)
254 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200255 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100256 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100257 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100258 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100259 self.assertIs(errors[1][0], os.rmdir)
260 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100261 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100262 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000263
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000264
Serhiy Storchaka43767632013-11-03 21:31:38 +0200265 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
266 @unittest.skipIf(sys.platform[:6] == 'cygwin',
267 "This test can't be run on Cygwin (issue #1071513).")
268 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
269 "This test can't be run reliably as root (issue #1076467).")
270 def test_on_error(self):
271 self.errorState = 0
272 os.mkdir(TESTFN)
273 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200274
Serhiy Storchaka43767632013-11-03 21:31:38 +0200275 self.child_file_path = os.path.join(TESTFN, 'a')
276 self.child_dir_path = os.path.join(TESTFN, 'b')
277 support.create_empty_file(self.child_file_path)
278 os.mkdir(self.child_dir_path)
279 old_dir_mode = os.stat(TESTFN).st_mode
280 old_child_file_mode = os.stat(self.child_file_path).st_mode
281 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
282 # Make unwritable.
283 new_mode = stat.S_IREAD|stat.S_IEXEC
284 os.chmod(self.child_file_path, new_mode)
285 os.chmod(self.child_dir_path, new_mode)
286 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000287
Serhiy Storchaka43767632013-11-03 21:31:38 +0200288 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
289 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
290 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200291
Serhiy Storchaka43767632013-11-03 21:31:38 +0200292 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
293 # Test whether onerror has actually been called.
294 self.assertEqual(self.errorState, 3,
295 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000296
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000297 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000298 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200299 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000300 # This function is run when shutil.rmtree fails.
301 # 99.9% of the time it initially fails to remove
302 # a file in the directory, so the first time through
303 # func is os.remove.
304 # However, some Linux machines running ZFS on
305 # FUSE experienced a failure earlier in the process
306 # at os.listdir. The first failure may legally
307 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200308 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200309 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200310 self.assertEqual(arg, self.child_file_path)
311 elif func is os.rmdir:
312 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000313 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200314 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200315 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000316 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200317 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000318 else:
319 self.assertEqual(func, os.rmdir)
320 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000321 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200322 self.errorState = 3
323
324 def test_rmtree_does_not_choke_on_failing_lstat(self):
325 try:
326 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200327 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200328 if fn != TESTFN:
329 raise OSError()
330 else:
331 return orig_lstat(fn)
332 os.lstat = raiser
333
334 os.mkdir(TESTFN)
335 write_file((TESTFN, 'foo'), 'foo')
336 shutil.rmtree(TESTFN)
337 finally:
338 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000339
Antoine Pitrou78091e62011-12-29 18:54:15 +0100340 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
341 @support.skip_unless_symlink
342 def test_copymode_follow_symlinks(self):
343 tmp_dir = self.mkdtemp()
344 src = os.path.join(tmp_dir, 'foo')
345 dst = os.path.join(tmp_dir, 'bar')
346 src_link = os.path.join(tmp_dir, 'baz')
347 dst_link = os.path.join(tmp_dir, 'quux')
348 write_file(src, 'foo')
349 write_file(dst, 'foo')
350 os.symlink(src, src_link)
351 os.symlink(dst, dst_link)
352 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
353 # file to file
354 os.chmod(dst, stat.S_IRWXO)
355 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
356 shutil.copymode(src, dst)
357 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou3f48ac92014-01-01 02:50:45 +0100358 # On Windows, os.chmod does not follow symlinks (issue #15411)
359 if os.name != 'nt':
360 # follow src link
361 os.chmod(dst, stat.S_IRWXO)
362 shutil.copymode(src_link, dst)
363 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
364 # follow dst link
365 os.chmod(dst, stat.S_IRWXO)
366 shutil.copymode(src, dst_link)
367 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
368 # follow both links
369 os.chmod(dst, stat.S_IRWXO)
370 shutil.copymode(src_link, dst_link)
371 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100372
373 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
374 @support.skip_unless_symlink
375 def test_copymode_symlink_to_symlink(self):
376 tmp_dir = self.mkdtemp()
377 src = os.path.join(tmp_dir, 'foo')
378 dst = os.path.join(tmp_dir, 'bar')
379 src_link = os.path.join(tmp_dir, 'baz')
380 dst_link = os.path.join(tmp_dir, 'quux')
381 write_file(src, 'foo')
382 write_file(dst, 'foo')
383 os.symlink(src, src_link)
384 os.symlink(dst, dst_link)
385 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
386 os.chmod(dst, stat.S_IRWXU)
387 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
388 # link to link
389 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700390 shutil.copymode(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100391 self.assertEqual(os.lstat(src_link).st_mode,
392 os.lstat(dst_link).st_mode)
393 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
394 # src link - use chmod
395 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700396 shutil.copymode(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100397 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
398 # dst link - use chmod
399 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700400 shutil.copymode(src, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100401 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
402
403 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
404 @support.skip_unless_symlink
405 def test_copymode_symlink_to_symlink_wo_lchmod(self):
406 tmp_dir = self.mkdtemp()
407 src = os.path.join(tmp_dir, 'foo')
408 dst = os.path.join(tmp_dir, 'bar')
409 src_link = os.path.join(tmp_dir, 'baz')
410 dst_link = os.path.join(tmp_dir, 'quux')
411 write_file(src, 'foo')
412 write_file(dst, 'foo')
413 os.symlink(src, src_link)
414 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700415 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
Antoine Pitrou78091e62011-12-29 18:54:15 +0100416
417 @support.skip_unless_symlink
418 def test_copystat_symlinks(self):
419 tmp_dir = self.mkdtemp()
420 src = os.path.join(tmp_dir, 'foo')
421 dst = os.path.join(tmp_dir, 'bar')
422 src_link = os.path.join(tmp_dir, 'baz')
423 dst_link = os.path.join(tmp_dir, 'qux')
424 write_file(src, 'foo')
425 src_stat = os.stat(src)
426 os.utime(src, (src_stat.st_atime,
427 src_stat.st_mtime - 42.0)) # ensure different mtimes
428 write_file(dst, 'bar')
429 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
430 os.symlink(src, src_link)
431 os.symlink(dst, dst_link)
432 if hasattr(os, 'lchmod'):
433 os.lchmod(src_link, stat.S_IRWXO)
434 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
435 os.lchflags(src_link, stat.UF_NODUMP)
436 src_link_stat = os.lstat(src_link)
437 # follow
438 if hasattr(os, 'lchmod'):
Larry Hastingsb4038062012-07-15 10:57:38 -0700439 shutil.copystat(src_link, dst_link, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100440 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
441 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700442 shutil.copystat(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100443 dst_link_stat = os.lstat(dst_link)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700444 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100445 for attr in 'st_atime', 'st_mtime':
446 # The modification times may be truncated in the new file.
447 self.assertLessEqual(getattr(src_link_stat, attr),
448 getattr(dst_link_stat, attr) + 1)
449 if hasattr(os, 'lchmod'):
450 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
451 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
452 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
453 # tell to follow but dst is not a link
Larry Hastingsb4038062012-07-15 10:57:38 -0700454 shutil.copystat(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100455 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
456 00000.1)
457
Ned Deilybaf75712012-05-10 17:05:19 -0700458 @unittest.skipUnless(hasattr(os, 'chflags') and
459 hasattr(errno, 'EOPNOTSUPP') and
460 hasattr(errno, 'ENOTSUP'),
461 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
462 def test_copystat_handles_harmless_chflags_errors(self):
463 tmpdir = self.mkdtemp()
464 file1 = os.path.join(tmpdir, 'file1')
465 file2 = os.path.join(tmpdir, 'file2')
466 write_file(file1, 'xxx')
467 write_file(file2, 'xxx')
468
469 def make_chflags_raiser(err):
470 ex = OSError()
471
Larry Hastings90867a52012-06-22 17:01:41 -0700472 def _chflags_raiser(path, flags, *, follow_symlinks=True):
Ned Deilybaf75712012-05-10 17:05:19 -0700473 ex.errno = err
474 raise ex
475 return _chflags_raiser
476 old_chflags = os.chflags
477 try:
478 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
479 os.chflags = make_chflags_raiser(err)
480 shutil.copystat(file1, file2)
481 # assert others errors break it
482 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
483 self.assertRaises(OSError, shutil.copystat, file1, file2)
484 finally:
485 os.chflags = old_chflags
486
Antoine Pitrou424246f2012-05-12 19:02:01 +0200487 @support.skip_unless_xattr
488 def test_copyxattr(self):
489 tmp_dir = self.mkdtemp()
490 src = os.path.join(tmp_dir, 'foo')
491 write_file(src, 'foo')
492 dst = os.path.join(tmp_dir, 'bar')
493 write_file(dst, 'bar')
494
495 # no xattr == no problem
496 shutil._copyxattr(src, dst)
497 # common case
498 os.setxattr(src, 'user.foo', b'42')
499 os.setxattr(src, 'user.bar', b'43')
500 shutil._copyxattr(src, dst)
Gregory P. Smith1093bf22014-01-17 12:01:22 -0800501 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200502 self.assertEqual(
503 os.getxattr(src, 'user.foo'),
504 os.getxattr(dst, 'user.foo'))
505 # check errors don't affect other attrs
506 os.remove(dst)
507 write_file(dst, 'bar')
508 os_error = OSError(errno.EPERM, 'EPERM')
509
Larry Hastings9cf065c2012-06-22 16:30:09 -0700510 def _raise_on_user_foo(fname, attr, val, **kwargs):
Antoine Pitrou424246f2012-05-12 19:02:01 +0200511 if attr == 'user.foo':
512 raise os_error
513 else:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700514 orig_setxattr(fname, attr, val, **kwargs)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200515 try:
516 orig_setxattr = os.setxattr
517 os.setxattr = _raise_on_user_foo
518 shutil._copyxattr(src, dst)
Antoine Pitrou61597d32012-05-12 23:37:35 +0200519 self.assertIn('user.bar', os.listxattr(dst))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200520 finally:
521 os.setxattr = orig_setxattr
Hynek Schlawack0beab052013-02-05 08:22:44 +0100522 # the source filesystem not supporting xattrs should be ok, too.
523 def _raise_on_src(fname, *, follow_symlinks=True):
524 if fname == src:
525 raise OSError(errno.ENOTSUP, 'Operation not supported')
526 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
527 try:
528 orig_listxattr = os.listxattr
529 os.listxattr = _raise_on_src
530 shutil._copyxattr(src, dst)
531 finally:
532 os.listxattr = orig_listxattr
Antoine Pitrou424246f2012-05-12 19:02:01 +0200533
Larry Hastingsad5ae042012-07-14 17:55:11 -0700534 # test that shutil.copystat copies xattrs
535 src = os.path.join(tmp_dir, 'the_original')
536 write_file(src, src)
537 os.setxattr(src, 'user.the_value', b'fiddly')
538 dst = os.path.join(tmp_dir, 'the_copy')
539 write_file(dst, dst)
540 shutil.copystat(src, dst)
Hynek Schlawackc2d481f2012-07-16 17:11:10 +0200541 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700542
Antoine Pitrou424246f2012-05-12 19:02:01 +0200543 @support.skip_unless_symlink
544 @support.skip_unless_xattr
545 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
546 'root privileges required')
547 def test_copyxattr_symlinks(self):
548 # On Linux, it's only possible to access non-user xattr for symlinks;
549 # which in turn require root privileges. This test should be expanded
550 # as soon as other platforms gain support for extended attributes.
551 tmp_dir = self.mkdtemp()
552 src = os.path.join(tmp_dir, 'foo')
553 src_link = os.path.join(tmp_dir, 'baz')
554 write_file(src, 'foo')
555 os.symlink(src, src_link)
556 os.setxattr(src, 'trusted.foo', b'42')
Larry Hastings9cf065c2012-06-22 16:30:09 -0700557 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200558 dst = os.path.join(tmp_dir, 'bar')
559 dst_link = os.path.join(tmp_dir, 'qux')
560 write_file(dst, 'bar')
561 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700562 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700563 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
Antoine Pitrou424246f2012-05-12 19:02:01 +0200564 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
Larry Hastingsb4038062012-07-15 10:57:38 -0700565 shutil._copyxattr(src_link, dst, follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200566 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
567
Antoine Pitrou78091e62011-12-29 18:54:15 +0100568 @support.skip_unless_symlink
569 def test_copy_symlinks(self):
570 tmp_dir = self.mkdtemp()
571 src = os.path.join(tmp_dir, 'foo')
572 dst = os.path.join(tmp_dir, 'bar')
573 src_link = os.path.join(tmp_dir, 'baz')
574 write_file(src, 'foo')
575 os.symlink(src, src_link)
576 if hasattr(os, 'lchmod'):
577 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
578 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700579 shutil.copy(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100580 self.assertFalse(os.path.islink(dst))
581 self.assertEqual(read_file(src), read_file(dst))
582 os.remove(dst)
583 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700584 shutil.copy(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100585 self.assertTrue(os.path.islink(dst))
586 self.assertEqual(os.readlink(dst), os.readlink(src_link))
587 if hasattr(os, 'lchmod'):
588 self.assertEqual(os.lstat(src_link).st_mode,
589 os.lstat(dst).st_mode)
590
591 @support.skip_unless_symlink
592 def test_copy2_symlinks(self):
593 tmp_dir = self.mkdtemp()
594 src = os.path.join(tmp_dir, 'foo')
595 dst = os.path.join(tmp_dir, 'bar')
596 src_link = os.path.join(tmp_dir, 'baz')
597 write_file(src, 'foo')
598 os.symlink(src, src_link)
599 if hasattr(os, 'lchmod'):
600 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
601 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
602 os.lchflags(src_link, stat.UF_NODUMP)
603 src_stat = os.stat(src)
604 src_link_stat = os.lstat(src_link)
605 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700606 shutil.copy2(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100607 self.assertFalse(os.path.islink(dst))
608 self.assertEqual(read_file(src), read_file(dst))
609 os.remove(dst)
610 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700611 shutil.copy2(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100612 self.assertTrue(os.path.islink(dst))
613 self.assertEqual(os.readlink(dst), os.readlink(src_link))
614 dst_stat = os.lstat(dst)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700615 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100616 for attr in 'st_atime', 'st_mtime':
617 # The modification times may be truncated in the new file.
618 self.assertLessEqual(getattr(src_link_stat, attr),
619 getattr(dst_stat, attr) + 1)
620 if hasattr(os, 'lchmod'):
621 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
622 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
623 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
624 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
625
Antoine Pitrou424246f2012-05-12 19:02:01 +0200626 @support.skip_unless_xattr
627 def test_copy2_xattr(self):
628 tmp_dir = self.mkdtemp()
629 src = os.path.join(tmp_dir, 'foo')
630 dst = os.path.join(tmp_dir, 'bar')
631 write_file(src, 'foo')
632 os.setxattr(src, 'user.foo', b'42')
633 shutil.copy2(src, dst)
634 self.assertEqual(
635 os.getxattr(src, 'user.foo'),
636 os.getxattr(dst, 'user.foo'))
637 os.remove(dst)
638
Antoine Pitrou78091e62011-12-29 18:54:15 +0100639 @support.skip_unless_symlink
640 def test_copyfile_symlinks(self):
641 tmp_dir = self.mkdtemp()
642 src = os.path.join(tmp_dir, 'src')
643 dst = os.path.join(tmp_dir, 'dst')
644 dst_link = os.path.join(tmp_dir, 'dst_link')
645 link = os.path.join(tmp_dir, 'link')
646 write_file(src, 'foo')
647 os.symlink(src, link)
648 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700649 shutil.copyfile(link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100650 self.assertTrue(os.path.islink(dst_link))
651 self.assertEqual(os.readlink(link), os.readlink(dst_link))
652 # follow
653 shutil.copyfile(link, dst)
654 self.assertFalse(os.path.islink(dst))
655
Hynek Schlawack2100b422012-06-23 20:28:32 +0200656 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200657 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
658 os.supports_dir_fd and
659 os.listdir in os.supports_fd and
660 os.stat in os.supports_follow_symlinks)
661 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200662 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000663 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200664 tmp_dir = self.mkdtemp()
665 d = os.path.join(tmp_dir, 'a')
666 os.mkdir(d)
667 try:
668 real_rmtree = shutil._rmtree_safe_fd
669 class Called(Exception): pass
670 def _raiser(*args, **kwargs):
671 raise Called
672 shutil._rmtree_safe_fd = _raiser
673 self.assertRaises(Called, shutil.rmtree, d)
674 finally:
675 shutil._rmtree_safe_fd = real_rmtree
676 else:
677 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000678 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200679
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000680 def test_rmtree_dont_delete_file(self):
681 # When called on a file instead of a directory, don't delete it.
682 handle, path = tempfile.mkstemp()
Victor Stinnerbf816222011-06-30 23:25:47 +0200683 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200684 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000685 os.remove(path)
686
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000687 def test_copytree_simple(self):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000688 src_dir = tempfile.mkdtemp()
689 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200690 self.addCleanup(shutil.rmtree, src_dir)
691 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
692 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000693 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200694 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000695
Éric Araujoa7e33a12011-08-12 19:51:35 +0200696 shutil.copytree(src_dir, dst_dir)
697 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
698 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
699 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
700 'test.txt')))
701 actual = read_file((dst_dir, 'test.txt'))
702 self.assertEqual(actual, '123')
703 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
704 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000705
jab9e00d9e2018-12-28 13:03:40 -0500706 def test_copytree_dirs_exist_ok(self):
707 src_dir = tempfile.mkdtemp()
708 dst_dir = tempfile.mkdtemp()
709 self.addCleanup(shutil.rmtree, src_dir)
710 self.addCleanup(shutil.rmtree, dst_dir)
711
712 write_file((src_dir, 'nonexisting.txt'), '123')
713 os.mkdir(os.path.join(src_dir, 'existing_dir'))
714 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
715 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
716 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
717
718 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
719 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
720 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
721 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
722 'existing.txt')))
723 actual = read_file((dst_dir, 'nonexisting.txt'))
724 self.assertEqual(actual, '123')
725 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
726 self.assertEqual(actual, 'has been replaced')
727
728 with self.assertRaises(FileExistsError):
729 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
730
Antoine Pitrou78091e62011-12-29 18:54:15 +0100731 @support.skip_unless_symlink
732 def test_copytree_symlinks(self):
733 tmp_dir = self.mkdtemp()
734 src_dir = os.path.join(tmp_dir, 'src')
735 dst_dir = os.path.join(tmp_dir, 'dst')
736 sub_dir = os.path.join(src_dir, 'sub')
737 os.mkdir(src_dir)
738 os.mkdir(sub_dir)
739 write_file((src_dir, 'file.txt'), 'foo')
740 src_link = os.path.join(sub_dir, 'link')
741 dst_link = os.path.join(dst_dir, 'sub/link')
742 os.symlink(os.path.join(src_dir, 'file.txt'),
743 src_link)
744 if hasattr(os, 'lchmod'):
745 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
746 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
747 os.lchflags(src_link, stat.UF_NODUMP)
748 src_stat = os.lstat(src_link)
749 shutil.copytree(src_dir, dst_dir, symlinks=True)
750 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
751 self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
752 os.path.join(src_dir, 'file.txt'))
753 dst_stat = os.lstat(dst_link)
754 if hasattr(os, 'lchmod'):
755 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
756 if hasattr(os, 'lchflags'):
757 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
758
Georg Brandl2ee470f2008-07-16 12:55:28 +0000759 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000760 # creating data
761 join = os.path.join
762 exists = os.path.exists
763 src_dir = tempfile.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000764 try:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000765 dst_dir = join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200766 write_file((src_dir, 'test.txt'), '123')
767 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000768 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200769 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000770 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200771 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000772 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
773 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200774 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
775 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000776
777 # testing glob-like patterns
778 try:
779 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
780 shutil.copytree(src_dir, dst_dir, ignore=patterns)
781 # checking the result: some elements should not be copied
782 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200783 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
784 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000785 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200786 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000787 try:
788 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
789 shutil.copytree(src_dir, dst_dir, ignore=patterns)
790 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200791 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
792 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
793 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000794 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200795 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000796
797 # testing callable-style
798 try:
799 def _filter(src, names):
800 res = []
801 for name in names:
802 path = os.path.join(src, name)
803
804 if (os.path.isdir(path) and
805 path.split()[-1] == 'subdir'):
806 res.append(name)
807 elif os.path.splitext(path)[-1] in ('.py'):
808 res.append(name)
809 return res
810
811 shutil.copytree(src_dir, dst_dir, ignore=_filter)
812
813 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200814 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
815 'test.py')))
816 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000817
818 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200819 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000820 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000821 shutil.rmtree(src_dir)
822 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000823
Antoine Pitrouac601602013-08-16 19:35:02 +0200824 def test_copytree_retains_permissions(self):
825 tmp_dir = tempfile.mkdtemp()
826 src_dir = os.path.join(tmp_dir, 'source')
827 os.mkdir(src_dir)
828 dst_dir = os.path.join(tmp_dir, 'destination')
829 self.addCleanup(shutil.rmtree, tmp_dir)
830
831 os.chmod(src_dir, 0o777)
832 write_file((src_dir, 'permissive.txt'), '123')
833 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
834 write_file((src_dir, 'restrictive.txt'), '456')
835 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
836 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
837 os.chmod(restrictive_subdir, 0o600)
838
839 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400840 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
841 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200842 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400843 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200844 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
845 restrictive_subdir_dst = os.path.join(dst_dir,
846 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400847 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200848 os.stat(restrictive_subdir_dst).st_mode)
849
Berker Peksag884afd92014-12-10 02:50:32 +0200850 @unittest.mock.patch('os.chmod')
851 def test_copytree_winerror(self, mock_patch):
852 # When copying to VFAT, copystat() raises OSError. On Windows, the
853 # exception object has a meaningful 'winerror' attribute, but not
854 # on other operating systems. Do not assume 'winerror' is set.
855 src_dir = tempfile.mkdtemp()
856 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
857 self.addCleanup(shutil.rmtree, src_dir)
858 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
859
860 mock_patch.side_effect = PermissionError('ka-boom')
861 with self.assertRaises(shutil.Error):
862 shutil.copytree(src_dir, dst_dir)
863
Zachary Ware9fe6d862013-12-08 00:20:35 -0600864 @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000865 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000866 def test_dont_copy_file_onto_link_to_itself(self):
867 # bug 851123.
868 os.mkdir(TESTFN)
869 src = os.path.join(TESTFN, 'cheese')
870 dst = os.path.join(TESTFN, 'shop')
871 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000872 with open(src, 'w') as f:
873 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +0100874 try:
875 os.link(src, dst)
876 except PermissionError as e:
877 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +0200878 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000879 with open(src, 'r') as f:
880 self.assertEqual(f.read(), 'cheddar')
881 os.remove(dst)
882 finally:
883 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000884
Brian Curtin3b4499c2010-12-28 14:31:47 +0000885 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000886 def test_dont_copy_file_onto_symlink_to_itself(self):
887 # bug 851123.
888 os.mkdir(TESTFN)
889 src = os.path.join(TESTFN, 'cheese')
890 dst = os.path.join(TESTFN, 'shop')
891 try:
892 with open(src, 'w') as f:
893 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000894 # Using `src` here would mean we end up with a symlink pointing
895 # to TESTFN/TESTFN/cheese, while it should point at
896 # TESTFN/cheese.
897 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +0200898 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +0000899 with open(src, 'r') as f:
900 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000901 os.remove(dst)
902 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000903 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +0000904
Brian Curtin3b4499c2010-12-28 14:31:47 +0000905 @support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +0000906 def test_rmtree_on_symlink(self):
907 # bug 1669.
908 os.mkdir(TESTFN)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000909 try:
Brian Curtind40e6f72010-07-08 21:39:08 +0000910 src = os.path.join(TESTFN, 'cheese')
911 dst = os.path.join(TESTFN, 'shop')
912 os.mkdir(src)
913 os.symlink(src, dst)
914 self.assertRaises(OSError, shutil.rmtree, dst)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200915 shutil.rmtree(dst, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000916 finally:
Brian Curtind40e6f72010-07-08 21:39:08 +0000917 shutil.rmtree(TESTFN, ignore_errors=True)
918
Serhiy Storchaka43767632013-11-03 21:31:38 +0200919 # Issue #3002: copyfile and copytree block indefinitely on named pipes
920 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
921 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +0100922 try:
923 os.mkfifo(TESTFN)
924 except PermissionError as e:
925 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200926 try:
927 self.assertRaises(shutil.SpecialFileError,
928 shutil.copyfile, TESTFN, TESTFN2)
929 self.assertRaises(shutil.SpecialFileError,
930 shutil.copyfile, __file__, TESTFN)
931 finally:
932 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000933
Serhiy Storchaka43767632013-11-03 21:31:38 +0200934 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
935 @support.skip_unless_symlink
936 def test_copytree_named_pipe(self):
937 os.mkdir(TESTFN)
938 try:
939 subdir = os.path.join(TESTFN, "subdir")
940 os.mkdir(subdir)
941 pipe = os.path.join(subdir, "mypipe")
xdegaye92c2ca72017-11-12 17:31:07 +0100942 try:
943 os.mkfifo(pipe)
944 except PermissionError as e:
945 self.skipTest('os.mkfifo(): %s' % e)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000946 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200947 shutil.copytree(TESTFN, TESTFN2)
948 except shutil.Error as e:
949 errors = e.args[0]
950 self.assertEqual(len(errors), 1)
951 src, dst, error_msg = errors[0]
952 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
953 else:
954 self.fail("shutil.Error should have been raised")
955 finally:
956 shutil.rmtree(TESTFN, ignore_errors=True)
957 shutil.rmtree(TESTFN2, ignore_errors=True)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000958
Tarek Ziadé5340db32010-04-19 22:30:51 +0000959 def test_copytree_special_func(self):
960
961 src_dir = self.mkdtemp()
962 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200963 write_file((src_dir, 'test.txt'), '123')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000964 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200965 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000966
967 copied = []
968 def _copy(src, dst):
969 copied.append((src, dst))
970
971 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000972 self.assertEqual(len(copied), 2)
Tarek Ziadé5340db32010-04-19 22:30:51 +0000973
Brian Curtin3b4499c2010-12-28 14:31:47 +0000974 @support.skip_unless_symlink
Tarek Ziadéfb437512010-04-20 08:57:33 +0000975 def test_copytree_dangling_symlinks(self):
976
977 # a dangling symlink raises an error at the end
978 src_dir = self.mkdtemp()
979 dst_dir = os.path.join(self.mkdtemp(), 'destination')
980 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
981 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200982 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadéfb437512010-04-20 08:57:33 +0000983 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
984
985 # a dangling symlink is ignored with the proper flag
986 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
987 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
988 self.assertNotIn('test.txt', os.listdir(dst_dir))
989
990 # a dangling symlink is copied if symlinks=True
991 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
992 shutil.copytree(src_dir, dst_dir, symlinks=True)
993 self.assertIn('test.txt', os.listdir(dst_dir))
994
Berker Peksag5a294d82015-07-25 14:53:48 +0300995 @support.skip_unless_symlink
996 def test_copytree_symlink_dir(self):
997 src_dir = self.mkdtemp()
998 dst_dir = os.path.join(self.mkdtemp(), 'destination')
999 os.mkdir(os.path.join(src_dir, 'real_dir'))
1000 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
1001 pass
1002 os.symlink(os.path.join(src_dir, 'real_dir'),
1003 os.path.join(src_dir, 'link_to_dir'),
1004 target_is_directory=True)
1005
1006 shutil.copytree(src_dir, dst_dir, symlinks=False)
1007 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1008 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1009
1010 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
1011 shutil.copytree(src_dir, dst_dir, symlinks=True)
1012 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1013 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1014
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001015 def _copy_file(self, method):
1016 fname = 'test.txt'
1017 tmpdir = self.mkdtemp()
Éric Araujoa7e33a12011-08-12 19:51:35 +02001018 write_file((tmpdir, fname), 'xxx')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001019 file1 = os.path.join(tmpdir, fname)
1020 tmpdir2 = self.mkdtemp()
1021 method(file1, tmpdir2)
1022 file2 = os.path.join(tmpdir2, fname)
1023 return (file1, file2)
1024
1025 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
1026 def test_copy(self):
1027 # Ensure that the copied file exists and has the same mode bits.
1028 file1, file2 = self._copy_file(shutil.copy)
1029 self.assertTrue(os.path.exists(file2))
1030 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1031
1032 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
Senthil Kumaran0c2dba52011-07-03 18:21:38 -07001033 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001034 def test_copy2(self):
1035 # Ensure that the copied file exists and has the same mode and
1036 # modification time bits.
1037 file1, file2 = self._copy_file(shutil.copy2)
1038 self.assertTrue(os.path.exists(file2))
1039 file1_stat = os.stat(file1)
1040 file2_stat = os.stat(file2)
1041 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1042 for attr in 'st_atime', 'st_mtime':
1043 # The modification times may be truncated in the new file.
1044 self.assertLessEqual(getattr(file1_stat, attr),
1045 getattr(file2_stat, attr) + 1)
1046 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1047 self.assertEqual(getattr(file1_stat, 'st_flags'),
1048 getattr(file2_stat, 'st_flags'))
1049
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001050 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001051 def test_make_tarball(self):
1052 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001053 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001054
1055 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001056 # force shutil to create the directory
1057 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001058 # working with relative paths
1059 work_dir = os.path.dirname(tmpdir2)
1060 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001061
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001062 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001063 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001064 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001065
1066 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001067 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001068 self.assertTrue(os.path.isfile(tarball))
1069 self.assertTrue(tarfile.is_tarfile(tarball))
1070 with tarfile.open(tarball, 'r:gz') as tf:
1071 self.assertCountEqual(tf.getnames(),
1072 ['.', './sub', './sub2',
1073 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001074
1075 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001076 with support.change_cwd(work_dir):
1077 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001078 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001079 self.assertTrue(os.path.isfile(tarball))
1080 self.assertTrue(tarfile.is_tarfile(tarball))
1081 with tarfile.open(tarball, 'r') as tf:
1082 self.assertCountEqual(tf.getnames(),
1083 ['.', './sub', './sub2',
1084 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001085
1086 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001087 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001088 names = tar.getnames()
1089 names.sort()
1090 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001091
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001092 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001093 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001094 root_dir = self.mkdtemp()
1095 dist = os.path.join(root_dir, base_dir)
1096 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001097 write_file((dist, 'file1'), 'xxx')
1098 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001099 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001100 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001101 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001102 if base_dir:
1103 write_file((root_dir, 'outer'), 'xxx')
1104 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001105
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001106 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001107 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001108 'Need the tar command to run')
1109 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001110 root_dir, base_dir = self._create_files()
1111 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001112 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001113
1114 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001115 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001116 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001117
1118 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001119 tarball2 = os.path.join(root_dir, 'archive2.tar')
1120 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001121 subprocess.check_call(tar_cmd, cwd=root_dir,
1122 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001123
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001124 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001125 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001126 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001127
1128 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001129 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1130 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001131 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001132
1133 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001134 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1135 dry_run=True)
1136 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001137 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001138
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001139 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001140 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001141 # creating something to zip
1142 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001143
1144 tmpdir2 = self.mkdtemp()
1145 # force shutil to create the directory
1146 os.rmdir(tmpdir2)
1147 # working with relative paths
1148 work_dir = os.path.dirname(tmpdir2)
1149 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001150
1151 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001152 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001153 res = make_archive(rel_base_name, 'zip', root_dir)
1154
1155 self.assertEqual(res, base_name + '.zip')
1156 self.assertTrue(os.path.isfile(res))
1157 self.assertTrue(zipfile.is_zipfile(res))
1158 with zipfile.ZipFile(res) as zf:
1159 self.assertCountEqual(zf.namelist(),
1160 ['dist/', 'dist/sub/', 'dist/sub2/',
1161 'dist/file1', 'dist/file2', 'dist/sub/file3',
1162 'outer'])
1163
1164 with support.change_cwd(work_dir):
1165 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001166 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001167
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001168 self.assertEqual(res, base_name + '.zip')
1169 self.assertTrue(os.path.isfile(res))
1170 self.assertTrue(zipfile.is_zipfile(res))
1171 with zipfile.ZipFile(res) as zf:
1172 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001173 ['dist/', 'dist/sub/', 'dist/sub2/',
1174 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001175
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001176 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001177 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001178 'Need the zip command to run')
1179 def test_zipfile_vs_zip(self):
1180 root_dir, base_dir = self._create_files()
1181 base_name = os.path.join(self.mkdtemp(), 'archive')
1182 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1183
1184 # check if ZIP file was created
1185 self.assertEqual(archive, base_name + '.zip')
1186 self.assertTrue(os.path.isfile(archive))
1187
1188 # now create another ZIP file using `zip`
1189 archive2 = os.path.join(root_dir, 'archive2.zip')
1190 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001191 subprocess.check_call(zip_cmd, cwd=root_dir,
1192 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001193
1194 self.assertTrue(os.path.isfile(archive2))
1195 # let's compare both ZIP files
1196 with zipfile.ZipFile(archive) as zf:
1197 names = zf.namelist()
1198 with zipfile.ZipFile(archive2) as zf:
1199 names2 = zf.namelist()
1200 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001201
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001202 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001203 @unittest.skipUnless(shutil.which('unzip'),
1204 'Need the unzip command to run')
1205 def test_unzip_zipfile(self):
1206 root_dir, base_dir = self._create_files()
1207 base_name = os.path.join(self.mkdtemp(), 'archive')
1208 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1209
1210 # check if ZIP file was created
1211 self.assertEqual(archive, base_name + '.zip')
1212 self.assertTrue(os.path.isfile(archive))
1213
1214 # now check the ZIP file using `unzip -t`
1215 zip_cmd = ['unzip', '-t', archive]
1216 with support.change_cwd(root_dir):
1217 try:
1218 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1219 except subprocess.CalledProcessError as exc:
1220 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001221 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001222 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001223 msg = "{}\n\n**Unzip Output**\n{}"
1224 self.fail(msg.format(exc, details))
1225
Tarek Ziadé396fad72010-02-23 05:30:31 +00001226 def test_make_archive(self):
1227 tmpdir = self.mkdtemp()
1228 base_name = os.path.join(tmpdir, 'archive')
1229 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1230
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001231 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001232 def test_make_archive_owner_group(self):
1233 # testing make_archive with owner and group, with various combinations
1234 # this works even if there's not gid/uid support
1235 if UID_GID_SUPPORT:
1236 group = grp.getgrgid(0)[0]
1237 owner = pwd.getpwuid(0)[0]
1238 else:
1239 group = owner = 'root'
1240
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001241 root_dir, base_dir = self._create_files()
1242 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001243 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1244 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001245 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001246
1247 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001248 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001249
1250 res = make_archive(base_name, 'tar', root_dir, base_dir,
1251 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001252 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001253
1254 res = make_archive(base_name, 'tar', root_dir, base_dir,
1255 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001256 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001257
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001258
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001259 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001260 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1261 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001262 root_dir, base_dir = self._create_files()
1263 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001264 group = grp.getgrgid(0)[0]
1265 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001266 with support.change_cwd(root_dir):
1267 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1268 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001269
1270 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001271 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001272
1273 # now checks the rights
1274 archive = tarfile.open(archive_name)
1275 try:
1276 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001277 self.assertEqual(member.uid, 0)
1278 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001279 finally:
1280 archive.close()
1281
1282 def test_make_archive_cwd(self):
1283 current_dir = os.getcwd()
1284 def _breaks(*args, **kw):
1285 raise RuntimeError()
1286
1287 register_archive_format('xxx', _breaks, [], 'xxx file')
1288 try:
1289 try:
1290 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1291 except Exception:
1292 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001293 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001294 finally:
1295 unregister_archive_format('xxx')
1296
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001297 def test_make_tarfile_in_curdir(self):
1298 # Issue #21280
1299 root_dir = self.mkdtemp()
1300 with support.change_cwd(root_dir):
1301 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1302 self.assertTrue(os.path.isfile('test.tar'))
1303
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001304 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001305 def test_make_zipfile_in_curdir(self):
1306 # Issue #21280
1307 root_dir = self.mkdtemp()
1308 with support.change_cwd(root_dir):
1309 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1310 self.assertTrue(os.path.isfile('test.zip'))
1311
Tarek Ziadé396fad72010-02-23 05:30:31 +00001312 def test_register_archive_format(self):
1313
1314 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1315 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1316 1)
1317 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1318 [(1, 2), (1, 2, 3)])
1319
1320 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1321 formats = [name for name, params in get_archive_formats()]
1322 self.assertIn('xxx', formats)
1323
1324 unregister_archive_format('xxx')
1325 formats = [name for name, params in get_archive_formats()]
1326 self.assertNotIn('xxx', formats)
1327
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001328 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001329 self.check_unpack_archive_with_converter(format, lambda path: path)
1330 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001331 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001332
1333 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001334 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001335 expected = rlistdir(root_dir)
1336 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001337
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001338 base_name = os.path.join(self.mkdtemp(), 'archive')
1339 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001340
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001341 # let's try to unpack it now
1342 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001343 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001344 self.assertEqual(rlistdir(tmpdir2), expected)
1345
1346 # and again, this time with the format specified
1347 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001348 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001349 self.assertEqual(rlistdir(tmpdir3), expected)
1350
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001351 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1352 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001353
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001354 def test_unpack_archive_tar(self):
1355 self.check_unpack_archive('tar')
1356
1357 @support.requires_zlib
1358 def test_unpack_archive_gztar(self):
1359 self.check_unpack_archive('gztar')
1360
1361 @support.requires_bz2
1362 def test_unpack_archive_bztar(self):
1363 self.check_unpack_archive('bztar')
1364
1365 @support.requires_lzma
Michael Feltef110b12019-02-18 12:02:44 +01001366 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001367 def test_unpack_archive_xztar(self):
1368 self.check_unpack_archive('xztar')
1369
1370 @support.requires_zlib
1371 def test_unpack_archive_zip(self):
1372 self.check_unpack_archive('zip')
1373
Martin Pantereb995702016-07-28 01:11:04 +00001374 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001375
1376 formats = get_unpack_formats()
1377
1378 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001379 self.assertEqual(extra, 1)
1380 self.assertEqual(filename, 'stuff.boo')
1381 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001382
1383 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1384 unpack_archive('stuff.boo', 'xx')
1385
1386 # trying to register a .boo unpacker again
1387 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1388 ['.boo'], _boo)
1389
1390 # should work now
1391 unregister_unpack_format('Boo')
1392 register_unpack_format('Boo2', ['.boo'], _boo)
1393 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1394 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1395
1396 # let's leave a clean state
1397 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001398 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001399
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001400 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1401 "disk_usage not available on this platform")
1402 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001403 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001404 for attr in ('total', 'used', 'free'):
1405 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001406 self.assertGreater(usage.total, 0)
1407 self.assertGreater(usage.used, 0)
1408 self.assertGreaterEqual(usage.free, 0)
1409 self.assertGreaterEqual(usage.total, usage.used)
1410 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001411
Victor Stinnerdc525f42018-12-11 12:05:21 +01001412 # bpo-32557: Check that disk_usage() also accepts a filename
1413 shutil.disk_usage(__file__)
1414
Sandro Tosid902a142011-08-22 23:28:27 +02001415 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1416 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1417 def test_chown(self):
1418
1419 # cleaned-up automatically by TestShutil.tearDown method
1420 dirname = self.mkdtemp()
1421 filename = tempfile.mktemp(dir=dirname)
1422 write_file(filename, 'testing chown function')
1423
1424 with self.assertRaises(ValueError):
1425 shutil.chown(filename)
1426
1427 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001428 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001429
1430 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001431 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001432
1433 with self.assertRaises(TypeError):
1434 shutil.chown(filename, b'spam')
1435
1436 with self.assertRaises(TypeError):
1437 shutil.chown(filename, 3.14)
1438
1439 uid = os.getuid()
1440 gid = os.getgid()
1441
1442 def check_chown(path, uid=None, gid=None):
1443 s = os.stat(filename)
1444 if uid is not None:
1445 self.assertEqual(uid, s.st_uid)
1446 if gid is not None:
1447 self.assertEqual(gid, s.st_gid)
1448
1449 shutil.chown(filename, uid, gid)
1450 check_chown(filename, uid, gid)
1451 shutil.chown(filename, uid)
1452 check_chown(filename, uid)
1453 shutil.chown(filename, user=uid)
1454 check_chown(filename, uid)
1455 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001456 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001457
1458 shutil.chown(dirname, uid, gid)
1459 check_chown(dirname, uid, gid)
1460 shutil.chown(dirname, uid)
1461 check_chown(dirname, uid)
1462 shutil.chown(dirname, user=uid)
1463 check_chown(dirname, uid)
1464 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001465 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001466
1467 user = pwd.getpwuid(uid)[0]
1468 group = grp.getgrgid(gid)[0]
1469 shutil.chown(filename, user, group)
1470 check_chown(filename, uid, gid)
1471 shutil.chown(dirname, user, group)
1472 check_chown(dirname, uid, gid)
1473
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001474 def test_copy_return_value(self):
1475 # copy and copy2 both return their destination path.
1476 for fn in (shutil.copy, shutil.copy2):
1477 src_dir = self.mkdtemp()
1478 dst_dir = self.mkdtemp()
1479 src = os.path.join(src_dir, 'foo')
1480 write_file(src, 'foo')
1481 rv = fn(src, dst_dir)
1482 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1483 rv = fn(src, os.path.join(dst_dir, 'bar'))
1484 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1485
1486 def test_copyfile_return_value(self):
1487 # copytree returns its destination path.
1488 src_dir = self.mkdtemp()
1489 dst_dir = self.mkdtemp()
1490 dst_file = os.path.join(dst_dir, 'bar')
1491 src_file = os.path.join(src_dir, 'foo')
1492 write_file(src_file, 'foo')
1493 rv = shutil.copyfile(src_file, dst_file)
1494 self.assertTrue(os.path.exists(rv))
1495 self.assertEqual(read_file(src_file), read_file(dst_file))
1496
Hynek Schlawack48653762012-10-07 12:49:58 +02001497 def test_copyfile_same_file(self):
1498 # copyfile() should raise SameFileError if the source and destination
1499 # are the same.
1500 src_dir = self.mkdtemp()
1501 src_file = os.path.join(src_dir, 'foo')
1502 write_file(src_file, 'foo')
1503 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
Hynek Schlawack27ddb572012-10-28 13:59:27 +01001504 # But Error should work too, to stay backward compatible.
1505 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001506 # Make sure file is not corrupted.
1507 self.assertEqual(read_file(src_file), 'foo')
Hynek Schlawack48653762012-10-07 12:49:58 +02001508
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001509 def test_copytree_return_value(self):
1510 # copytree returns its destination path.
1511 src_dir = self.mkdtemp()
1512 dst_dir = src_dir + "dest"
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001513 self.addCleanup(shutil.rmtree, dst_dir, True)
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001514 src = os.path.join(src_dir, 'foo')
1515 write_file(src, 'foo')
1516 rv = shutil.copytree(src_dir, dst_dir)
1517 self.assertEqual(['foo'], os.listdir(rv))
1518
Christian Heimes9bd667a2008-01-20 15:14:11 +00001519
Brian Curtinc57a3452012-06-22 16:00:30 -05001520class TestWhich(unittest.TestCase):
1521
1522 def setUp(self):
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001523 self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001524 self.addCleanup(shutil.rmtree, self.temp_dir, True)
Brian Curtinc57a3452012-06-22 16:00:30 -05001525 # Give the temp_file an ".exe" suffix for all.
1526 # It's needed on Windows and not harmful on other platforms.
1527 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001528 prefix="Tmp",
1529 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001530 os.chmod(self.temp_file.name, stat.S_IXUSR)
1531 self.addCleanup(self.temp_file.close)
1532 self.dir, self.file = os.path.split(self.temp_file.name)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001533 self.env_path = self.dir
1534 self.curdir = os.curdir
1535 self.ext = ".EXE"
Brian Curtinc57a3452012-06-22 16:00:30 -05001536
1537 def test_basic(self):
1538 # Given an EXE in a directory, it should be returned.
1539 rv = shutil.which(self.file, path=self.dir)
1540 self.assertEqual(rv, self.temp_file.name)
1541
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001542 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001543 # When given the fully qualified path to an executable that exists,
1544 # it should be returned.
1545 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001546 self.assertEqual(rv, self.temp_file.name)
1547
1548 def test_relative_cmd(self):
1549 # When given the relative path with a directory part to an executable
1550 # that exists, it should be returned.
1551 base_dir, tail_dir = os.path.split(self.dir)
1552 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001553 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001554 rv = shutil.which(relpath, path=self.temp_dir)
1555 self.assertEqual(rv, relpath)
1556 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001557 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001558 rv = shutil.which(relpath, path=base_dir)
1559 self.assertIsNone(rv)
1560
1561 def test_cwd(self):
1562 # Issue #16957
1563 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001564 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001565 rv = shutil.which(self.file, path=base_dir)
1566 if sys.platform == "win32":
1567 # Windows: current directory implicitly on PATH
Cheryl Sabella5680f652019-02-13 06:25:10 -05001568 self.assertEqual(rv, os.path.join(self.curdir, self.file))
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001569 else:
1570 # Other platforms: shouldn't match in the current directory.
1571 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001572
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001573 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1574 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001575 def test_non_matching_mode(self):
1576 # Set the file read-only and ask for writeable files.
1577 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001578 if os.access(self.temp_file.name, os.W_OK):
1579 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001580 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1581 self.assertIsNone(rv)
1582
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001583 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001584 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001585 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001586 rv = shutil.which(self.file, path=tail_dir)
1587 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001588
Brian Curtinc57a3452012-06-22 16:00:30 -05001589 def test_nonexistent_file(self):
1590 # Return None when no matching executable file is found on the path.
1591 rv = shutil.which("foo.exe", path=self.dir)
1592 self.assertIsNone(rv)
1593
1594 @unittest.skipUnless(sys.platform == "win32",
1595 "pathext check is Windows-only")
1596 def test_pathext_checking(self):
1597 # Ask for the file without the ".exe" extension, then ensure that
1598 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001599 rv = shutil.which(self.file[:-4], path=self.dir)
Cheryl Sabella5680f652019-02-13 06:25:10 -05001600 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
Brian Curtinc57a3452012-06-22 16:00:30 -05001601
Barry Warsaw618738b2013-04-16 11:05:03 -04001602 def test_environ_path(self):
1603 with support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001604 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001605 rv = shutil.which(self.file)
1606 self.assertEqual(rv, self.temp_file.name)
1607
1608 def test_empty_path(self):
1609 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001610 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001611 support.EnvironmentVarGuard() as env:
Cheryl Sabella5680f652019-02-13 06:25:10 -05001612 env['PATH'] = self.env_path
Barry Warsaw618738b2013-04-16 11:05:03 -04001613 rv = shutil.which(self.file, path='')
1614 self.assertIsNone(rv)
1615
1616 def test_empty_path_no_PATH(self):
1617 with support.EnvironmentVarGuard() as env:
1618 env.pop('PATH', None)
1619 rv = shutil.which(self.file)
1620 self.assertIsNone(rv)
1621
Brian Curtinc57a3452012-06-22 16:00:30 -05001622
Cheryl Sabella5680f652019-02-13 06:25:10 -05001623class TestWhichBytes(TestWhich):
1624 def setUp(self):
1625 TestWhich.setUp(self)
1626 self.dir = os.fsencode(self.dir)
1627 self.file = os.fsencode(self.file)
1628 self.temp_file.name = os.fsencode(self.temp_file.name)
1629 self.curdir = os.fsencode(self.curdir)
1630 self.ext = os.fsencode(self.ext)
1631
1632
Christian Heimesada8c3b2008-03-18 18:26:33 +00001633class TestMove(unittest.TestCase):
1634
1635 def setUp(self):
1636 filename = "foo"
1637 self.src_dir = tempfile.mkdtemp()
1638 self.dst_dir = tempfile.mkdtemp()
1639 self.src_file = os.path.join(self.src_dir, filename)
1640 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001641 with open(self.src_file, "wb") as f:
1642 f.write(b"spam")
1643
1644 def tearDown(self):
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001645 for d in (self.src_dir, self.dst_dir):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001646 try:
1647 if d:
1648 shutil.rmtree(d)
1649 except:
1650 pass
1651
1652 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001653 with open(src, "rb") as f:
1654 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001655 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001656 with open(real_dst, "rb") as f:
1657 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001658 self.assertFalse(os.path.exists(src))
1659
1660 def _check_move_dir(self, src, dst, real_dst):
1661 contents = sorted(os.listdir(src))
1662 shutil.move(src, dst)
1663 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1664 self.assertFalse(os.path.exists(src))
1665
1666 def test_move_file(self):
1667 # Move a file to another location on the same filesystem.
1668 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1669
1670 def test_move_file_to_dir(self):
1671 # Move a file inside an existing dir on the same filesystem.
1672 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1673
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001674 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001675 def test_move_file_other_fs(self):
1676 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001677 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001678
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001679 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001680 def test_move_file_to_dir_other_fs(self):
1681 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001682 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001683
1684 def test_move_dir(self):
1685 # Move a dir to another location on the same filesystem.
1686 dst_dir = tempfile.mktemp()
1687 try:
1688 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1689 finally:
1690 try:
1691 shutil.rmtree(dst_dir)
1692 except:
1693 pass
1694
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001695 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001696 def test_move_dir_other_fs(self):
1697 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001698 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001699
1700 def test_move_dir_to_dir(self):
1701 # Move a dir inside an existing dir on the same filesystem.
1702 self._check_move_dir(self.src_dir, self.dst_dir,
1703 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1704
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001705 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001706 def test_move_dir_to_dir_other_fs(self):
1707 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001708 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001709
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001710 def test_move_dir_sep_to_dir(self):
1711 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1712 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1713
1714 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1715 def test_move_dir_altsep_to_dir(self):
1716 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1717 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1718
Christian Heimesada8c3b2008-03-18 18:26:33 +00001719 def test_existing_file_inside_dest_dir(self):
1720 # A file with the same name inside the destination dir already exists.
1721 with open(self.dst_file, "wb"):
1722 pass
1723 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1724
1725 def test_dont_move_dir_in_itself(self):
1726 # Moving a dir inside itself raises an Error.
1727 dst = os.path.join(self.src_dir, "bar")
1728 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1729
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001730 def test_destinsrc_false_negative(self):
1731 os.mkdir(TESTFN)
1732 try:
1733 for src, dst in [('srcdir', 'srcdir/dest')]:
1734 src = os.path.join(TESTFN, src)
1735 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001736 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001737 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001738 'dst (%s) is not in src (%s)' % (dst, src))
1739 finally:
1740 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001741
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001742 def test_destinsrc_false_positive(self):
1743 os.mkdir(TESTFN)
1744 try:
1745 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1746 src = os.path.join(TESTFN, src)
1747 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001748 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001749 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001750 'dst (%s) is in src (%s)' % (dst, src))
1751 finally:
1752 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001753
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001754 @support.skip_unless_symlink
1755 @mock_rename
1756 def test_move_file_symlink(self):
1757 dst = os.path.join(self.src_dir, 'bar')
1758 os.symlink(self.src_file, dst)
1759 shutil.move(dst, self.dst_file)
1760 self.assertTrue(os.path.islink(self.dst_file))
1761 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1762
1763 @support.skip_unless_symlink
1764 @mock_rename
1765 def test_move_file_symlink_to_dir(self):
1766 filename = "bar"
1767 dst = os.path.join(self.src_dir, filename)
1768 os.symlink(self.src_file, dst)
1769 shutil.move(dst, self.dst_dir)
1770 final_link = os.path.join(self.dst_dir, filename)
1771 self.assertTrue(os.path.islink(final_link))
1772 self.assertTrue(os.path.samefile(self.src_file, final_link))
1773
1774 @support.skip_unless_symlink
1775 @mock_rename
1776 def test_move_dangling_symlink(self):
1777 src = os.path.join(self.src_dir, 'baz')
1778 dst = os.path.join(self.src_dir, 'bar')
1779 os.symlink(src, dst)
1780 dst_link = os.path.join(self.dst_dir, 'quux')
1781 shutil.move(dst, dst_link)
1782 self.assertTrue(os.path.islink(dst_link))
Antoine Pitrou3f48ac92014-01-01 02:50:45 +01001783 # On Windows, os.path.realpath does not follow symlinks (issue #9949)
1784 if os.name == 'nt':
1785 self.assertEqual(os.path.realpath(src), os.readlink(dst_link))
1786 else:
1787 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001788
1789 @support.skip_unless_symlink
1790 @mock_rename
1791 def test_move_dir_symlink(self):
1792 src = os.path.join(self.src_dir, 'baz')
1793 dst = os.path.join(self.src_dir, 'bar')
1794 os.mkdir(src)
1795 os.symlink(src, dst)
1796 dst_link = os.path.join(self.dst_dir, 'quux')
1797 shutil.move(dst, dst_link)
1798 self.assertTrue(os.path.islink(dst_link))
1799 self.assertTrue(os.path.samefile(src, dst_link))
1800
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001801 def test_move_return_value(self):
1802 rv = shutil.move(self.src_file, self.dst_dir)
1803 self.assertEqual(rv,
1804 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1805
1806 def test_move_as_rename_return_value(self):
1807 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1808 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1809
R David Murray6ffface2014-06-11 14:40:13 -04001810 @mock_rename
1811 def test_move_file_special_function(self):
1812 moved = []
1813 def _copy(src, dst):
1814 moved.append((src, dst))
1815 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1816 self.assertEqual(len(moved), 1)
1817
1818 @mock_rename
1819 def test_move_dir_special_function(self):
1820 moved = []
1821 def _copy(src, dst):
1822 moved.append((src, dst))
1823 support.create_empty_file(os.path.join(self.src_dir, 'child'))
1824 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1825 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1826 self.assertEqual(len(moved), 3)
1827
Tarek Ziadé5340db32010-04-19 22:30:51 +00001828
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001829class TestCopyFile(unittest.TestCase):
1830
1831 _delete = False
1832
1833 class Faux(object):
1834 _entered = False
1835 _exited_with = None
1836 _raised = False
1837 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
1838 self._raise_in_exit = raise_in_exit
1839 self._suppress_at_exit = suppress_at_exit
1840 def read(self, *args):
1841 return ''
1842 def __enter__(self):
1843 self._entered = True
1844 def __exit__(self, exc_type, exc_val, exc_tb):
1845 self._exited_with = exc_type, exc_val, exc_tb
1846 if self._raise_in_exit:
1847 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001848 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001849 return self._suppress_at_exit
1850
1851 def tearDown(self):
1852 if self._delete:
1853 del shutil.open
1854
1855 def _set_shutil_open(self, func):
1856 shutil.open = func
1857 self._delete = True
1858
1859 def test_w_source_open_fails(self):
1860 def _open(filename, mode='r'):
1861 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001862 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001863 assert 0 # shouldn't reach here.
1864
1865 self._set_shutil_open(_open)
1866
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001867 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001868
Victor Stinner937ee9e2018-06-26 02:11:06 +02001869 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001870 def test_w_dest_open_fails(self):
1871
1872 srcfile = self.Faux()
1873
1874 def _open(filename, mode='r'):
1875 if filename == 'srcfile':
1876 return srcfile
1877 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001878 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001879 assert 0 # shouldn't reach here.
1880
1881 self._set_shutil_open(_open)
1882
1883 shutil.copyfile('srcfile', 'destfile')
1884 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001885 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001886 self.assertEqual(srcfile._exited_with[1].args,
1887 ('Cannot open "destfile"',))
1888
Victor Stinner937ee9e2018-06-26 02:11:06 +02001889 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001890 def test_w_dest_close_fails(self):
1891
1892 srcfile = self.Faux()
1893 destfile = self.Faux(True)
1894
1895 def _open(filename, mode='r'):
1896 if filename == 'srcfile':
1897 return srcfile
1898 if filename == 'destfile':
1899 return destfile
1900 assert 0 # shouldn't reach here.
1901
1902 self._set_shutil_open(_open)
1903
1904 shutil.copyfile('srcfile', 'destfile')
1905 self.assertTrue(srcfile._entered)
1906 self.assertTrue(destfile._entered)
1907 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001908 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001909 self.assertEqual(srcfile._exited_with[1].args,
1910 ('Cannot close',))
1911
Victor Stinner937ee9e2018-06-26 02:11:06 +02001912 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001913 def test_w_source_close_fails(self):
1914
1915 srcfile = self.Faux(True)
1916 destfile = self.Faux()
1917
1918 def _open(filename, mode='r'):
1919 if filename == 'srcfile':
1920 return srcfile
1921 if filename == 'destfile':
1922 return destfile
1923 assert 0 # shouldn't reach here.
1924
1925 self._set_shutil_open(_open)
1926
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001927 self.assertRaises(OSError,
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001928 shutil.copyfile, 'srcfile', 'destfile')
1929 self.assertTrue(srcfile._entered)
1930 self.assertTrue(destfile._entered)
1931 self.assertFalse(destfile._raised)
1932 self.assertTrue(srcfile._exited_with[0] is None)
1933 self.assertTrue(srcfile._raised)
1934
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001935 def test_move_dir_caseinsensitive(self):
1936 # Renames a folder to the same name
1937 # but a different case.
1938
1939 self.src_dir = tempfile.mkdtemp()
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001940 self.addCleanup(shutil.rmtree, self.src_dir, True)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001941 dst_dir = os.path.join(
1942 os.path.dirname(self.src_dir),
1943 os.path.basename(self.src_dir).upper())
1944 self.assertNotEqual(self.src_dir, dst_dir)
1945
1946 try:
1947 shutil.move(self.src_dir, dst_dir)
1948 self.assertTrue(os.path.isdir(dst_dir))
1949 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +02001950 os.rmdir(dst_dir)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001951
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001952
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07001953class TestCopyFileObj(unittest.TestCase):
1954 FILESIZE = 2 * 1024 * 1024
1955
1956 @classmethod
1957 def setUpClass(cls):
1958 write_test_file(TESTFN, cls.FILESIZE)
1959
1960 @classmethod
1961 def tearDownClass(cls):
1962 support.unlink(TESTFN)
1963 support.unlink(TESTFN2)
1964
1965 def tearDown(self):
1966 support.unlink(TESTFN2)
1967
1968 @contextlib.contextmanager
1969 def get_files(self):
1970 with open(TESTFN, "rb") as src:
1971 with open(TESTFN2, "wb") as dst:
1972 yield (src, dst)
1973
1974 def assert_files_eq(self, src, dst):
1975 with open(src, 'rb') as fsrc:
1976 with open(dst, 'rb') as fdst:
1977 self.assertEqual(fsrc.read(), fdst.read())
1978
1979 def test_content(self):
1980 with self.get_files() as (src, dst):
1981 shutil.copyfileobj(src, dst)
1982 self.assert_files_eq(TESTFN, TESTFN2)
1983
1984 def test_file_not_closed(self):
1985 with self.get_files() as (src, dst):
1986 shutil.copyfileobj(src, dst)
1987 assert not src.closed
1988 assert not dst.closed
1989
1990 def test_file_offset(self):
1991 with self.get_files() as (src, dst):
1992 shutil.copyfileobj(src, dst)
1993 self.assertEqual(src.tell(), self.FILESIZE)
1994 self.assertEqual(dst.tell(), self.FILESIZE)
1995
1996 @unittest.skipIf(os.name != 'nt', "Windows only")
1997 def test_win_impl(self):
1998 # Make sure alternate Windows implementation is called.
1999 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2000 shutil.copyfile(TESTFN, TESTFN2)
2001 assert m.called
2002
2003 # File size is 2 MiB but max buf size should be 1 MiB.
2004 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2005
2006 # If file size < 1 MiB memoryview() length must be equal to
2007 # the actual file size.
2008 with tempfile.NamedTemporaryFile(delete=False) as f:
2009 f.write(b'foo')
2010 fname = f.name
2011 self.addCleanup(support.unlink, fname)
2012 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2013 shutil.copyfile(fname, TESTFN2)
2014 self.assertEqual(m.call_args[0][2], 3)
2015
2016 # Empty files should not rely on readinto() variant.
2017 with tempfile.NamedTemporaryFile(delete=False) as f:
2018 pass
2019 fname = f.name
2020 self.addCleanup(support.unlink, fname)
2021 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2022 shutil.copyfile(fname, TESTFN2)
2023 assert not m.called
2024 self.assert_files_eq(fname, TESTFN2)
2025
2026
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002027class _ZeroCopyFileTest(object):
2028 """Tests common to all zero-copy APIs."""
2029 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2030 FILEDATA = b""
2031 PATCHPOINT = ""
2032
2033 @classmethod
2034 def setUpClass(cls):
2035 write_test_file(TESTFN, cls.FILESIZE)
2036 with open(TESTFN, 'rb') as f:
2037 cls.FILEDATA = f.read()
2038 assert len(cls.FILEDATA) == cls.FILESIZE
2039
2040 @classmethod
2041 def tearDownClass(cls):
2042 support.unlink(TESTFN)
2043
2044 def tearDown(self):
2045 support.unlink(TESTFN2)
2046
2047 @contextlib.contextmanager
2048 def get_files(self):
2049 with open(TESTFN, "rb") as src:
2050 with open(TESTFN2, "wb") as dst:
2051 yield (src, dst)
2052
2053 def zerocopy_fun(self, *args, **kwargs):
2054 raise NotImplementedError("must be implemented in subclass")
2055
2056 def reset(self):
2057 self.tearDown()
2058 self.tearDownClass()
2059 self.setUpClass()
2060 self.setUp()
2061
2062 # ---
2063
2064 def test_regular_copy(self):
2065 with self.get_files() as (src, dst):
2066 self.zerocopy_fun(src, dst)
2067 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2068 # Make sure the fallback function is not called.
2069 with self.get_files() as (src, dst):
2070 with unittest.mock.patch('shutil.copyfileobj') as m:
2071 shutil.copyfile(TESTFN, TESTFN2)
2072 assert not m.called
2073
2074 def test_same_file(self):
2075 self.addCleanup(self.reset)
2076 with self.get_files() as (src, dst):
2077 with self.assertRaises(Exception):
2078 self.zerocopy_fun(src, src)
2079 # Make sure src file is not corrupted.
2080 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2081
2082 def test_non_existent_src(self):
2083 name = tempfile.mktemp()
2084 with self.assertRaises(FileNotFoundError) as cm:
2085 shutil.copyfile(name, "new")
2086 self.assertEqual(cm.exception.filename, name)
2087
2088 def test_empty_file(self):
2089 srcname = TESTFN + 'src'
2090 dstname = TESTFN + 'dst'
2091 self.addCleanup(lambda: support.unlink(srcname))
2092 self.addCleanup(lambda: support.unlink(dstname))
2093 with open(srcname, "wb"):
2094 pass
2095
2096 with open(srcname, "rb") as src:
2097 with open(dstname, "wb") as dst:
2098 self.zerocopy_fun(src, dst)
2099
2100 self.assertEqual(read_file(dstname, binary=True), b"")
2101
2102 def test_unhandled_exception(self):
2103 with unittest.mock.patch(self.PATCHPOINT,
2104 side_effect=ZeroDivisionError):
2105 self.assertRaises(ZeroDivisionError,
2106 shutil.copyfile, TESTFN, TESTFN2)
2107
2108 def test_exception_on_first_call(self):
2109 # Emulate a case where the first call to the zero-copy
2110 # function raises an exception in which case the function is
2111 # supposed to give up immediately.
2112 with unittest.mock.patch(self.PATCHPOINT,
2113 side_effect=OSError(errno.EINVAL, "yo")):
2114 with self.get_files() as (src, dst):
2115 with self.assertRaises(_GiveupOnFastCopy):
2116 self.zerocopy_fun(src, dst)
2117
2118 def test_filesystem_full(self):
2119 # Emulate a case where filesystem is full and sendfile() fails
2120 # on first call.
2121 with unittest.mock.patch(self.PATCHPOINT,
2122 side_effect=OSError(errno.ENOSPC, "yo")):
2123 with self.get_files() as (src, dst):
2124 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2125
2126
2127@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2128class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2129 PATCHPOINT = "os.sendfile"
2130
2131 def zerocopy_fun(self, fsrc, fdst):
2132 return shutil._fastcopy_sendfile(fsrc, fdst)
2133
2134 def test_non_regular_file_src(self):
2135 with io.BytesIO(self.FILEDATA) as src:
2136 with open(TESTFN2, "wb") as dst:
2137 with self.assertRaises(_GiveupOnFastCopy):
2138 self.zerocopy_fun(src, dst)
2139 shutil.copyfileobj(src, dst)
2140
2141 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2142
2143 def test_non_regular_file_dst(self):
2144 with open(TESTFN, "rb") as src:
2145 with io.BytesIO() as dst:
2146 with self.assertRaises(_GiveupOnFastCopy):
2147 self.zerocopy_fun(src, dst)
2148 shutil.copyfileobj(src, dst)
2149 dst.seek(0)
2150 self.assertEqual(dst.read(), self.FILEDATA)
2151
2152 def test_exception_on_second_call(self):
2153 def sendfile(*args, **kwargs):
2154 if not flag:
2155 flag.append(None)
2156 return orig_sendfile(*args, **kwargs)
2157 else:
2158 raise OSError(errno.EBADF, "yo")
2159
2160 flag = []
2161 orig_sendfile = os.sendfile
2162 with unittest.mock.patch('os.sendfile', create=True,
2163 side_effect=sendfile):
2164 with self.get_files() as (src, dst):
2165 with self.assertRaises(OSError) as cm:
2166 shutil._fastcopy_sendfile(src, dst)
2167 assert flag
2168 self.assertEqual(cm.exception.errno, errno.EBADF)
2169
2170 def test_cant_get_size(self):
2171 # Emulate a case where src file size cannot be determined.
2172 # Internally bufsize will be set to a small value and
2173 # sendfile() will be called repeatedly.
2174 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2175 with self.get_files() as (src, dst):
2176 shutil._fastcopy_sendfile(src, dst)
2177 assert m.called
2178 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2179
2180 def test_small_chunks(self):
2181 # Force internal file size detection to be smaller than the
2182 # actual file size. We want to force sendfile() to be called
2183 # multiple times, also in order to emulate a src fd which gets
2184 # bigger while it is being copied.
2185 mock = unittest.mock.Mock()
2186 mock.st_size = 65536 + 1
2187 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2188 with self.get_files() as (src, dst):
2189 shutil._fastcopy_sendfile(src, dst)
2190 assert m.called
2191 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2192
2193 def test_big_chunk(self):
2194 # Force internal file size detection to be +100MB bigger than
2195 # the actual file size. Make sure sendfile() does not rely on
2196 # file size value except for (maybe) a better throughput /
2197 # performance.
2198 mock = unittest.mock.Mock()
2199 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2200 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2201 with self.get_files() as (src, dst):
2202 shutil._fastcopy_sendfile(src, dst)
2203 assert m.called
2204 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2205
2206 def test_blocksize_arg(self):
2207 with unittest.mock.patch('os.sendfile',
2208 side_effect=ZeroDivisionError) as m:
2209 self.assertRaises(ZeroDivisionError,
2210 shutil.copyfile, TESTFN, TESTFN2)
2211 blocksize = m.call_args[0][3]
2212 # Make sure file size and the block size arg passed to
2213 # sendfile() are the same.
2214 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2215 # ...unless we're dealing with a small file.
2216 support.unlink(TESTFN2)
2217 write_file(TESTFN2, b"hello", binary=True)
2218 self.addCleanup(support.unlink, TESTFN2 + '3')
2219 self.assertRaises(ZeroDivisionError,
2220 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2221 blocksize = m.call_args[0][3]
2222 self.assertEqual(blocksize, 2 ** 23)
2223
2224 def test_file2file_not_supported(self):
2225 # Emulate a case where sendfile() only support file->socket
2226 # fds. In such a case copyfile() is supposed to skip the
2227 # fast-copy attempt from then on.
2228 assert shutil._HAS_SENDFILE
2229 try:
2230 with unittest.mock.patch(
2231 self.PATCHPOINT,
2232 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2233 with self.get_files() as (src, dst):
2234 with self.assertRaises(_GiveupOnFastCopy):
2235 shutil._fastcopy_sendfile(src, dst)
2236 assert m.called
2237 assert not shutil._HAS_SENDFILE
2238
2239 with unittest.mock.patch(self.PATCHPOINT) as m:
2240 shutil.copyfile(TESTFN, TESTFN2)
2241 assert not m.called
2242 finally:
2243 shutil._HAS_SENDFILE = True
2244
2245
Victor Stinner937ee9e2018-06-26 02:11:06 +02002246@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002247class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002248 PATCHPOINT = "posix._fcopyfile"
2249
2250 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002251 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002252
2253
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002254class TermsizeTests(unittest.TestCase):
2255 def test_does_not_crash(self):
2256 """Check if get_terminal_size() returns a meaningful value.
2257
2258 There's no easy portable way to actually check the size of the
2259 terminal, so let's check if it returns something sensible instead.
2260 """
2261 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002262 self.assertGreaterEqual(size.columns, 0)
2263 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002264
2265 def test_os_environ_first(self):
2266 "Check if environment variables have precedence"
2267
2268 with support.EnvironmentVarGuard() as env:
2269 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002270 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002271 size = shutil.get_terminal_size()
2272 self.assertEqual(size.columns, 777)
2273
2274 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002275 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002276 env['LINES'] = '888'
2277 size = shutil.get_terminal_size()
2278 self.assertEqual(size.lines, 888)
2279
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002280 def test_bad_environ(self):
2281 with support.EnvironmentVarGuard() as env:
2282 env['COLUMNS'] = 'xxx'
2283 env['LINES'] = 'yyy'
2284 size = shutil.get_terminal_size()
2285 self.assertGreaterEqual(size.columns, 0)
2286 self.assertGreaterEqual(size.lines, 0)
2287
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002288 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002289 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2290 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002291 def test_stty_match(self):
2292 """Check if stty returns the same results ignoring env
2293
2294 This test will fail if stdin and stdout are connected to
2295 different terminals with different sizes. Nevertheless, such
2296 situations should be pretty rare.
2297 """
2298 try:
2299 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002300 except (FileNotFoundError, PermissionError,
2301 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002302 self.skipTest("stty invocation failed")
2303 expected = (int(size[1]), int(size[0])) # reversed order
2304
2305 with support.EnvironmentVarGuard() as env:
2306 del env['LINES']
2307 del env['COLUMNS']
2308 actual = shutil.get_terminal_size()
2309
2310 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002311
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002312 def test_fallback(self):
2313 with support.EnvironmentVarGuard() as env:
2314 del env['LINES']
2315 del env['COLUMNS']
2316
2317 # sys.__stdout__ has no fileno()
2318 with support.swap_attr(sys, '__stdout__', None):
2319 size = shutil.get_terminal_size(fallback=(10, 20))
2320 self.assertEqual(size.columns, 10)
2321 self.assertEqual(size.lines, 20)
2322
2323 # sys.__stdout__ is not a terminal on Unix
2324 # or fileno() not in (0, 1, 2) on Windows
2325 with open(os.devnull, 'w') as f, \
2326 support.swap_attr(sys, '__stdout__', f):
2327 size = shutil.get_terminal_size(fallback=(30, 40))
2328 self.assertEqual(size.columns, 30)
2329 self.assertEqual(size.lines, 40)
2330
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002331
Berker Peksag8083cd62014-11-01 11:04:06 +02002332class PublicAPITests(unittest.TestCase):
2333 """Ensures that the correct values are exposed in the public API."""
2334
2335 def test_module_all_attribute(self):
2336 self.assertTrue(hasattr(shutil, '__all__'))
2337 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2338 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2339 'SpecialFileError', 'ExecError', 'make_archive',
2340 'get_archive_formats', 'register_archive_format',
2341 'unregister_archive_format', 'get_unpack_formats',
2342 'register_unpack_format', 'unregister_unpack_format',
2343 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2344 'get_terminal_size', 'SameFileError']
2345 if hasattr(os, 'statvfs') or os.name == 'nt':
2346 target_api.append('disk_usage')
2347 self.assertEqual(set(shutil.__all__), set(target_api))
2348
2349
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002350if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002351 unittest.main()