blob: 7e0a3292e0f8a4697e59516d835c7b0ad4f96e33 [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Tarek Ziadé396fad72010-02-23 05:30:31 +000037try:
38 import grp
39 import pwd
40 UID_GID_SUPPORT = True
41except ImportError:
42 UID_GID_SUPPORT = False
43
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040044def _fake_rename(*args, **kwargs):
45 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010046 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040047
48def mock_rename(func):
49 @functools.wraps(func)
50 def wrap(*args, **kwargs):
51 try:
52 builtin_rename = os.rename
53 os.rename = _fake_rename
54 return func(*args, **kwargs)
55 finally:
56 os.rename = builtin_rename
57 return wrap
58
Éric Araujoa7e33a12011-08-12 19:51:35 +020059def write_file(path, content, binary=False):
60 """Write *content* to a file located at *path*.
61
62 If *path* is a tuple instead of a string, os.path.join will be used to
63 make a path. If *binary* is true, the file will be opened in binary
64 mode.
65 """
66 if isinstance(path, tuple):
67 path = os.path.join(*path)
68 with open(path, 'wb' if binary else 'w') as fp:
69 fp.write(content)
70
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020071def write_test_file(path, size):
72 """Create a test file with an arbitrary size and random text content."""
73 def chunks(total, step):
74 assert total >= step
75 while total > step:
76 yield step
77 total -= step
78 if total:
79 yield total
80
81 bufsize = min(size, 8192)
82 chunk = b"".join([random.choice(string.ascii_letters).encode()
83 for i in range(bufsize)])
84 with open(path, 'wb') as f:
85 for csize in chunks(size, bufsize):
86 f.write(chunk)
87 assert os.path.getsize(path) == size
88
Éric Araujoa7e33a12011-08-12 19:51:35 +020089def read_file(path, binary=False):
90 """Return contents from a file located at *path*.
91
92 If *path* is a tuple instead of a string, os.path.join will be used to
93 make a path. If *binary* is true, the file will be opened in binary
94 mode.
95 """
96 if isinstance(path, tuple):
97 path = os.path.join(*path)
98 with open(path, 'rb' if binary else 'r') as fp:
99 return fp.read()
100
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300101def rlistdir(path):
102 res = []
103 for name in sorted(os.listdir(path)):
104 p = os.path.join(path, name)
105 if os.path.isdir(p) and not os.path.islink(p):
106 res.append(name + '/')
107 for n in rlistdir(p):
108 res.append(name + '/' + n)
109 else:
110 res.append(name)
111 return res
112
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200113def supports_file2file_sendfile():
114 # ...apparently Linux and Solaris are the only ones
115 if not hasattr(os, "sendfile"):
116 return False
117 srcname = None
118 dstname = None
119 try:
120 with tempfile.NamedTemporaryFile("wb", delete=False) as f:
121 srcname = f.name
122 f.write(b"0123456789")
123
124 with open(srcname, "rb") as src:
125 with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
126 dstname = f.name
127 infd = src.fileno()
128 outfd = dst.fileno()
129 try:
130 os.sendfile(outfd, infd, 0, 2)
131 except OSError:
132 return False
133 else:
134 return True
135 finally:
136 if srcname is not None:
137 support.unlink(srcname)
138 if dstname is not None:
139 support.unlink(dstname)
140
141
142SUPPORTS_SENDFILE = supports_file2file_sendfile()
143
Éric Araujoa7e33a12011-08-12 19:51:35 +0200144
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000145class TestShutil(unittest.TestCase):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000146
147 def setUp(self):
148 super(TestShutil, self).setUp()
149 self.tempdirs = []
150
151 def tearDown(self):
152 super(TestShutil, self).tearDown()
153 while self.tempdirs:
154 d = self.tempdirs.pop()
155 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
156
Tarek Ziadé396fad72010-02-23 05:30:31 +0000157
158 def mkdtemp(self):
159 """Create a temporary directory that will be cleaned up.
160
161 Returns the path of the directory.
162 """
163 d = tempfile.mkdtemp()
164 self.tempdirs.append(d)
165 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000166
Hynek Schlawack3b527782012-06-25 13:27:31 +0200167 def test_rmtree_works_on_bytes(self):
168 tmp = self.mkdtemp()
169 victim = os.path.join(tmp, 'killme')
170 os.mkdir(victim)
171 write_file(os.path.join(victim, 'somefile'), 'foo')
172 victim = os.fsencode(victim)
173 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700174 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200175
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200176 @support.skip_unless_symlink
177 def test_rmtree_fails_on_symlink(self):
178 tmp = self.mkdtemp()
179 dir_ = os.path.join(tmp, 'dir')
180 os.mkdir(dir_)
181 link = os.path.join(tmp, 'link')
182 os.symlink(dir_, link)
183 self.assertRaises(OSError, shutil.rmtree, link)
184 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100185 self.assertTrue(os.path.lexists(link))
186 errors = []
187 def onerror(*args):
188 errors.append(args)
189 shutil.rmtree(link, onerror=onerror)
190 self.assertEqual(len(errors), 1)
191 self.assertIs(errors[0][0], os.path.islink)
192 self.assertEqual(errors[0][1], link)
193 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200194
195 @support.skip_unless_symlink
196 def test_rmtree_works_on_symlinks(self):
197 tmp = self.mkdtemp()
198 dir1 = os.path.join(tmp, 'dir1')
199 dir2 = os.path.join(dir1, 'dir2')
200 dir3 = os.path.join(tmp, 'dir3')
201 for d in dir1, dir2, dir3:
202 os.mkdir(d)
203 file1 = os.path.join(tmp, 'file1')
204 write_file(file1, 'foo')
205 link1 = os.path.join(dir1, 'link1')
206 os.symlink(dir2, link1)
207 link2 = os.path.join(dir1, 'link2')
208 os.symlink(dir3, link2)
209 link3 = os.path.join(dir1, 'link3')
210 os.symlink(file1, link3)
211 # make sure symlinks are removed but not followed
212 shutil.rmtree(dir1)
213 self.assertFalse(os.path.exists(dir1))
214 self.assertTrue(os.path.exists(dir3))
215 self.assertTrue(os.path.exists(file1))
216
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000217 def test_rmtree_errors(self):
218 # filename is guaranteed not to exist
219 filename = tempfile.mktemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100220 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
221 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100222 shutil.rmtree(filename, ignore_errors=True)
223
224 # existing file
225 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100226 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100227 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100228 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100229 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100230 # The reason for this rather odd construct is that Windows sprinkles
231 # a \*.* at the end of file names. But only sometimes on some buildbots
232 possible_args = [filename, os.path.join(filename, '*.*')]
233 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100234 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100235 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100236 shutil.rmtree(filename, ignore_errors=True)
237 self.assertTrue(os.path.exists(filename))
238 errors = []
239 def onerror(*args):
240 errors.append(args)
241 shutil.rmtree(filename, onerror=onerror)
242 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200243 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100244 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100245 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100246 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100247 self.assertIs(errors[1][0], os.rmdir)
248 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100249 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100250 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000251
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000252
Serhiy Storchaka43767632013-11-03 21:31:38 +0200253 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
254 @unittest.skipIf(sys.platform[:6] == 'cygwin',
255 "This test can't be run on Cygwin (issue #1071513).")
256 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
257 "This test can't be run reliably as root (issue #1076467).")
258 def test_on_error(self):
259 self.errorState = 0
260 os.mkdir(TESTFN)
261 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200262
Serhiy Storchaka43767632013-11-03 21:31:38 +0200263 self.child_file_path = os.path.join(TESTFN, 'a')
264 self.child_dir_path = os.path.join(TESTFN, 'b')
265 support.create_empty_file(self.child_file_path)
266 os.mkdir(self.child_dir_path)
267 old_dir_mode = os.stat(TESTFN).st_mode
268 old_child_file_mode = os.stat(self.child_file_path).st_mode
269 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
270 # Make unwritable.
271 new_mode = stat.S_IREAD|stat.S_IEXEC
272 os.chmod(self.child_file_path, new_mode)
273 os.chmod(self.child_dir_path, new_mode)
274 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000275
Serhiy Storchaka43767632013-11-03 21:31:38 +0200276 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
277 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
278 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200279
Serhiy Storchaka43767632013-11-03 21:31:38 +0200280 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
281 # Test whether onerror has actually been called.
282 self.assertEqual(self.errorState, 3,
283 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000284
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000285 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000286 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200287 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000288 # This function is run when shutil.rmtree fails.
289 # 99.9% of the time it initially fails to remove
290 # a file in the directory, so the first time through
291 # func is os.remove.
292 # However, some Linux machines running ZFS on
293 # FUSE experienced a failure earlier in the process
294 # at os.listdir. The first failure may legally
295 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200296 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200297 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200298 self.assertEqual(arg, self.child_file_path)
299 elif func is os.rmdir:
300 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000301 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200302 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200303 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000304 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200305 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000306 else:
307 self.assertEqual(func, os.rmdir)
308 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000309 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200310 self.errorState = 3
311
312 def test_rmtree_does_not_choke_on_failing_lstat(self):
313 try:
314 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200315 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200316 if fn != TESTFN:
317 raise OSError()
318 else:
319 return orig_lstat(fn)
320 os.lstat = raiser
321
322 os.mkdir(TESTFN)
323 write_file((TESTFN, 'foo'), 'foo')
324 shutil.rmtree(TESTFN)
325 finally:
326 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000327
Antoine Pitrou78091e62011-12-29 18:54:15 +0100328 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
329 @support.skip_unless_symlink
330 def test_copymode_follow_symlinks(self):
331 tmp_dir = self.mkdtemp()
332 src = os.path.join(tmp_dir, 'foo')
333 dst = os.path.join(tmp_dir, 'bar')
334 src_link = os.path.join(tmp_dir, 'baz')
335 dst_link = os.path.join(tmp_dir, 'quux')
336 write_file(src, 'foo')
337 write_file(dst, 'foo')
338 os.symlink(src, src_link)
339 os.symlink(dst, dst_link)
340 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
341 # file to file
342 os.chmod(dst, stat.S_IRWXO)
343 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
344 shutil.copymode(src, dst)
345 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou3f48ac92014-01-01 02:50:45 +0100346 # On Windows, os.chmod does not follow symlinks (issue #15411)
347 if os.name != 'nt':
348 # follow src link
349 os.chmod(dst, stat.S_IRWXO)
350 shutil.copymode(src_link, dst)
351 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
352 # follow dst link
353 os.chmod(dst, stat.S_IRWXO)
354 shutil.copymode(src, dst_link)
355 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
356 # follow both links
357 os.chmod(dst, stat.S_IRWXO)
358 shutil.copymode(src_link, dst_link)
359 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100360
361 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
362 @support.skip_unless_symlink
363 def test_copymode_symlink_to_symlink(self):
364 tmp_dir = self.mkdtemp()
365 src = os.path.join(tmp_dir, 'foo')
366 dst = os.path.join(tmp_dir, 'bar')
367 src_link = os.path.join(tmp_dir, 'baz')
368 dst_link = os.path.join(tmp_dir, 'quux')
369 write_file(src, 'foo')
370 write_file(dst, 'foo')
371 os.symlink(src, src_link)
372 os.symlink(dst, dst_link)
373 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
374 os.chmod(dst, stat.S_IRWXU)
375 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
376 # link to link
377 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700378 shutil.copymode(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100379 self.assertEqual(os.lstat(src_link).st_mode,
380 os.lstat(dst_link).st_mode)
381 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
382 # src link - use chmod
383 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700384 shutil.copymode(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100385 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
386 # dst link - use chmod
387 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700388 shutil.copymode(src, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100389 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
390
391 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
392 @support.skip_unless_symlink
393 def test_copymode_symlink_to_symlink_wo_lchmod(self):
394 tmp_dir = self.mkdtemp()
395 src = os.path.join(tmp_dir, 'foo')
396 dst = os.path.join(tmp_dir, 'bar')
397 src_link = os.path.join(tmp_dir, 'baz')
398 dst_link = os.path.join(tmp_dir, 'quux')
399 write_file(src, 'foo')
400 write_file(dst, 'foo')
401 os.symlink(src, src_link)
402 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700403 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
Antoine Pitrou78091e62011-12-29 18:54:15 +0100404
405 @support.skip_unless_symlink
406 def test_copystat_symlinks(self):
407 tmp_dir = self.mkdtemp()
408 src = os.path.join(tmp_dir, 'foo')
409 dst = os.path.join(tmp_dir, 'bar')
410 src_link = os.path.join(tmp_dir, 'baz')
411 dst_link = os.path.join(tmp_dir, 'qux')
412 write_file(src, 'foo')
413 src_stat = os.stat(src)
414 os.utime(src, (src_stat.st_atime,
415 src_stat.st_mtime - 42.0)) # ensure different mtimes
416 write_file(dst, 'bar')
417 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
418 os.symlink(src, src_link)
419 os.symlink(dst, dst_link)
420 if hasattr(os, 'lchmod'):
421 os.lchmod(src_link, stat.S_IRWXO)
422 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
423 os.lchflags(src_link, stat.UF_NODUMP)
424 src_link_stat = os.lstat(src_link)
425 # follow
426 if hasattr(os, 'lchmod'):
Larry Hastingsb4038062012-07-15 10:57:38 -0700427 shutil.copystat(src_link, dst_link, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100428 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
429 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700430 shutil.copystat(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100431 dst_link_stat = os.lstat(dst_link)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700432 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100433 for attr in 'st_atime', 'st_mtime':
434 # The modification times may be truncated in the new file.
435 self.assertLessEqual(getattr(src_link_stat, attr),
436 getattr(dst_link_stat, attr) + 1)
437 if hasattr(os, 'lchmod'):
438 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
439 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
440 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
441 # tell to follow but dst is not a link
Larry Hastingsb4038062012-07-15 10:57:38 -0700442 shutil.copystat(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100443 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
444 00000.1)
445
Ned Deilybaf75712012-05-10 17:05:19 -0700446 @unittest.skipUnless(hasattr(os, 'chflags') and
447 hasattr(errno, 'EOPNOTSUPP') and
448 hasattr(errno, 'ENOTSUP'),
449 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
450 def test_copystat_handles_harmless_chflags_errors(self):
451 tmpdir = self.mkdtemp()
452 file1 = os.path.join(tmpdir, 'file1')
453 file2 = os.path.join(tmpdir, 'file2')
454 write_file(file1, 'xxx')
455 write_file(file2, 'xxx')
456
457 def make_chflags_raiser(err):
458 ex = OSError()
459
Larry Hastings90867a52012-06-22 17:01:41 -0700460 def _chflags_raiser(path, flags, *, follow_symlinks=True):
Ned Deilybaf75712012-05-10 17:05:19 -0700461 ex.errno = err
462 raise ex
463 return _chflags_raiser
464 old_chflags = os.chflags
465 try:
466 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
467 os.chflags = make_chflags_raiser(err)
468 shutil.copystat(file1, file2)
469 # assert others errors break it
470 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
471 self.assertRaises(OSError, shutil.copystat, file1, file2)
472 finally:
473 os.chflags = old_chflags
474
Antoine Pitrou424246f2012-05-12 19:02:01 +0200475 @support.skip_unless_xattr
476 def test_copyxattr(self):
477 tmp_dir = self.mkdtemp()
478 src = os.path.join(tmp_dir, 'foo')
479 write_file(src, 'foo')
480 dst = os.path.join(tmp_dir, 'bar')
481 write_file(dst, 'bar')
482
483 # no xattr == no problem
484 shutil._copyxattr(src, dst)
485 # common case
486 os.setxattr(src, 'user.foo', b'42')
487 os.setxattr(src, 'user.bar', b'43')
488 shutil._copyxattr(src, dst)
Gregory P. Smith1093bf22014-01-17 12:01:22 -0800489 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200490 self.assertEqual(
491 os.getxattr(src, 'user.foo'),
492 os.getxattr(dst, 'user.foo'))
493 # check errors don't affect other attrs
494 os.remove(dst)
495 write_file(dst, 'bar')
496 os_error = OSError(errno.EPERM, 'EPERM')
497
Larry Hastings9cf065c2012-06-22 16:30:09 -0700498 def _raise_on_user_foo(fname, attr, val, **kwargs):
Antoine Pitrou424246f2012-05-12 19:02:01 +0200499 if attr == 'user.foo':
500 raise os_error
501 else:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700502 orig_setxattr(fname, attr, val, **kwargs)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200503 try:
504 orig_setxattr = os.setxattr
505 os.setxattr = _raise_on_user_foo
506 shutil._copyxattr(src, dst)
Antoine Pitrou61597d32012-05-12 23:37:35 +0200507 self.assertIn('user.bar', os.listxattr(dst))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200508 finally:
509 os.setxattr = orig_setxattr
Hynek Schlawack0beab052013-02-05 08:22:44 +0100510 # the source filesystem not supporting xattrs should be ok, too.
511 def _raise_on_src(fname, *, follow_symlinks=True):
512 if fname == src:
513 raise OSError(errno.ENOTSUP, 'Operation not supported')
514 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
515 try:
516 orig_listxattr = os.listxattr
517 os.listxattr = _raise_on_src
518 shutil._copyxattr(src, dst)
519 finally:
520 os.listxattr = orig_listxattr
Antoine Pitrou424246f2012-05-12 19:02:01 +0200521
Larry Hastingsad5ae042012-07-14 17:55:11 -0700522 # test that shutil.copystat copies xattrs
523 src = os.path.join(tmp_dir, 'the_original')
524 write_file(src, src)
525 os.setxattr(src, 'user.the_value', b'fiddly')
526 dst = os.path.join(tmp_dir, 'the_copy')
527 write_file(dst, dst)
528 shutil.copystat(src, dst)
Hynek Schlawackc2d481f2012-07-16 17:11:10 +0200529 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700530
Antoine Pitrou424246f2012-05-12 19:02:01 +0200531 @support.skip_unless_symlink
532 @support.skip_unless_xattr
533 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
534 'root privileges required')
535 def test_copyxattr_symlinks(self):
536 # On Linux, it's only possible to access non-user xattr for symlinks;
537 # which in turn require root privileges. This test should be expanded
538 # as soon as other platforms gain support for extended attributes.
539 tmp_dir = self.mkdtemp()
540 src = os.path.join(tmp_dir, 'foo')
541 src_link = os.path.join(tmp_dir, 'baz')
542 write_file(src, 'foo')
543 os.symlink(src, src_link)
544 os.setxattr(src, 'trusted.foo', b'42')
Larry Hastings9cf065c2012-06-22 16:30:09 -0700545 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200546 dst = os.path.join(tmp_dir, 'bar')
547 dst_link = os.path.join(tmp_dir, 'qux')
548 write_file(dst, 'bar')
549 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700550 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700551 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
Antoine Pitrou424246f2012-05-12 19:02:01 +0200552 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
Larry Hastingsb4038062012-07-15 10:57:38 -0700553 shutil._copyxattr(src_link, dst, follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200554 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
555
Antoine Pitrou78091e62011-12-29 18:54:15 +0100556 @support.skip_unless_symlink
557 def test_copy_symlinks(self):
558 tmp_dir = self.mkdtemp()
559 src = os.path.join(tmp_dir, 'foo')
560 dst = os.path.join(tmp_dir, 'bar')
561 src_link = os.path.join(tmp_dir, 'baz')
562 write_file(src, 'foo')
563 os.symlink(src, src_link)
564 if hasattr(os, 'lchmod'):
565 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
566 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700567 shutil.copy(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100568 self.assertFalse(os.path.islink(dst))
569 self.assertEqual(read_file(src), read_file(dst))
570 os.remove(dst)
571 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700572 shutil.copy(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100573 self.assertTrue(os.path.islink(dst))
574 self.assertEqual(os.readlink(dst), os.readlink(src_link))
575 if hasattr(os, 'lchmod'):
576 self.assertEqual(os.lstat(src_link).st_mode,
577 os.lstat(dst).st_mode)
578
579 @support.skip_unless_symlink
580 def test_copy2_symlinks(self):
581 tmp_dir = self.mkdtemp()
582 src = os.path.join(tmp_dir, 'foo')
583 dst = os.path.join(tmp_dir, 'bar')
584 src_link = os.path.join(tmp_dir, 'baz')
585 write_file(src, 'foo')
586 os.symlink(src, src_link)
587 if hasattr(os, 'lchmod'):
588 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
589 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
590 os.lchflags(src_link, stat.UF_NODUMP)
591 src_stat = os.stat(src)
592 src_link_stat = os.lstat(src_link)
593 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700594 shutil.copy2(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100595 self.assertFalse(os.path.islink(dst))
596 self.assertEqual(read_file(src), read_file(dst))
597 os.remove(dst)
598 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700599 shutil.copy2(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100600 self.assertTrue(os.path.islink(dst))
601 self.assertEqual(os.readlink(dst), os.readlink(src_link))
602 dst_stat = os.lstat(dst)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700603 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100604 for attr in 'st_atime', 'st_mtime':
605 # The modification times may be truncated in the new file.
606 self.assertLessEqual(getattr(src_link_stat, attr),
607 getattr(dst_stat, attr) + 1)
608 if hasattr(os, 'lchmod'):
609 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
610 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
611 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
612 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
613
Antoine Pitrou424246f2012-05-12 19:02:01 +0200614 @support.skip_unless_xattr
615 def test_copy2_xattr(self):
616 tmp_dir = self.mkdtemp()
617 src = os.path.join(tmp_dir, 'foo')
618 dst = os.path.join(tmp_dir, 'bar')
619 write_file(src, 'foo')
620 os.setxattr(src, 'user.foo', b'42')
621 shutil.copy2(src, dst)
622 self.assertEqual(
623 os.getxattr(src, 'user.foo'),
624 os.getxattr(dst, 'user.foo'))
625 os.remove(dst)
626
Antoine Pitrou78091e62011-12-29 18:54:15 +0100627 @support.skip_unless_symlink
628 def test_copyfile_symlinks(self):
629 tmp_dir = self.mkdtemp()
630 src = os.path.join(tmp_dir, 'src')
631 dst = os.path.join(tmp_dir, 'dst')
632 dst_link = os.path.join(tmp_dir, 'dst_link')
633 link = os.path.join(tmp_dir, 'link')
634 write_file(src, 'foo')
635 os.symlink(src, link)
636 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700637 shutil.copyfile(link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100638 self.assertTrue(os.path.islink(dst_link))
639 self.assertEqual(os.readlink(link), os.readlink(dst_link))
640 # follow
641 shutil.copyfile(link, dst)
642 self.assertFalse(os.path.islink(dst))
643
Hynek Schlawack2100b422012-06-23 20:28:32 +0200644 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200645 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
646 os.supports_dir_fd and
647 os.listdir in os.supports_fd and
648 os.stat in os.supports_follow_symlinks)
649 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200650 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000651 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200652 tmp_dir = self.mkdtemp()
653 d = os.path.join(tmp_dir, 'a')
654 os.mkdir(d)
655 try:
656 real_rmtree = shutil._rmtree_safe_fd
657 class Called(Exception): pass
658 def _raiser(*args, **kwargs):
659 raise Called
660 shutil._rmtree_safe_fd = _raiser
661 self.assertRaises(Called, shutil.rmtree, d)
662 finally:
663 shutil._rmtree_safe_fd = real_rmtree
664 else:
665 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000666 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200667
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000668 def test_rmtree_dont_delete_file(self):
669 # When called on a file instead of a directory, don't delete it.
670 handle, path = tempfile.mkstemp()
Victor Stinnerbf816222011-06-30 23:25:47 +0200671 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200672 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000673 os.remove(path)
674
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000675 def test_copytree_simple(self):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000676 src_dir = tempfile.mkdtemp()
677 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200678 self.addCleanup(shutil.rmtree, src_dir)
679 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
680 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000681 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200682 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000683
Éric Araujoa7e33a12011-08-12 19:51:35 +0200684 shutil.copytree(src_dir, dst_dir)
685 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
686 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
687 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
688 'test.txt')))
689 actual = read_file((dst_dir, 'test.txt'))
690 self.assertEqual(actual, '123')
691 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
692 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000693
Antoine Pitrou78091e62011-12-29 18:54:15 +0100694 @support.skip_unless_symlink
695 def test_copytree_symlinks(self):
696 tmp_dir = self.mkdtemp()
697 src_dir = os.path.join(tmp_dir, 'src')
698 dst_dir = os.path.join(tmp_dir, 'dst')
699 sub_dir = os.path.join(src_dir, 'sub')
700 os.mkdir(src_dir)
701 os.mkdir(sub_dir)
702 write_file((src_dir, 'file.txt'), 'foo')
703 src_link = os.path.join(sub_dir, 'link')
704 dst_link = os.path.join(dst_dir, 'sub/link')
705 os.symlink(os.path.join(src_dir, 'file.txt'),
706 src_link)
707 if hasattr(os, 'lchmod'):
708 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
709 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
710 os.lchflags(src_link, stat.UF_NODUMP)
711 src_stat = os.lstat(src_link)
712 shutil.copytree(src_dir, dst_dir, symlinks=True)
713 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
714 self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
715 os.path.join(src_dir, 'file.txt'))
716 dst_stat = os.lstat(dst_link)
717 if hasattr(os, 'lchmod'):
718 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
719 if hasattr(os, 'lchflags'):
720 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
721
Georg Brandl2ee470f2008-07-16 12:55:28 +0000722 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000723 # creating data
724 join = os.path.join
725 exists = os.path.exists
726 src_dir = tempfile.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000727 try:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000728 dst_dir = join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200729 write_file((src_dir, 'test.txt'), '123')
730 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000731 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200732 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000733 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200734 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000735 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
736 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200737 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
738 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000739
740 # testing glob-like patterns
741 try:
742 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
743 shutil.copytree(src_dir, dst_dir, ignore=patterns)
744 # checking the result: some elements should not be copied
745 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200746 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
747 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000748 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200749 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000750 try:
751 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
752 shutil.copytree(src_dir, dst_dir, ignore=patterns)
753 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200754 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
755 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
756 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000757 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200758 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000759
760 # testing callable-style
761 try:
762 def _filter(src, names):
763 res = []
764 for name in names:
765 path = os.path.join(src, name)
766
767 if (os.path.isdir(path) and
768 path.split()[-1] == 'subdir'):
769 res.append(name)
770 elif os.path.splitext(path)[-1] in ('.py'):
771 res.append(name)
772 return res
773
774 shutil.copytree(src_dir, dst_dir, ignore=_filter)
775
776 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200777 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
778 'test.py')))
779 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000780
781 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200782 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000783 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000784 shutil.rmtree(src_dir)
785 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000786
Antoine Pitrouac601602013-08-16 19:35:02 +0200787 def test_copytree_retains_permissions(self):
788 tmp_dir = tempfile.mkdtemp()
789 src_dir = os.path.join(tmp_dir, 'source')
790 os.mkdir(src_dir)
791 dst_dir = os.path.join(tmp_dir, 'destination')
792 self.addCleanup(shutil.rmtree, tmp_dir)
793
794 os.chmod(src_dir, 0o777)
795 write_file((src_dir, 'permissive.txt'), '123')
796 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
797 write_file((src_dir, 'restrictive.txt'), '456')
798 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
799 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
800 os.chmod(restrictive_subdir, 0o600)
801
802 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400803 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
804 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200805 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400806 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200807 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
808 restrictive_subdir_dst = os.path.join(dst_dir,
809 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400810 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200811 os.stat(restrictive_subdir_dst).st_mode)
812
Berker Peksag884afd92014-12-10 02:50:32 +0200813 @unittest.mock.patch('os.chmod')
814 def test_copytree_winerror(self, mock_patch):
815 # When copying to VFAT, copystat() raises OSError. On Windows, the
816 # exception object has a meaningful 'winerror' attribute, but not
817 # on other operating systems. Do not assume 'winerror' is set.
818 src_dir = tempfile.mkdtemp()
819 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
820 self.addCleanup(shutil.rmtree, src_dir)
821 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
822
823 mock_patch.side_effect = PermissionError('ka-boom')
824 with self.assertRaises(shutil.Error):
825 shutil.copytree(src_dir, dst_dir)
826
Zachary Ware9fe6d862013-12-08 00:20:35 -0600827 @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000828 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000829 def test_dont_copy_file_onto_link_to_itself(self):
830 # bug 851123.
831 os.mkdir(TESTFN)
832 src = os.path.join(TESTFN, 'cheese')
833 dst = os.path.join(TESTFN, 'shop')
834 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000835 with open(src, 'w') as f:
836 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +0100837 try:
838 os.link(src, dst)
839 except PermissionError as e:
840 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +0200841 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000842 with open(src, 'r') as f:
843 self.assertEqual(f.read(), 'cheddar')
844 os.remove(dst)
845 finally:
846 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000847
Brian Curtin3b4499c2010-12-28 14:31:47 +0000848 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000849 def test_dont_copy_file_onto_symlink_to_itself(self):
850 # bug 851123.
851 os.mkdir(TESTFN)
852 src = os.path.join(TESTFN, 'cheese')
853 dst = os.path.join(TESTFN, 'shop')
854 try:
855 with open(src, 'w') as f:
856 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000857 # Using `src` here would mean we end up with a symlink pointing
858 # to TESTFN/TESTFN/cheese, while it should point at
859 # TESTFN/cheese.
860 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +0200861 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +0000862 with open(src, 'r') as f:
863 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000864 os.remove(dst)
865 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000866 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +0000867
Brian Curtin3b4499c2010-12-28 14:31:47 +0000868 @support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +0000869 def test_rmtree_on_symlink(self):
870 # bug 1669.
871 os.mkdir(TESTFN)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000872 try:
Brian Curtind40e6f72010-07-08 21:39:08 +0000873 src = os.path.join(TESTFN, 'cheese')
874 dst = os.path.join(TESTFN, 'shop')
875 os.mkdir(src)
876 os.symlink(src, dst)
877 self.assertRaises(OSError, shutil.rmtree, dst)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200878 shutil.rmtree(dst, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000879 finally:
Brian Curtind40e6f72010-07-08 21:39:08 +0000880 shutil.rmtree(TESTFN, ignore_errors=True)
881
Serhiy Storchaka43767632013-11-03 21:31:38 +0200882 # Issue #3002: copyfile and copytree block indefinitely on named pipes
883 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
884 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +0100885 try:
886 os.mkfifo(TESTFN)
887 except PermissionError as e:
888 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200889 try:
890 self.assertRaises(shutil.SpecialFileError,
891 shutil.copyfile, TESTFN, TESTFN2)
892 self.assertRaises(shutil.SpecialFileError,
893 shutil.copyfile, __file__, TESTFN)
894 finally:
895 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000896
Serhiy Storchaka43767632013-11-03 21:31:38 +0200897 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
898 @support.skip_unless_symlink
899 def test_copytree_named_pipe(self):
900 os.mkdir(TESTFN)
901 try:
902 subdir = os.path.join(TESTFN, "subdir")
903 os.mkdir(subdir)
904 pipe = os.path.join(subdir, "mypipe")
xdegaye92c2ca72017-11-12 17:31:07 +0100905 try:
906 os.mkfifo(pipe)
907 except PermissionError as e:
908 self.skipTest('os.mkfifo(): %s' % e)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000909 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200910 shutil.copytree(TESTFN, TESTFN2)
911 except shutil.Error as e:
912 errors = e.args[0]
913 self.assertEqual(len(errors), 1)
914 src, dst, error_msg = errors[0]
915 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
916 else:
917 self.fail("shutil.Error should have been raised")
918 finally:
919 shutil.rmtree(TESTFN, ignore_errors=True)
920 shutil.rmtree(TESTFN2, ignore_errors=True)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000921
Tarek Ziadé5340db32010-04-19 22:30:51 +0000922 def test_copytree_special_func(self):
923
924 src_dir = self.mkdtemp()
925 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200926 write_file((src_dir, 'test.txt'), '123')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000927 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200928 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000929
930 copied = []
931 def _copy(src, dst):
932 copied.append((src, dst))
933
934 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000935 self.assertEqual(len(copied), 2)
Tarek Ziadé5340db32010-04-19 22:30:51 +0000936
Brian Curtin3b4499c2010-12-28 14:31:47 +0000937 @support.skip_unless_symlink
Tarek Ziadéfb437512010-04-20 08:57:33 +0000938 def test_copytree_dangling_symlinks(self):
939
940 # a dangling symlink raises an error at the end
941 src_dir = self.mkdtemp()
942 dst_dir = os.path.join(self.mkdtemp(), 'destination')
943 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
944 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200945 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadéfb437512010-04-20 08:57:33 +0000946 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
947
948 # a dangling symlink is ignored with the proper flag
949 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
950 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
951 self.assertNotIn('test.txt', os.listdir(dst_dir))
952
953 # a dangling symlink is copied if symlinks=True
954 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
955 shutil.copytree(src_dir, dst_dir, symlinks=True)
956 self.assertIn('test.txt', os.listdir(dst_dir))
957
Berker Peksag5a294d82015-07-25 14:53:48 +0300958 @support.skip_unless_symlink
959 def test_copytree_symlink_dir(self):
960 src_dir = self.mkdtemp()
961 dst_dir = os.path.join(self.mkdtemp(), 'destination')
962 os.mkdir(os.path.join(src_dir, 'real_dir'))
963 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
964 pass
965 os.symlink(os.path.join(src_dir, 'real_dir'),
966 os.path.join(src_dir, 'link_to_dir'),
967 target_is_directory=True)
968
969 shutil.copytree(src_dir, dst_dir, symlinks=False)
970 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
971 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
972
973 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
974 shutil.copytree(src_dir, dst_dir, symlinks=True)
975 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
976 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
977
Nick Coghlan8ed3cf32011-03-16 14:05:35 -0400978 def _copy_file(self, method):
979 fname = 'test.txt'
980 tmpdir = self.mkdtemp()
Éric Araujoa7e33a12011-08-12 19:51:35 +0200981 write_file((tmpdir, fname), 'xxx')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -0400982 file1 = os.path.join(tmpdir, fname)
983 tmpdir2 = self.mkdtemp()
984 method(file1, tmpdir2)
985 file2 = os.path.join(tmpdir2, fname)
986 return (file1, file2)
987
988 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
989 def test_copy(self):
990 # Ensure that the copied file exists and has the same mode bits.
991 file1, file2 = self._copy_file(shutil.copy)
992 self.assertTrue(os.path.exists(file2))
993 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
994
995 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
Senthil Kumaran0c2dba52011-07-03 18:21:38 -0700996 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -0400997 def test_copy2(self):
998 # Ensure that the copied file exists and has the same mode and
999 # modification time bits.
1000 file1, file2 = self._copy_file(shutil.copy2)
1001 self.assertTrue(os.path.exists(file2))
1002 file1_stat = os.stat(file1)
1003 file2_stat = os.stat(file2)
1004 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1005 for attr in 'st_atime', 'st_mtime':
1006 # The modification times may be truncated in the new file.
1007 self.assertLessEqual(getattr(file1_stat, attr),
1008 getattr(file2_stat, attr) + 1)
1009 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1010 self.assertEqual(getattr(file1_stat, 'st_flags'),
1011 getattr(file2_stat, 'st_flags'))
1012
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001013 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001014 def test_make_tarball(self):
1015 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001016 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001017
1018 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001019 # force shutil to create the directory
1020 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001021 # working with relative paths
1022 work_dir = os.path.dirname(tmpdir2)
1023 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001024
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001025 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001026 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001027 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001028
1029 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001030 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001031 self.assertTrue(os.path.isfile(tarball))
1032 self.assertTrue(tarfile.is_tarfile(tarball))
1033 with tarfile.open(tarball, 'r:gz') as tf:
1034 self.assertCountEqual(tf.getnames(),
1035 ['.', './sub', './sub2',
1036 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001037
1038 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001039 with support.change_cwd(work_dir):
1040 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001041 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001042 self.assertTrue(os.path.isfile(tarball))
1043 self.assertTrue(tarfile.is_tarfile(tarball))
1044 with tarfile.open(tarball, 'r') as tf:
1045 self.assertCountEqual(tf.getnames(),
1046 ['.', './sub', './sub2',
1047 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001048
1049 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001050 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001051 names = tar.getnames()
1052 names.sort()
1053 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001054
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001055 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001056 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001057 root_dir = self.mkdtemp()
1058 dist = os.path.join(root_dir, base_dir)
1059 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001060 write_file((dist, 'file1'), 'xxx')
1061 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001062 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001063 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001064 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001065 if base_dir:
1066 write_file((root_dir, 'outer'), 'xxx')
1067 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001068
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001069 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001070 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001071 'Need the tar command to run')
1072 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001073 root_dir, base_dir = self._create_files()
1074 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001075 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001076
1077 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001078 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001079 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001080
1081 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001082 tarball2 = os.path.join(root_dir, 'archive2.tar')
1083 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001084 subprocess.check_call(tar_cmd, cwd=root_dir,
1085 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001086
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001087 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001088 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001089 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001090
1091 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001092 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1093 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001094 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001095
1096 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001097 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1098 dry_run=True)
1099 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001100 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001101
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001102 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001103 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001104 # creating something to zip
1105 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001106
1107 tmpdir2 = self.mkdtemp()
1108 # force shutil to create the directory
1109 os.rmdir(tmpdir2)
1110 # working with relative paths
1111 work_dir = os.path.dirname(tmpdir2)
1112 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001113
1114 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001115 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001116 res = make_archive(rel_base_name, 'zip', root_dir)
1117
1118 self.assertEqual(res, base_name + '.zip')
1119 self.assertTrue(os.path.isfile(res))
1120 self.assertTrue(zipfile.is_zipfile(res))
1121 with zipfile.ZipFile(res) as zf:
1122 self.assertCountEqual(zf.namelist(),
1123 ['dist/', 'dist/sub/', 'dist/sub2/',
1124 'dist/file1', 'dist/file2', 'dist/sub/file3',
1125 'outer'])
1126
1127 with support.change_cwd(work_dir):
1128 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001129 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001130
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001131 self.assertEqual(res, base_name + '.zip')
1132 self.assertTrue(os.path.isfile(res))
1133 self.assertTrue(zipfile.is_zipfile(res))
1134 with zipfile.ZipFile(res) as zf:
1135 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001136 ['dist/', 'dist/sub/', 'dist/sub2/',
1137 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001138
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001139 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001140 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001141 'Need the zip command to run')
1142 def test_zipfile_vs_zip(self):
1143 root_dir, base_dir = self._create_files()
1144 base_name = os.path.join(self.mkdtemp(), 'archive')
1145 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1146
1147 # check if ZIP file was created
1148 self.assertEqual(archive, base_name + '.zip')
1149 self.assertTrue(os.path.isfile(archive))
1150
1151 # now create another ZIP file using `zip`
1152 archive2 = os.path.join(root_dir, 'archive2.zip')
1153 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001154 subprocess.check_call(zip_cmd, cwd=root_dir,
1155 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001156
1157 self.assertTrue(os.path.isfile(archive2))
1158 # let's compare both ZIP files
1159 with zipfile.ZipFile(archive) as zf:
1160 names = zf.namelist()
1161 with zipfile.ZipFile(archive2) as zf:
1162 names2 = zf.namelist()
1163 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001164
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001165 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001166 @unittest.skipUnless(shutil.which('unzip'),
1167 'Need the unzip command to run')
1168 def test_unzip_zipfile(self):
1169 root_dir, base_dir = self._create_files()
1170 base_name = os.path.join(self.mkdtemp(), 'archive')
1171 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1172
1173 # check if ZIP file was created
1174 self.assertEqual(archive, base_name + '.zip')
1175 self.assertTrue(os.path.isfile(archive))
1176
1177 # now check the ZIP file using `unzip -t`
1178 zip_cmd = ['unzip', '-t', archive]
1179 with support.change_cwd(root_dir):
1180 try:
1181 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1182 except subprocess.CalledProcessError as exc:
1183 details = exc.output.decode(errors="replace")
1184 msg = "{}\n\n**Unzip Output**\n{}"
1185 self.fail(msg.format(exc, details))
1186
Tarek Ziadé396fad72010-02-23 05:30:31 +00001187 def test_make_archive(self):
1188 tmpdir = self.mkdtemp()
1189 base_name = os.path.join(tmpdir, 'archive')
1190 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1191
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001192 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001193 def test_make_archive_owner_group(self):
1194 # testing make_archive with owner and group, with various combinations
1195 # this works even if there's not gid/uid support
1196 if UID_GID_SUPPORT:
1197 group = grp.getgrgid(0)[0]
1198 owner = pwd.getpwuid(0)[0]
1199 else:
1200 group = owner = 'root'
1201
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001202 root_dir, base_dir = self._create_files()
1203 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001204 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1205 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001206 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001207
1208 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001209 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001210
1211 res = make_archive(base_name, 'tar', root_dir, base_dir,
1212 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001213 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001214
1215 res = make_archive(base_name, 'tar', root_dir, base_dir,
1216 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001217 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001218
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001219
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001220 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001221 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1222 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001223 root_dir, base_dir = self._create_files()
1224 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001225 group = grp.getgrgid(0)[0]
1226 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001227 with support.change_cwd(root_dir):
1228 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1229 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001230
1231 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001232 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001233
1234 # now checks the rights
1235 archive = tarfile.open(archive_name)
1236 try:
1237 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001238 self.assertEqual(member.uid, 0)
1239 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001240 finally:
1241 archive.close()
1242
1243 def test_make_archive_cwd(self):
1244 current_dir = os.getcwd()
1245 def _breaks(*args, **kw):
1246 raise RuntimeError()
1247
1248 register_archive_format('xxx', _breaks, [], 'xxx file')
1249 try:
1250 try:
1251 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1252 except Exception:
1253 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001254 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001255 finally:
1256 unregister_archive_format('xxx')
1257
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001258 def test_make_tarfile_in_curdir(self):
1259 # Issue #21280
1260 root_dir = self.mkdtemp()
1261 with support.change_cwd(root_dir):
1262 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1263 self.assertTrue(os.path.isfile('test.tar'))
1264
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001265 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001266 def test_make_zipfile_in_curdir(self):
1267 # Issue #21280
1268 root_dir = self.mkdtemp()
1269 with support.change_cwd(root_dir):
1270 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1271 self.assertTrue(os.path.isfile('test.zip'))
1272
Tarek Ziadé396fad72010-02-23 05:30:31 +00001273 def test_register_archive_format(self):
1274
1275 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1276 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1277 1)
1278 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1279 [(1, 2), (1, 2, 3)])
1280
1281 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1282 formats = [name for name, params in get_archive_formats()]
1283 self.assertIn('xxx', formats)
1284
1285 unregister_archive_format('xxx')
1286 formats = [name for name, params in get_archive_formats()]
1287 self.assertNotIn('xxx', formats)
1288
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001289 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001290 self.check_unpack_archive_with_converter(format, lambda path: path)
1291 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001292 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001293
1294 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001295 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001296 expected = rlistdir(root_dir)
1297 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001298
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001299 base_name = os.path.join(self.mkdtemp(), 'archive')
1300 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001301
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001302 # let's try to unpack it now
1303 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001304 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001305 self.assertEqual(rlistdir(tmpdir2), expected)
1306
1307 # and again, this time with the format specified
1308 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001309 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001310 self.assertEqual(rlistdir(tmpdir3), expected)
1311
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001312 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1313 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001314
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001315 def test_unpack_archive_tar(self):
1316 self.check_unpack_archive('tar')
1317
1318 @support.requires_zlib
1319 def test_unpack_archive_gztar(self):
1320 self.check_unpack_archive('gztar')
1321
1322 @support.requires_bz2
1323 def test_unpack_archive_bztar(self):
1324 self.check_unpack_archive('bztar')
1325
1326 @support.requires_lzma
1327 def test_unpack_archive_xztar(self):
1328 self.check_unpack_archive('xztar')
1329
1330 @support.requires_zlib
1331 def test_unpack_archive_zip(self):
1332 self.check_unpack_archive('zip')
1333
Martin Pantereb995702016-07-28 01:11:04 +00001334 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001335
1336 formats = get_unpack_formats()
1337
1338 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001339 self.assertEqual(extra, 1)
1340 self.assertEqual(filename, 'stuff.boo')
1341 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001342
1343 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1344 unpack_archive('stuff.boo', 'xx')
1345
1346 # trying to register a .boo unpacker again
1347 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1348 ['.boo'], _boo)
1349
1350 # should work now
1351 unregister_unpack_format('Boo')
1352 register_unpack_format('Boo2', ['.boo'], _boo)
1353 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1354 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1355
1356 # let's leave a clean state
1357 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001358 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001359
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001360 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1361 "disk_usage not available on this platform")
1362 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001363 usage = shutil.disk_usage(os.path.dirname(__file__))
Éric Araujo2ee61882011-07-02 16:45:45 +02001364 self.assertGreater(usage.total, 0)
1365 self.assertGreater(usage.used, 0)
1366 self.assertGreaterEqual(usage.free, 0)
1367 self.assertGreaterEqual(usage.total, usage.used)
1368 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001369
Sandro Tosid902a142011-08-22 23:28:27 +02001370 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1371 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1372 def test_chown(self):
1373
1374 # cleaned-up automatically by TestShutil.tearDown method
1375 dirname = self.mkdtemp()
1376 filename = tempfile.mktemp(dir=dirname)
1377 write_file(filename, 'testing chown function')
1378
1379 with self.assertRaises(ValueError):
1380 shutil.chown(filename)
1381
1382 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001383 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001384
1385 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001386 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001387
1388 with self.assertRaises(TypeError):
1389 shutil.chown(filename, b'spam')
1390
1391 with self.assertRaises(TypeError):
1392 shutil.chown(filename, 3.14)
1393
1394 uid = os.getuid()
1395 gid = os.getgid()
1396
1397 def check_chown(path, uid=None, gid=None):
1398 s = os.stat(filename)
1399 if uid is not None:
1400 self.assertEqual(uid, s.st_uid)
1401 if gid is not None:
1402 self.assertEqual(gid, s.st_gid)
1403
1404 shutil.chown(filename, uid, gid)
1405 check_chown(filename, uid, gid)
1406 shutil.chown(filename, uid)
1407 check_chown(filename, uid)
1408 shutil.chown(filename, user=uid)
1409 check_chown(filename, uid)
1410 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001411 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001412
1413 shutil.chown(dirname, uid, gid)
1414 check_chown(dirname, uid, gid)
1415 shutil.chown(dirname, uid)
1416 check_chown(dirname, uid)
1417 shutil.chown(dirname, user=uid)
1418 check_chown(dirname, uid)
1419 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001420 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001421
1422 user = pwd.getpwuid(uid)[0]
1423 group = grp.getgrgid(gid)[0]
1424 shutil.chown(filename, user, group)
1425 check_chown(filename, uid, gid)
1426 shutil.chown(dirname, user, group)
1427 check_chown(dirname, uid, gid)
1428
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001429 def test_copy_return_value(self):
1430 # copy and copy2 both return their destination path.
1431 for fn in (shutil.copy, shutil.copy2):
1432 src_dir = self.mkdtemp()
1433 dst_dir = self.mkdtemp()
1434 src = os.path.join(src_dir, 'foo')
1435 write_file(src, 'foo')
1436 rv = fn(src, dst_dir)
1437 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1438 rv = fn(src, os.path.join(dst_dir, 'bar'))
1439 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1440
1441 def test_copyfile_return_value(self):
1442 # copytree returns its destination path.
1443 src_dir = self.mkdtemp()
1444 dst_dir = self.mkdtemp()
1445 dst_file = os.path.join(dst_dir, 'bar')
1446 src_file = os.path.join(src_dir, 'foo')
1447 write_file(src_file, 'foo')
1448 rv = shutil.copyfile(src_file, dst_file)
1449 self.assertTrue(os.path.exists(rv))
1450 self.assertEqual(read_file(src_file), read_file(dst_file))
1451
Hynek Schlawack48653762012-10-07 12:49:58 +02001452 def test_copyfile_same_file(self):
1453 # copyfile() should raise SameFileError if the source and destination
1454 # are the same.
1455 src_dir = self.mkdtemp()
1456 src_file = os.path.join(src_dir, 'foo')
1457 write_file(src_file, 'foo')
1458 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
Hynek Schlawack27ddb572012-10-28 13:59:27 +01001459 # But Error should work too, to stay backward compatible.
1460 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001461 # Make sure file is not corrupted.
1462 self.assertEqual(read_file(src_file), 'foo')
Hynek Schlawack48653762012-10-07 12:49:58 +02001463
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001464 def test_copytree_return_value(self):
1465 # copytree returns its destination path.
1466 src_dir = self.mkdtemp()
1467 dst_dir = src_dir + "dest"
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001468 self.addCleanup(shutil.rmtree, dst_dir, True)
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001469 src = os.path.join(src_dir, 'foo')
1470 write_file(src, 'foo')
1471 rv = shutil.copytree(src_dir, dst_dir)
1472 self.assertEqual(['foo'], os.listdir(rv))
1473
Christian Heimes9bd667a2008-01-20 15:14:11 +00001474
Brian Curtinc57a3452012-06-22 16:00:30 -05001475class TestWhich(unittest.TestCase):
1476
1477 def setUp(self):
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001478 self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001479 self.addCleanup(shutil.rmtree, self.temp_dir, True)
Brian Curtinc57a3452012-06-22 16:00:30 -05001480 # Give the temp_file an ".exe" suffix for all.
1481 # It's needed on Windows and not harmful on other platforms.
1482 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001483 prefix="Tmp",
1484 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001485 os.chmod(self.temp_file.name, stat.S_IXUSR)
1486 self.addCleanup(self.temp_file.close)
1487 self.dir, self.file = os.path.split(self.temp_file.name)
1488
1489 def test_basic(self):
1490 # Given an EXE in a directory, it should be returned.
1491 rv = shutil.which(self.file, path=self.dir)
1492 self.assertEqual(rv, self.temp_file.name)
1493
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001494 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001495 # When given the fully qualified path to an executable that exists,
1496 # it should be returned.
1497 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001498 self.assertEqual(rv, self.temp_file.name)
1499
1500 def test_relative_cmd(self):
1501 # When given the relative path with a directory part to an executable
1502 # that exists, it should be returned.
1503 base_dir, tail_dir = os.path.split(self.dir)
1504 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001505 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001506 rv = shutil.which(relpath, path=self.temp_dir)
1507 self.assertEqual(rv, relpath)
1508 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001509 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001510 rv = shutil.which(relpath, path=base_dir)
1511 self.assertIsNone(rv)
1512
1513 def test_cwd(self):
1514 # Issue #16957
1515 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001516 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001517 rv = shutil.which(self.file, path=base_dir)
1518 if sys.platform == "win32":
1519 # Windows: current directory implicitly on PATH
1520 self.assertEqual(rv, os.path.join(os.curdir, self.file))
1521 else:
1522 # Other platforms: shouldn't match in the current directory.
1523 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001524
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001525 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1526 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001527 def test_non_matching_mode(self):
1528 # Set the file read-only and ask for writeable files.
1529 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001530 if os.access(self.temp_file.name, os.W_OK):
1531 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001532 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1533 self.assertIsNone(rv)
1534
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001535 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001536 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001537 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001538 rv = shutil.which(self.file, path=tail_dir)
1539 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001540
Brian Curtinc57a3452012-06-22 16:00:30 -05001541 def test_nonexistent_file(self):
1542 # Return None when no matching executable file is found on the path.
1543 rv = shutil.which("foo.exe", path=self.dir)
1544 self.assertIsNone(rv)
1545
1546 @unittest.skipUnless(sys.platform == "win32",
1547 "pathext check is Windows-only")
1548 def test_pathext_checking(self):
1549 # Ask for the file without the ".exe" extension, then ensure that
1550 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001551 rv = shutil.which(self.file[:-4], path=self.dir)
Serhiy Storchaka80c88f42013-01-22 10:31:36 +02001552 self.assertEqual(rv, self.temp_file.name[:-4] + ".EXE")
Brian Curtinc57a3452012-06-22 16:00:30 -05001553
Barry Warsaw618738b2013-04-16 11:05:03 -04001554 def test_environ_path(self):
1555 with support.EnvironmentVarGuard() as env:
Victor Stinner1d006a22013-12-16 23:39:40 +01001556 env['PATH'] = self.dir
Barry Warsaw618738b2013-04-16 11:05:03 -04001557 rv = shutil.which(self.file)
1558 self.assertEqual(rv, self.temp_file.name)
1559
1560 def test_empty_path(self):
1561 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001562 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001563 support.EnvironmentVarGuard() as env:
Victor Stinner1d006a22013-12-16 23:39:40 +01001564 env['PATH'] = self.dir
Barry Warsaw618738b2013-04-16 11:05:03 -04001565 rv = shutil.which(self.file, path='')
1566 self.assertIsNone(rv)
1567
1568 def test_empty_path_no_PATH(self):
1569 with support.EnvironmentVarGuard() as env:
1570 env.pop('PATH', None)
1571 rv = shutil.which(self.file)
1572 self.assertIsNone(rv)
1573
Brian Curtinc57a3452012-06-22 16:00:30 -05001574
Christian Heimesada8c3b2008-03-18 18:26:33 +00001575class TestMove(unittest.TestCase):
1576
1577 def setUp(self):
1578 filename = "foo"
1579 self.src_dir = tempfile.mkdtemp()
1580 self.dst_dir = tempfile.mkdtemp()
1581 self.src_file = os.path.join(self.src_dir, filename)
1582 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001583 with open(self.src_file, "wb") as f:
1584 f.write(b"spam")
1585
1586 def tearDown(self):
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001587 for d in (self.src_dir, self.dst_dir):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001588 try:
1589 if d:
1590 shutil.rmtree(d)
1591 except:
1592 pass
1593
1594 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001595 with open(src, "rb") as f:
1596 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001597 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001598 with open(real_dst, "rb") as f:
1599 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001600 self.assertFalse(os.path.exists(src))
1601
1602 def _check_move_dir(self, src, dst, real_dst):
1603 contents = sorted(os.listdir(src))
1604 shutil.move(src, dst)
1605 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1606 self.assertFalse(os.path.exists(src))
1607
1608 def test_move_file(self):
1609 # Move a file to another location on the same filesystem.
1610 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1611
1612 def test_move_file_to_dir(self):
1613 # Move a file inside an existing dir on the same filesystem.
1614 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1615
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001616 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001617 def test_move_file_other_fs(self):
1618 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001619 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001620
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001621 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001622 def test_move_file_to_dir_other_fs(self):
1623 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001624 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001625
1626 def test_move_dir(self):
1627 # Move a dir to another location on the same filesystem.
1628 dst_dir = tempfile.mktemp()
1629 try:
1630 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1631 finally:
1632 try:
1633 shutil.rmtree(dst_dir)
1634 except:
1635 pass
1636
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001637 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001638 def test_move_dir_other_fs(self):
1639 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001640 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001641
1642 def test_move_dir_to_dir(self):
1643 # Move a dir inside an existing dir on the same filesystem.
1644 self._check_move_dir(self.src_dir, self.dst_dir,
1645 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1646
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001647 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001648 def test_move_dir_to_dir_other_fs(self):
1649 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001650 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001651
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001652 def test_move_dir_sep_to_dir(self):
1653 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1654 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1655
1656 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1657 def test_move_dir_altsep_to_dir(self):
1658 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1659 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1660
Christian Heimesada8c3b2008-03-18 18:26:33 +00001661 def test_existing_file_inside_dest_dir(self):
1662 # A file with the same name inside the destination dir already exists.
1663 with open(self.dst_file, "wb"):
1664 pass
1665 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1666
1667 def test_dont_move_dir_in_itself(self):
1668 # Moving a dir inside itself raises an Error.
1669 dst = os.path.join(self.src_dir, "bar")
1670 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1671
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001672 def test_destinsrc_false_negative(self):
1673 os.mkdir(TESTFN)
1674 try:
1675 for src, dst in [('srcdir', 'srcdir/dest')]:
1676 src = os.path.join(TESTFN, src)
1677 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001678 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001679 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001680 'dst (%s) is not in src (%s)' % (dst, src))
1681 finally:
1682 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001683
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001684 def test_destinsrc_false_positive(self):
1685 os.mkdir(TESTFN)
1686 try:
1687 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1688 src = os.path.join(TESTFN, src)
1689 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001690 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001691 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001692 'dst (%s) is in src (%s)' % (dst, src))
1693 finally:
1694 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001695
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001696 @support.skip_unless_symlink
1697 @mock_rename
1698 def test_move_file_symlink(self):
1699 dst = os.path.join(self.src_dir, 'bar')
1700 os.symlink(self.src_file, dst)
1701 shutil.move(dst, self.dst_file)
1702 self.assertTrue(os.path.islink(self.dst_file))
1703 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1704
1705 @support.skip_unless_symlink
1706 @mock_rename
1707 def test_move_file_symlink_to_dir(self):
1708 filename = "bar"
1709 dst = os.path.join(self.src_dir, filename)
1710 os.symlink(self.src_file, dst)
1711 shutil.move(dst, self.dst_dir)
1712 final_link = os.path.join(self.dst_dir, filename)
1713 self.assertTrue(os.path.islink(final_link))
1714 self.assertTrue(os.path.samefile(self.src_file, final_link))
1715
1716 @support.skip_unless_symlink
1717 @mock_rename
1718 def test_move_dangling_symlink(self):
1719 src = os.path.join(self.src_dir, 'baz')
1720 dst = os.path.join(self.src_dir, 'bar')
1721 os.symlink(src, dst)
1722 dst_link = os.path.join(self.dst_dir, 'quux')
1723 shutil.move(dst, dst_link)
1724 self.assertTrue(os.path.islink(dst_link))
Antoine Pitrou3f48ac92014-01-01 02:50:45 +01001725 # On Windows, os.path.realpath does not follow symlinks (issue #9949)
1726 if os.name == 'nt':
1727 self.assertEqual(os.path.realpath(src), os.readlink(dst_link))
1728 else:
1729 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001730
1731 @support.skip_unless_symlink
1732 @mock_rename
1733 def test_move_dir_symlink(self):
1734 src = os.path.join(self.src_dir, 'baz')
1735 dst = os.path.join(self.src_dir, 'bar')
1736 os.mkdir(src)
1737 os.symlink(src, dst)
1738 dst_link = os.path.join(self.dst_dir, 'quux')
1739 shutil.move(dst, dst_link)
1740 self.assertTrue(os.path.islink(dst_link))
1741 self.assertTrue(os.path.samefile(src, dst_link))
1742
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001743 def test_move_return_value(self):
1744 rv = shutil.move(self.src_file, self.dst_dir)
1745 self.assertEqual(rv,
1746 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1747
1748 def test_move_as_rename_return_value(self):
1749 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1750 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1751
R David Murray6ffface2014-06-11 14:40:13 -04001752 @mock_rename
1753 def test_move_file_special_function(self):
1754 moved = []
1755 def _copy(src, dst):
1756 moved.append((src, dst))
1757 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1758 self.assertEqual(len(moved), 1)
1759
1760 @mock_rename
1761 def test_move_dir_special_function(self):
1762 moved = []
1763 def _copy(src, dst):
1764 moved.append((src, dst))
1765 support.create_empty_file(os.path.join(self.src_dir, 'child'))
1766 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1767 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1768 self.assertEqual(len(moved), 3)
1769
Tarek Ziadé5340db32010-04-19 22:30:51 +00001770
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001771class TestCopyFile(unittest.TestCase):
1772
1773 _delete = False
1774
1775 class Faux(object):
1776 _entered = False
1777 _exited_with = None
1778 _raised = False
1779 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
1780 self._raise_in_exit = raise_in_exit
1781 self._suppress_at_exit = suppress_at_exit
1782 def read(self, *args):
1783 return ''
1784 def __enter__(self):
1785 self._entered = True
1786 def __exit__(self, exc_type, exc_val, exc_tb):
1787 self._exited_with = exc_type, exc_val, exc_tb
1788 if self._raise_in_exit:
1789 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001790 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001791 return self._suppress_at_exit
1792
1793 def tearDown(self):
1794 if self._delete:
1795 del shutil.open
1796
1797 def _set_shutil_open(self, func):
1798 shutil.open = func
1799 self._delete = True
1800
1801 def test_w_source_open_fails(self):
1802 def _open(filename, mode='r'):
1803 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001804 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001805 assert 0 # shouldn't reach here.
1806
1807 self._set_shutil_open(_open)
1808
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001809 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001810
Victor Stinner937ee9e2018-06-26 02:11:06 +02001811 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001812 def test_w_dest_open_fails(self):
1813
1814 srcfile = self.Faux()
1815
1816 def _open(filename, mode='r'):
1817 if filename == 'srcfile':
1818 return srcfile
1819 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001820 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001821 assert 0 # shouldn't reach here.
1822
1823 self._set_shutil_open(_open)
1824
1825 shutil.copyfile('srcfile', 'destfile')
1826 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001827 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001828 self.assertEqual(srcfile._exited_with[1].args,
1829 ('Cannot open "destfile"',))
1830
Victor Stinner937ee9e2018-06-26 02:11:06 +02001831 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001832 def test_w_dest_close_fails(self):
1833
1834 srcfile = self.Faux()
1835 destfile = self.Faux(True)
1836
1837 def _open(filename, mode='r'):
1838 if filename == 'srcfile':
1839 return srcfile
1840 if filename == 'destfile':
1841 return destfile
1842 assert 0 # shouldn't reach here.
1843
1844 self._set_shutil_open(_open)
1845
1846 shutil.copyfile('srcfile', 'destfile')
1847 self.assertTrue(srcfile._entered)
1848 self.assertTrue(destfile._entered)
1849 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001850 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001851 self.assertEqual(srcfile._exited_with[1].args,
1852 ('Cannot close',))
1853
Victor Stinner937ee9e2018-06-26 02:11:06 +02001854 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001855 def test_w_source_close_fails(self):
1856
1857 srcfile = self.Faux(True)
1858 destfile = self.Faux()
1859
1860 def _open(filename, mode='r'):
1861 if filename == 'srcfile':
1862 return srcfile
1863 if filename == 'destfile':
1864 return destfile
1865 assert 0 # shouldn't reach here.
1866
1867 self._set_shutil_open(_open)
1868
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001869 self.assertRaises(OSError,
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001870 shutil.copyfile, 'srcfile', 'destfile')
1871 self.assertTrue(srcfile._entered)
1872 self.assertTrue(destfile._entered)
1873 self.assertFalse(destfile._raised)
1874 self.assertTrue(srcfile._exited_with[0] is None)
1875 self.assertTrue(srcfile._raised)
1876
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001877 def test_move_dir_caseinsensitive(self):
1878 # Renames a folder to the same name
1879 # but a different case.
1880
1881 self.src_dir = tempfile.mkdtemp()
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001882 self.addCleanup(shutil.rmtree, self.src_dir, True)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001883 dst_dir = os.path.join(
1884 os.path.dirname(self.src_dir),
1885 os.path.basename(self.src_dir).upper())
1886 self.assertNotEqual(self.src_dir, dst_dir)
1887
1888 try:
1889 shutil.move(self.src_dir, dst_dir)
1890 self.assertTrue(os.path.isdir(dst_dir))
1891 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +02001892 os.rmdir(dst_dir)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001893
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001894
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07001895class TestCopyFileObj(unittest.TestCase):
1896 FILESIZE = 2 * 1024 * 1024
1897
1898 @classmethod
1899 def setUpClass(cls):
1900 write_test_file(TESTFN, cls.FILESIZE)
1901
1902 @classmethod
1903 def tearDownClass(cls):
1904 support.unlink(TESTFN)
1905 support.unlink(TESTFN2)
1906
1907 def tearDown(self):
1908 support.unlink(TESTFN2)
1909
1910 @contextlib.contextmanager
1911 def get_files(self):
1912 with open(TESTFN, "rb") as src:
1913 with open(TESTFN2, "wb") as dst:
1914 yield (src, dst)
1915
1916 def assert_files_eq(self, src, dst):
1917 with open(src, 'rb') as fsrc:
1918 with open(dst, 'rb') as fdst:
1919 self.assertEqual(fsrc.read(), fdst.read())
1920
1921 def test_content(self):
1922 with self.get_files() as (src, dst):
1923 shutil.copyfileobj(src, dst)
1924 self.assert_files_eq(TESTFN, TESTFN2)
1925
1926 def test_file_not_closed(self):
1927 with self.get_files() as (src, dst):
1928 shutil.copyfileobj(src, dst)
1929 assert not src.closed
1930 assert not dst.closed
1931
1932 def test_file_offset(self):
1933 with self.get_files() as (src, dst):
1934 shutil.copyfileobj(src, dst)
1935 self.assertEqual(src.tell(), self.FILESIZE)
1936 self.assertEqual(dst.tell(), self.FILESIZE)
1937
1938 @unittest.skipIf(os.name != 'nt', "Windows only")
1939 def test_win_impl(self):
1940 # Make sure alternate Windows implementation is called.
1941 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1942 shutil.copyfile(TESTFN, TESTFN2)
1943 assert m.called
1944
1945 # File size is 2 MiB but max buf size should be 1 MiB.
1946 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
1947
1948 # If file size < 1 MiB memoryview() length must be equal to
1949 # the actual file size.
1950 with tempfile.NamedTemporaryFile(delete=False) as f:
1951 f.write(b'foo')
1952 fname = f.name
1953 self.addCleanup(support.unlink, fname)
1954 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1955 shutil.copyfile(fname, TESTFN2)
1956 self.assertEqual(m.call_args[0][2], 3)
1957
1958 # Empty files should not rely on readinto() variant.
1959 with tempfile.NamedTemporaryFile(delete=False) as f:
1960 pass
1961 fname = f.name
1962 self.addCleanup(support.unlink, fname)
1963 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1964 shutil.copyfile(fname, TESTFN2)
1965 assert not m.called
1966 self.assert_files_eq(fname, TESTFN2)
1967
1968
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001969class _ZeroCopyFileTest(object):
1970 """Tests common to all zero-copy APIs."""
1971 FILESIZE = (10 * 1024 * 1024) # 10 MiB
1972 FILEDATA = b""
1973 PATCHPOINT = ""
1974
1975 @classmethod
1976 def setUpClass(cls):
1977 write_test_file(TESTFN, cls.FILESIZE)
1978 with open(TESTFN, 'rb') as f:
1979 cls.FILEDATA = f.read()
1980 assert len(cls.FILEDATA) == cls.FILESIZE
1981
1982 @classmethod
1983 def tearDownClass(cls):
1984 support.unlink(TESTFN)
1985
1986 def tearDown(self):
1987 support.unlink(TESTFN2)
1988
1989 @contextlib.contextmanager
1990 def get_files(self):
1991 with open(TESTFN, "rb") as src:
1992 with open(TESTFN2, "wb") as dst:
1993 yield (src, dst)
1994
1995 def zerocopy_fun(self, *args, **kwargs):
1996 raise NotImplementedError("must be implemented in subclass")
1997
1998 def reset(self):
1999 self.tearDown()
2000 self.tearDownClass()
2001 self.setUpClass()
2002 self.setUp()
2003
2004 # ---
2005
2006 def test_regular_copy(self):
2007 with self.get_files() as (src, dst):
2008 self.zerocopy_fun(src, dst)
2009 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2010 # Make sure the fallback function is not called.
2011 with self.get_files() as (src, dst):
2012 with unittest.mock.patch('shutil.copyfileobj') as m:
2013 shutil.copyfile(TESTFN, TESTFN2)
2014 assert not m.called
2015
2016 def test_same_file(self):
2017 self.addCleanup(self.reset)
2018 with self.get_files() as (src, dst):
2019 with self.assertRaises(Exception):
2020 self.zerocopy_fun(src, src)
2021 # Make sure src file is not corrupted.
2022 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2023
2024 def test_non_existent_src(self):
2025 name = tempfile.mktemp()
2026 with self.assertRaises(FileNotFoundError) as cm:
2027 shutil.copyfile(name, "new")
2028 self.assertEqual(cm.exception.filename, name)
2029
2030 def test_empty_file(self):
2031 srcname = TESTFN + 'src'
2032 dstname = TESTFN + 'dst'
2033 self.addCleanup(lambda: support.unlink(srcname))
2034 self.addCleanup(lambda: support.unlink(dstname))
2035 with open(srcname, "wb"):
2036 pass
2037
2038 with open(srcname, "rb") as src:
2039 with open(dstname, "wb") as dst:
2040 self.zerocopy_fun(src, dst)
2041
2042 self.assertEqual(read_file(dstname, binary=True), b"")
2043
2044 def test_unhandled_exception(self):
2045 with unittest.mock.patch(self.PATCHPOINT,
2046 side_effect=ZeroDivisionError):
2047 self.assertRaises(ZeroDivisionError,
2048 shutil.copyfile, TESTFN, TESTFN2)
2049
2050 def test_exception_on_first_call(self):
2051 # Emulate a case where the first call to the zero-copy
2052 # function raises an exception in which case the function is
2053 # supposed to give up immediately.
2054 with unittest.mock.patch(self.PATCHPOINT,
2055 side_effect=OSError(errno.EINVAL, "yo")):
2056 with self.get_files() as (src, dst):
2057 with self.assertRaises(_GiveupOnFastCopy):
2058 self.zerocopy_fun(src, dst)
2059
2060 def test_filesystem_full(self):
2061 # Emulate a case where filesystem is full and sendfile() fails
2062 # on first call.
2063 with unittest.mock.patch(self.PATCHPOINT,
2064 side_effect=OSError(errno.ENOSPC, "yo")):
2065 with self.get_files() as (src, dst):
2066 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2067
2068
2069@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2070class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2071 PATCHPOINT = "os.sendfile"
2072
2073 def zerocopy_fun(self, fsrc, fdst):
2074 return shutil._fastcopy_sendfile(fsrc, fdst)
2075
2076 def test_non_regular_file_src(self):
2077 with io.BytesIO(self.FILEDATA) as src:
2078 with open(TESTFN2, "wb") as dst:
2079 with self.assertRaises(_GiveupOnFastCopy):
2080 self.zerocopy_fun(src, dst)
2081 shutil.copyfileobj(src, dst)
2082
2083 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2084
2085 def test_non_regular_file_dst(self):
2086 with open(TESTFN, "rb") as src:
2087 with io.BytesIO() as dst:
2088 with self.assertRaises(_GiveupOnFastCopy):
2089 self.zerocopy_fun(src, dst)
2090 shutil.copyfileobj(src, dst)
2091 dst.seek(0)
2092 self.assertEqual(dst.read(), self.FILEDATA)
2093
2094 def test_exception_on_second_call(self):
2095 def sendfile(*args, **kwargs):
2096 if not flag:
2097 flag.append(None)
2098 return orig_sendfile(*args, **kwargs)
2099 else:
2100 raise OSError(errno.EBADF, "yo")
2101
2102 flag = []
2103 orig_sendfile = os.sendfile
2104 with unittest.mock.patch('os.sendfile', create=True,
2105 side_effect=sendfile):
2106 with self.get_files() as (src, dst):
2107 with self.assertRaises(OSError) as cm:
2108 shutil._fastcopy_sendfile(src, dst)
2109 assert flag
2110 self.assertEqual(cm.exception.errno, errno.EBADF)
2111
2112 def test_cant_get_size(self):
2113 # Emulate a case where src file size cannot be determined.
2114 # Internally bufsize will be set to a small value and
2115 # sendfile() will be called repeatedly.
2116 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2117 with self.get_files() as (src, dst):
2118 shutil._fastcopy_sendfile(src, dst)
2119 assert m.called
2120 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2121
2122 def test_small_chunks(self):
2123 # Force internal file size detection to be smaller than the
2124 # actual file size. We want to force sendfile() to be called
2125 # multiple times, also in order to emulate a src fd which gets
2126 # bigger while it is being copied.
2127 mock = unittest.mock.Mock()
2128 mock.st_size = 65536 + 1
2129 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2130 with self.get_files() as (src, dst):
2131 shutil._fastcopy_sendfile(src, dst)
2132 assert m.called
2133 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2134
2135 def test_big_chunk(self):
2136 # Force internal file size detection to be +100MB bigger than
2137 # the actual file size. Make sure sendfile() does not rely on
2138 # file size value except for (maybe) a better throughput /
2139 # performance.
2140 mock = unittest.mock.Mock()
2141 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2142 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2143 with self.get_files() as (src, dst):
2144 shutil._fastcopy_sendfile(src, dst)
2145 assert m.called
2146 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2147
2148 def test_blocksize_arg(self):
2149 with unittest.mock.patch('os.sendfile',
2150 side_effect=ZeroDivisionError) as m:
2151 self.assertRaises(ZeroDivisionError,
2152 shutil.copyfile, TESTFN, TESTFN2)
2153 blocksize = m.call_args[0][3]
2154 # Make sure file size and the block size arg passed to
2155 # sendfile() are the same.
2156 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2157 # ...unless we're dealing with a small file.
2158 support.unlink(TESTFN2)
2159 write_file(TESTFN2, b"hello", binary=True)
2160 self.addCleanup(support.unlink, TESTFN2 + '3')
2161 self.assertRaises(ZeroDivisionError,
2162 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2163 blocksize = m.call_args[0][3]
2164 self.assertEqual(blocksize, 2 ** 23)
2165
2166 def test_file2file_not_supported(self):
2167 # Emulate a case where sendfile() only support file->socket
2168 # fds. In such a case copyfile() is supposed to skip the
2169 # fast-copy attempt from then on.
2170 assert shutil._HAS_SENDFILE
2171 try:
2172 with unittest.mock.patch(
2173 self.PATCHPOINT,
2174 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2175 with self.get_files() as (src, dst):
2176 with self.assertRaises(_GiveupOnFastCopy):
2177 shutil._fastcopy_sendfile(src, dst)
2178 assert m.called
2179 assert not shutil._HAS_SENDFILE
2180
2181 with unittest.mock.patch(self.PATCHPOINT) as m:
2182 shutil.copyfile(TESTFN, TESTFN2)
2183 assert not m.called
2184 finally:
2185 shutil._HAS_SENDFILE = True
2186
2187
Victor Stinner937ee9e2018-06-26 02:11:06 +02002188@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002189class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002190 PATCHPOINT = "posix._fcopyfile"
2191
2192 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002193 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002194
2195
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002196class TermsizeTests(unittest.TestCase):
2197 def test_does_not_crash(self):
2198 """Check if get_terminal_size() returns a meaningful value.
2199
2200 There's no easy portable way to actually check the size of the
2201 terminal, so let's check if it returns something sensible instead.
2202 """
2203 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002204 self.assertGreaterEqual(size.columns, 0)
2205 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002206
2207 def test_os_environ_first(self):
2208 "Check if environment variables have precedence"
2209
2210 with support.EnvironmentVarGuard() as env:
2211 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002212 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002213 size = shutil.get_terminal_size()
2214 self.assertEqual(size.columns, 777)
2215
2216 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002217 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002218 env['LINES'] = '888'
2219 size = shutil.get_terminal_size()
2220 self.assertEqual(size.lines, 888)
2221
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002222 def test_bad_environ(self):
2223 with support.EnvironmentVarGuard() as env:
2224 env['COLUMNS'] = 'xxx'
2225 env['LINES'] = 'yyy'
2226 size = shutil.get_terminal_size()
2227 self.assertGreaterEqual(size.columns, 0)
2228 self.assertGreaterEqual(size.lines, 0)
2229
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002230 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002231 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2232 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002233 def test_stty_match(self):
2234 """Check if stty returns the same results ignoring env
2235
2236 This test will fail if stdin and stdout are connected to
2237 different terminals with different sizes. Nevertheless, such
2238 situations should be pretty rare.
2239 """
2240 try:
2241 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002242 except (FileNotFoundError, PermissionError,
2243 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002244 self.skipTest("stty invocation failed")
2245 expected = (int(size[1]), int(size[0])) # reversed order
2246
2247 with support.EnvironmentVarGuard() as env:
2248 del env['LINES']
2249 del env['COLUMNS']
2250 actual = shutil.get_terminal_size()
2251
2252 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002253
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002254 def test_fallback(self):
2255 with support.EnvironmentVarGuard() as env:
2256 del env['LINES']
2257 del env['COLUMNS']
2258
2259 # sys.__stdout__ has no fileno()
2260 with support.swap_attr(sys, '__stdout__', None):
2261 size = shutil.get_terminal_size(fallback=(10, 20))
2262 self.assertEqual(size.columns, 10)
2263 self.assertEqual(size.lines, 20)
2264
2265 # sys.__stdout__ is not a terminal on Unix
2266 # or fileno() not in (0, 1, 2) on Windows
2267 with open(os.devnull, 'w') as f, \
2268 support.swap_attr(sys, '__stdout__', f):
2269 size = shutil.get_terminal_size(fallback=(30, 40))
2270 self.assertEqual(size.columns, 30)
2271 self.assertEqual(size.lines, 40)
2272
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002273
Berker Peksag8083cd62014-11-01 11:04:06 +02002274class PublicAPITests(unittest.TestCase):
2275 """Ensures that the correct values are exposed in the public API."""
2276
2277 def test_module_all_attribute(self):
2278 self.assertTrue(hasattr(shutil, '__all__'))
2279 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2280 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2281 'SpecialFileError', 'ExecError', 'make_archive',
2282 'get_archive_formats', 'register_archive_format',
2283 'unregister_archive_format', 'get_unpack_formats',
2284 'register_unpack_format', 'unregister_unpack_format',
2285 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2286 'get_terminal_size', 'SameFileError']
2287 if hasattr(os, 'statvfs') or os.name == 'nt':
2288 target_api.append('disk_usage')
2289 self.assertEqual(set(shutil.__all__), set(target_api))
2290
2291
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002292if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002293 unittest.main()