blob: ec8fcc3eef01e2516e236b847e8e766b0282e741 [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Tarek Ziadé396fad72010-02-23 05:30:31 +000037try:
38 import grp
39 import pwd
40 UID_GID_SUPPORT = True
41except ImportError:
42 UID_GID_SUPPORT = False
43
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040044def _fake_rename(*args, **kwargs):
45 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010046 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040047
48def mock_rename(func):
49 @functools.wraps(func)
50 def wrap(*args, **kwargs):
51 try:
52 builtin_rename = os.rename
53 os.rename = _fake_rename
54 return func(*args, **kwargs)
55 finally:
56 os.rename = builtin_rename
57 return wrap
58
Éric Araujoa7e33a12011-08-12 19:51:35 +020059def write_file(path, content, binary=False):
60 """Write *content* to a file located at *path*.
61
62 If *path* is a tuple instead of a string, os.path.join will be used to
63 make a path. If *binary* is true, the file will be opened in binary
64 mode.
65 """
66 if isinstance(path, tuple):
67 path = os.path.join(*path)
68 with open(path, 'wb' if binary else 'w') as fp:
69 fp.write(content)
70
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020071def write_test_file(path, size):
72 """Create a test file with an arbitrary size and random text content."""
73 def chunks(total, step):
74 assert total >= step
75 while total > step:
76 yield step
77 total -= step
78 if total:
79 yield total
80
81 bufsize = min(size, 8192)
82 chunk = b"".join([random.choice(string.ascii_letters).encode()
83 for i in range(bufsize)])
84 with open(path, 'wb') as f:
85 for csize in chunks(size, bufsize):
86 f.write(chunk)
87 assert os.path.getsize(path) == size
88
Éric Araujoa7e33a12011-08-12 19:51:35 +020089def read_file(path, binary=False):
90 """Return contents from a file located at *path*.
91
92 If *path* is a tuple instead of a string, os.path.join will be used to
93 make a path. If *binary* is true, the file will be opened in binary
94 mode.
95 """
96 if isinstance(path, tuple):
97 path = os.path.join(*path)
98 with open(path, 'rb' if binary else 'r') as fp:
99 return fp.read()
100
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300101def rlistdir(path):
102 res = []
103 for name in sorted(os.listdir(path)):
104 p = os.path.join(path, name)
105 if os.path.isdir(p) and not os.path.islink(p):
106 res.append(name + '/')
107 for n in rlistdir(p):
108 res.append(name + '/' + n)
109 else:
110 res.append(name)
111 return res
112
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200113def supports_file2file_sendfile():
114 # ...apparently Linux and Solaris are the only ones
115 if not hasattr(os, "sendfile"):
116 return False
117 srcname = None
118 dstname = None
119 try:
120 with tempfile.NamedTemporaryFile("wb", delete=False) as f:
121 srcname = f.name
122 f.write(b"0123456789")
123
124 with open(srcname, "rb") as src:
125 with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
126 dstname = f.name
127 infd = src.fileno()
128 outfd = dst.fileno()
129 try:
130 os.sendfile(outfd, infd, 0, 2)
131 except OSError:
132 return False
133 else:
134 return True
135 finally:
136 if srcname is not None:
137 support.unlink(srcname)
138 if dstname is not None:
139 support.unlink(dstname)
140
141
142SUPPORTS_SENDFILE = supports_file2file_sendfile()
143
Éric Araujoa7e33a12011-08-12 19:51:35 +0200144
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000145class TestShutil(unittest.TestCase):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000146
147 def setUp(self):
148 super(TestShutil, self).setUp()
149 self.tempdirs = []
150
151 def tearDown(self):
152 super(TestShutil, self).tearDown()
153 while self.tempdirs:
154 d = self.tempdirs.pop()
155 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
156
Tarek Ziadé396fad72010-02-23 05:30:31 +0000157
158 def mkdtemp(self):
159 """Create a temporary directory that will be cleaned up.
160
161 Returns the path of the directory.
162 """
163 d = tempfile.mkdtemp()
164 self.tempdirs.append(d)
165 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000166
Hynek Schlawack3b527782012-06-25 13:27:31 +0200167 def test_rmtree_works_on_bytes(self):
168 tmp = self.mkdtemp()
169 victim = os.path.join(tmp, 'killme')
170 os.mkdir(victim)
171 write_file(os.path.join(victim, 'somefile'), 'foo')
172 victim = os.fsencode(victim)
173 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700174 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200175
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200176 @support.skip_unless_symlink
177 def test_rmtree_fails_on_symlink(self):
178 tmp = self.mkdtemp()
179 dir_ = os.path.join(tmp, 'dir')
180 os.mkdir(dir_)
181 link = os.path.join(tmp, 'link')
182 os.symlink(dir_, link)
183 self.assertRaises(OSError, shutil.rmtree, link)
184 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100185 self.assertTrue(os.path.lexists(link))
186 errors = []
187 def onerror(*args):
188 errors.append(args)
189 shutil.rmtree(link, onerror=onerror)
190 self.assertEqual(len(errors), 1)
191 self.assertIs(errors[0][0], os.path.islink)
192 self.assertEqual(errors[0][1], link)
193 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200194
195 @support.skip_unless_symlink
196 def test_rmtree_works_on_symlinks(self):
197 tmp = self.mkdtemp()
198 dir1 = os.path.join(tmp, 'dir1')
199 dir2 = os.path.join(dir1, 'dir2')
200 dir3 = os.path.join(tmp, 'dir3')
201 for d in dir1, dir2, dir3:
202 os.mkdir(d)
203 file1 = os.path.join(tmp, 'file1')
204 write_file(file1, 'foo')
205 link1 = os.path.join(dir1, 'link1')
206 os.symlink(dir2, link1)
207 link2 = os.path.join(dir1, 'link2')
208 os.symlink(dir3, link2)
209 link3 = os.path.join(dir1, 'link3')
210 os.symlink(file1, link3)
211 # make sure symlinks are removed but not followed
212 shutil.rmtree(dir1)
213 self.assertFalse(os.path.exists(dir1))
214 self.assertTrue(os.path.exists(dir3))
215 self.assertTrue(os.path.exists(file1))
216
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000217 def test_rmtree_errors(self):
218 # filename is guaranteed not to exist
219 filename = tempfile.mktemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100220 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
221 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100222 shutil.rmtree(filename, ignore_errors=True)
223
224 # existing file
225 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100226 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100227 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100228 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100229 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100230 # The reason for this rather odd construct is that Windows sprinkles
231 # a \*.* at the end of file names. But only sometimes on some buildbots
232 possible_args = [filename, os.path.join(filename, '*.*')]
233 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100234 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100235 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100236 shutil.rmtree(filename, ignore_errors=True)
237 self.assertTrue(os.path.exists(filename))
238 errors = []
239 def onerror(*args):
240 errors.append(args)
241 shutil.rmtree(filename, onerror=onerror)
242 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200243 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100244 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100245 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100246 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100247 self.assertIs(errors[1][0], os.rmdir)
248 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100249 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100250 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000251
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000252
Serhiy Storchaka43767632013-11-03 21:31:38 +0200253 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
254 @unittest.skipIf(sys.platform[:6] == 'cygwin',
255 "This test can't be run on Cygwin (issue #1071513).")
256 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
257 "This test can't be run reliably as root (issue #1076467).")
258 def test_on_error(self):
259 self.errorState = 0
260 os.mkdir(TESTFN)
261 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200262
Serhiy Storchaka43767632013-11-03 21:31:38 +0200263 self.child_file_path = os.path.join(TESTFN, 'a')
264 self.child_dir_path = os.path.join(TESTFN, 'b')
265 support.create_empty_file(self.child_file_path)
266 os.mkdir(self.child_dir_path)
267 old_dir_mode = os.stat(TESTFN).st_mode
268 old_child_file_mode = os.stat(self.child_file_path).st_mode
269 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
270 # Make unwritable.
271 new_mode = stat.S_IREAD|stat.S_IEXEC
272 os.chmod(self.child_file_path, new_mode)
273 os.chmod(self.child_dir_path, new_mode)
274 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000275
Serhiy Storchaka43767632013-11-03 21:31:38 +0200276 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
277 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
278 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200279
Serhiy Storchaka43767632013-11-03 21:31:38 +0200280 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
281 # Test whether onerror has actually been called.
282 self.assertEqual(self.errorState, 3,
283 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000284
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000285 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000286 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200287 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000288 # This function is run when shutil.rmtree fails.
289 # 99.9% of the time it initially fails to remove
290 # a file in the directory, so the first time through
291 # func is os.remove.
292 # However, some Linux machines running ZFS on
293 # FUSE experienced a failure earlier in the process
294 # at os.listdir. The first failure may legally
295 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200296 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200297 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200298 self.assertEqual(arg, self.child_file_path)
299 elif func is os.rmdir:
300 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000301 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200302 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200303 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000304 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200305 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000306 else:
307 self.assertEqual(func, os.rmdir)
308 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000309 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200310 self.errorState = 3
311
312 def test_rmtree_does_not_choke_on_failing_lstat(self):
313 try:
314 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200315 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200316 if fn != TESTFN:
317 raise OSError()
318 else:
319 return orig_lstat(fn)
320 os.lstat = raiser
321
322 os.mkdir(TESTFN)
323 write_file((TESTFN, 'foo'), 'foo')
324 shutil.rmtree(TESTFN)
325 finally:
326 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000327
Antoine Pitrou78091e62011-12-29 18:54:15 +0100328 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
329 @support.skip_unless_symlink
330 def test_copymode_follow_symlinks(self):
331 tmp_dir = self.mkdtemp()
332 src = os.path.join(tmp_dir, 'foo')
333 dst = os.path.join(tmp_dir, 'bar')
334 src_link = os.path.join(tmp_dir, 'baz')
335 dst_link = os.path.join(tmp_dir, 'quux')
336 write_file(src, 'foo')
337 write_file(dst, 'foo')
338 os.symlink(src, src_link)
339 os.symlink(dst, dst_link)
340 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
341 # file to file
342 os.chmod(dst, stat.S_IRWXO)
343 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
344 shutil.copymode(src, dst)
345 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou3f48ac92014-01-01 02:50:45 +0100346 # On Windows, os.chmod does not follow symlinks (issue #15411)
347 if os.name != 'nt':
348 # follow src link
349 os.chmod(dst, stat.S_IRWXO)
350 shutil.copymode(src_link, dst)
351 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
352 # follow dst link
353 os.chmod(dst, stat.S_IRWXO)
354 shutil.copymode(src, dst_link)
355 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
356 # follow both links
357 os.chmod(dst, stat.S_IRWXO)
358 shutil.copymode(src_link, dst_link)
359 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100360
361 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
362 @support.skip_unless_symlink
363 def test_copymode_symlink_to_symlink(self):
364 tmp_dir = self.mkdtemp()
365 src = os.path.join(tmp_dir, 'foo')
366 dst = os.path.join(tmp_dir, 'bar')
367 src_link = os.path.join(tmp_dir, 'baz')
368 dst_link = os.path.join(tmp_dir, 'quux')
369 write_file(src, 'foo')
370 write_file(dst, 'foo')
371 os.symlink(src, src_link)
372 os.symlink(dst, dst_link)
373 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
374 os.chmod(dst, stat.S_IRWXU)
375 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
376 # link to link
377 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700378 shutil.copymode(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100379 self.assertEqual(os.lstat(src_link).st_mode,
380 os.lstat(dst_link).st_mode)
381 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
382 # src link - use chmod
383 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700384 shutil.copymode(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100385 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
386 # dst link - use chmod
387 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700388 shutil.copymode(src, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100389 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
390
391 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
392 @support.skip_unless_symlink
393 def test_copymode_symlink_to_symlink_wo_lchmod(self):
394 tmp_dir = self.mkdtemp()
395 src = os.path.join(tmp_dir, 'foo')
396 dst = os.path.join(tmp_dir, 'bar')
397 src_link = os.path.join(tmp_dir, 'baz')
398 dst_link = os.path.join(tmp_dir, 'quux')
399 write_file(src, 'foo')
400 write_file(dst, 'foo')
401 os.symlink(src, src_link)
402 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700403 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
Antoine Pitrou78091e62011-12-29 18:54:15 +0100404
405 @support.skip_unless_symlink
406 def test_copystat_symlinks(self):
407 tmp_dir = self.mkdtemp()
408 src = os.path.join(tmp_dir, 'foo')
409 dst = os.path.join(tmp_dir, 'bar')
410 src_link = os.path.join(tmp_dir, 'baz')
411 dst_link = os.path.join(tmp_dir, 'qux')
412 write_file(src, 'foo')
413 src_stat = os.stat(src)
414 os.utime(src, (src_stat.st_atime,
415 src_stat.st_mtime - 42.0)) # ensure different mtimes
416 write_file(dst, 'bar')
417 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
418 os.symlink(src, src_link)
419 os.symlink(dst, dst_link)
420 if hasattr(os, 'lchmod'):
421 os.lchmod(src_link, stat.S_IRWXO)
422 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
423 os.lchflags(src_link, stat.UF_NODUMP)
424 src_link_stat = os.lstat(src_link)
425 # follow
426 if hasattr(os, 'lchmod'):
Larry Hastingsb4038062012-07-15 10:57:38 -0700427 shutil.copystat(src_link, dst_link, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100428 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
429 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700430 shutil.copystat(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100431 dst_link_stat = os.lstat(dst_link)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700432 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100433 for attr in 'st_atime', 'st_mtime':
434 # The modification times may be truncated in the new file.
435 self.assertLessEqual(getattr(src_link_stat, attr),
436 getattr(dst_link_stat, attr) + 1)
437 if hasattr(os, 'lchmod'):
438 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
439 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
440 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
441 # tell to follow but dst is not a link
Larry Hastingsb4038062012-07-15 10:57:38 -0700442 shutil.copystat(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100443 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
444 00000.1)
445
Ned Deilybaf75712012-05-10 17:05:19 -0700446 @unittest.skipUnless(hasattr(os, 'chflags') and
447 hasattr(errno, 'EOPNOTSUPP') and
448 hasattr(errno, 'ENOTSUP'),
449 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
450 def test_copystat_handles_harmless_chflags_errors(self):
451 tmpdir = self.mkdtemp()
452 file1 = os.path.join(tmpdir, 'file1')
453 file2 = os.path.join(tmpdir, 'file2')
454 write_file(file1, 'xxx')
455 write_file(file2, 'xxx')
456
457 def make_chflags_raiser(err):
458 ex = OSError()
459
Larry Hastings90867a52012-06-22 17:01:41 -0700460 def _chflags_raiser(path, flags, *, follow_symlinks=True):
Ned Deilybaf75712012-05-10 17:05:19 -0700461 ex.errno = err
462 raise ex
463 return _chflags_raiser
464 old_chflags = os.chflags
465 try:
466 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
467 os.chflags = make_chflags_raiser(err)
468 shutil.copystat(file1, file2)
469 # assert others errors break it
470 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
471 self.assertRaises(OSError, shutil.copystat, file1, file2)
472 finally:
473 os.chflags = old_chflags
474
Antoine Pitrou424246f2012-05-12 19:02:01 +0200475 @support.skip_unless_xattr
476 def test_copyxattr(self):
477 tmp_dir = self.mkdtemp()
478 src = os.path.join(tmp_dir, 'foo')
479 write_file(src, 'foo')
480 dst = os.path.join(tmp_dir, 'bar')
481 write_file(dst, 'bar')
482
483 # no xattr == no problem
484 shutil._copyxattr(src, dst)
485 # common case
486 os.setxattr(src, 'user.foo', b'42')
487 os.setxattr(src, 'user.bar', b'43')
488 shutil._copyxattr(src, dst)
Gregory P. Smith1093bf22014-01-17 12:01:22 -0800489 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200490 self.assertEqual(
491 os.getxattr(src, 'user.foo'),
492 os.getxattr(dst, 'user.foo'))
493 # check errors don't affect other attrs
494 os.remove(dst)
495 write_file(dst, 'bar')
496 os_error = OSError(errno.EPERM, 'EPERM')
497
Larry Hastings9cf065c2012-06-22 16:30:09 -0700498 def _raise_on_user_foo(fname, attr, val, **kwargs):
Antoine Pitrou424246f2012-05-12 19:02:01 +0200499 if attr == 'user.foo':
500 raise os_error
501 else:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700502 orig_setxattr(fname, attr, val, **kwargs)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200503 try:
504 orig_setxattr = os.setxattr
505 os.setxattr = _raise_on_user_foo
506 shutil._copyxattr(src, dst)
Antoine Pitrou61597d32012-05-12 23:37:35 +0200507 self.assertIn('user.bar', os.listxattr(dst))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200508 finally:
509 os.setxattr = orig_setxattr
Hynek Schlawack0beab052013-02-05 08:22:44 +0100510 # the source filesystem not supporting xattrs should be ok, too.
511 def _raise_on_src(fname, *, follow_symlinks=True):
512 if fname == src:
513 raise OSError(errno.ENOTSUP, 'Operation not supported')
514 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
515 try:
516 orig_listxattr = os.listxattr
517 os.listxattr = _raise_on_src
518 shutil._copyxattr(src, dst)
519 finally:
520 os.listxattr = orig_listxattr
Antoine Pitrou424246f2012-05-12 19:02:01 +0200521
Larry Hastingsad5ae042012-07-14 17:55:11 -0700522 # test that shutil.copystat copies xattrs
523 src = os.path.join(tmp_dir, 'the_original')
524 write_file(src, src)
525 os.setxattr(src, 'user.the_value', b'fiddly')
526 dst = os.path.join(tmp_dir, 'the_copy')
527 write_file(dst, dst)
528 shutil.copystat(src, dst)
Hynek Schlawackc2d481f2012-07-16 17:11:10 +0200529 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700530
Antoine Pitrou424246f2012-05-12 19:02:01 +0200531 @support.skip_unless_symlink
532 @support.skip_unless_xattr
533 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
534 'root privileges required')
535 def test_copyxattr_symlinks(self):
536 # On Linux, it's only possible to access non-user xattr for symlinks;
537 # which in turn require root privileges. This test should be expanded
538 # as soon as other platforms gain support for extended attributes.
539 tmp_dir = self.mkdtemp()
540 src = os.path.join(tmp_dir, 'foo')
541 src_link = os.path.join(tmp_dir, 'baz')
542 write_file(src, 'foo')
543 os.symlink(src, src_link)
544 os.setxattr(src, 'trusted.foo', b'42')
Larry Hastings9cf065c2012-06-22 16:30:09 -0700545 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200546 dst = os.path.join(tmp_dir, 'bar')
547 dst_link = os.path.join(tmp_dir, 'qux')
548 write_file(dst, 'bar')
549 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700550 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700551 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
Antoine Pitrou424246f2012-05-12 19:02:01 +0200552 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
Larry Hastingsb4038062012-07-15 10:57:38 -0700553 shutil._copyxattr(src_link, dst, follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200554 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
555
Antoine Pitrou78091e62011-12-29 18:54:15 +0100556 @support.skip_unless_symlink
557 def test_copy_symlinks(self):
558 tmp_dir = self.mkdtemp()
559 src = os.path.join(tmp_dir, 'foo')
560 dst = os.path.join(tmp_dir, 'bar')
561 src_link = os.path.join(tmp_dir, 'baz')
562 write_file(src, 'foo')
563 os.symlink(src, src_link)
564 if hasattr(os, 'lchmod'):
565 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
566 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700567 shutil.copy(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100568 self.assertFalse(os.path.islink(dst))
569 self.assertEqual(read_file(src), read_file(dst))
570 os.remove(dst)
571 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700572 shutil.copy(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100573 self.assertTrue(os.path.islink(dst))
574 self.assertEqual(os.readlink(dst), os.readlink(src_link))
575 if hasattr(os, 'lchmod'):
576 self.assertEqual(os.lstat(src_link).st_mode,
577 os.lstat(dst).st_mode)
578
579 @support.skip_unless_symlink
580 def test_copy2_symlinks(self):
581 tmp_dir = self.mkdtemp()
582 src = os.path.join(tmp_dir, 'foo')
583 dst = os.path.join(tmp_dir, 'bar')
584 src_link = os.path.join(tmp_dir, 'baz')
585 write_file(src, 'foo')
586 os.symlink(src, src_link)
587 if hasattr(os, 'lchmod'):
588 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
589 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
590 os.lchflags(src_link, stat.UF_NODUMP)
591 src_stat = os.stat(src)
592 src_link_stat = os.lstat(src_link)
593 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700594 shutil.copy2(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100595 self.assertFalse(os.path.islink(dst))
596 self.assertEqual(read_file(src), read_file(dst))
597 os.remove(dst)
598 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700599 shutil.copy2(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100600 self.assertTrue(os.path.islink(dst))
601 self.assertEqual(os.readlink(dst), os.readlink(src_link))
602 dst_stat = os.lstat(dst)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700603 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100604 for attr in 'st_atime', 'st_mtime':
605 # The modification times may be truncated in the new file.
606 self.assertLessEqual(getattr(src_link_stat, attr),
607 getattr(dst_stat, attr) + 1)
608 if hasattr(os, 'lchmod'):
609 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
610 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
611 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
612 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
613
Antoine Pitrou424246f2012-05-12 19:02:01 +0200614 @support.skip_unless_xattr
615 def test_copy2_xattr(self):
616 tmp_dir = self.mkdtemp()
617 src = os.path.join(tmp_dir, 'foo')
618 dst = os.path.join(tmp_dir, 'bar')
619 write_file(src, 'foo')
620 os.setxattr(src, 'user.foo', b'42')
621 shutil.copy2(src, dst)
622 self.assertEqual(
623 os.getxattr(src, 'user.foo'),
624 os.getxattr(dst, 'user.foo'))
625 os.remove(dst)
626
Antoine Pitrou78091e62011-12-29 18:54:15 +0100627 @support.skip_unless_symlink
628 def test_copyfile_symlinks(self):
629 tmp_dir = self.mkdtemp()
630 src = os.path.join(tmp_dir, 'src')
631 dst = os.path.join(tmp_dir, 'dst')
632 dst_link = os.path.join(tmp_dir, 'dst_link')
633 link = os.path.join(tmp_dir, 'link')
634 write_file(src, 'foo')
635 os.symlink(src, link)
636 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700637 shutil.copyfile(link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100638 self.assertTrue(os.path.islink(dst_link))
639 self.assertEqual(os.readlink(link), os.readlink(dst_link))
640 # follow
641 shutil.copyfile(link, dst)
642 self.assertFalse(os.path.islink(dst))
643
Hynek Schlawack2100b422012-06-23 20:28:32 +0200644 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200645 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
646 os.supports_dir_fd and
647 os.listdir in os.supports_fd and
648 os.stat in os.supports_follow_symlinks)
649 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200650 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000651 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200652 tmp_dir = self.mkdtemp()
653 d = os.path.join(tmp_dir, 'a')
654 os.mkdir(d)
655 try:
656 real_rmtree = shutil._rmtree_safe_fd
657 class Called(Exception): pass
658 def _raiser(*args, **kwargs):
659 raise Called
660 shutil._rmtree_safe_fd = _raiser
661 self.assertRaises(Called, shutil.rmtree, d)
662 finally:
663 shutil._rmtree_safe_fd = real_rmtree
664 else:
665 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000666 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200667
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000668 def test_rmtree_dont_delete_file(self):
669 # When called on a file instead of a directory, don't delete it.
670 handle, path = tempfile.mkstemp()
Victor Stinnerbf816222011-06-30 23:25:47 +0200671 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200672 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000673 os.remove(path)
674
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000675 def test_copytree_simple(self):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000676 src_dir = tempfile.mkdtemp()
677 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200678 self.addCleanup(shutil.rmtree, src_dir)
679 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
680 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000681 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200682 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000683
Éric Araujoa7e33a12011-08-12 19:51:35 +0200684 shutil.copytree(src_dir, dst_dir)
685 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
686 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
687 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
688 'test.txt')))
689 actual = read_file((dst_dir, 'test.txt'))
690 self.assertEqual(actual, '123')
691 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
692 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000693
Antoine Pitrou78091e62011-12-29 18:54:15 +0100694 @support.skip_unless_symlink
695 def test_copytree_symlinks(self):
696 tmp_dir = self.mkdtemp()
697 src_dir = os.path.join(tmp_dir, 'src')
698 dst_dir = os.path.join(tmp_dir, 'dst')
699 sub_dir = os.path.join(src_dir, 'sub')
700 os.mkdir(src_dir)
701 os.mkdir(sub_dir)
702 write_file((src_dir, 'file.txt'), 'foo')
703 src_link = os.path.join(sub_dir, 'link')
704 dst_link = os.path.join(dst_dir, 'sub/link')
705 os.symlink(os.path.join(src_dir, 'file.txt'),
706 src_link)
707 if hasattr(os, 'lchmod'):
708 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
709 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
710 os.lchflags(src_link, stat.UF_NODUMP)
711 src_stat = os.lstat(src_link)
712 shutil.copytree(src_dir, dst_dir, symlinks=True)
713 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
714 self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
715 os.path.join(src_dir, 'file.txt'))
716 dst_stat = os.lstat(dst_link)
717 if hasattr(os, 'lchmod'):
718 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
719 if hasattr(os, 'lchflags'):
720 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
721
Georg Brandl2ee470f2008-07-16 12:55:28 +0000722 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000723 # creating data
724 join = os.path.join
725 exists = os.path.exists
726 src_dir = tempfile.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000727 try:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000728 dst_dir = join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200729 write_file((src_dir, 'test.txt'), '123')
730 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000731 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200732 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000733 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200734 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000735 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
736 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200737 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
738 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000739
740 # testing glob-like patterns
741 try:
742 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
743 shutil.copytree(src_dir, dst_dir, ignore=patterns)
744 # checking the result: some elements should not be copied
745 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200746 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
747 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000748 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200749 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000750 try:
751 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
752 shutil.copytree(src_dir, dst_dir, ignore=patterns)
753 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200754 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
755 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
756 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000757 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200758 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000759
760 # testing callable-style
761 try:
762 def _filter(src, names):
763 res = []
764 for name in names:
765 path = os.path.join(src, name)
766
767 if (os.path.isdir(path) and
768 path.split()[-1] == 'subdir'):
769 res.append(name)
770 elif os.path.splitext(path)[-1] in ('.py'):
771 res.append(name)
772 return res
773
774 shutil.copytree(src_dir, dst_dir, ignore=_filter)
775
776 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200777 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
778 'test.py')))
779 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000780
781 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200782 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000783 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000784 shutil.rmtree(src_dir)
785 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000786
Antoine Pitrouac601602013-08-16 19:35:02 +0200787 def test_copytree_retains_permissions(self):
788 tmp_dir = tempfile.mkdtemp()
789 src_dir = os.path.join(tmp_dir, 'source')
790 os.mkdir(src_dir)
791 dst_dir = os.path.join(tmp_dir, 'destination')
792 self.addCleanup(shutil.rmtree, tmp_dir)
793
794 os.chmod(src_dir, 0o777)
795 write_file((src_dir, 'permissive.txt'), '123')
796 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
797 write_file((src_dir, 'restrictive.txt'), '456')
798 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
799 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
800 os.chmod(restrictive_subdir, 0o600)
801
802 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400803 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
804 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200805 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400806 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200807 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
808 restrictive_subdir_dst = os.path.join(dst_dir,
809 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400810 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200811 os.stat(restrictive_subdir_dst).st_mode)
812
Berker Peksag884afd92014-12-10 02:50:32 +0200813 @unittest.mock.patch('os.chmod')
814 def test_copytree_winerror(self, mock_patch):
815 # When copying to VFAT, copystat() raises OSError. On Windows, the
816 # exception object has a meaningful 'winerror' attribute, but not
817 # on other operating systems. Do not assume 'winerror' is set.
818 src_dir = tempfile.mkdtemp()
819 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
820 self.addCleanup(shutil.rmtree, src_dir)
821 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
822
823 mock_patch.side_effect = PermissionError('ka-boom')
824 with self.assertRaises(shutil.Error):
825 shutil.copytree(src_dir, dst_dir)
826
Zachary Ware9fe6d862013-12-08 00:20:35 -0600827 @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000828 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000829 def test_dont_copy_file_onto_link_to_itself(self):
830 # bug 851123.
831 os.mkdir(TESTFN)
832 src = os.path.join(TESTFN, 'cheese')
833 dst = os.path.join(TESTFN, 'shop')
834 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000835 with open(src, 'w') as f:
836 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +0100837 try:
838 os.link(src, dst)
839 except PermissionError as e:
840 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +0200841 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000842 with open(src, 'r') as f:
843 self.assertEqual(f.read(), 'cheddar')
844 os.remove(dst)
845 finally:
846 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000847
Brian Curtin3b4499c2010-12-28 14:31:47 +0000848 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000849 def test_dont_copy_file_onto_symlink_to_itself(self):
850 # bug 851123.
851 os.mkdir(TESTFN)
852 src = os.path.join(TESTFN, 'cheese')
853 dst = os.path.join(TESTFN, 'shop')
854 try:
855 with open(src, 'w') as f:
856 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000857 # Using `src` here would mean we end up with a symlink pointing
858 # to TESTFN/TESTFN/cheese, while it should point at
859 # TESTFN/cheese.
860 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +0200861 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +0000862 with open(src, 'r') as f:
863 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000864 os.remove(dst)
865 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000866 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +0000867
Brian Curtin3b4499c2010-12-28 14:31:47 +0000868 @support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +0000869 def test_rmtree_on_symlink(self):
870 # bug 1669.
871 os.mkdir(TESTFN)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000872 try:
Brian Curtind40e6f72010-07-08 21:39:08 +0000873 src = os.path.join(TESTFN, 'cheese')
874 dst = os.path.join(TESTFN, 'shop')
875 os.mkdir(src)
876 os.symlink(src, dst)
877 self.assertRaises(OSError, shutil.rmtree, dst)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200878 shutil.rmtree(dst, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000879 finally:
Brian Curtind40e6f72010-07-08 21:39:08 +0000880 shutil.rmtree(TESTFN, ignore_errors=True)
881
Serhiy Storchaka43767632013-11-03 21:31:38 +0200882 # Issue #3002: copyfile and copytree block indefinitely on named pipes
883 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
884 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +0100885 try:
886 os.mkfifo(TESTFN)
887 except PermissionError as e:
888 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200889 try:
890 self.assertRaises(shutil.SpecialFileError,
891 shutil.copyfile, TESTFN, TESTFN2)
892 self.assertRaises(shutil.SpecialFileError,
893 shutil.copyfile, __file__, TESTFN)
894 finally:
895 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000896
Serhiy Storchaka43767632013-11-03 21:31:38 +0200897 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
898 @support.skip_unless_symlink
899 def test_copytree_named_pipe(self):
900 os.mkdir(TESTFN)
901 try:
902 subdir = os.path.join(TESTFN, "subdir")
903 os.mkdir(subdir)
904 pipe = os.path.join(subdir, "mypipe")
xdegaye92c2ca72017-11-12 17:31:07 +0100905 try:
906 os.mkfifo(pipe)
907 except PermissionError as e:
908 self.skipTest('os.mkfifo(): %s' % e)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000909 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200910 shutil.copytree(TESTFN, TESTFN2)
911 except shutil.Error as e:
912 errors = e.args[0]
913 self.assertEqual(len(errors), 1)
914 src, dst, error_msg = errors[0]
915 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
916 else:
917 self.fail("shutil.Error should have been raised")
918 finally:
919 shutil.rmtree(TESTFN, ignore_errors=True)
920 shutil.rmtree(TESTFN2, ignore_errors=True)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000921
Tarek Ziadé5340db32010-04-19 22:30:51 +0000922 def test_copytree_special_func(self):
923
924 src_dir = self.mkdtemp()
925 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200926 write_file((src_dir, 'test.txt'), '123')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000927 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200928 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000929
930 copied = []
931 def _copy(src, dst):
932 copied.append((src, dst))
933
934 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000935 self.assertEqual(len(copied), 2)
Tarek Ziadé5340db32010-04-19 22:30:51 +0000936
Brian Curtin3b4499c2010-12-28 14:31:47 +0000937 @support.skip_unless_symlink
Tarek Ziadéfb437512010-04-20 08:57:33 +0000938 def test_copytree_dangling_symlinks(self):
939
940 # a dangling symlink raises an error at the end
941 src_dir = self.mkdtemp()
942 dst_dir = os.path.join(self.mkdtemp(), 'destination')
943 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
944 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200945 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadéfb437512010-04-20 08:57:33 +0000946 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
947
948 # a dangling symlink is ignored with the proper flag
949 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
950 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
951 self.assertNotIn('test.txt', os.listdir(dst_dir))
952
953 # a dangling symlink is copied if symlinks=True
954 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
955 shutil.copytree(src_dir, dst_dir, symlinks=True)
956 self.assertIn('test.txt', os.listdir(dst_dir))
957
Berker Peksag5a294d82015-07-25 14:53:48 +0300958 @support.skip_unless_symlink
959 def test_copytree_symlink_dir(self):
960 src_dir = self.mkdtemp()
961 dst_dir = os.path.join(self.mkdtemp(), 'destination')
962 os.mkdir(os.path.join(src_dir, 'real_dir'))
963 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
964 pass
965 os.symlink(os.path.join(src_dir, 'real_dir'),
966 os.path.join(src_dir, 'link_to_dir'),
967 target_is_directory=True)
968
969 shutil.copytree(src_dir, dst_dir, symlinks=False)
970 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
971 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
972
973 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
974 shutil.copytree(src_dir, dst_dir, symlinks=True)
975 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
976 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
977
Nick Coghlan8ed3cf32011-03-16 14:05:35 -0400978 def _copy_file(self, method):
979 fname = 'test.txt'
980 tmpdir = self.mkdtemp()
Éric Araujoa7e33a12011-08-12 19:51:35 +0200981 write_file((tmpdir, fname), 'xxx')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -0400982 file1 = os.path.join(tmpdir, fname)
983 tmpdir2 = self.mkdtemp()
984 method(file1, tmpdir2)
985 file2 = os.path.join(tmpdir2, fname)
986 return (file1, file2)
987
988 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
989 def test_copy(self):
990 # Ensure that the copied file exists and has the same mode bits.
991 file1, file2 = self._copy_file(shutil.copy)
992 self.assertTrue(os.path.exists(file2))
993 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
994
995 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
Senthil Kumaran0c2dba52011-07-03 18:21:38 -0700996 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -0400997 def test_copy2(self):
998 # Ensure that the copied file exists and has the same mode and
999 # modification time bits.
1000 file1, file2 = self._copy_file(shutil.copy2)
1001 self.assertTrue(os.path.exists(file2))
1002 file1_stat = os.stat(file1)
1003 file2_stat = os.stat(file2)
1004 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1005 for attr in 'st_atime', 'st_mtime':
1006 # The modification times may be truncated in the new file.
1007 self.assertLessEqual(getattr(file1_stat, attr),
1008 getattr(file2_stat, attr) + 1)
1009 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1010 self.assertEqual(getattr(file1_stat, 'st_flags'),
1011 getattr(file2_stat, 'st_flags'))
1012
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001013 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001014 def test_make_tarball(self):
1015 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001016 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001017
1018 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001019 # force shutil to create the directory
1020 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001021 # working with relative paths
1022 work_dir = os.path.dirname(tmpdir2)
1023 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001024
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001025 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001026 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001027 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001028
1029 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001030 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001031 self.assertTrue(os.path.isfile(tarball))
1032 self.assertTrue(tarfile.is_tarfile(tarball))
1033 with tarfile.open(tarball, 'r:gz') as tf:
1034 self.assertCountEqual(tf.getnames(),
1035 ['.', './sub', './sub2',
1036 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001037
1038 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001039 with support.change_cwd(work_dir):
1040 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001041 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001042 self.assertTrue(os.path.isfile(tarball))
1043 self.assertTrue(tarfile.is_tarfile(tarball))
1044 with tarfile.open(tarball, 'r') as tf:
1045 self.assertCountEqual(tf.getnames(),
1046 ['.', './sub', './sub2',
1047 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001048
1049 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001050 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001051 names = tar.getnames()
1052 names.sort()
1053 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001054
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001055 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001056 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001057 root_dir = self.mkdtemp()
1058 dist = os.path.join(root_dir, base_dir)
1059 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001060 write_file((dist, 'file1'), 'xxx')
1061 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001062 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001063 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001064 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001065 if base_dir:
1066 write_file((root_dir, 'outer'), 'xxx')
1067 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001068
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001069 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001070 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001071 'Need the tar command to run')
1072 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001073 root_dir, base_dir = self._create_files()
1074 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001075 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001076
1077 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001078 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001079 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001080
1081 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001082 tarball2 = os.path.join(root_dir, 'archive2.tar')
1083 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001084 subprocess.check_call(tar_cmd, cwd=root_dir,
1085 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001086
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001087 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001088 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001089 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001090
1091 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001092 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1093 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001094 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001095
1096 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001097 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1098 dry_run=True)
1099 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001100 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001101
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001102 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001103 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001104 # creating something to zip
1105 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001106
1107 tmpdir2 = self.mkdtemp()
1108 # force shutil to create the directory
1109 os.rmdir(tmpdir2)
1110 # working with relative paths
1111 work_dir = os.path.dirname(tmpdir2)
1112 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001113
1114 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001115 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001116 res = make_archive(rel_base_name, 'zip', root_dir)
1117
1118 self.assertEqual(res, base_name + '.zip')
1119 self.assertTrue(os.path.isfile(res))
1120 self.assertTrue(zipfile.is_zipfile(res))
1121 with zipfile.ZipFile(res) as zf:
1122 self.assertCountEqual(zf.namelist(),
1123 ['dist/', 'dist/sub/', 'dist/sub2/',
1124 'dist/file1', 'dist/file2', 'dist/sub/file3',
1125 'outer'])
1126
1127 with support.change_cwd(work_dir):
1128 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001129 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001130
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001131 self.assertEqual(res, base_name + '.zip')
1132 self.assertTrue(os.path.isfile(res))
1133 self.assertTrue(zipfile.is_zipfile(res))
1134 with zipfile.ZipFile(res) as zf:
1135 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001136 ['dist/', 'dist/sub/', 'dist/sub2/',
1137 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001138
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001139 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001140 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001141 'Need the zip command to run')
1142 def test_zipfile_vs_zip(self):
1143 root_dir, base_dir = self._create_files()
1144 base_name = os.path.join(self.mkdtemp(), 'archive')
1145 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1146
1147 # check if ZIP file was created
1148 self.assertEqual(archive, base_name + '.zip')
1149 self.assertTrue(os.path.isfile(archive))
1150
1151 # now create another ZIP file using `zip`
1152 archive2 = os.path.join(root_dir, 'archive2.zip')
1153 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001154 subprocess.check_call(zip_cmd, cwd=root_dir,
1155 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001156
1157 self.assertTrue(os.path.isfile(archive2))
1158 # let's compare both ZIP files
1159 with zipfile.ZipFile(archive) as zf:
1160 names = zf.namelist()
1161 with zipfile.ZipFile(archive2) as zf:
1162 names2 = zf.namelist()
1163 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001164
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001165 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001166 @unittest.skipUnless(shutil.which('unzip'),
1167 'Need the unzip command to run')
1168 def test_unzip_zipfile(self):
1169 root_dir, base_dir = self._create_files()
1170 base_name = os.path.join(self.mkdtemp(), 'archive')
1171 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1172
1173 # check if ZIP file was created
1174 self.assertEqual(archive, base_name + '.zip')
1175 self.assertTrue(os.path.isfile(archive))
1176
1177 # now check the ZIP file using `unzip -t`
1178 zip_cmd = ['unzip', '-t', archive]
1179 with support.change_cwd(root_dir):
1180 try:
1181 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1182 except subprocess.CalledProcessError as exc:
1183 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001184 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001185 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001186 msg = "{}\n\n**Unzip Output**\n{}"
1187 self.fail(msg.format(exc, details))
1188
Tarek Ziadé396fad72010-02-23 05:30:31 +00001189 def test_make_archive(self):
1190 tmpdir = self.mkdtemp()
1191 base_name = os.path.join(tmpdir, 'archive')
1192 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1193
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001194 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001195 def test_make_archive_owner_group(self):
1196 # testing make_archive with owner and group, with various combinations
1197 # this works even if there's not gid/uid support
1198 if UID_GID_SUPPORT:
1199 group = grp.getgrgid(0)[0]
1200 owner = pwd.getpwuid(0)[0]
1201 else:
1202 group = owner = 'root'
1203
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001204 root_dir, base_dir = self._create_files()
1205 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001206 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1207 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001208 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001209
1210 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001211 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001212
1213 res = make_archive(base_name, 'tar', root_dir, base_dir,
1214 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001215 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001216
1217 res = make_archive(base_name, 'tar', root_dir, base_dir,
1218 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001219 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001220
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001221
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001222 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001223 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1224 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001225 root_dir, base_dir = self._create_files()
1226 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001227 group = grp.getgrgid(0)[0]
1228 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001229 with support.change_cwd(root_dir):
1230 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1231 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001232
1233 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001234 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001235
1236 # now checks the rights
1237 archive = tarfile.open(archive_name)
1238 try:
1239 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001240 self.assertEqual(member.uid, 0)
1241 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001242 finally:
1243 archive.close()
1244
1245 def test_make_archive_cwd(self):
1246 current_dir = os.getcwd()
1247 def _breaks(*args, **kw):
1248 raise RuntimeError()
1249
1250 register_archive_format('xxx', _breaks, [], 'xxx file')
1251 try:
1252 try:
1253 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1254 except Exception:
1255 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001256 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001257 finally:
1258 unregister_archive_format('xxx')
1259
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001260 def test_make_tarfile_in_curdir(self):
1261 # Issue #21280
1262 root_dir = self.mkdtemp()
1263 with support.change_cwd(root_dir):
1264 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1265 self.assertTrue(os.path.isfile('test.tar'))
1266
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001267 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001268 def test_make_zipfile_in_curdir(self):
1269 # Issue #21280
1270 root_dir = self.mkdtemp()
1271 with support.change_cwd(root_dir):
1272 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1273 self.assertTrue(os.path.isfile('test.zip'))
1274
Tarek Ziadé396fad72010-02-23 05:30:31 +00001275 def test_register_archive_format(self):
1276
1277 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1278 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1279 1)
1280 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1281 [(1, 2), (1, 2, 3)])
1282
1283 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1284 formats = [name for name, params in get_archive_formats()]
1285 self.assertIn('xxx', formats)
1286
1287 unregister_archive_format('xxx')
1288 formats = [name for name, params in get_archive_formats()]
1289 self.assertNotIn('xxx', formats)
1290
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001291 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001292 self.check_unpack_archive_with_converter(format, lambda path: path)
1293 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001294 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001295
1296 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001297 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001298 expected = rlistdir(root_dir)
1299 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001300
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001301 base_name = os.path.join(self.mkdtemp(), 'archive')
1302 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001303
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001304 # let's try to unpack it now
1305 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001306 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001307 self.assertEqual(rlistdir(tmpdir2), expected)
1308
1309 # and again, this time with the format specified
1310 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001311 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001312 self.assertEqual(rlistdir(tmpdir3), expected)
1313
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001314 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1315 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001316
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001317 def test_unpack_archive_tar(self):
1318 self.check_unpack_archive('tar')
1319
1320 @support.requires_zlib
1321 def test_unpack_archive_gztar(self):
1322 self.check_unpack_archive('gztar')
1323
1324 @support.requires_bz2
1325 def test_unpack_archive_bztar(self):
1326 self.check_unpack_archive('bztar')
1327
1328 @support.requires_lzma
1329 def test_unpack_archive_xztar(self):
1330 self.check_unpack_archive('xztar')
1331
1332 @support.requires_zlib
1333 def test_unpack_archive_zip(self):
1334 self.check_unpack_archive('zip')
1335
Martin Pantereb995702016-07-28 01:11:04 +00001336 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001337
1338 formats = get_unpack_formats()
1339
1340 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001341 self.assertEqual(extra, 1)
1342 self.assertEqual(filename, 'stuff.boo')
1343 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001344
1345 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1346 unpack_archive('stuff.boo', 'xx')
1347
1348 # trying to register a .boo unpacker again
1349 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1350 ['.boo'], _boo)
1351
1352 # should work now
1353 unregister_unpack_format('Boo')
1354 register_unpack_format('Boo2', ['.boo'], _boo)
1355 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1356 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1357
1358 # let's leave a clean state
1359 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001360 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001361
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001362 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1363 "disk_usage not available on this platform")
1364 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001365 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001366 for attr in ('total', 'used', 'free'):
1367 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001368 self.assertGreater(usage.total, 0)
1369 self.assertGreater(usage.used, 0)
1370 self.assertGreaterEqual(usage.free, 0)
1371 self.assertGreaterEqual(usage.total, usage.used)
1372 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001373
Victor Stinnerdc525f42018-12-11 12:05:21 +01001374 # bpo-32557: Check that disk_usage() also accepts a filename
1375 shutil.disk_usage(__file__)
1376
Sandro Tosid902a142011-08-22 23:28:27 +02001377 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1378 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1379 def test_chown(self):
1380
1381 # cleaned-up automatically by TestShutil.tearDown method
1382 dirname = self.mkdtemp()
1383 filename = tempfile.mktemp(dir=dirname)
1384 write_file(filename, 'testing chown function')
1385
1386 with self.assertRaises(ValueError):
1387 shutil.chown(filename)
1388
1389 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001390 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001391
1392 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001393 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001394
1395 with self.assertRaises(TypeError):
1396 shutil.chown(filename, b'spam')
1397
1398 with self.assertRaises(TypeError):
1399 shutil.chown(filename, 3.14)
1400
1401 uid = os.getuid()
1402 gid = os.getgid()
1403
1404 def check_chown(path, uid=None, gid=None):
1405 s = os.stat(filename)
1406 if uid is not None:
1407 self.assertEqual(uid, s.st_uid)
1408 if gid is not None:
1409 self.assertEqual(gid, s.st_gid)
1410
1411 shutil.chown(filename, uid, gid)
1412 check_chown(filename, uid, gid)
1413 shutil.chown(filename, uid)
1414 check_chown(filename, uid)
1415 shutil.chown(filename, user=uid)
1416 check_chown(filename, uid)
1417 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001418 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001419
1420 shutil.chown(dirname, uid, gid)
1421 check_chown(dirname, uid, gid)
1422 shutil.chown(dirname, uid)
1423 check_chown(dirname, uid)
1424 shutil.chown(dirname, user=uid)
1425 check_chown(dirname, uid)
1426 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001427 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001428
1429 user = pwd.getpwuid(uid)[0]
1430 group = grp.getgrgid(gid)[0]
1431 shutil.chown(filename, user, group)
1432 check_chown(filename, uid, gid)
1433 shutil.chown(dirname, user, group)
1434 check_chown(dirname, uid, gid)
1435
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001436 def test_copy_return_value(self):
1437 # copy and copy2 both return their destination path.
1438 for fn in (shutil.copy, shutil.copy2):
1439 src_dir = self.mkdtemp()
1440 dst_dir = self.mkdtemp()
1441 src = os.path.join(src_dir, 'foo')
1442 write_file(src, 'foo')
1443 rv = fn(src, dst_dir)
1444 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1445 rv = fn(src, os.path.join(dst_dir, 'bar'))
1446 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1447
1448 def test_copyfile_return_value(self):
1449 # copytree returns its destination path.
1450 src_dir = self.mkdtemp()
1451 dst_dir = self.mkdtemp()
1452 dst_file = os.path.join(dst_dir, 'bar')
1453 src_file = os.path.join(src_dir, 'foo')
1454 write_file(src_file, 'foo')
1455 rv = shutil.copyfile(src_file, dst_file)
1456 self.assertTrue(os.path.exists(rv))
1457 self.assertEqual(read_file(src_file), read_file(dst_file))
1458
Hynek Schlawack48653762012-10-07 12:49:58 +02001459 def test_copyfile_same_file(self):
1460 # copyfile() should raise SameFileError if the source and destination
1461 # are the same.
1462 src_dir = self.mkdtemp()
1463 src_file = os.path.join(src_dir, 'foo')
1464 write_file(src_file, 'foo')
1465 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
Hynek Schlawack27ddb572012-10-28 13:59:27 +01001466 # But Error should work too, to stay backward compatible.
1467 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001468 # Make sure file is not corrupted.
1469 self.assertEqual(read_file(src_file), 'foo')
Hynek Schlawack48653762012-10-07 12:49:58 +02001470
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001471 def test_copytree_return_value(self):
1472 # copytree returns its destination path.
1473 src_dir = self.mkdtemp()
1474 dst_dir = src_dir + "dest"
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001475 self.addCleanup(shutil.rmtree, dst_dir, True)
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001476 src = os.path.join(src_dir, 'foo')
1477 write_file(src, 'foo')
1478 rv = shutil.copytree(src_dir, dst_dir)
1479 self.assertEqual(['foo'], os.listdir(rv))
1480
Christian Heimes9bd667a2008-01-20 15:14:11 +00001481
Brian Curtinc57a3452012-06-22 16:00:30 -05001482class TestWhich(unittest.TestCase):
1483
1484 def setUp(self):
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001485 self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001486 self.addCleanup(shutil.rmtree, self.temp_dir, True)
Brian Curtinc57a3452012-06-22 16:00:30 -05001487 # Give the temp_file an ".exe" suffix for all.
1488 # It's needed on Windows and not harmful on other platforms.
1489 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001490 prefix="Tmp",
1491 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001492 os.chmod(self.temp_file.name, stat.S_IXUSR)
1493 self.addCleanup(self.temp_file.close)
1494 self.dir, self.file = os.path.split(self.temp_file.name)
1495
1496 def test_basic(self):
1497 # Given an EXE in a directory, it should be returned.
1498 rv = shutil.which(self.file, path=self.dir)
1499 self.assertEqual(rv, self.temp_file.name)
1500
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001501 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001502 # When given the fully qualified path to an executable that exists,
1503 # it should be returned.
1504 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001505 self.assertEqual(rv, self.temp_file.name)
1506
1507 def test_relative_cmd(self):
1508 # When given the relative path with a directory part to an executable
1509 # that exists, it should be returned.
1510 base_dir, tail_dir = os.path.split(self.dir)
1511 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001512 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001513 rv = shutil.which(relpath, path=self.temp_dir)
1514 self.assertEqual(rv, relpath)
1515 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001516 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001517 rv = shutil.which(relpath, path=base_dir)
1518 self.assertIsNone(rv)
1519
1520 def test_cwd(self):
1521 # Issue #16957
1522 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001523 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001524 rv = shutil.which(self.file, path=base_dir)
1525 if sys.platform == "win32":
1526 # Windows: current directory implicitly on PATH
1527 self.assertEqual(rv, os.path.join(os.curdir, self.file))
1528 else:
1529 # Other platforms: shouldn't match in the current directory.
1530 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001531
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001532 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1533 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001534 def test_non_matching_mode(self):
1535 # Set the file read-only and ask for writeable files.
1536 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001537 if os.access(self.temp_file.name, os.W_OK):
1538 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001539 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1540 self.assertIsNone(rv)
1541
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001542 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001543 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001544 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001545 rv = shutil.which(self.file, path=tail_dir)
1546 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001547
Brian Curtinc57a3452012-06-22 16:00:30 -05001548 def test_nonexistent_file(self):
1549 # Return None when no matching executable file is found on the path.
1550 rv = shutil.which("foo.exe", path=self.dir)
1551 self.assertIsNone(rv)
1552
1553 @unittest.skipUnless(sys.platform == "win32",
1554 "pathext check is Windows-only")
1555 def test_pathext_checking(self):
1556 # Ask for the file without the ".exe" extension, then ensure that
1557 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001558 rv = shutil.which(self.file[:-4], path=self.dir)
Serhiy Storchaka80c88f42013-01-22 10:31:36 +02001559 self.assertEqual(rv, self.temp_file.name[:-4] + ".EXE")
Brian Curtinc57a3452012-06-22 16:00:30 -05001560
Barry Warsaw618738b2013-04-16 11:05:03 -04001561 def test_environ_path(self):
1562 with support.EnvironmentVarGuard() as env:
Victor Stinner1d006a22013-12-16 23:39:40 +01001563 env['PATH'] = self.dir
Barry Warsaw618738b2013-04-16 11:05:03 -04001564 rv = shutil.which(self.file)
1565 self.assertEqual(rv, self.temp_file.name)
1566
1567 def test_empty_path(self):
1568 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001569 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001570 support.EnvironmentVarGuard() as env:
Victor Stinner1d006a22013-12-16 23:39:40 +01001571 env['PATH'] = self.dir
Barry Warsaw618738b2013-04-16 11:05:03 -04001572 rv = shutil.which(self.file, path='')
1573 self.assertIsNone(rv)
1574
1575 def test_empty_path_no_PATH(self):
1576 with support.EnvironmentVarGuard() as env:
1577 env.pop('PATH', None)
1578 rv = shutil.which(self.file)
1579 self.assertIsNone(rv)
1580
Brian Curtinc57a3452012-06-22 16:00:30 -05001581
Christian Heimesada8c3b2008-03-18 18:26:33 +00001582class TestMove(unittest.TestCase):
1583
1584 def setUp(self):
1585 filename = "foo"
1586 self.src_dir = tempfile.mkdtemp()
1587 self.dst_dir = tempfile.mkdtemp()
1588 self.src_file = os.path.join(self.src_dir, filename)
1589 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001590 with open(self.src_file, "wb") as f:
1591 f.write(b"spam")
1592
1593 def tearDown(self):
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001594 for d in (self.src_dir, self.dst_dir):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001595 try:
1596 if d:
1597 shutil.rmtree(d)
1598 except:
1599 pass
1600
1601 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001602 with open(src, "rb") as f:
1603 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001604 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001605 with open(real_dst, "rb") as f:
1606 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001607 self.assertFalse(os.path.exists(src))
1608
1609 def _check_move_dir(self, src, dst, real_dst):
1610 contents = sorted(os.listdir(src))
1611 shutil.move(src, dst)
1612 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1613 self.assertFalse(os.path.exists(src))
1614
1615 def test_move_file(self):
1616 # Move a file to another location on the same filesystem.
1617 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1618
1619 def test_move_file_to_dir(self):
1620 # Move a file inside an existing dir on the same filesystem.
1621 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1622
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001623 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001624 def test_move_file_other_fs(self):
1625 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001626 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001627
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001628 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001629 def test_move_file_to_dir_other_fs(self):
1630 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001631 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001632
1633 def test_move_dir(self):
1634 # Move a dir to another location on the same filesystem.
1635 dst_dir = tempfile.mktemp()
1636 try:
1637 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1638 finally:
1639 try:
1640 shutil.rmtree(dst_dir)
1641 except:
1642 pass
1643
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001644 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001645 def test_move_dir_other_fs(self):
1646 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001647 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001648
1649 def test_move_dir_to_dir(self):
1650 # Move a dir inside an existing dir on the same filesystem.
1651 self._check_move_dir(self.src_dir, self.dst_dir,
1652 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1653
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001654 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001655 def test_move_dir_to_dir_other_fs(self):
1656 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001657 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001658
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001659 def test_move_dir_sep_to_dir(self):
1660 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1661 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1662
1663 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1664 def test_move_dir_altsep_to_dir(self):
1665 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1666 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1667
Christian Heimesada8c3b2008-03-18 18:26:33 +00001668 def test_existing_file_inside_dest_dir(self):
1669 # A file with the same name inside the destination dir already exists.
1670 with open(self.dst_file, "wb"):
1671 pass
1672 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1673
1674 def test_dont_move_dir_in_itself(self):
1675 # Moving a dir inside itself raises an Error.
1676 dst = os.path.join(self.src_dir, "bar")
1677 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1678
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001679 def test_destinsrc_false_negative(self):
1680 os.mkdir(TESTFN)
1681 try:
1682 for src, dst in [('srcdir', 'srcdir/dest')]:
1683 src = os.path.join(TESTFN, src)
1684 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001685 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001686 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001687 'dst (%s) is not in src (%s)' % (dst, src))
1688 finally:
1689 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001690
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001691 def test_destinsrc_false_positive(self):
1692 os.mkdir(TESTFN)
1693 try:
1694 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1695 src = os.path.join(TESTFN, src)
1696 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001697 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001698 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001699 'dst (%s) is in src (%s)' % (dst, src))
1700 finally:
1701 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001702
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001703 @support.skip_unless_symlink
1704 @mock_rename
1705 def test_move_file_symlink(self):
1706 dst = os.path.join(self.src_dir, 'bar')
1707 os.symlink(self.src_file, dst)
1708 shutil.move(dst, self.dst_file)
1709 self.assertTrue(os.path.islink(self.dst_file))
1710 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1711
1712 @support.skip_unless_symlink
1713 @mock_rename
1714 def test_move_file_symlink_to_dir(self):
1715 filename = "bar"
1716 dst = os.path.join(self.src_dir, filename)
1717 os.symlink(self.src_file, dst)
1718 shutil.move(dst, self.dst_dir)
1719 final_link = os.path.join(self.dst_dir, filename)
1720 self.assertTrue(os.path.islink(final_link))
1721 self.assertTrue(os.path.samefile(self.src_file, final_link))
1722
1723 @support.skip_unless_symlink
1724 @mock_rename
1725 def test_move_dangling_symlink(self):
1726 src = os.path.join(self.src_dir, 'baz')
1727 dst = os.path.join(self.src_dir, 'bar')
1728 os.symlink(src, dst)
1729 dst_link = os.path.join(self.dst_dir, 'quux')
1730 shutil.move(dst, dst_link)
1731 self.assertTrue(os.path.islink(dst_link))
Antoine Pitrou3f48ac92014-01-01 02:50:45 +01001732 # On Windows, os.path.realpath does not follow symlinks (issue #9949)
1733 if os.name == 'nt':
1734 self.assertEqual(os.path.realpath(src), os.readlink(dst_link))
1735 else:
1736 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001737
1738 @support.skip_unless_symlink
1739 @mock_rename
1740 def test_move_dir_symlink(self):
1741 src = os.path.join(self.src_dir, 'baz')
1742 dst = os.path.join(self.src_dir, 'bar')
1743 os.mkdir(src)
1744 os.symlink(src, dst)
1745 dst_link = os.path.join(self.dst_dir, 'quux')
1746 shutil.move(dst, dst_link)
1747 self.assertTrue(os.path.islink(dst_link))
1748 self.assertTrue(os.path.samefile(src, dst_link))
1749
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001750 def test_move_return_value(self):
1751 rv = shutil.move(self.src_file, self.dst_dir)
1752 self.assertEqual(rv,
1753 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1754
1755 def test_move_as_rename_return_value(self):
1756 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1757 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1758
R David Murray6ffface2014-06-11 14:40:13 -04001759 @mock_rename
1760 def test_move_file_special_function(self):
1761 moved = []
1762 def _copy(src, dst):
1763 moved.append((src, dst))
1764 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1765 self.assertEqual(len(moved), 1)
1766
1767 @mock_rename
1768 def test_move_dir_special_function(self):
1769 moved = []
1770 def _copy(src, dst):
1771 moved.append((src, dst))
1772 support.create_empty_file(os.path.join(self.src_dir, 'child'))
1773 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1774 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1775 self.assertEqual(len(moved), 3)
1776
Tarek Ziadé5340db32010-04-19 22:30:51 +00001777
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001778class TestCopyFile(unittest.TestCase):
1779
1780 _delete = False
1781
1782 class Faux(object):
1783 _entered = False
1784 _exited_with = None
1785 _raised = False
1786 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
1787 self._raise_in_exit = raise_in_exit
1788 self._suppress_at_exit = suppress_at_exit
1789 def read(self, *args):
1790 return ''
1791 def __enter__(self):
1792 self._entered = True
1793 def __exit__(self, exc_type, exc_val, exc_tb):
1794 self._exited_with = exc_type, exc_val, exc_tb
1795 if self._raise_in_exit:
1796 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001797 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001798 return self._suppress_at_exit
1799
1800 def tearDown(self):
1801 if self._delete:
1802 del shutil.open
1803
1804 def _set_shutil_open(self, func):
1805 shutil.open = func
1806 self._delete = True
1807
1808 def test_w_source_open_fails(self):
1809 def _open(filename, mode='r'):
1810 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001811 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001812 assert 0 # shouldn't reach here.
1813
1814 self._set_shutil_open(_open)
1815
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001816 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001817
Victor Stinner937ee9e2018-06-26 02:11:06 +02001818 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001819 def test_w_dest_open_fails(self):
1820
1821 srcfile = self.Faux()
1822
1823 def _open(filename, mode='r'):
1824 if filename == 'srcfile':
1825 return srcfile
1826 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001827 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001828 assert 0 # shouldn't reach here.
1829
1830 self._set_shutil_open(_open)
1831
1832 shutil.copyfile('srcfile', 'destfile')
1833 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001834 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001835 self.assertEqual(srcfile._exited_with[1].args,
1836 ('Cannot open "destfile"',))
1837
Victor Stinner937ee9e2018-06-26 02:11:06 +02001838 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001839 def test_w_dest_close_fails(self):
1840
1841 srcfile = self.Faux()
1842 destfile = self.Faux(True)
1843
1844 def _open(filename, mode='r'):
1845 if filename == 'srcfile':
1846 return srcfile
1847 if filename == 'destfile':
1848 return destfile
1849 assert 0 # shouldn't reach here.
1850
1851 self._set_shutil_open(_open)
1852
1853 shutil.copyfile('srcfile', 'destfile')
1854 self.assertTrue(srcfile._entered)
1855 self.assertTrue(destfile._entered)
1856 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001857 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001858 self.assertEqual(srcfile._exited_with[1].args,
1859 ('Cannot close',))
1860
Victor Stinner937ee9e2018-06-26 02:11:06 +02001861 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001862 def test_w_source_close_fails(self):
1863
1864 srcfile = self.Faux(True)
1865 destfile = self.Faux()
1866
1867 def _open(filename, mode='r'):
1868 if filename == 'srcfile':
1869 return srcfile
1870 if filename == 'destfile':
1871 return destfile
1872 assert 0 # shouldn't reach here.
1873
1874 self._set_shutil_open(_open)
1875
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001876 self.assertRaises(OSError,
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001877 shutil.copyfile, 'srcfile', 'destfile')
1878 self.assertTrue(srcfile._entered)
1879 self.assertTrue(destfile._entered)
1880 self.assertFalse(destfile._raised)
1881 self.assertTrue(srcfile._exited_with[0] is None)
1882 self.assertTrue(srcfile._raised)
1883
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001884 def test_move_dir_caseinsensitive(self):
1885 # Renames a folder to the same name
1886 # but a different case.
1887
1888 self.src_dir = tempfile.mkdtemp()
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001889 self.addCleanup(shutil.rmtree, self.src_dir, True)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001890 dst_dir = os.path.join(
1891 os.path.dirname(self.src_dir),
1892 os.path.basename(self.src_dir).upper())
1893 self.assertNotEqual(self.src_dir, dst_dir)
1894
1895 try:
1896 shutil.move(self.src_dir, dst_dir)
1897 self.assertTrue(os.path.isdir(dst_dir))
1898 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +02001899 os.rmdir(dst_dir)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001900
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001901
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07001902class TestCopyFileObj(unittest.TestCase):
1903 FILESIZE = 2 * 1024 * 1024
1904
1905 @classmethod
1906 def setUpClass(cls):
1907 write_test_file(TESTFN, cls.FILESIZE)
1908
1909 @classmethod
1910 def tearDownClass(cls):
1911 support.unlink(TESTFN)
1912 support.unlink(TESTFN2)
1913
1914 def tearDown(self):
1915 support.unlink(TESTFN2)
1916
1917 @contextlib.contextmanager
1918 def get_files(self):
1919 with open(TESTFN, "rb") as src:
1920 with open(TESTFN2, "wb") as dst:
1921 yield (src, dst)
1922
1923 def assert_files_eq(self, src, dst):
1924 with open(src, 'rb') as fsrc:
1925 with open(dst, 'rb') as fdst:
1926 self.assertEqual(fsrc.read(), fdst.read())
1927
1928 def test_content(self):
1929 with self.get_files() as (src, dst):
1930 shutil.copyfileobj(src, dst)
1931 self.assert_files_eq(TESTFN, TESTFN2)
1932
1933 def test_file_not_closed(self):
1934 with self.get_files() as (src, dst):
1935 shutil.copyfileobj(src, dst)
1936 assert not src.closed
1937 assert not dst.closed
1938
1939 def test_file_offset(self):
1940 with self.get_files() as (src, dst):
1941 shutil.copyfileobj(src, dst)
1942 self.assertEqual(src.tell(), self.FILESIZE)
1943 self.assertEqual(dst.tell(), self.FILESIZE)
1944
1945 @unittest.skipIf(os.name != 'nt', "Windows only")
1946 def test_win_impl(self):
1947 # Make sure alternate Windows implementation is called.
1948 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1949 shutil.copyfile(TESTFN, TESTFN2)
1950 assert m.called
1951
1952 # File size is 2 MiB but max buf size should be 1 MiB.
1953 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
1954
1955 # If file size < 1 MiB memoryview() length must be equal to
1956 # the actual file size.
1957 with tempfile.NamedTemporaryFile(delete=False) as f:
1958 f.write(b'foo')
1959 fname = f.name
1960 self.addCleanup(support.unlink, fname)
1961 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1962 shutil.copyfile(fname, TESTFN2)
1963 self.assertEqual(m.call_args[0][2], 3)
1964
1965 # Empty files should not rely on readinto() variant.
1966 with tempfile.NamedTemporaryFile(delete=False) as f:
1967 pass
1968 fname = f.name
1969 self.addCleanup(support.unlink, fname)
1970 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1971 shutil.copyfile(fname, TESTFN2)
1972 assert not m.called
1973 self.assert_files_eq(fname, TESTFN2)
1974
1975
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001976class _ZeroCopyFileTest(object):
1977 """Tests common to all zero-copy APIs."""
1978 FILESIZE = (10 * 1024 * 1024) # 10 MiB
1979 FILEDATA = b""
1980 PATCHPOINT = ""
1981
1982 @classmethod
1983 def setUpClass(cls):
1984 write_test_file(TESTFN, cls.FILESIZE)
1985 with open(TESTFN, 'rb') as f:
1986 cls.FILEDATA = f.read()
1987 assert len(cls.FILEDATA) == cls.FILESIZE
1988
1989 @classmethod
1990 def tearDownClass(cls):
1991 support.unlink(TESTFN)
1992
1993 def tearDown(self):
1994 support.unlink(TESTFN2)
1995
1996 @contextlib.contextmanager
1997 def get_files(self):
1998 with open(TESTFN, "rb") as src:
1999 with open(TESTFN2, "wb") as dst:
2000 yield (src, dst)
2001
2002 def zerocopy_fun(self, *args, **kwargs):
2003 raise NotImplementedError("must be implemented in subclass")
2004
2005 def reset(self):
2006 self.tearDown()
2007 self.tearDownClass()
2008 self.setUpClass()
2009 self.setUp()
2010
2011 # ---
2012
2013 def test_regular_copy(self):
2014 with self.get_files() as (src, dst):
2015 self.zerocopy_fun(src, dst)
2016 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2017 # Make sure the fallback function is not called.
2018 with self.get_files() as (src, dst):
2019 with unittest.mock.patch('shutil.copyfileobj') as m:
2020 shutil.copyfile(TESTFN, TESTFN2)
2021 assert not m.called
2022
2023 def test_same_file(self):
2024 self.addCleanup(self.reset)
2025 with self.get_files() as (src, dst):
2026 with self.assertRaises(Exception):
2027 self.zerocopy_fun(src, src)
2028 # Make sure src file is not corrupted.
2029 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2030
2031 def test_non_existent_src(self):
2032 name = tempfile.mktemp()
2033 with self.assertRaises(FileNotFoundError) as cm:
2034 shutil.copyfile(name, "new")
2035 self.assertEqual(cm.exception.filename, name)
2036
2037 def test_empty_file(self):
2038 srcname = TESTFN + 'src'
2039 dstname = TESTFN + 'dst'
2040 self.addCleanup(lambda: support.unlink(srcname))
2041 self.addCleanup(lambda: support.unlink(dstname))
2042 with open(srcname, "wb"):
2043 pass
2044
2045 with open(srcname, "rb") as src:
2046 with open(dstname, "wb") as dst:
2047 self.zerocopy_fun(src, dst)
2048
2049 self.assertEqual(read_file(dstname, binary=True), b"")
2050
2051 def test_unhandled_exception(self):
2052 with unittest.mock.patch(self.PATCHPOINT,
2053 side_effect=ZeroDivisionError):
2054 self.assertRaises(ZeroDivisionError,
2055 shutil.copyfile, TESTFN, TESTFN2)
2056
2057 def test_exception_on_first_call(self):
2058 # Emulate a case where the first call to the zero-copy
2059 # function raises an exception in which case the function is
2060 # supposed to give up immediately.
2061 with unittest.mock.patch(self.PATCHPOINT,
2062 side_effect=OSError(errno.EINVAL, "yo")):
2063 with self.get_files() as (src, dst):
2064 with self.assertRaises(_GiveupOnFastCopy):
2065 self.zerocopy_fun(src, dst)
2066
2067 def test_filesystem_full(self):
2068 # Emulate a case where filesystem is full and sendfile() fails
2069 # on first call.
2070 with unittest.mock.patch(self.PATCHPOINT,
2071 side_effect=OSError(errno.ENOSPC, "yo")):
2072 with self.get_files() as (src, dst):
2073 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2074
2075
2076@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2077class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2078 PATCHPOINT = "os.sendfile"
2079
2080 def zerocopy_fun(self, fsrc, fdst):
2081 return shutil._fastcopy_sendfile(fsrc, fdst)
2082
2083 def test_non_regular_file_src(self):
2084 with io.BytesIO(self.FILEDATA) as src:
2085 with open(TESTFN2, "wb") as dst:
2086 with self.assertRaises(_GiveupOnFastCopy):
2087 self.zerocopy_fun(src, dst)
2088 shutil.copyfileobj(src, dst)
2089
2090 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2091
2092 def test_non_regular_file_dst(self):
2093 with open(TESTFN, "rb") as src:
2094 with io.BytesIO() as dst:
2095 with self.assertRaises(_GiveupOnFastCopy):
2096 self.zerocopy_fun(src, dst)
2097 shutil.copyfileobj(src, dst)
2098 dst.seek(0)
2099 self.assertEqual(dst.read(), self.FILEDATA)
2100
2101 def test_exception_on_second_call(self):
2102 def sendfile(*args, **kwargs):
2103 if not flag:
2104 flag.append(None)
2105 return orig_sendfile(*args, **kwargs)
2106 else:
2107 raise OSError(errno.EBADF, "yo")
2108
2109 flag = []
2110 orig_sendfile = os.sendfile
2111 with unittest.mock.patch('os.sendfile', create=True,
2112 side_effect=sendfile):
2113 with self.get_files() as (src, dst):
2114 with self.assertRaises(OSError) as cm:
2115 shutil._fastcopy_sendfile(src, dst)
2116 assert flag
2117 self.assertEqual(cm.exception.errno, errno.EBADF)
2118
2119 def test_cant_get_size(self):
2120 # Emulate a case where src file size cannot be determined.
2121 # Internally bufsize will be set to a small value and
2122 # sendfile() will be called repeatedly.
2123 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2124 with self.get_files() as (src, dst):
2125 shutil._fastcopy_sendfile(src, dst)
2126 assert m.called
2127 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2128
2129 def test_small_chunks(self):
2130 # Force internal file size detection to be smaller than the
2131 # actual file size. We want to force sendfile() to be called
2132 # multiple times, also in order to emulate a src fd which gets
2133 # bigger while it is being copied.
2134 mock = unittest.mock.Mock()
2135 mock.st_size = 65536 + 1
2136 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2137 with self.get_files() as (src, dst):
2138 shutil._fastcopy_sendfile(src, dst)
2139 assert m.called
2140 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2141
2142 def test_big_chunk(self):
2143 # Force internal file size detection to be +100MB bigger than
2144 # the actual file size. Make sure sendfile() does not rely on
2145 # file size value except for (maybe) a better throughput /
2146 # performance.
2147 mock = unittest.mock.Mock()
2148 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2149 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2150 with self.get_files() as (src, dst):
2151 shutil._fastcopy_sendfile(src, dst)
2152 assert m.called
2153 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2154
2155 def test_blocksize_arg(self):
2156 with unittest.mock.patch('os.sendfile',
2157 side_effect=ZeroDivisionError) as m:
2158 self.assertRaises(ZeroDivisionError,
2159 shutil.copyfile, TESTFN, TESTFN2)
2160 blocksize = m.call_args[0][3]
2161 # Make sure file size and the block size arg passed to
2162 # sendfile() are the same.
2163 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2164 # ...unless we're dealing with a small file.
2165 support.unlink(TESTFN2)
2166 write_file(TESTFN2, b"hello", binary=True)
2167 self.addCleanup(support.unlink, TESTFN2 + '3')
2168 self.assertRaises(ZeroDivisionError,
2169 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2170 blocksize = m.call_args[0][3]
2171 self.assertEqual(blocksize, 2 ** 23)
2172
2173 def test_file2file_not_supported(self):
2174 # Emulate a case where sendfile() only support file->socket
2175 # fds. In such a case copyfile() is supposed to skip the
2176 # fast-copy attempt from then on.
2177 assert shutil._HAS_SENDFILE
2178 try:
2179 with unittest.mock.patch(
2180 self.PATCHPOINT,
2181 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2182 with self.get_files() as (src, dst):
2183 with self.assertRaises(_GiveupOnFastCopy):
2184 shutil._fastcopy_sendfile(src, dst)
2185 assert m.called
2186 assert not shutil._HAS_SENDFILE
2187
2188 with unittest.mock.patch(self.PATCHPOINT) as m:
2189 shutil.copyfile(TESTFN, TESTFN2)
2190 assert not m.called
2191 finally:
2192 shutil._HAS_SENDFILE = True
2193
2194
Victor Stinner937ee9e2018-06-26 02:11:06 +02002195@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002196class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002197 PATCHPOINT = "posix._fcopyfile"
2198
2199 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002200 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002201
2202
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002203class TermsizeTests(unittest.TestCase):
2204 def test_does_not_crash(self):
2205 """Check if get_terminal_size() returns a meaningful value.
2206
2207 There's no easy portable way to actually check the size of the
2208 terminal, so let's check if it returns something sensible instead.
2209 """
2210 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002211 self.assertGreaterEqual(size.columns, 0)
2212 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002213
2214 def test_os_environ_first(self):
2215 "Check if environment variables have precedence"
2216
2217 with support.EnvironmentVarGuard() as env:
2218 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002219 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002220 size = shutil.get_terminal_size()
2221 self.assertEqual(size.columns, 777)
2222
2223 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002224 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002225 env['LINES'] = '888'
2226 size = shutil.get_terminal_size()
2227 self.assertEqual(size.lines, 888)
2228
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002229 def test_bad_environ(self):
2230 with support.EnvironmentVarGuard() as env:
2231 env['COLUMNS'] = 'xxx'
2232 env['LINES'] = 'yyy'
2233 size = shutil.get_terminal_size()
2234 self.assertGreaterEqual(size.columns, 0)
2235 self.assertGreaterEqual(size.lines, 0)
2236
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002237 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002238 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2239 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002240 def test_stty_match(self):
2241 """Check if stty returns the same results ignoring env
2242
2243 This test will fail if stdin and stdout are connected to
2244 different terminals with different sizes. Nevertheless, such
2245 situations should be pretty rare.
2246 """
2247 try:
2248 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002249 except (FileNotFoundError, PermissionError,
2250 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002251 self.skipTest("stty invocation failed")
2252 expected = (int(size[1]), int(size[0])) # reversed order
2253
2254 with support.EnvironmentVarGuard() as env:
2255 del env['LINES']
2256 del env['COLUMNS']
2257 actual = shutil.get_terminal_size()
2258
2259 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002260
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002261 def test_fallback(self):
2262 with support.EnvironmentVarGuard() as env:
2263 del env['LINES']
2264 del env['COLUMNS']
2265
2266 # sys.__stdout__ has no fileno()
2267 with support.swap_attr(sys, '__stdout__', None):
2268 size = shutil.get_terminal_size(fallback=(10, 20))
2269 self.assertEqual(size.columns, 10)
2270 self.assertEqual(size.lines, 20)
2271
2272 # sys.__stdout__ is not a terminal on Unix
2273 # or fileno() not in (0, 1, 2) on Windows
2274 with open(os.devnull, 'w') as f, \
2275 support.swap_attr(sys, '__stdout__', f):
2276 size = shutil.get_terminal_size(fallback=(30, 40))
2277 self.assertEqual(size.columns, 30)
2278 self.assertEqual(size.lines, 40)
2279
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002280
Berker Peksag8083cd62014-11-01 11:04:06 +02002281class PublicAPITests(unittest.TestCase):
2282 """Ensures that the correct values are exposed in the public API."""
2283
2284 def test_module_all_attribute(self):
2285 self.assertTrue(hasattr(shutil, '__all__'))
2286 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2287 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2288 'SpecialFileError', 'ExecError', 'make_archive',
2289 'get_archive_formats', 'register_archive_format',
2290 'unregister_archive_format', 'get_unpack_formats',
2291 'register_unpack_format', 'unregister_unpack_format',
2292 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2293 'get_terminal_size', 'SameFileError']
2294 if hasattr(os, 'statvfs') or os.name == 'nt':
2295 target_api.append('disk_usage')
2296 self.assertEqual(set(shutil.__all__), set(target_api))
2297
2298
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002299if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002300 unittest.main()