blob: b3b3e66009e40eaff96f1db55e1e137ca634093e [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Tarek Ziadé396fad72010-02-23 05:30:31 +000036try:
37 import grp
38 import pwd
39 UID_GID_SUPPORT = True
40except ImportError:
41 UID_GID_SUPPORT = False
42
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040043def _fake_rename(*args, **kwargs):
44 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010045 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040046
47def mock_rename(func):
48 @functools.wraps(func)
49 def wrap(*args, **kwargs):
50 try:
51 builtin_rename = os.rename
52 os.rename = _fake_rename
53 return func(*args, **kwargs)
54 finally:
55 os.rename = builtin_rename
56 return wrap
57
Éric Araujoa7e33a12011-08-12 19:51:35 +020058def write_file(path, content, binary=False):
59 """Write *content* to a file located at *path*.
60
61 If *path* is a tuple instead of a string, os.path.join will be used to
62 make a path. If *binary* is true, the file will be opened in binary
63 mode.
64 """
65 if isinstance(path, tuple):
66 path = os.path.join(*path)
67 with open(path, 'wb' if binary else 'w') as fp:
68 fp.write(content)
69
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020070def write_test_file(path, size):
71 """Create a test file with an arbitrary size and random text content."""
72 def chunks(total, step):
73 assert total >= step
74 while total > step:
75 yield step
76 total -= step
77 if total:
78 yield total
79
80 bufsize = min(size, 8192)
81 chunk = b"".join([random.choice(string.ascii_letters).encode()
82 for i in range(bufsize)])
83 with open(path, 'wb') as f:
84 for csize in chunks(size, bufsize):
85 f.write(chunk)
86 assert os.path.getsize(path) == size
87
Éric Araujoa7e33a12011-08-12 19:51:35 +020088def read_file(path, binary=False):
89 """Return contents from a file located at *path*.
90
91 If *path* is a tuple instead of a string, os.path.join will be used to
92 make a path. If *binary* is true, the file will be opened in binary
93 mode.
94 """
95 if isinstance(path, tuple):
96 path = os.path.join(*path)
97 with open(path, 'rb' if binary else 'r') as fp:
98 return fp.read()
99
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300100def rlistdir(path):
101 res = []
102 for name in sorted(os.listdir(path)):
103 p = os.path.join(path, name)
104 if os.path.isdir(p) and not os.path.islink(p):
105 res.append(name + '/')
106 for n in rlistdir(p):
107 res.append(name + '/' + n)
108 else:
109 res.append(name)
110 return res
111
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200112def supports_file2file_sendfile():
113 # ...apparently Linux and Solaris are the only ones
114 if not hasattr(os, "sendfile"):
115 return False
116 srcname = None
117 dstname = None
118 try:
119 with tempfile.NamedTemporaryFile("wb", delete=False) as f:
120 srcname = f.name
121 f.write(b"0123456789")
122
123 with open(srcname, "rb") as src:
124 with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
125 dstname = f.name
126 infd = src.fileno()
127 outfd = dst.fileno()
128 try:
129 os.sendfile(outfd, infd, 0, 2)
130 except OSError:
131 return False
132 else:
133 return True
134 finally:
135 if srcname is not None:
136 support.unlink(srcname)
137 if dstname is not None:
138 support.unlink(dstname)
139
140
141SUPPORTS_SENDFILE = supports_file2file_sendfile()
142
Éric Araujoa7e33a12011-08-12 19:51:35 +0200143
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000144class TestShutil(unittest.TestCase):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000145
146 def setUp(self):
147 super(TestShutil, self).setUp()
148 self.tempdirs = []
149
150 def tearDown(self):
151 super(TestShutil, self).tearDown()
152 while self.tempdirs:
153 d = self.tempdirs.pop()
154 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
155
Tarek Ziadé396fad72010-02-23 05:30:31 +0000156
157 def mkdtemp(self):
158 """Create a temporary directory that will be cleaned up.
159
160 Returns the path of the directory.
161 """
162 d = tempfile.mkdtemp()
163 self.tempdirs.append(d)
164 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000165
Hynek Schlawack3b527782012-06-25 13:27:31 +0200166 def test_rmtree_works_on_bytes(self):
167 tmp = self.mkdtemp()
168 victim = os.path.join(tmp, 'killme')
169 os.mkdir(victim)
170 write_file(os.path.join(victim, 'somefile'), 'foo')
171 victim = os.fsencode(victim)
172 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700173 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200174
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200175 @support.skip_unless_symlink
176 def test_rmtree_fails_on_symlink(self):
177 tmp = self.mkdtemp()
178 dir_ = os.path.join(tmp, 'dir')
179 os.mkdir(dir_)
180 link = os.path.join(tmp, 'link')
181 os.symlink(dir_, link)
182 self.assertRaises(OSError, shutil.rmtree, link)
183 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100184 self.assertTrue(os.path.lexists(link))
185 errors = []
186 def onerror(*args):
187 errors.append(args)
188 shutil.rmtree(link, onerror=onerror)
189 self.assertEqual(len(errors), 1)
190 self.assertIs(errors[0][0], os.path.islink)
191 self.assertEqual(errors[0][1], link)
192 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200193
194 @support.skip_unless_symlink
195 def test_rmtree_works_on_symlinks(self):
196 tmp = self.mkdtemp()
197 dir1 = os.path.join(tmp, 'dir1')
198 dir2 = os.path.join(dir1, 'dir2')
199 dir3 = os.path.join(tmp, 'dir3')
200 for d in dir1, dir2, dir3:
201 os.mkdir(d)
202 file1 = os.path.join(tmp, 'file1')
203 write_file(file1, 'foo')
204 link1 = os.path.join(dir1, 'link1')
205 os.symlink(dir2, link1)
206 link2 = os.path.join(dir1, 'link2')
207 os.symlink(dir3, link2)
208 link3 = os.path.join(dir1, 'link3')
209 os.symlink(file1, link3)
210 # make sure symlinks are removed but not followed
211 shutil.rmtree(dir1)
212 self.assertFalse(os.path.exists(dir1))
213 self.assertTrue(os.path.exists(dir3))
214 self.assertTrue(os.path.exists(file1))
215
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000216 def test_rmtree_errors(self):
217 # filename is guaranteed not to exist
218 filename = tempfile.mktemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100219 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
220 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100221 shutil.rmtree(filename, ignore_errors=True)
222
223 # existing file
224 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100225 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100226 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100227 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100228 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100229 # The reason for this rather odd construct is that Windows sprinkles
230 # a \*.* at the end of file names. But only sometimes on some buildbots
231 possible_args = [filename, os.path.join(filename, '*.*')]
232 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100233 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100234 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100235 shutil.rmtree(filename, ignore_errors=True)
236 self.assertTrue(os.path.exists(filename))
237 errors = []
238 def onerror(*args):
239 errors.append(args)
240 shutil.rmtree(filename, onerror=onerror)
241 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200242 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100243 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100244 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100245 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100246 self.assertIs(errors[1][0], os.rmdir)
247 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100248 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100249 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000250
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000251
Serhiy Storchaka43767632013-11-03 21:31:38 +0200252 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
253 @unittest.skipIf(sys.platform[:6] == 'cygwin',
254 "This test can't be run on Cygwin (issue #1071513).")
255 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
256 "This test can't be run reliably as root (issue #1076467).")
257 def test_on_error(self):
258 self.errorState = 0
259 os.mkdir(TESTFN)
260 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200261
Serhiy Storchaka43767632013-11-03 21:31:38 +0200262 self.child_file_path = os.path.join(TESTFN, 'a')
263 self.child_dir_path = os.path.join(TESTFN, 'b')
264 support.create_empty_file(self.child_file_path)
265 os.mkdir(self.child_dir_path)
266 old_dir_mode = os.stat(TESTFN).st_mode
267 old_child_file_mode = os.stat(self.child_file_path).st_mode
268 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
269 # Make unwritable.
270 new_mode = stat.S_IREAD|stat.S_IEXEC
271 os.chmod(self.child_file_path, new_mode)
272 os.chmod(self.child_dir_path, new_mode)
273 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000274
Serhiy Storchaka43767632013-11-03 21:31:38 +0200275 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
276 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
277 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200278
Serhiy Storchaka43767632013-11-03 21:31:38 +0200279 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
280 # Test whether onerror has actually been called.
281 self.assertEqual(self.errorState, 3,
282 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000283
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000284 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000285 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200286 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000287 # This function is run when shutil.rmtree fails.
288 # 99.9% of the time it initially fails to remove
289 # a file in the directory, so the first time through
290 # func is os.remove.
291 # However, some Linux machines running ZFS on
292 # FUSE experienced a failure earlier in the process
293 # at os.listdir. The first failure may legally
294 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200295 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200296 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200297 self.assertEqual(arg, self.child_file_path)
298 elif func is os.rmdir:
299 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000300 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200301 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200302 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000303 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200304 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000305 else:
306 self.assertEqual(func, os.rmdir)
307 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000308 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200309 self.errorState = 3
310
311 def test_rmtree_does_not_choke_on_failing_lstat(self):
312 try:
313 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200314 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200315 if fn != TESTFN:
316 raise OSError()
317 else:
318 return orig_lstat(fn)
319 os.lstat = raiser
320
321 os.mkdir(TESTFN)
322 write_file((TESTFN, 'foo'), 'foo')
323 shutil.rmtree(TESTFN)
324 finally:
325 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000326
Antoine Pitrou78091e62011-12-29 18:54:15 +0100327 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
328 @support.skip_unless_symlink
329 def test_copymode_follow_symlinks(self):
330 tmp_dir = self.mkdtemp()
331 src = os.path.join(tmp_dir, 'foo')
332 dst = os.path.join(tmp_dir, 'bar')
333 src_link = os.path.join(tmp_dir, 'baz')
334 dst_link = os.path.join(tmp_dir, 'quux')
335 write_file(src, 'foo')
336 write_file(dst, 'foo')
337 os.symlink(src, src_link)
338 os.symlink(dst, dst_link)
339 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
340 # file to file
341 os.chmod(dst, stat.S_IRWXO)
342 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
343 shutil.copymode(src, dst)
344 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou3f48ac92014-01-01 02:50:45 +0100345 # On Windows, os.chmod does not follow symlinks (issue #15411)
346 if os.name != 'nt':
347 # follow src link
348 os.chmod(dst, stat.S_IRWXO)
349 shutil.copymode(src_link, dst)
350 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
351 # follow dst link
352 os.chmod(dst, stat.S_IRWXO)
353 shutil.copymode(src, dst_link)
354 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
355 # follow both links
356 os.chmod(dst, stat.S_IRWXO)
357 shutil.copymode(src_link, dst_link)
358 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100359
360 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
361 @support.skip_unless_symlink
362 def test_copymode_symlink_to_symlink(self):
363 tmp_dir = self.mkdtemp()
364 src = os.path.join(tmp_dir, 'foo')
365 dst = os.path.join(tmp_dir, 'bar')
366 src_link = os.path.join(tmp_dir, 'baz')
367 dst_link = os.path.join(tmp_dir, 'quux')
368 write_file(src, 'foo')
369 write_file(dst, 'foo')
370 os.symlink(src, src_link)
371 os.symlink(dst, dst_link)
372 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
373 os.chmod(dst, stat.S_IRWXU)
374 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
375 # link to link
376 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700377 shutil.copymode(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100378 self.assertEqual(os.lstat(src_link).st_mode,
379 os.lstat(dst_link).st_mode)
380 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
381 # src link - use chmod
382 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700383 shutil.copymode(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100384 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
385 # dst link - use chmod
386 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700387 shutil.copymode(src, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100388 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
389
390 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
391 @support.skip_unless_symlink
392 def test_copymode_symlink_to_symlink_wo_lchmod(self):
393 tmp_dir = self.mkdtemp()
394 src = os.path.join(tmp_dir, 'foo')
395 dst = os.path.join(tmp_dir, 'bar')
396 src_link = os.path.join(tmp_dir, 'baz')
397 dst_link = os.path.join(tmp_dir, 'quux')
398 write_file(src, 'foo')
399 write_file(dst, 'foo')
400 os.symlink(src, src_link)
401 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700402 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
Antoine Pitrou78091e62011-12-29 18:54:15 +0100403
404 @support.skip_unless_symlink
405 def test_copystat_symlinks(self):
406 tmp_dir = self.mkdtemp()
407 src = os.path.join(tmp_dir, 'foo')
408 dst = os.path.join(tmp_dir, 'bar')
409 src_link = os.path.join(tmp_dir, 'baz')
410 dst_link = os.path.join(tmp_dir, 'qux')
411 write_file(src, 'foo')
412 src_stat = os.stat(src)
413 os.utime(src, (src_stat.st_atime,
414 src_stat.st_mtime - 42.0)) # ensure different mtimes
415 write_file(dst, 'bar')
416 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
417 os.symlink(src, src_link)
418 os.symlink(dst, dst_link)
419 if hasattr(os, 'lchmod'):
420 os.lchmod(src_link, stat.S_IRWXO)
421 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
422 os.lchflags(src_link, stat.UF_NODUMP)
423 src_link_stat = os.lstat(src_link)
424 # follow
425 if hasattr(os, 'lchmod'):
Larry Hastingsb4038062012-07-15 10:57:38 -0700426 shutil.copystat(src_link, dst_link, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100427 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
428 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700429 shutil.copystat(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100430 dst_link_stat = os.lstat(dst_link)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700431 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100432 for attr in 'st_atime', 'st_mtime':
433 # The modification times may be truncated in the new file.
434 self.assertLessEqual(getattr(src_link_stat, attr),
435 getattr(dst_link_stat, attr) + 1)
436 if hasattr(os, 'lchmod'):
437 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
438 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
439 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
440 # tell to follow but dst is not a link
Larry Hastingsb4038062012-07-15 10:57:38 -0700441 shutil.copystat(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100442 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
443 00000.1)
444
Ned Deilybaf75712012-05-10 17:05:19 -0700445 @unittest.skipUnless(hasattr(os, 'chflags') and
446 hasattr(errno, 'EOPNOTSUPP') and
447 hasattr(errno, 'ENOTSUP'),
448 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
449 def test_copystat_handles_harmless_chflags_errors(self):
450 tmpdir = self.mkdtemp()
451 file1 = os.path.join(tmpdir, 'file1')
452 file2 = os.path.join(tmpdir, 'file2')
453 write_file(file1, 'xxx')
454 write_file(file2, 'xxx')
455
456 def make_chflags_raiser(err):
457 ex = OSError()
458
Larry Hastings90867a52012-06-22 17:01:41 -0700459 def _chflags_raiser(path, flags, *, follow_symlinks=True):
Ned Deilybaf75712012-05-10 17:05:19 -0700460 ex.errno = err
461 raise ex
462 return _chflags_raiser
463 old_chflags = os.chflags
464 try:
465 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
466 os.chflags = make_chflags_raiser(err)
467 shutil.copystat(file1, file2)
468 # assert others errors break it
469 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
470 self.assertRaises(OSError, shutil.copystat, file1, file2)
471 finally:
472 os.chflags = old_chflags
473
Antoine Pitrou424246f2012-05-12 19:02:01 +0200474 @support.skip_unless_xattr
475 def test_copyxattr(self):
476 tmp_dir = self.mkdtemp()
477 src = os.path.join(tmp_dir, 'foo')
478 write_file(src, 'foo')
479 dst = os.path.join(tmp_dir, 'bar')
480 write_file(dst, 'bar')
481
482 # no xattr == no problem
483 shutil._copyxattr(src, dst)
484 # common case
485 os.setxattr(src, 'user.foo', b'42')
486 os.setxattr(src, 'user.bar', b'43')
487 shutil._copyxattr(src, dst)
Gregory P. Smith1093bf22014-01-17 12:01:22 -0800488 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200489 self.assertEqual(
490 os.getxattr(src, 'user.foo'),
491 os.getxattr(dst, 'user.foo'))
492 # check errors don't affect other attrs
493 os.remove(dst)
494 write_file(dst, 'bar')
495 os_error = OSError(errno.EPERM, 'EPERM')
496
Larry Hastings9cf065c2012-06-22 16:30:09 -0700497 def _raise_on_user_foo(fname, attr, val, **kwargs):
Antoine Pitrou424246f2012-05-12 19:02:01 +0200498 if attr == 'user.foo':
499 raise os_error
500 else:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700501 orig_setxattr(fname, attr, val, **kwargs)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200502 try:
503 orig_setxattr = os.setxattr
504 os.setxattr = _raise_on_user_foo
505 shutil._copyxattr(src, dst)
Antoine Pitrou61597d32012-05-12 23:37:35 +0200506 self.assertIn('user.bar', os.listxattr(dst))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200507 finally:
508 os.setxattr = orig_setxattr
Hynek Schlawack0beab052013-02-05 08:22:44 +0100509 # the source filesystem not supporting xattrs should be ok, too.
510 def _raise_on_src(fname, *, follow_symlinks=True):
511 if fname == src:
512 raise OSError(errno.ENOTSUP, 'Operation not supported')
513 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
514 try:
515 orig_listxattr = os.listxattr
516 os.listxattr = _raise_on_src
517 shutil._copyxattr(src, dst)
518 finally:
519 os.listxattr = orig_listxattr
Antoine Pitrou424246f2012-05-12 19:02:01 +0200520
Larry Hastingsad5ae042012-07-14 17:55:11 -0700521 # test that shutil.copystat copies xattrs
522 src = os.path.join(tmp_dir, 'the_original')
523 write_file(src, src)
524 os.setxattr(src, 'user.the_value', b'fiddly')
525 dst = os.path.join(tmp_dir, 'the_copy')
526 write_file(dst, dst)
527 shutil.copystat(src, dst)
Hynek Schlawackc2d481f2012-07-16 17:11:10 +0200528 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700529
Antoine Pitrou424246f2012-05-12 19:02:01 +0200530 @support.skip_unless_symlink
531 @support.skip_unless_xattr
532 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
533 'root privileges required')
534 def test_copyxattr_symlinks(self):
535 # On Linux, it's only possible to access non-user xattr for symlinks;
536 # which in turn require root privileges. This test should be expanded
537 # as soon as other platforms gain support for extended attributes.
538 tmp_dir = self.mkdtemp()
539 src = os.path.join(tmp_dir, 'foo')
540 src_link = os.path.join(tmp_dir, 'baz')
541 write_file(src, 'foo')
542 os.symlink(src, src_link)
543 os.setxattr(src, 'trusted.foo', b'42')
Larry Hastings9cf065c2012-06-22 16:30:09 -0700544 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200545 dst = os.path.join(tmp_dir, 'bar')
546 dst_link = os.path.join(tmp_dir, 'qux')
547 write_file(dst, 'bar')
548 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700549 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700550 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
Antoine Pitrou424246f2012-05-12 19:02:01 +0200551 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
Larry Hastingsb4038062012-07-15 10:57:38 -0700552 shutil._copyxattr(src_link, dst, follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200553 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
554
Antoine Pitrou78091e62011-12-29 18:54:15 +0100555 @support.skip_unless_symlink
556 def test_copy_symlinks(self):
557 tmp_dir = self.mkdtemp()
558 src = os.path.join(tmp_dir, 'foo')
559 dst = os.path.join(tmp_dir, 'bar')
560 src_link = os.path.join(tmp_dir, 'baz')
561 write_file(src, 'foo')
562 os.symlink(src, src_link)
563 if hasattr(os, 'lchmod'):
564 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
565 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700566 shutil.copy(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100567 self.assertFalse(os.path.islink(dst))
568 self.assertEqual(read_file(src), read_file(dst))
569 os.remove(dst)
570 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700571 shutil.copy(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100572 self.assertTrue(os.path.islink(dst))
573 self.assertEqual(os.readlink(dst), os.readlink(src_link))
574 if hasattr(os, 'lchmod'):
575 self.assertEqual(os.lstat(src_link).st_mode,
576 os.lstat(dst).st_mode)
577
578 @support.skip_unless_symlink
579 def test_copy2_symlinks(self):
580 tmp_dir = self.mkdtemp()
581 src = os.path.join(tmp_dir, 'foo')
582 dst = os.path.join(tmp_dir, 'bar')
583 src_link = os.path.join(tmp_dir, 'baz')
584 write_file(src, 'foo')
585 os.symlink(src, src_link)
586 if hasattr(os, 'lchmod'):
587 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
588 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
589 os.lchflags(src_link, stat.UF_NODUMP)
590 src_stat = os.stat(src)
591 src_link_stat = os.lstat(src_link)
592 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700593 shutil.copy2(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100594 self.assertFalse(os.path.islink(dst))
595 self.assertEqual(read_file(src), read_file(dst))
596 os.remove(dst)
597 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700598 shutil.copy2(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100599 self.assertTrue(os.path.islink(dst))
600 self.assertEqual(os.readlink(dst), os.readlink(src_link))
601 dst_stat = os.lstat(dst)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700602 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100603 for attr in 'st_atime', 'st_mtime':
604 # The modification times may be truncated in the new file.
605 self.assertLessEqual(getattr(src_link_stat, attr),
606 getattr(dst_stat, attr) + 1)
607 if hasattr(os, 'lchmod'):
608 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
609 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
610 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
611 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
612
Antoine Pitrou424246f2012-05-12 19:02:01 +0200613 @support.skip_unless_xattr
614 def test_copy2_xattr(self):
615 tmp_dir = self.mkdtemp()
616 src = os.path.join(tmp_dir, 'foo')
617 dst = os.path.join(tmp_dir, 'bar')
618 write_file(src, 'foo')
619 os.setxattr(src, 'user.foo', b'42')
620 shutil.copy2(src, dst)
621 self.assertEqual(
622 os.getxattr(src, 'user.foo'),
623 os.getxattr(dst, 'user.foo'))
624 os.remove(dst)
625
Antoine Pitrou78091e62011-12-29 18:54:15 +0100626 @support.skip_unless_symlink
627 def test_copyfile_symlinks(self):
628 tmp_dir = self.mkdtemp()
629 src = os.path.join(tmp_dir, 'src')
630 dst = os.path.join(tmp_dir, 'dst')
631 dst_link = os.path.join(tmp_dir, 'dst_link')
632 link = os.path.join(tmp_dir, 'link')
633 write_file(src, 'foo')
634 os.symlink(src, link)
635 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700636 shutil.copyfile(link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100637 self.assertTrue(os.path.islink(dst_link))
638 self.assertEqual(os.readlink(link), os.readlink(dst_link))
639 # follow
640 shutil.copyfile(link, dst)
641 self.assertFalse(os.path.islink(dst))
642
Hynek Schlawack2100b422012-06-23 20:28:32 +0200643 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200644 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
645 os.supports_dir_fd and
646 os.listdir in os.supports_fd and
647 os.stat in os.supports_follow_symlinks)
648 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200649 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000650 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200651 tmp_dir = self.mkdtemp()
652 d = os.path.join(tmp_dir, 'a')
653 os.mkdir(d)
654 try:
655 real_rmtree = shutil._rmtree_safe_fd
656 class Called(Exception): pass
657 def _raiser(*args, **kwargs):
658 raise Called
659 shutil._rmtree_safe_fd = _raiser
660 self.assertRaises(Called, shutil.rmtree, d)
661 finally:
662 shutil._rmtree_safe_fd = real_rmtree
663 else:
664 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000665 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200666
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000667 def test_rmtree_dont_delete_file(self):
668 # When called on a file instead of a directory, don't delete it.
669 handle, path = tempfile.mkstemp()
Victor Stinnerbf816222011-06-30 23:25:47 +0200670 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200671 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000672 os.remove(path)
673
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000674 def test_copytree_simple(self):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000675 src_dir = tempfile.mkdtemp()
676 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200677 self.addCleanup(shutil.rmtree, src_dir)
678 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
679 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000680 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200681 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000682
Éric Araujoa7e33a12011-08-12 19:51:35 +0200683 shutil.copytree(src_dir, dst_dir)
684 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
685 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
686 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
687 'test.txt')))
688 actual = read_file((dst_dir, 'test.txt'))
689 self.assertEqual(actual, '123')
690 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
691 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000692
Antoine Pitrou78091e62011-12-29 18:54:15 +0100693 @support.skip_unless_symlink
694 def test_copytree_symlinks(self):
695 tmp_dir = self.mkdtemp()
696 src_dir = os.path.join(tmp_dir, 'src')
697 dst_dir = os.path.join(tmp_dir, 'dst')
698 sub_dir = os.path.join(src_dir, 'sub')
699 os.mkdir(src_dir)
700 os.mkdir(sub_dir)
701 write_file((src_dir, 'file.txt'), 'foo')
702 src_link = os.path.join(sub_dir, 'link')
703 dst_link = os.path.join(dst_dir, 'sub/link')
704 os.symlink(os.path.join(src_dir, 'file.txt'),
705 src_link)
706 if hasattr(os, 'lchmod'):
707 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
708 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
709 os.lchflags(src_link, stat.UF_NODUMP)
710 src_stat = os.lstat(src_link)
711 shutil.copytree(src_dir, dst_dir, symlinks=True)
712 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
713 self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
714 os.path.join(src_dir, 'file.txt'))
715 dst_stat = os.lstat(dst_link)
716 if hasattr(os, 'lchmod'):
717 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
718 if hasattr(os, 'lchflags'):
719 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
720
Georg Brandl2ee470f2008-07-16 12:55:28 +0000721 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000722 # creating data
723 join = os.path.join
724 exists = os.path.exists
725 src_dir = tempfile.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000726 try:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000727 dst_dir = join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200728 write_file((src_dir, 'test.txt'), '123')
729 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000730 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200731 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000732 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200733 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000734 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
735 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200736 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
737 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000738
739 # testing glob-like patterns
740 try:
741 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
742 shutil.copytree(src_dir, dst_dir, ignore=patterns)
743 # checking the result: some elements should not be copied
744 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200745 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
746 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000747 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200748 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000749 try:
750 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
751 shutil.copytree(src_dir, dst_dir, ignore=patterns)
752 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200753 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
754 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
755 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000756 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200757 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000758
759 # testing callable-style
760 try:
761 def _filter(src, names):
762 res = []
763 for name in names:
764 path = os.path.join(src, name)
765
766 if (os.path.isdir(path) and
767 path.split()[-1] == 'subdir'):
768 res.append(name)
769 elif os.path.splitext(path)[-1] in ('.py'):
770 res.append(name)
771 return res
772
773 shutil.copytree(src_dir, dst_dir, ignore=_filter)
774
775 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200776 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
777 'test.py')))
778 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000779
780 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200781 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000782 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000783 shutil.rmtree(src_dir)
784 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000785
Antoine Pitrouac601602013-08-16 19:35:02 +0200786 def test_copytree_retains_permissions(self):
787 tmp_dir = tempfile.mkdtemp()
788 src_dir = os.path.join(tmp_dir, 'source')
789 os.mkdir(src_dir)
790 dst_dir = os.path.join(tmp_dir, 'destination')
791 self.addCleanup(shutil.rmtree, tmp_dir)
792
793 os.chmod(src_dir, 0o777)
794 write_file((src_dir, 'permissive.txt'), '123')
795 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
796 write_file((src_dir, 'restrictive.txt'), '456')
797 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
798 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
799 os.chmod(restrictive_subdir, 0o600)
800
801 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400802 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
803 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200804 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400805 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200806 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
807 restrictive_subdir_dst = os.path.join(dst_dir,
808 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400809 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200810 os.stat(restrictive_subdir_dst).st_mode)
811
Berker Peksag884afd92014-12-10 02:50:32 +0200812 @unittest.mock.patch('os.chmod')
813 def test_copytree_winerror(self, mock_patch):
814 # When copying to VFAT, copystat() raises OSError. On Windows, the
815 # exception object has a meaningful 'winerror' attribute, but not
816 # on other operating systems. Do not assume 'winerror' is set.
817 src_dir = tempfile.mkdtemp()
818 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
819 self.addCleanup(shutil.rmtree, src_dir)
820 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
821
822 mock_patch.side_effect = PermissionError('ka-boom')
823 with self.assertRaises(shutil.Error):
824 shutil.copytree(src_dir, dst_dir)
825
Zachary Ware9fe6d862013-12-08 00:20:35 -0600826 @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000827 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000828 def test_dont_copy_file_onto_link_to_itself(self):
829 # bug 851123.
830 os.mkdir(TESTFN)
831 src = os.path.join(TESTFN, 'cheese')
832 dst = os.path.join(TESTFN, 'shop')
833 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000834 with open(src, 'w') as f:
835 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +0100836 try:
837 os.link(src, dst)
838 except PermissionError as e:
839 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +0200840 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000841 with open(src, 'r') as f:
842 self.assertEqual(f.read(), 'cheddar')
843 os.remove(dst)
844 finally:
845 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000846
Brian Curtin3b4499c2010-12-28 14:31:47 +0000847 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000848 def test_dont_copy_file_onto_symlink_to_itself(self):
849 # bug 851123.
850 os.mkdir(TESTFN)
851 src = os.path.join(TESTFN, 'cheese')
852 dst = os.path.join(TESTFN, 'shop')
853 try:
854 with open(src, 'w') as f:
855 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000856 # Using `src` here would mean we end up with a symlink pointing
857 # to TESTFN/TESTFN/cheese, while it should point at
858 # TESTFN/cheese.
859 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +0200860 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +0000861 with open(src, 'r') as f:
862 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000863 os.remove(dst)
864 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000865 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +0000866
Brian Curtin3b4499c2010-12-28 14:31:47 +0000867 @support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +0000868 def test_rmtree_on_symlink(self):
869 # bug 1669.
870 os.mkdir(TESTFN)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000871 try:
Brian Curtind40e6f72010-07-08 21:39:08 +0000872 src = os.path.join(TESTFN, 'cheese')
873 dst = os.path.join(TESTFN, 'shop')
874 os.mkdir(src)
875 os.symlink(src, dst)
876 self.assertRaises(OSError, shutil.rmtree, dst)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200877 shutil.rmtree(dst, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000878 finally:
Brian Curtind40e6f72010-07-08 21:39:08 +0000879 shutil.rmtree(TESTFN, ignore_errors=True)
880
Serhiy Storchaka43767632013-11-03 21:31:38 +0200881 # Issue #3002: copyfile and copytree block indefinitely on named pipes
882 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
883 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +0100884 try:
885 os.mkfifo(TESTFN)
886 except PermissionError as e:
887 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200888 try:
889 self.assertRaises(shutil.SpecialFileError,
890 shutil.copyfile, TESTFN, TESTFN2)
891 self.assertRaises(shutil.SpecialFileError,
892 shutil.copyfile, __file__, TESTFN)
893 finally:
894 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000895
Serhiy Storchaka43767632013-11-03 21:31:38 +0200896 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
897 @support.skip_unless_symlink
898 def test_copytree_named_pipe(self):
899 os.mkdir(TESTFN)
900 try:
901 subdir = os.path.join(TESTFN, "subdir")
902 os.mkdir(subdir)
903 pipe = os.path.join(subdir, "mypipe")
xdegaye92c2ca72017-11-12 17:31:07 +0100904 try:
905 os.mkfifo(pipe)
906 except PermissionError as e:
907 self.skipTest('os.mkfifo(): %s' % e)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000908 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200909 shutil.copytree(TESTFN, TESTFN2)
910 except shutil.Error as e:
911 errors = e.args[0]
912 self.assertEqual(len(errors), 1)
913 src, dst, error_msg = errors[0]
914 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
915 else:
916 self.fail("shutil.Error should have been raised")
917 finally:
918 shutil.rmtree(TESTFN, ignore_errors=True)
919 shutil.rmtree(TESTFN2, ignore_errors=True)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000920
Tarek Ziadé5340db32010-04-19 22:30:51 +0000921 def test_copytree_special_func(self):
922
923 src_dir = self.mkdtemp()
924 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200925 write_file((src_dir, 'test.txt'), '123')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000926 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200927 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000928
929 copied = []
930 def _copy(src, dst):
931 copied.append((src, dst))
932
933 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000934 self.assertEqual(len(copied), 2)
Tarek Ziadé5340db32010-04-19 22:30:51 +0000935
Brian Curtin3b4499c2010-12-28 14:31:47 +0000936 @support.skip_unless_symlink
Tarek Ziadéfb437512010-04-20 08:57:33 +0000937 def test_copytree_dangling_symlinks(self):
938
939 # a dangling symlink raises an error at the end
940 src_dir = self.mkdtemp()
941 dst_dir = os.path.join(self.mkdtemp(), 'destination')
942 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
943 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200944 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadéfb437512010-04-20 08:57:33 +0000945 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
946
947 # a dangling symlink is ignored with the proper flag
948 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
949 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
950 self.assertNotIn('test.txt', os.listdir(dst_dir))
951
952 # a dangling symlink is copied if symlinks=True
953 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
954 shutil.copytree(src_dir, dst_dir, symlinks=True)
955 self.assertIn('test.txt', os.listdir(dst_dir))
956
Berker Peksag5a294d82015-07-25 14:53:48 +0300957 @support.skip_unless_symlink
958 def test_copytree_symlink_dir(self):
959 src_dir = self.mkdtemp()
960 dst_dir = os.path.join(self.mkdtemp(), 'destination')
961 os.mkdir(os.path.join(src_dir, 'real_dir'))
962 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
963 pass
964 os.symlink(os.path.join(src_dir, 'real_dir'),
965 os.path.join(src_dir, 'link_to_dir'),
966 target_is_directory=True)
967
968 shutil.copytree(src_dir, dst_dir, symlinks=False)
969 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
970 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
971
972 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
973 shutil.copytree(src_dir, dst_dir, symlinks=True)
974 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
975 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
976
Nick Coghlan8ed3cf32011-03-16 14:05:35 -0400977 def _copy_file(self, method):
978 fname = 'test.txt'
979 tmpdir = self.mkdtemp()
Éric Araujoa7e33a12011-08-12 19:51:35 +0200980 write_file((tmpdir, fname), 'xxx')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -0400981 file1 = os.path.join(tmpdir, fname)
982 tmpdir2 = self.mkdtemp()
983 method(file1, tmpdir2)
984 file2 = os.path.join(tmpdir2, fname)
985 return (file1, file2)
986
987 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
988 def test_copy(self):
989 # Ensure that the copied file exists and has the same mode bits.
990 file1, file2 = self._copy_file(shutil.copy)
991 self.assertTrue(os.path.exists(file2))
992 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
993
994 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
Senthil Kumaran0c2dba52011-07-03 18:21:38 -0700995 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -0400996 def test_copy2(self):
997 # Ensure that the copied file exists and has the same mode and
998 # modification time bits.
999 file1, file2 = self._copy_file(shutil.copy2)
1000 self.assertTrue(os.path.exists(file2))
1001 file1_stat = os.stat(file1)
1002 file2_stat = os.stat(file2)
1003 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1004 for attr in 'st_atime', 'st_mtime':
1005 # The modification times may be truncated in the new file.
1006 self.assertLessEqual(getattr(file1_stat, attr),
1007 getattr(file2_stat, attr) + 1)
1008 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1009 self.assertEqual(getattr(file1_stat, 'st_flags'),
1010 getattr(file2_stat, 'st_flags'))
1011
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001012 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001013 def test_make_tarball(self):
1014 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001015 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001016
1017 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001018 # force shutil to create the directory
1019 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001020 # working with relative paths
1021 work_dir = os.path.dirname(tmpdir2)
1022 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001023
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001024 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001025 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001026 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001027
1028 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001029 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001030 self.assertTrue(os.path.isfile(tarball))
1031 self.assertTrue(tarfile.is_tarfile(tarball))
1032 with tarfile.open(tarball, 'r:gz') as tf:
1033 self.assertCountEqual(tf.getnames(),
1034 ['.', './sub', './sub2',
1035 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001036
1037 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001038 with support.change_cwd(work_dir):
1039 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001040 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001041 self.assertTrue(os.path.isfile(tarball))
1042 self.assertTrue(tarfile.is_tarfile(tarball))
1043 with tarfile.open(tarball, 'r') as tf:
1044 self.assertCountEqual(tf.getnames(),
1045 ['.', './sub', './sub2',
1046 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001047
1048 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001049 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001050 names = tar.getnames()
1051 names.sort()
1052 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001053
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001054 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001055 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001056 root_dir = self.mkdtemp()
1057 dist = os.path.join(root_dir, base_dir)
1058 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001059 write_file((dist, 'file1'), 'xxx')
1060 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001061 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001062 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001063 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001064 if base_dir:
1065 write_file((root_dir, 'outer'), 'xxx')
1066 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001067
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001068 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001069 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001070 'Need the tar command to run')
1071 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001072 root_dir, base_dir = self._create_files()
1073 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001074 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001075
1076 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001077 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001078 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001079
1080 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001081 tarball2 = os.path.join(root_dir, 'archive2.tar')
1082 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001083 subprocess.check_call(tar_cmd, cwd=root_dir,
1084 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001085
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001086 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001087 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001088 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001089
1090 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001091 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1092 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001093 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001094
1095 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001096 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1097 dry_run=True)
1098 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001099 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001100
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001101 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001102 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001103 # creating something to zip
1104 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001105
1106 tmpdir2 = self.mkdtemp()
1107 # force shutil to create the directory
1108 os.rmdir(tmpdir2)
1109 # working with relative paths
1110 work_dir = os.path.dirname(tmpdir2)
1111 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001112
1113 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001114 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001115 res = make_archive(rel_base_name, 'zip', root_dir)
1116
1117 self.assertEqual(res, base_name + '.zip')
1118 self.assertTrue(os.path.isfile(res))
1119 self.assertTrue(zipfile.is_zipfile(res))
1120 with zipfile.ZipFile(res) as zf:
1121 self.assertCountEqual(zf.namelist(),
1122 ['dist/', 'dist/sub/', 'dist/sub2/',
1123 'dist/file1', 'dist/file2', 'dist/sub/file3',
1124 'outer'])
1125
1126 with support.change_cwd(work_dir):
1127 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001128 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001129
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001130 self.assertEqual(res, base_name + '.zip')
1131 self.assertTrue(os.path.isfile(res))
1132 self.assertTrue(zipfile.is_zipfile(res))
1133 with zipfile.ZipFile(res) as zf:
1134 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001135 ['dist/', 'dist/sub/', 'dist/sub2/',
1136 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001137
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001138 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001139 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001140 'Need the zip command to run')
1141 def test_zipfile_vs_zip(self):
1142 root_dir, base_dir = self._create_files()
1143 base_name = os.path.join(self.mkdtemp(), 'archive')
1144 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1145
1146 # check if ZIP file was created
1147 self.assertEqual(archive, base_name + '.zip')
1148 self.assertTrue(os.path.isfile(archive))
1149
1150 # now create another ZIP file using `zip`
1151 archive2 = os.path.join(root_dir, 'archive2.zip')
1152 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001153 subprocess.check_call(zip_cmd, cwd=root_dir,
1154 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001155
1156 self.assertTrue(os.path.isfile(archive2))
1157 # let's compare both ZIP files
1158 with zipfile.ZipFile(archive) as zf:
1159 names = zf.namelist()
1160 with zipfile.ZipFile(archive2) as zf:
1161 names2 = zf.namelist()
1162 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001163
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001164 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001165 @unittest.skipUnless(shutil.which('unzip'),
1166 'Need the unzip command to run')
1167 def test_unzip_zipfile(self):
1168 root_dir, base_dir = self._create_files()
1169 base_name = os.path.join(self.mkdtemp(), 'archive')
1170 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1171
1172 # check if ZIP file was created
1173 self.assertEqual(archive, base_name + '.zip')
1174 self.assertTrue(os.path.isfile(archive))
1175
1176 # now check the ZIP file using `unzip -t`
1177 zip_cmd = ['unzip', '-t', archive]
1178 with support.change_cwd(root_dir):
1179 try:
1180 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1181 except subprocess.CalledProcessError as exc:
1182 details = exc.output.decode(errors="replace")
1183 msg = "{}\n\n**Unzip Output**\n{}"
1184 self.fail(msg.format(exc, details))
1185
Tarek Ziadé396fad72010-02-23 05:30:31 +00001186 def test_make_archive(self):
1187 tmpdir = self.mkdtemp()
1188 base_name = os.path.join(tmpdir, 'archive')
1189 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1190
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001191 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001192 def test_make_archive_owner_group(self):
1193 # testing make_archive with owner and group, with various combinations
1194 # this works even if there's not gid/uid support
1195 if UID_GID_SUPPORT:
1196 group = grp.getgrgid(0)[0]
1197 owner = pwd.getpwuid(0)[0]
1198 else:
1199 group = owner = 'root'
1200
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001201 root_dir, base_dir = self._create_files()
1202 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001203 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1204 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001205 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001206
1207 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001208 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001209
1210 res = make_archive(base_name, 'tar', root_dir, base_dir,
1211 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001212 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001213
1214 res = make_archive(base_name, 'tar', root_dir, base_dir,
1215 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001216 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001217
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001218
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001219 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001220 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1221 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001222 root_dir, base_dir = self._create_files()
1223 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001224 group = grp.getgrgid(0)[0]
1225 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001226 with support.change_cwd(root_dir):
1227 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1228 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001229
1230 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001231 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001232
1233 # now checks the rights
1234 archive = tarfile.open(archive_name)
1235 try:
1236 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001237 self.assertEqual(member.uid, 0)
1238 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001239 finally:
1240 archive.close()
1241
1242 def test_make_archive_cwd(self):
1243 current_dir = os.getcwd()
1244 def _breaks(*args, **kw):
1245 raise RuntimeError()
1246
1247 register_archive_format('xxx', _breaks, [], 'xxx file')
1248 try:
1249 try:
1250 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1251 except Exception:
1252 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001253 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001254 finally:
1255 unregister_archive_format('xxx')
1256
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001257 def test_make_tarfile_in_curdir(self):
1258 # Issue #21280
1259 root_dir = self.mkdtemp()
1260 with support.change_cwd(root_dir):
1261 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1262 self.assertTrue(os.path.isfile('test.tar'))
1263
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001264 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001265 def test_make_zipfile_in_curdir(self):
1266 # Issue #21280
1267 root_dir = self.mkdtemp()
1268 with support.change_cwd(root_dir):
1269 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1270 self.assertTrue(os.path.isfile('test.zip'))
1271
Tarek Ziadé396fad72010-02-23 05:30:31 +00001272 def test_register_archive_format(self):
1273
1274 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1275 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1276 1)
1277 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1278 [(1, 2), (1, 2, 3)])
1279
1280 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1281 formats = [name for name, params in get_archive_formats()]
1282 self.assertIn('xxx', formats)
1283
1284 unregister_archive_format('xxx')
1285 formats = [name for name, params in get_archive_formats()]
1286 self.assertNotIn('xxx', formats)
1287
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001288 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001289 self.check_unpack_archive_with_converter(format, lambda path: path)
1290 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001291 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001292
1293 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001294 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001295 expected = rlistdir(root_dir)
1296 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001297
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001298 base_name = os.path.join(self.mkdtemp(), 'archive')
1299 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001300
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001301 # let's try to unpack it now
1302 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001303 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001304 self.assertEqual(rlistdir(tmpdir2), expected)
1305
1306 # and again, this time with the format specified
1307 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001308 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001309 self.assertEqual(rlistdir(tmpdir3), expected)
1310
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001311 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1312 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001313
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001314 def test_unpack_archive_tar(self):
1315 self.check_unpack_archive('tar')
1316
1317 @support.requires_zlib
1318 def test_unpack_archive_gztar(self):
1319 self.check_unpack_archive('gztar')
1320
1321 @support.requires_bz2
1322 def test_unpack_archive_bztar(self):
1323 self.check_unpack_archive('bztar')
1324
1325 @support.requires_lzma
1326 def test_unpack_archive_xztar(self):
1327 self.check_unpack_archive('xztar')
1328
1329 @support.requires_zlib
1330 def test_unpack_archive_zip(self):
1331 self.check_unpack_archive('zip')
1332
Martin Pantereb995702016-07-28 01:11:04 +00001333 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001334
1335 formats = get_unpack_formats()
1336
1337 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001338 self.assertEqual(extra, 1)
1339 self.assertEqual(filename, 'stuff.boo')
1340 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001341
1342 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1343 unpack_archive('stuff.boo', 'xx')
1344
1345 # trying to register a .boo unpacker again
1346 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1347 ['.boo'], _boo)
1348
1349 # should work now
1350 unregister_unpack_format('Boo')
1351 register_unpack_format('Boo2', ['.boo'], _boo)
1352 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1353 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1354
1355 # let's leave a clean state
1356 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001357 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001358
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001359 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1360 "disk_usage not available on this platform")
1361 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001362 usage = shutil.disk_usage(os.path.dirname(__file__))
Éric Araujo2ee61882011-07-02 16:45:45 +02001363 self.assertGreater(usage.total, 0)
1364 self.assertGreater(usage.used, 0)
1365 self.assertGreaterEqual(usage.free, 0)
1366 self.assertGreaterEqual(usage.total, usage.used)
1367 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001368
Sandro Tosid902a142011-08-22 23:28:27 +02001369 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1370 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1371 def test_chown(self):
1372
1373 # cleaned-up automatically by TestShutil.tearDown method
1374 dirname = self.mkdtemp()
1375 filename = tempfile.mktemp(dir=dirname)
1376 write_file(filename, 'testing chown function')
1377
1378 with self.assertRaises(ValueError):
1379 shutil.chown(filename)
1380
1381 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001382 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001383
1384 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001385 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001386
1387 with self.assertRaises(TypeError):
1388 shutil.chown(filename, b'spam')
1389
1390 with self.assertRaises(TypeError):
1391 shutil.chown(filename, 3.14)
1392
1393 uid = os.getuid()
1394 gid = os.getgid()
1395
1396 def check_chown(path, uid=None, gid=None):
1397 s = os.stat(filename)
1398 if uid is not None:
1399 self.assertEqual(uid, s.st_uid)
1400 if gid is not None:
1401 self.assertEqual(gid, s.st_gid)
1402
1403 shutil.chown(filename, uid, gid)
1404 check_chown(filename, uid, gid)
1405 shutil.chown(filename, uid)
1406 check_chown(filename, uid)
1407 shutil.chown(filename, user=uid)
1408 check_chown(filename, uid)
1409 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001410 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001411
1412 shutil.chown(dirname, uid, gid)
1413 check_chown(dirname, uid, gid)
1414 shutil.chown(dirname, uid)
1415 check_chown(dirname, uid)
1416 shutil.chown(dirname, user=uid)
1417 check_chown(dirname, uid)
1418 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001419 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001420
1421 user = pwd.getpwuid(uid)[0]
1422 group = grp.getgrgid(gid)[0]
1423 shutil.chown(filename, user, group)
1424 check_chown(filename, uid, gid)
1425 shutil.chown(dirname, user, group)
1426 check_chown(dirname, uid, gid)
1427
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001428 def test_copy_return_value(self):
1429 # copy and copy2 both return their destination path.
1430 for fn in (shutil.copy, shutil.copy2):
1431 src_dir = self.mkdtemp()
1432 dst_dir = self.mkdtemp()
1433 src = os.path.join(src_dir, 'foo')
1434 write_file(src, 'foo')
1435 rv = fn(src, dst_dir)
1436 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1437 rv = fn(src, os.path.join(dst_dir, 'bar'))
1438 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1439
1440 def test_copyfile_return_value(self):
1441 # copytree returns its destination path.
1442 src_dir = self.mkdtemp()
1443 dst_dir = self.mkdtemp()
1444 dst_file = os.path.join(dst_dir, 'bar')
1445 src_file = os.path.join(src_dir, 'foo')
1446 write_file(src_file, 'foo')
1447 rv = shutil.copyfile(src_file, dst_file)
1448 self.assertTrue(os.path.exists(rv))
1449 self.assertEqual(read_file(src_file), read_file(dst_file))
1450
Hynek Schlawack48653762012-10-07 12:49:58 +02001451 def test_copyfile_same_file(self):
1452 # copyfile() should raise SameFileError if the source and destination
1453 # are the same.
1454 src_dir = self.mkdtemp()
1455 src_file = os.path.join(src_dir, 'foo')
1456 write_file(src_file, 'foo')
1457 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
Hynek Schlawack27ddb572012-10-28 13:59:27 +01001458 # But Error should work too, to stay backward compatible.
1459 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001460 # Make sure file is not corrupted.
1461 self.assertEqual(read_file(src_file), 'foo')
Hynek Schlawack48653762012-10-07 12:49:58 +02001462
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001463 def test_copytree_return_value(self):
1464 # copytree returns its destination path.
1465 src_dir = self.mkdtemp()
1466 dst_dir = src_dir + "dest"
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001467 self.addCleanup(shutil.rmtree, dst_dir, True)
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001468 src = os.path.join(src_dir, 'foo')
1469 write_file(src, 'foo')
1470 rv = shutil.copytree(src_dir, dst_dir)
1471 self.assertEqual(['foo'], os.listdir(rv))
1472
Christian Heimes9bd667a2008-01-20 15:14:11 +00001473
Brian Curtinc57a3452012-06-22 16:00:30 -05001474class TestWhich(unittest.TestCase):
1475
1476 def setUp(self):
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001477 self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001478 self.addCleanup(shutil.rmtree, self.temp_dir, True)
Brian Curtinc57a3452012-06-22 16:00:30 -05001479 # Give the temp_file an ".exe" suffix for all.
1480 # It's needed on Windows and not harmful on other platforms.
1481 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001482 prefix="Tmp",
1483 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001484 os.chmod(self.temp_file.name, stat.S_IXUSR)
1485 self.addCleanup(self.temp_file.close)
1486 self.dir, self.file = os.path.split(self.temp_file.name)
1487
1488 def test_basic(self):
1489 # Given an EXE in a directory, it should be returned.
1490 rv = shutil.which(self.file, path=self.dir)
1491 self.assertEqual(rv, self.temp_file.name)
1492
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001493 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001494 # When given the fully qualified path to an executable that exists,
1495 # it should be returned.
1496 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001497 self.assertEqual(rv, self.temp_file.name)
1498
1499 def test_relative_cmd(self):
1500 # When given the relative path with a directory part to an executable
1501 # that exists, it should be returned.
1502 base_dir, tail_dir = os.path.split(self.dir)
1503 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001504 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001505 rv = shutil.which(relpath, path=self.temp_dir)
1506 self.assertEqual(rv, relpath)
1507 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001508 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001509 rv = shutil.which(relpath, path=base_dir)
1510 self.assertIsNone(rv)
1511
1512 def test_cwd(self):
1513 # Issue #16957
1514 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001515 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001516 rv = shutil.which(self.file, path=base_dir)
1517 if sys.platform == "win32":
1518 # Windows: current directory implicitly on PATH
1519 self.assertEqual(rv, os.path.join(os.curdir, self.file))
1520 else:
1521 # Other platforms: shouldn't match in the current directory.
1522 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001523
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001524 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1525 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001526 def test_non_matching_mode(self):
1527 # Set the file read-only and ask for writeable files.
1528 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001529 if os.access(self.temp_file.name, os.W_OK):
1530 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001531 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1532 self.assertIsNone(rv)
1533
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001534 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001535 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001536 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001537 rv = shutil.which(self.file, path=tail_dir)
1538 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001539
Brian Curtinc57a3452012-06-22 16:00:30 -05001540 def test_nonexistent_file(self):
1541 # Return None when no matching executable file is found on the path.
1542 rv = shutil.which("foo.exe", path=self.dir)
1543 self.assertIsNone(rv)
1544
1545 @unittest.skipUnless(sys.platform == "win32",
1546 "pathext check is Windows-only")
1547 def test_pathext_checking(self):
1548 # Ask for the file without the ".exe" extension, then ensure that
1549 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001550 rv = shutil.which(self.file[:-4], path=self.dir)
Serhiy Storchaka80c88f42013-01-22 10:31:36 +02001551 self.assertEqual(rv, self.temp_file.name[:-4] + ".EXE")
Brian Curtinc57a3452012-06-22 16:00:30 -05001552
Barry Warsaw618738b2013-04-16 11:05:03 -04001553 def test_environ_path(self):
1554 with support.EnvironmentVarGuard() as env:
Victor Stinner1d006a22013-12-16 23:39:40 +01001555 env['PATH'] = self.dir
Barry Warsaw618738b2013-04-16 11:05:03 -04001556 rv = shutil.which(self.file)
1557 self.assertEqual(rv, self.temp_file.name)
1558
1559 def test_empty_path(self):
1560 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001561 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001562 support.EnvironmentVarGuard() as env:
Victor Stinner1d006a22013-12-16 23:39:40 +01001563 env['PATH'] = self.dir
Barry Warsaw618738b2013-04-16 11:05:03 -04001564 rv = shutil.which(self.file, path='')
1565 self.assertIsNone(rv)
1566
1567 def test_empty_path_no_PATH(self):
1568 with support.EnvironmentVarGuard() as env:
1569 env.pop('PATH', None)
1570 rv = shutil.which(self.file)
1571 self.assertIsNone(rv)
1572
Brian Curtinc57a3452012-06-22 16:00:30 -05001573
Christian Heimesada8c3b2008-03-18 18:26:33 +00001574class TestMove(unittest.TestCase):
1575
1576 def setUp(self):
1577 filename = "foo"
1578 self.src_dir = tempfile.mkdtemp()
1579 self.dst_dir = tempfile.mkdtemp()
1580 self.src_file = os.path.join(self.src_dir, filename)
1581 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001582 with open(self.src_file, "wb") as f:
1583 f.write(b"spam")
1584
1585 def tearDown(self):
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001586 for d in (self.src_dir, self.dst_dir):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001587 try:
1588 if d:
1589 shutil.rmtree(d)
1590 except:
1591 pass
1592
1593 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001594 with open(src, "rb") as f:
1595 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001596 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001597 with open(real_dst, "rb") as f:
1598 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001599 self.assertFalse(os.path.exists(src))
1600
1601 def _check_move_dir(self, src, dst, real_dst):
1602 contents = sorted(os.listdir(src))
1603 shutil.move(src, dst)
1604 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1605 self.assertFalse(os.path.exists(src))
1606
1607 def test_move_file(self):
1608 # Move a file to another location on the same filesystem.
1609 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1610
1611 def test_move_file_to_dir(self):
1612 # Move a file inside an existing dir on the same filesystem.
1613 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1614
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001615 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001616 def test_move_file_other_fs(self):
1617 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001618 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001619
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001620 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001621 def test_move_file_to_dir_other_fs(self):
1622 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001623 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001624
1625 def test_move_dir(self):
1626 # Move a dir to another location on the same filesystem.
1627 dst_dir = tempfile.mktemp()
1628 try:
1629 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1630 finally:
1631 try:
1632 shutil.rmtree(dst_dir)
1633 except:
1634 pass
1635
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001636 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001637 def test_move_dir_other_fs(self):
1638 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001639 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001640
1641 def test_move_dir_to_dir(self):
1642 # Move a dir inside an existing dir on the same filesystem.
1643 self._check_move_dir(self.src_dir, self.dst_dir,
1644 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1645
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001646 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001647 def test_move_dir_to_dir_other_fs(self):
1648 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001649 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001650
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001651 def test_move_dir_sep_to_dir(self):
1652 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1653 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1654
1655 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1656 def test_move_dir_altsep_to_dir(self):
1657 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1658 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1659
Christian Heimesada8c3b2008-03-18 18:26:33 +00001660 def test_existing_file_inside_dest_dir(self):
1661 # A file with the same name inside the destination dir already exists.
1662 with open(self.dst_file, "wb"):
1663 pass
1664 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1665
1666 def test_dont_move_dir_in_itself(self):
1667 # Moving a dir inside itself raises an Error.
1668 dst = os.path.join(self.src_dir, "bar")
1669 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1670
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001671 def test_destinsrc_false_negative(self):
1672 os.mkdir(TESTFN)
1673 try:
1674 for src, dst in [('srcdir', 'srcdir/dest')]:
1675 src = os.path.join(TESTFN, src)
1676 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001677 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001678 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001679 'dst (%s) is not in src (%s)' % (dst, src))
1680 finally:
1681 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001682
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001683 def test_destinsrc_false_positive(self):
1684 os.mkdir(TESTFN)
1685 try:
1686 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1687 src = os.path.join(TESTFN, src)
1688 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001689 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001690 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001691 'dst (%s) is in src (%s)' % (dst, src))
1692 finally:
1693 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001694
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001695 @support.skip_unless_symlink
1696 @mock_rename
1697 def test_move_file_symlink(self):
1698 dst = os.path.join(self.src_dir, 'bar')
1699 os.symlink(self.src_file, dst)
1700 shutil.move(dst, self.dst_file)
1701 self.assertTrue(os.path.islink(self.dst_file))
1702 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1703
1704 @support.skip_unless_symlink
1705 @mock_rename
1706 def test_move_file_symlink_to_dir(self):
1707 filename = "bar"
1708 dst = os.path.join(self.src_dir, filename)
1709 os.symlink(self.src_file, dst)
1710 shutil.move(dst, self.dst_dir)
1711 final_link = os.path.join(self.dst_dir, filename)
1712 self.assertTrue(os.path.islink(final_link))
1713 self.assertTrue(os.path.samefile(self.src_file, final_link))
1714
1715 @support.skip_unless_symlink
1716 @mock_rename
1717 def test_move_dangling_symlink(self):
1718 src = os.path.join(self.src_dir, 'baz')
1719 dst = os.path.join(self.src_dir, 'bar')
1720 os.symlink(src, dst)
1721 dst_link = os.path.join(self.dst_dir, 'quux')
1722 shutil.move(dst, dst_link)
1723 self.assertTrue(os.path.islink(dst_link))
Antoine Pitrou3f48ac92014-01-01 02:50:45 +01001724 # On Windows, os.path.realpath does not follow symlinks (issue #9949)
1725 if os.name == 'nt':
1726 self.assertEqual(os.path.realpath(src), os.readlink(dst_link))
1727 else:
1728 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001729
1730 @support.skip_unless_symlink
1731 @mock_rename
1732 def test_move_dir_symlink(self):
1733 src = os.path.join(self.src_dir, 'baz')
1734 dst = os.path.join(self.src_dir, 'bar')
1735 os.mkdir(src)
1736 os.symlink(src, dst)
1737 dst_link = os.path.join(self.dst_dir, 'quux')
1738 shutil.move(dst, dst_link)
1739 self.assertTrue(os.path.islink(dst_link))
1740 self.assertTrue(os.path.samefile(src, dst_link))
1741
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001742 def test_move_return_value(self):
1743 rv = shutil.move(self.src_file, self.dst_dir)
1744 self.assertEqual(rv,
1745 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1746
1747 def test_move_as_rename_return_value(self):
1748 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1749 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1750
R David Murray6ffface2014-06-11 14:40:13 -04001751 @mock_rename
1752 def test_move_file_special_function(self):
1753 moved = []
1754 def _copy(src, dst):
1755 moved.append((src, dst))
1756 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1757 self.assertEqual(len(moved), 1)
1758
1759 @mock_rename
1760 def test_move_dir_special_function(self):
1761 moved = []
1762 def _copy(src, dst):
1763 moved.append((src, dst))
1764 support.create_empty_file(os.path.join(self.src_dir, 'child'))
1765 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1766 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1767 self.assertEqual(len(moved), 3)
1768
Tarek Ziadé5340db32010-04-19 22:30:51 +00001769
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001770class TestCopyFile(unittest.TestCase):
1771
1772 _delete = False
1773
1774 class Faux(object):
1775 _entered = False
1776 _exited_with = None
1777 _raised = False
1778 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
1779 self._raise_in_exit = raise_in_exit
1780 self._suppress_at_exit = suppress_at_exit
1781 def read(self, *args):
1782 return ''
1783 def __enter__(self):
1784 self._entered = True
1785 def __exit__(self, exc_type, exc_val, exc_tb):
1786 self._exited_with = exc_type, exc_val, exc_tb
1787 if self._raise_in_exit:
1788 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001789 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001790 return self._suppress_at_exit
1791
1792 def tearDown(self):
1793 if self._delete:
1794 del shutil.open
1795
1796 def _set_shutil_open(self, func):
1797 shutil.open = func
1798 self._delete = True
1799
1800 def test_w_source_open_fails(self):
1801 def _open(filename, mode='r'):
1802 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001803 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001804 assert 0 # shouldn't reach here.
1805
1806 self._set_shutil_open(_open)
1807
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001808 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001809
Victor Stinner8fbbdf02018-06-22 19:25:44 +02001810 @unittest.skipIf(support.MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001811 def test_w_dest_open_fails(self):
1812
1813 srcfile = self.Faux()
1814
1815 def _open(filename, mode='r'):
1816 if filename == 'srcfile':
1817 return srcfile
1818 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001819 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001820 assert 0 # shouldn't reach here.
1821
1822 self._set_shutil_open(_open)
1823
1824 shutil.copyfile('srcfile', 'destfile')
1825 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001826 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001827 self.assertEqual(srcfile._exited_with[1].args,
1828 ('Cannot open "destfile"',))
1829
Victor Stinner8fbbdf02018-06-22 19:25:44 +02001830 @unittest.skipIf(support.MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001831 def test_w_dest_close_fails(self):
1832
1833 srcfile = self.Faux()
1834 destfile = self.Faux(True)
1835
1836 def _open(filename, mode='r'):
1837 if filename == 'srcfile':
1838 return srcfile
1839 if filename == 'destfile':
1840 return destfile
1841 assert 0 # shouldn't reach here.
1842
1843 self._set_shutil_open(_open)
1844
1845 shutil.copyfile('srcfile', 'destfile')
1846 self.assertTrue(srcfile._entered)
1847 self.assertTrue(destfile._entered)
1848 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001849 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001850 self.assertEqual(srcfile._exited_with[1].args,
1851 ('Cannot close',))
1852
Victor Stinner8fbbdf02018-06-22 19:25:44 +02001853 @unittest.skipIf(support.MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001854 def test_w_source_close_fails(self):
1855
1856 srcfile = self.Faux(True)
1857 destfile = self.Faux()
1858
1859 def _open(filename, mode='r'):
1860 if filename == 'srcfile':
1861 return srcfile
1862 if filename == 'destfile':
1863 return destfile
1864 assert 0 # shouldn't reach here.
1865
1866 self._set_shutil_open(_open)
1867
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001868 self.assertRaises(OSError,
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001869 shutil.copyfile, 'srcfile', 'destfile')
1870 self.assertTrue(srcfile._entered)
1871 self.assertTrue(destfile._entered)
1872 self.assertFalse(destfile._raised)
1873 self.assertTrue(srcfile._exited_with[0] is None)
1874 self.assertTrue(srcfile._raised)
1875
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001876 def test_move_dir_caseinsensitive(self):
1877 # Renames a folder to the same name
1878 # but a different case.
1879
1880 self.src_dir = tempfile.mkdtemp()
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001881 self.addCleanup(shutil.rmtree, self.src_dir, True)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001882 dst_dir = os.path.join(
1883 os.path.dirname(self.src_dir),
1884 os.path.basename(self.src_dir).upper())
1885 self.assertNotEqual(self.src_dir, dst_dir)
1886
1887 try:
1888 shutil.move(self.src_dir, dst_dir)
1889 self.assertTrue(os.path.isdir(dst_dir))
1890 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +02001891 os.rmdir(dst_dir)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001892
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001893
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07001894class TestCopyFileObj(unittest.TestCase):
1895 FILESIZE = 2 * 1024 * 1024
1896
1897 @classmethod
1898 def setUpClass(cls):
1899 write_test_file(TESTFN, cls.FILESIZE)
1900
1901 @classmethod
1902 def tearDownClass(cls):
1903 support.unlink(TESTFN)
1904 support.unlink(TESTFN2)
1905
1906 def tearDown(self):
1907 support.unlink(TESTFN2)
1908
1909 @contextlib.contextmanager
1910 def get_files(self):
1911 with open(TESTFN, "rb") as src:
1912 with open(TESTFN2, "wb") as dst:
1913 yield (src, dst)
1914
1915 def assert_files_eq(self, src, dst):
1916 with open(src, 'rb') as fsrc:
1917 with open(dst, 'rb') as fdst:
1918 self.assertEqual(fsrc.read(), fdst.read())
1919
1920 def test_content(self):
1921 with self.get_files() as (src, dst):
1922 shutil.copyfileobj(src, dst)
1923 self.assert_files_eq(TESTFN, TESTFN2)
1924
1925 def test_file_not_closed(self):
1926 with self.get_files() as (src, dst):
1927 shutil.copyfileobj(src, dst)
1928 assert not src.closed
1929 assert not dst.closed
1930
1931 def test_file_offset(self):
1932 with self.get_files() as (src, dst):
1933 shutil.copyfileobj(src, dst)
1934 self.assertEqual(src.tell(), self.FILESIZE)
1935 self.assertEqual(dst.tell(), self.FILESIZE)
1936
1937 @unittest.skipIf(os.name != 'nt', "Windows only")
1938 def test_win_impl(self):
1939 # Make sure alternate Windows implementation is called.
1940 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1941 shutil.copyfile(TESTFN, TESTFN2)
1942 assert m.called
1943
1944 # File size is 2 MiB but max buf size should be 1 MiB.
1945 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
1946
1947 # If file size < 1 MiB memoryview() length must be equal to
1948 # the actual file size.
1949 with tempfile.NamedTemporaryFile(delete=False) as f:
1950 f.write(b'foo')
1951 fname = f.name
1952 self.addCleanup(support.unlink, fname)
1953 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1954 shutil.copyfile(fname, TESTFN2)
1955 self.assertEqual(m.call_args[0][2], 3)
1956
1957 # Empty files should not rely on readinto() variant.
1958 with tempfile.NamedTemporaryFile(delete=False) as f:
1959 pass
1960 fname = f.name
1961 self.addCleanup(support.unlink, fname)
1962 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1963 shutil.copyfile(fname, TESTFN2)
1964 assert not m.called
1965 self.assert_files_eq(fname, TESTFN2)
1966
1967
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001968class _ZeroCopyFileTest(object):
1969 """Tests common to all zero-copy APIs."""
1970 FILESIZE = (10 * 1024 * 1024) # 10 MiB
1971 FILEDATA = b""
1972 PATCHPOINT = ""
1973
1974 @classmethod
1975 def setUpClass(cls):
1976 write_test_file(TESTFN, cls.FILESIZE)
1977 with open(TESTFN, 'rb') as f:
1978 cls.FILEDATA = f.read()
1979 assert len(cls.FILEDATA) == cls.FILESIZE
1980
1981 @classmethod
1982 def tearDownClass(cls):
1983 support.unlink(TESTFN)
1984
1985 def tearDown(self):
1986 support.unlink(TESTFN2)
1987
1988 @contextlib.contextmanager
1989 def get_files(self):
1990 with open(TESTFN, "rb") as src:
1991 with open(TESTFN2, "wb") as dst:
1992 yield (src, dst)
1993
1994 def zerocopy_fun(self, *args, **kwargs):
1995 raise NotImplementedError("must be implemented in subclass")
1996
1997 def reset(self):
1998 self.tearDown()
1999 self.tearDownClass()
2000 self.setUpClass()
2001 self.setUp()
2002
2003 # ---
2004
2005 def test_regular_copy(self):
2006 with self.get_files() as (src, dst):
2007 self.zerocopy_fun(src, dst)
2008 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2009 # Make sure the fallback function is not called.
2010 with self.get_files() as (src, dst):
2011 with unittest.mock.patch('shutil.copyfileobj') as m:
2012 shutil.copyfile(TESTFN, TESTFN2)
2013 assert not m.called
2014
2015 def test_same_file(self):
2016 self.addCleanup(self.reset)
2017 with self.get_files() as (src, dst):
2018 with self.assertRaises(Exception):
2019 self.zerocopy_fun(src, src)
2020 # Make sure src file is not corrupted.
2021 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2022
2023 def test_non_existent_src(self):
2024 name = tempfile.mktemp()
2025 with self.assertRaises(FileNotFoundError) as cm:
2026 shutil.copyfile(name, "new")
2027 self.assertEqual(cm.exception.filename, name)
2028
2029 def test_empty_file(self):
2030 srcname = TESTFN + 'src'
2031 dstname = TESTFN + 'dst'
2032 self.addCleanup(lambda: support.unlink(srcname))
2033 self.addCleanup(lambda: support.unlink(dstname))
2034 with open(srcname, "wb"):
2035 pass
2036
2037 with open(srcname, "rb") as src:
2038 with open(dstname, "wb") as dst:
2039 self.zerocopy_fun(src, dst)
2040
2041 self.assertEqual(read_file(dstname, binary=True), b"")
2042
2043 def test_unhandled_exception(self):
2044 with unittest.mock.patch(self.PATCHPOINT,
2045 side_effect=ZeroDivisionError):
2046 self.assertRaises(ZeroDivisionError,
2047 shutil.copyfile, TESTFN, TESTFN2)
2048
2049 def test_exception_on_first_call(self):
2050 # Emulate a case where the first call to the zero-copy
2051 # function raises an exception in which case the function is
2052 # supposed to give up immediately.
2053 with unittest.mock.patch(self.PATCHPOINT,
2054 side_effect=OSError(errno.EINVAL, "yo")):
2055 with self.get_files() as (src, dst):
2056 with self.assertRaises(_GiveupOnFastCopy):
2057 self.zerocopy_fun(src, dst)
2058
2059 def test_filesystem_full(self):
2060 # Emulate a case where filesystem is full and sendfile() fails
2061 # on first call.
2062 with unittest.mock.patch(self.PATCHPOINT,
2063 side_effect=OSError(errno.ENOSPC, "yo")):
2064 with self.get_files() as (src, dst):
2065 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2066
2067
2068@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2069class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2070 PATCHPOINT = "os.sendfile"
2071
2072 def zerocopy_fun(self, fsrc, fdst):
2073 return shutil._fastcopy_sendfile(fsrc, fdst)
2074
2075 def test_non_regular_file_src(self):
2076 with io.BytesIO(self.FILEDATA) as src:
2077 with open(TESTFN2, "wb") as dst:
2078 with self.assertRaises(_GiveupOnFastCopy):
2079 self.zerocopy_fun(src, dst)
2080 shutil.copyfileobj(src, dst)
2081
2082 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2083
2084 def test_non_regular_file_dst(self):
2085 with open(TESTFN, "rb") as src:
2086 with io.BytesIO() as dst:
2087 with self.assertRaises(_GiveupOnFastCopy):
2088 self.zerocopy_fun(src, dst)
2089 shutil.copyfileobj(src, dst)
2090 dst.seek(0)
2091 self.assertEqual(dst.read(), self.FILEDATA)
2092
2093 def test_exception_on_second_call(self):
2094 def sendfile(*args, **kwargs):
2095 if not flag:
2096 flag.append(None)
2097 return orig_sendfile(*args, **kwargs)
2098 else:
2099 raise OSError(errno.EBADF, "yo")
2100
2101 flag = []
2102 orig_sendfile = os.sendfile
2103 with unittest.mock.patch('os.sendfile', create=True,
2104 side_effect=sendfile):
2105 with self.get_files() as (src, dst):
2106 with self.assertRaises(OSError) as cm:
2107 shutil._fastcopy_sendfile(src, dst)
2108 assert flag
2109 self.assertEqual(cm.exception.errno, errno.EBADF)
2110
2111 def test_cant_get_size(self):
2112 # Emulate a case where src file size cannot be determined.
2113 # Internally bufsize will be set to a small value and
2114 # sendfile() will be called repeatedly.
2115 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2116 with self.get_files() as (src, dst):
2117 shutil._fastcopy_sendfile(src, dst)
2118 assert m.called
2119 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2120
2121 def test_small_chunks(self):
2122 # Force internal file size detection to be smaller than the
2123 # actual file size. We want to force sendfile() to be called
2124 # multiple times, also in order to emulate a src fd which gets
2125 # bigger while it is being copied.
2126 mock = unittest.mock.Mock()
2127 mock.st_size = 65536 + 1
2128 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2129 with self.get_files() as (src, dst):
2130 shutil._fastcopy_sendfile(src, dst)
2131 assert m.called
2132 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2133
2134 def test_big_chunk(self):
2135 # Force internal file size detection to be +100MB bigger than
2136 # the actual file size. Make sure sendfile() does not rely on
2137 # file size value except for (maybe) a better throughput /
2138 # performance.
2139 mock = unittest.mock.Mock()
2140 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2141 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2142 with self.get_files() as (src, dst):
2143 shutil._fastcopy_sendfile(src, dst)
2144 assert m.called
2145 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2146
2147 def test_blocksize_arg(self):
2148 with unittest.mock.patch('os.sendfile',
2149 side_effect=ZeroDivisionError) as m:
2150 self.assertRaises(ZeroDivisionError,
2151 shutil.copyfile, TESTFN, TESTFN2)
2152 blocksize = m.call_args[0][3]
2153 # Make sure file size and the block size arg passed to
2154 # sendfile() are the same.
2155 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2156 # ...unless we're dealing with a small file.
2157 support.unlink(TESTFN2)
2158 write_file(TESTFN2, b"hello", binary=True)
2159 self.addCleanup(support.unlink, TESTFN2 + '3')
2160 self.assertRaises(ZeroDivisionError,
2161 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2162 blocksize = m.call_args[0][3]
2163 self.assertEqual(blocksize, 2 ** 23)
2164
2165 def test_file2file_not_supported(self):
2166 # Emulate a case where sendfile() only support file->socket
2167 # fds. In such a case copyfile() is supposed to skip the
2168 # fast-copy attempt from then on.
2169 assert shutil._HAS_SENDFILE
2170 try:
2171 with unittest.mock.patch(
2172 self.PATCHPOINT,
2173 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2174 with self.get_files() as (src, dst):
2175 with self.assertRaises(_GiveupOnFastCopy):
2176 shutil._fastcopy_sendfile(src, dst)
2177 assert m.called
2178 assert not shutil._HAS_SENDFILE
2179
2180 with unittest.mock.patch(self.PATCHPOINT) as m:
2181 shutil.copyfile(TESTFN, TESTFN2)
2182 assert not m.called
2183 finally:
2184 shutil._HAS_SENDFILE = True
2185
2186
Victor Stinner8fbbdf02018-06-22 19:25:44 +02002187@unittest.skipIf(not support.MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002188class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002189 PATCHPOINT = "posix._fcopyfile"
2190
2191 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002192 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002193
2194
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002195class TermsizeTests(unittest.TestCase):
2196 def test_does_not_crash(self):
2197 """Check if get_terminal_size() returns a meaningful value.
2198
2199 There's no easy portable way to actually check the size of the
2200 terminal, so let's check if it returns something sensible instead.
2201 """
2202 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002203 self.assertGreaterEqual(size.columns, 0)
2204 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002205
2206 def test_os_environ_first(self):
2207 "Check if environment variables have precedence"
2208
2209 with support.EnvironmentVarGuard() as env:
2210 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002211 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002212 size = shutil.get_terminal_size()
2213 self.assertEqual(size.columns, 777)
2214
2215 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002216 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002217 env['LINES'] = '888'
2218 size = shutil.get_terminal_size()
2219 self.assertEqual(size.lines, 888)
2220
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002221 def test_bad_environ(self):
2222 with support.EnvironmentVarGuard() as env:
2223 env['COLUMNS'] = 'xxx'
2224 env['LINES'] = 'yyy'
2225 size = shutil.get_terminal_size()
2226 self.assertGreaterEqual(size.columns, 0)
2227 self.assertGreaterEqual(size.lines, 0)
2228
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002229 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002230 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2231 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002232 def test_stty_match(self):
2233 """Check if stty returns the same results ignoring env
2234
2235 This test will fail if stdin and stdout are connected to
2236 different terminals with different sizes. Nevertheless, such
2237 situations should be pretty rare.
2238 """
2239 try:
2240 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002241 except (FileNotFoundError, PermissionError,
2242 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002243 self.skipTest("stty invocation failed")
2244 expected = (int(size[1]), int(size[0])) # reversed order
2245
2246 with support.EnvironmentVarGuard() as env:
2247 del env['LINES']
2248 del env['COLUMNS']
2249 actual = shutil.get_terminal_size()
2250
2251 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002252
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002253 def test_fallback(self):
2254 with support.EnvironmentVarGuard() as env:
2255 del env['LINES']
2256 del env['COLUMNS']
2257
2258 # sys.__stdout__ has no fileno()
2259 with support.swap_attr(sys, '__stdout__', None):
2260 size = shutil.get_terminal_size(fallback=(10, 20))
2261 self.assertEqual(size.columns, 10)
2262 self.assertEqual(size.lines, 20)
2263
2264 # sys.__stdout__ is not a terminal on Unix
2265 # or fileno() not in (0, 1, 2) on Windows
2266 with open(os.devnull, 'w') as f, \
2267 support.swap_attr(sys, '__stdout__', f):
2268 size = shutil.get_terminal_size(fallback=(30, 40))
2269 self.assertEqual(size.columns, 30)
2270 self.assertEqual(size.lines, 40)
2271
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002272
Berker Peksag8083cd62014-11-01 11:04:06 +02002273class PublicAPITests(unittest.TestCase):
2274 """Ensures that the correct values are exposed in the public API."""
2275
2276 def test_module_all_attribute(self):
2277 self.assertTrue(hasattr(shutil, '__all__'))
2278 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2279 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2280 'SpecialFileError', 'ExecError', 'make_archive',
2281 'get_archive_formats', 'register_archive_format',
2282 'unregister_archive_format', 'get_unpack_formats',
2283 'register_unpack_format', 'unregister_unpack_format',
2284 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2285 'get_terminal_size', 'SameFileError']
2286 if hasattr(os, 'statvfs') or os.name == 'nt':
2287 target_api.append('disk_usage')
2288 self.assertEqual(set(shutil.__all__), set(target_api))
2289
2290
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002291if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002292 unittest.main()