blob: 6f22e5378ff22cb16e5566f98d3c02af534fabc4 [file] [log] [blame]
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00001# Copyright (C) 2003 Python Software Foundation
2
3import unittest
Berker Peksag884afd92014-12-10 02:50:32 +02004import unittest.mock
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00005import shutil
6import tempfile
Johannes Gijsbers8e6f2de2004-11-23 09:27:27 +00007import sys
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +00008import stat
Brett Cannon1c3fa182004-06-19 21:11:35 +00009import os
10import os.path
Antoine Pitrouc041ab62012-01-02 19:18:02 +010011import errno
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040012import functools
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -070013import pathlib
Antoine Pitroubcf2b592012-02-08 23:28:36 +010014import subprocess
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020015import random
16import string
17import contextlib
18import io
Serhiy Storchaka527ef072015-09-06 18:33:19 +030019from shutil import (make_archive,
Tarek Ziadé396fad72010-02-23 05:30:31 +000020 register_archive_format, unregister_archive_format,
Tarek Ziadé6ac91722010-04-28 17:51:36 +000021 get_archive_formats, Error, unpack_archive,
22 register_unpack_format, RegistryError,
Hynek Schlawack48653762012-10-07 12:49:58 +020023 unregister_unpack_format, get_unpack_formats,
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020024 SameFileError, _GiveupOnFastCopy)
Tarek Ziadé396fad72010-02-23 05:30:31 +000025import tarfile
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +020026import zipfile
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020027try:
28 import posix
29except ImportError:
30 posix = None
Tarek Ziadé396fad72010-02-23 05:30:31 +000031
32from test import support
Serhiy Storchakab21d1552018-03-02 11:53:51 +020033from test.support import TESTFN, FakePath
Serhiy Storchaka11213772014-08-06 18:50:19 +030034
Antoine Pitrou7fff0962009-05-01 21:09:44 +000035TESTFN2 = TESTFN + "2"
Victor Stinner937ee9e2018-06-26 02:11:06 +020036MACOS = sys.platform.startswith("darwin")
Tarek Ziadé396fad72010-02-23 05:30:31 +000037try:
38 import grp
39 import pwd
40 UID_GID_SUPPORT = True
41except ImportError:
42 UID_GID_SUPPORT = False
43
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040044def _fake_rename(*args, **kwargs):
45 # Pretend the destination path is on a different filesystem.
Antoine Pitrouc041ab62012-01-02 19:18:02 +010046 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
Nick Coghlan8ed3cf32011-03-16 14:05:35 -040047
48def mock_rename(func):
49 @functools.wraps(func)
50 def wrap(*args, **kwargs):
51 try:
52 builtin_rename = os.rename
53 os.rename = _fake_rename
54 return func(*args, **kwargs)
55 finally:
56 os.rename = builtin_rename
57 return wrap
58
Éric Araujoa7e33a12011-08-12 19:51:35 +020059def write_file(path, content, binary=False):
60 """Write *content* to a file located at *path*.
61
62 If *path* is a tuple instead of a string, os.path.join will be used to
63 make a path. If *binary* is true, the file will be opened in binary
64 mode.
65 """
66 if isinstance(path, tuple):
67 path = os.path.join(*path)
68 with open(path, 'wb' if binary else 'w') as fp:
69 fp.write(content)
70
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +020071def write_test_file(path, size):
72 """Create a test file with an arbitrary size and random text content."""
73 def chunks(total, step):
74 assert total >= step
75 while total > step:
76 yield step
77 total -= step
78 if total:
79 yield total
80
81 bufsize = min(size, 8192)
82 chunk = b"".join([random.choice(string.ascii_letters).encode()
83 for i in range(bufsize)])
84 with open(path, 'wb') as f:
85 for csize in chunks(size, bufsize):
86 f.write(chunk)
87 assert os.path.getsize(path) == size
88
Éric Araujoa7e33a12011-08-12 19:51:35 +020089def read_file(path, binary=False):
90 """Return contents from a file located at *path*.
91
92 If *path* is a tuple instead of a string, os.path.join will be used to
93 make a path. If *binary* is true, the file will be opened in binary
94 mode.
95 """
96 if isinstance(path, tuple):
97 path = os.path.join(*path)
98 with open(path, 'rb' if binary else 'r') as fp:
99 return fp.read()
100
Serhiy Storchaka527ef072015-09-06 18:33:19 +0300101def rlistdir(path):
102 res = []
103 for name in sorted(os.listdir(path)):
104 p = os.path.join(path, name)
105 if os.path.isdir(p) and not os.path.islink(p):
106 res.append(name + '/')
107 for n in rlistdir(p):
108 res.append(name + '/' + n)
109 else:
110 res.append(name)
111 return res
112
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +0200113def supports_file2file_sendfile():
114 # ...apparently Linux and Solaris are the only ones
115 if not hasattr(os, "sendfile"):
116 return False
117 srcname = None
118 dstname = None
119 try:
120 with tempfile.NamedTemporaryFile("wb", delete=False) as f:
121 srcname = f.name
122 f.write(b"0123456789")
123
124 with open(srcname, "rb") as src:
125 with tempfile.NamedTemporaryFile("wb", delete=False) as dst:
126 dstname = f.name
127 infd = src.fileno()
128 outfd = dst.fileno()
129 try:
130 os.sendfile(outfd, infd, 0, 2)
131 except OSError:
132 return False
133 else:
134 return True
135 finally:
136 if srcname is not None:
137 support.unlink(srcname)
138 if dstname is not None:
139 support.unlink(dstname)
140
141
142SUPPORTS_SENDFILE = supports_file2file_sendfile()
143
Éric Araujoa7e33a12011-08-12 19:51:35 +0200144
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000145class TestShutil(unittest.TestCase):
Tarek Ziadé396fad72010-02-23 05:30:31 +0000146
147 def setUp(self):
148 super(TestShutil, self).setUp()
149 self.tempdirs = []
150
151 def tearDown(self):
152 super(TestShutil, self).tearDown()
153 while self.tempdirs:
154 d = self.tempdirs.pop()
155 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
156
Tarek Ziadé396fad72010-02-23 05:30:31 +0000157
158 def mkdtemp(self):
159 """Create a temporary directory that will be cleaned up.
160
161 Returns the path of the directory.
162 """
163 d = tempfile.mkdtemp()
164 self.tempdirs.append(d)
165 return d
Tarek Ziadé5340db32010-04-19 22:30:51 +0000166
Hynek Schlawack3b527782012-06-25 13:27:31 +0200167 def test_rmtree_works_on_bytes(self):
168 tmp = self.mkdtemp()
169 victim = os.path.join(tmp, 'killme')
170 os.mkdir(victim)
171 write_file(os.path.join(victim, 'somefile'), 'foo')
172 victim = os.fsencode(victim)
173 self.assertIsInstance(victim, bytes)
Steve Dowere58571b2016-09-08 11:11:13 -0700174 shutil.rmtree(victim)
Hynek Schlawack3b527782012-06-25 13:27:31 +0200175
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200176 @support.skip_unless_symlink
177 def test_rmtree_fails_on_symlink(self):
178 tmp = self.mkdtemp()
179 dir_ = os.path.join(tmp, 'dir')
180 os.mkdir(dir_)
181 link = os.path.join(tmp, 'link')
182 os.symlink(dir_, link)
183 self.assertRaises(OSError, shutil.rmtree, link)
184 self.assertTrue(os.path.exists(dir_))
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100185 self.assertTrue(os.path.lexists(link))
186 errors = []
187 def onerror(*args):
188 errors.append(args)
189 shutil.rmtree(link, onerror=onerror)
190 self.assertEqual(len(errors), 1)
191 self.assertIs(errors[0][0], os.path.islink)
192 self.assertEqual(errors[0][1], link)
193 self.assertIsInstance(errors[0][2][1], OSError)
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200194
195 @support.skip_unless_symlink
196 def test_rmtree_works_on_symlinks(self):
197 tmp = self.mkdtemp()
198 dir1 = os.path.join(tmp, 'dir1')
199 dir2 = os.path.join(dir1, 'dir2')
200 dir3 = os.path.join(tmp, 'dir3')
201 for d in dir1, dir2, dir3:
202 os.mkdir(d)
203 file1 = os.path.join(tmp, 'file1')
204 write_file(file1, 'foo')
205 link1 = os.path.join(dir1, 'link1')
206 os.symlink(dir2, link1)
207 link2 = os.path.join(dir1, 'link2')
208 os.symlink(dir3, link2)
209 link3 = os.path.join(dir1, 'link3')
210 os.symlink(file1, link3)
211 # make sure symlinks are removed but not followed
212 shutil.rmtree(dir1)
213 self.assertFalse(os.path.exists(dir1))
214 self.assertTrue(os.path.exists(dir3))
215 self.assertTrue(os.path.exists(file1))
216
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000217 def test_rmtree_errors(self):
218 # filename is guaranteed not to exist
219 filename = tempfile.mktemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100220 self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
221 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100222 shutil.rmtree(filename, ignore_errors=True)
223
224 # existing file
225 tmpdir = self.mkdtemp()
Hynek Schlawackb5501102012-12-10 09:11:25 +0100226 write_file((tmpdir, "tstfile"), "")
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100227 filename = os.path.join(tmpdir, "tstfile")
Hynek Schlawackb5501102012-12-10 09:11:25 +0100228 with self.assertRaises(NotADirectoryError) as cm:
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100229 shutil.rmtree(filename)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100230 # The reason for this rather odd construct is that Windows sprinkles
231 # a \*.* at the end of file names. But only sometimes on some buildbots
232 possible_args = [filename, os.path.join(filename, '*.*')]
233 self.assertIn(cm.exception.filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100234 self.assertTrue(os.path.exists(filename))
Hynek Schlawackb5501102012-12-10 09:11:25 +0100235 # test that ignore_errors option is honored
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100236 shutil.rmtree(filename, ignore_errors=True)
237 self.assertTrue(os.path.exists(filename))
238 errors = []
239 def onerror(*args):
240 errors.append(args)
241 shutil.rmtree(filename, onerror=onerror)
242 self.assertEqual(len(errors), 2)
Serhiy Storchakad4d79bc2017-11-04 14:16:35 +0200243 self.assertIs(errors[0][0], os.scandir)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100244 self.assertEqual(errors[0][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100245 self.assertIsInstance(errors[0][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100246 self.assertIn(errors[0][2][1].filename, possible_args)
Hynek Schlawackd16eacb2012-12-10 09:00:09 +0100247 self.assertIs(errors[1][0], os.rmdir)
248 self.assertEqual(errors[1][1], filename)
Hynek Schlawackb5501102012-12-10 09:11:25 +0100249 self.assertIsInstance(errors[1][2][1], NotADirectoryError)
Hynek Schlawack87f9b462012-12-10 16:29:57 +0100250 self.assertIn(errors[1][2][1].filename, possible_args)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000251
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000252
Serhiy Storchaka43767632013-11-03 21:31:38 +0200253 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
254 @unittest.skipIf(sys.platform[:6] == 'cygwin',
255 "This test can't be run on Cygwin (issue #1071513).")
256 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
257 "This test can't be run reliably as root (issue #1076467).")
258 def test_on_error(self):
259 self.errorState = 0
260 os.mkdir(TESTFN)
261 self.addCleanup(shutil.rmtree, TESTFN)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200262
Serhiy Storchaka43767632013-11-03 21:31:38 +0200263 self.child_file_path = os.path.join(TESTFN, 'a')
264 self.child_dir_path = os.path.join(TESTFN, 'b')
265 support.create_empty_file(self.child_file_path)
266 os.mkdir(self.child_dir_path)
267 old_dir_mode = os.stat(TESTFN).st_mode
268 old_child_file_mode = os.stat(self.child_file_path).st_mode
269 old_child_dir_mode = os.stat(self.child_dir_path).st_mode
270 # Make unwritable.
271 new_mode = stat.S_IREAD|stat.S_IEXEC
272 os.chmod(self.child_file_path, new_mode)
273 os.chmod(self.child_dir_path, new_mode)
274 os.chmod(TESTFN, new_mode)
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000275
Serhiy Storchaka43767632013-11-03 21:31:38 +0200276 self.addCleanup(os.chmod, TESTFN, old_dir_mode)
277 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
278 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
Antoine Pitrou2f8a75c2012-06-23 21:28:15 +0200279
Serhiy Storchaka43767632013-11-03 21:31:38 +0200280 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
281 # Test whether onerror has actually been called.
282 self.assertEqual(self.errorState, 3,
283 "Expected call to onerror function did not happen.")
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000284
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000285 def check_args_to_onerror(self, func, arg, exc):
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000286 # test_rmtree_errors deliberately runs rmtree
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200287 # on a directory that is chmod 500, which will fail.
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000288 # This function is run when shutil.rmtree fails.
289 # 99.9% of the time it initially fails to remove
290 # a file in the directory, so the first time through
291 # func is os.remove.
292 # However, some Linux machines running ZFS on
293 # FUSE experienced a failure earlier in the process
294 # at os.listdir. The first failure may legally
295 # be either.
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200296 if self.errorState < 2:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200297 if func is os.unlink:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200298 self.assertEqual(arg, self.child_file_path)
299 elif func is os.rmdir:
300 self.assertEqual(arg, self.child_dir_path)
Benjamin Peterson25c95f12009-05-08 20:42:26 +0000301 else:
Antoine Pitrou4f6e3f72012-06-23 22:05:11 +0200302 self.assertIs(func, os.listdir)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200303 self.assertIn(arg, [TESTFN, self.child_dir_path])
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000304 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200305 self.errorState += 1
Johannes Gijsbersef5ffc42004-10-31 12:05:31 +0000306 else:
307 self.assertEqual(func, os.rmdir)
308 self.assertEqual(arg, TESTFN)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000309 self.assertTrue(issubclass(exc[0], OSError))
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200310 self.errorState = 3
311
312 def test_rmtree_does_not_choke_on_failing_lstat(self):
313 try:
314 orig_lstat = os.lstat
Hynek Schlawacka75cd1c2012-06-28 12:07:29 +0200315 def raiser(fn, *args, **kwargs):
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200316 if fn != TESTFN:
317 raise OSError()
318 else:
319 return orig_lstat(fn)
320 os.lstat = raiser
321
322 os.mkdir(TESTFN)
323 write_file((TESTFN, 'foo'), 'foo')
324 shutil.rmtree(TESTFN)
325 finally:
326 os.lstat = orig_lstat
Barry Warsaw7fc2cca2003-01-24 17:34:13 +0000327
Antoine Pitrou78091e62011-12-29 18:54:15 +0100328 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
329 @support.skip_unless_symlink
330 def test_copymode_follow_symlinks(self):
331 tmp_dir = self.mkdtemp()
332 src = os.path.join(tmp_dir, 'foo')
333 dst = os.path.join(tmp_dir, 'bar')
334 src_link = os.path.join(tmp_dir, 'baz')
335 dst_link = os.path.join(tmp_dir, 'quux')
336 write_file(src, 'foo')
337 write_file(dst, 'foo')
338 os.symlink(src, src_link)
339 os.symlink(dst, dst_link)
340 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
341 # file to file
342 os.chmod(dst, stat.S_IRWXO)
343 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
344 shutil.copymode(src, dst)
345 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou3f48ac92014-01-01 02:50:45 +0100346 # On Windows, os.chmod does not follow symlinks (issue #15411)
347 if os.name != 'nt':
348 # follow src link
349 os.chmod(dst, stat.S_IRWXO)
350 shutil.copymode(src_link, dst)
351 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
352 # follow dst link
353 os.chmod(dst, stat.S_IRWXO)
354 shutil.copymode(src, dst_link)
355 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
356 # follow both links
357 os.chmod(dst, stat.S_IRWXO)
358 shutil.copymode(src_link, dst_link)
359 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100360
361 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
362 @support.skip_unless_symlink
363 def test_copymode_symlink_to_symlink(self):
364 tmp_dir = self.mkdtemp()
365 src = os.path.join(tmp_dir, 'foo')
366 dst = os.path.join(tmp_dir, 'bar')
367 src_link = os.path.join(tmp_dir, 'baz')
368 dst_link = os.path.join(tmp_dir, 'quux')
369 write_file(src, 'foo')
370 write_file(dst, 'foo')
371 os.symlink(src, src_link)
372 os.symlink(dst, dst_link)
373 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
374 os.chmod(dst, stat.S_IRWXU)
375 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
376 # link to link
377 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700378 shutil.copymode(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100379 self.assertEqual(os.lstat(src_link).st_mode,
380 os.lstat(dst_link).st_mode)
381 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
382 # src link - use chmod
383 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700384 shutil.copymode(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100385 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
386 # dst link - use chmod
387 os.lchmod(dst_link, stat.S_IRWXO)
Larry Hastingsb4038062012-07-15 10:57:38 -0700388 shutil.copymode(src, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100389 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
390
391 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
392 @support.skip_unless_symlink
393 def test_copymode_symlink_to_symlink_wo_lchmod(self):
394 tmp_dir = self.mkdtemp()
395 src = os.path.join(tmp_dir, 'foo')
396 dst = os.path.join(tmp_dir, 'bar')
397 src_link = os.path.join(tmp_dir, 'baz')
398 dst_link = os.path.join(tmp_dir, 'quux')
399 write_file(src, 'foo')
400 write_file(dst, 'foo')
401 os.symlink(src, src_link)
402 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700403 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail
Antoine Pitrou78091e62011-12-29 18:54:15 +0100404
405 @support.skip_unless_symlink
406 def test_copystat_symlinks(self):
407 tmp_dir = self.mkdtemp()
408 src = os.path.join(tmp_dir, 'foo')
409 dst = os.path.join(tmp_dir, 'bar')
410 src_link = os.path.join(tmp_dir, 'baz')
411 dst_link = os.path.join(tmp_dir, 'qux')
412 write_file(src, 'foo')
413 src_stat = os.stat(src)
414 os.utime(src, (src_stat.st_atime,
415 src_stat.st_mtime - 42.0)) # ensure different mtimes
416 write_file(dst, 'bar')
417 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
418 os.symlink(src, src_link)
419 os.symlink(dst, dst_link)
420 if hasattr(os, 'lchmod'):
421 os.lchmod(src_link, stat.S_IRWXO)
422 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
423 os.lchflags(src_link, stat.UF_NODUMP)
424 src_link_stat = os.lstat(src_link)
425 # follow
426 if hasattr(os, 'lchmod'):
Larry Hastingsb4038062012-07-15 10:57:38 -0700427 shutil.copystat(src_link, dst_link, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100428 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
429 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700430 shutil.copystat(src_link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100431 dst_link_stat = os.lstat(dst_link)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700432 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100433 for attr in 'st_atime', 'st_mtime':
434 # The modification times may be truncated in the new file.
435 self.assertLessEqual(getattr(src_link_stat, attr),
436 getattr(dst_link_stat, attr) + 1)
437 if hasattr(os, 'lchmod'):
438 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
439 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
440 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
441 # tell to follow but dst is not a link
Larry Hastingsb4038062012-07-15 10:57:38 -0700442 shutil.copystat(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100443 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
444 00000.1)
445
Ned Deilybaf75712012-05-10 17:05:19 -0700446 @unittest.skipUnless(hasattr(os, 'chflags') and
447 hasattr(errno, 'EOPNOTSUPP') and
448 hasattr(errno, 'ENOTSUP'),
449 "requires os.chflags, EOPNOTSUPP & ENOTSUP")
450 def test_copystat_handles_harmless_chflags_errors(self):
451 tmpdir = self.mkdtemp()
452 file1 = os.path.join(tmpdir, 'file1')
453 file2 = os.path.join(tmpdir, 'file2')
454 write_file(file1, 'xxx')
455 write_file(file2, 'xxx')
456
457 def make_chflags_raiser(err):
458 ex = OSError()
459
Larry Hastings90867a52012-06-22 17:01:41 -0700460 def _chflags_raiser(path, flags, *, follow_symlinks=True):
Ned Deilybaf75712012-05-10 17:05:19 -0700461 ex.errno = err
462 raise ex
463 return _chflags_raiser
464 old_chflags = os.chflags
465 try:
466 for err in errno.EOPNOTSUPP, errno.ENOTSUP:
467 os.chflags = make_chflags_raiser(err)
468 shutil.copystat(file1, file2)
469 # assert others errors break it
470 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
471 self.assertRaises(OSError, shutil.copystat, file1, file2)
472 finally:
473 os.chflags = old_chflags
474
Antoine Pitrou424246f2012-05-12 19:02:01 +0200475 @support.skip_unless_xattr
476 def test_copyxattr(self):
477 tmp_dir = self.mkdtemp()
478 src = os.path.join(tmp_dir, 'foo')
479 write_file(src, 'foo')
480 dst = os.path.join(tmp_dir, 'bar')
481 write_file(dst, 'bar')
482
483 # no xattr == no problem
484 shutil._copyxattr(src, dst)
485 # common case
486 os.setxattr(src, 'user.foo', b'42')
487 os.setxattr(src, 'user.bar', b'43')
488 shutil._copyxattr(src, dst)
Gregory P. Smith1093bf22014-01-17 12:01:22 -0800489 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200490 self.assertEqual(
491 os.getxattr(src, 'user.foo'),
492 os.getxattr(dst, 'user.foo'))
493 # check errors don't affect other attrs
494 os.remove(dst)
495 write_file(dst, 'bar')
496 os_error = OSError(errno.EPERM, 'EPERM')
497
Larry Hastings9cf065c2012-06-22 16:30:09 -0700498 def _raise_on_user_foo(fname, attr, val, **kwargs):
Antoine Pitrou424246f2012-05-12 19:02:01 +0200499 if attr == 'user.foo':
500 raise os_error
501 else:
Larry Hastings9cf065c2012-06-22 16:30:09 -0700502 orig_setxattr(fname, attr, val, **kwargs)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200503 try:
504 orig_setxattr = os.setxattr
505 os.setxattr = _raise_on_user_foo
506 shutil._copyxattr(src, dst)
Antoine Pitrou61597d32012-05-12 23:37:35 +0200507 self.assertIn('user.bar', os.listxattr(dst))
Antoine Pitrou424246f2012-05-12 19:02:01 +0200508 finally:
509 os.setxattr = orig_setxattr
Hynek Schlawack0beab052013-02-05 08:22:44 +0100510 # the source filesystem not supporting xattrs should be ok, too.
511 def _raise_on_src(fname, *, follow_symlinks=True):
512 if fname == src:
513 raise OSError(errno.ENOTSUP, 'Operation not supported')
514 return orig_listxattr(fname, follow_symlinks=follow_symlinks)
515 try:
516 orig_listxattr = os.listxattr
517 os.listxattr = _raise_on_src
518 shutil._copyxattr(src, dst)
519 finally:
520 os.listxattr = orig_listxattr
Antoine Pitrou424246f2012-05-12 19:02:01 +0200521
Larry Hastingsad5ae042012-07-14 17:55:11 -0700522 # test that shutil.copystat copies xattrs
523 src = os.path.join(tmp_dir, 'the_original')
524 write_file(src, src)
525 os.setxattr(src, 'user.the_value', b'fiddly')
526 dst = os.path.join(tmp_dir, 'the_copy')
527 write_file(dst, dst)
528 shutil.copystat(src, dst)
Hynek Schlawackc2d481f2012-07-16 17:11:10 +0200529 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
Larry Hastingsad5ae042012-07-14 17:55:11 -0700530
Antoine Pitrou424246f2012-05-12 19:02:01 +0200531 @support.skip_unless_symlink
532 @support.skip_unless_xattr
533 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0,
534 'root privileges required')
535 def test_copyxattr_symlinks(self):
536 # On Linux, it's only possible to access non-user xattr for symlinks;
537 # which in turn require root privileges. This test should be expanded
538 # as soon as other platforms gain support for extended attributes.
539 tmp_dir = self.mkdtemp()
540 src = os.path.join(tmp_dir, 'foo')
541 src_link = os.path.join(tmp_dir, 'baz')
542 write_file(src, 'foo')
543 os.symlink(src, src_link)
544 os.setxattr(src, 'trusted.foo', b'42')
Larry Hastings9cf065c2012-06-22 16:30:09 -0700545 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200546 dst = os.path.join(tmp_dir, 'bar')
547 dst_link = os.path.join(tmp_dir, 'qux')
548 write_file(dst, 'bar')
549 os.symlink(dst, dst_link)
Larry Hastingsb4038062012-07-15 10:57:38 -0700550 shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700551 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
Antoine Pitrou424246f2012-05-12 19:02:01 +0200552 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
Larry Hastingsb4038062012-07-15 10:57:38 -0700553 shutil._copyxattr(src_link, dst, follow_symlinks=False)
Antoine Pitrou424246f2012-05-12 19:02:01 +0200554 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
555
Antoine Pitrou78091e62011-12-29 18:54:15 +0100556 @support.skip_unless_symlink
557 def test_copy_symlinks(self):
558 tmp_dir = self.mkdtemp()
559 src = os.path.join(tmp_dir, 'foo')
560 dst = os.path.join(tmp_dir, 'bar')
561 src_link = os.path.join(tmp_dir, 'baz')
562 write_file(src, 'foo')
563 os.symlink(src, src_link)
564 if hasattr(os, 'lchmod'):
565 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
566 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700567 shutil.copy(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100568 self.assertFalse(os.path.islink(dst))
569 self.assertEqual(read_file(src), read_file(dst))
570 os.remove(dst)
571 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700572 shutil.copy(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100573 self.assertTrue(os.path.islink(dst))
574 self.assertEqual(os.readlink(dst), os.readlink(src_link))
575 if hasattr(os, 'lchmod'):
576 self.assertEqual(os.lstat(src_link).st_mode,
577 os.lstat(dst).st_mode)
578
579 @support.skip_unless_symlink
580 def test_copy2_symlinks(self):
581 tmp_dir = self.mkdtemp()
582 src = os.path.join(tmp_dir, 'foo')
583 dst = os.path.join(tmp_dir, 'bar')
584 src_link = os.path.join(tmp_dir, 'baz')
585 write_file(src, 'foo')
586 os.symlink(src, src_link)
587 if hasattr(os, 'lchmod'):
588 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
589 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
590 os.lchflags(src_link, stat.UF_NODUMP)
591 src_stat = os.stat(src)
592 src_link_stat = os.lstat(src_link)
593 # follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700594 shutil.copy2(src_link, dst, follow_symlinks=True)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100595 self.assertFalse(os.path.islink(dst))
596 self.assertEqual(read_file(src), read_file(dst))
597 os.remove(dst)
598 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700599 shutil.copy2(src_link, dst, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100600 self.assertTrue(os.path.islink(dst))
601 self.assertEqual(os.readlink(dst), os.readlink(src_link))
602 dst_stat = os.lstat(dst)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700603 if os.utime in os.supports_follow_symlinks:
Antoine Pitrou78091e62011-12-29 18:54:15 +0100604 for attr in 'st_atime', 'st_mtime':
605 # The modification times may be truncated in the new file.
606 self.assertLessEqual(getattr(src_link_stat, attr),
607 getattr(dst_stat, attr) + 1)
608 if hasattr(os, 'lchmod'):
609 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
610 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
611 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
612 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
613
Antoine Pitrou424246f2012-05-12 19:02:01 +0200614 @support.skip_unless_xattr
615 def test_copy2_xattr(self):
616 tmp_dir = self.mkdtemp()
617 src = os.path.join(tmp_dir, 'foo')
618 dst = os.path.join(tmp_dir, 'bar')
619 write_file(src, 'foo')
620 os.setxattr(src, 'user.foo', b'42')
621 shutil.copy2(src, dst)
622 self.assertEqual(
623 os.getxattr(src, 'user.foo'),
624 os.getxattr(dst, 'user.foo'))
625 os.remove(dst)
626
Antoine Pitrou78091e62011-12-29 18:54:15 +0100627 @support.skip_unless_symlink
628 def test_copyfile_symlinks(self):
629 tmp_dir = self.mkdtemp()
630 src = os.path.join(tmp_dir, 'src')
631 dst = os.path.join(tmp_dir, 'dst')
632 dst_link = os.path.join(tmp_dir, 'dst_link')
633 link = os.path.join(tmp_dir, 'link')
634 write_file(src, 'foo')
635 os.symlink(src, link)
636 # don't follow
Larry Hastingsb4038062012-07-15 10:57:38 -0700637 shutil.copyfile(link, dst_link, follow_symlinks=False)
Antoine Pitrou78091e62011-12-29 18:54:15 +0100638 self.assertTrue(os.path.islink(dst_link))
639 self.assertEqual(os.readlink(link), os.readlink(dst_link))
640 # follow
641 shutil.copyfile(link, dst)
642 self.assertFalse(os.path.islink(dst))
643
Hynek Schlawack2100b422012-06-23 20:28:32 +0200644 def test_rmtree_uses_safe_fd_version_if_available(self):
Hynek Schlawackd0f6e0a2012-06-29 08:28:20 +0200645 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
646 os.supports_dir_fd and
647 os.listdir in os.supports_fd and
648 os.stat in os.supports_follow_symlinks)
649 if _use_fd_functions:
Hynek Schlawack2100b422012-06-23 20:28:32 +0200650 self.assertTrue(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000651 self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200652 tmp_dir = self.mkdtemp()
653 d = os.path.join(tmp_dir, 'a')
654 os.mkdir(d)
655 try:
656 real_rmtree = shutil._rmtree_safe_fd
657 class Called(Exception): pass
658 def _raiser(*args, **kwargs):
659 raise Called
660 shutil._rmtree_safe_fd = _raiser
661 self.assertRaises(Called, shutil.rmtree, d)
662 finally:
663 shutil._rmtree_safe_fd = real_rmtree
664 else:
665 self.assertFalse(shutil._use_fd_functions)
Nick Coghlan5b0eca12012-06-24 16:43:06 +1000666 self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
Hynek Schlawack2100b422012-06-23 20:28:32 +0200667
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000668 def test_rmtree_dont_delete_file(self):
669 # When called on a file instead of a directory, don't delete it.
670 handle, path = tempfile.mkstemp()
Victor Stinnerbf816222011-06-30 23:25:47 +0200671 os.close(handle)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200672 self.assertRaises(NotADirectoryError, shutil.rmtree, path)
Johannes Gijsbersd60e92a2004-09-11 21:26:21 +0000673 os.remove(path)
674
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000675 def test_copytree_simple(self):
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000676 src_dir = tempfile.mkdtemp()
677 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200678 self.addCleanup(shutil.rmtree, src_dir)
679 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
680 write_file((src_dir, 'test.txt'), '123')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000681 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200682 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000683
Éric Araujoa7e33a12011-08-12 19:51:35 +0200684 shutil.copytree(src_dir, dst_dir)
685 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
686 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
687 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
688 'test.txt')))
689 actual = read_file((dst_dir, 'test.txt'))
690 self.assertEqual(actual, '123')
691 actual = read_file((dst_dir, 'test_dir', 'test.txt'))
692 self.assertEqual(actual, '456')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000693
jab9e00d9e2018-12-28 13:03:40 -0500694 def test_copytree_dirs_exist_ok(self):
695 src_dir = tempfile.mkdtemp()
696 dst_dir = tempfile.mkdtemp()
697 self.addCleanup(shutil.rmtree, src_dir)
698 self.addCleanup(shutil.rmtree, dst_dir)
699
700 write_file((src_dir, 'nonexisting.txt'), '123')
701 os.mkdir(os.path.join(src_dir, 'existing_dir'))
702 os.mkdir(os.path.join(dst_dir, 'existing_dir'))
703 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
704 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
705
706 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
707 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
708 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
709 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
710 'existing.txt')))
711 actual = read_file((dst_dir, 'nonexisting.txt'))
712 self.assertEqual(actual, '123')
713 actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
714 self.assertEqual(actual, 'has been replaced')
715
716 with self.assertRaises(FileExistsError):
717 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
718
Antoine Pitrou78091e62011-12-29 18:54:15 +0100719 @support.skip_unless_symlink
720 def test_copytree_symlinks(self):
721 tmp_dir = self.mkdtemp()
722 src_dir = os.path.join(tmp_dir, 'src')
723 dst_dir = os.path.join(tmp_dir, 'dst')
724 sub_dir = os.path.join(src_dir, 'sub')
725 os.mkdir(src_dir)
726 os.mkdir(sub_dir)
727 write_file((src_dir, 'file.txt'), 'foo')
728 src_link = os.path.join(sub_dir, 'link')
729 dst_link = os.path.join(dst_dir, 'sub/link')
730 os.symlink(os.path.join(src_dir, 'file.txt'),
731 src_link)
732 if hasattr(os, 'lchmod'):
733 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
734 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
735 os.lchflags(src_link, stat.UF_NODUMP)
736 src_stat = os.lstat(src_link)
737 shutil.copytree(src_dir, dst_dir, symlinks=True)
738 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
739 self.assertEqual(os.readlink(os.path.join(dst_dir, 'sub', 'link')),
740 os.path.join(src_dir, 'file.txt'))
741 dst_stat = os.lstat(dst_link)
742 if hasattr(os, 'lchmod'):
743 self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
744 if hasattr(os, 'lchflags'):
745 self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
746
Georg Brandl2ee470f2008-07-16 12:55:28 +0000747 def test_copytree_with_exclude(self):
Georg Brandl2ee470f2008-07-16 12:55:28 +0000748 # creating data
749 join = os.path.join
750 exists = os.path.exists
751 src_dir = tempfile.mkdtemp()
Georg Brandl2ee470f2008-07-16 12:55:28 +0000752 try:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000753 dst_dir = join(tempfile.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200754 write_file((src_dir, 'test.txt'), '123')
755 write_file((src_dir, 'test.tmp'), '123')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000756 os.mkdir(join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200757 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000758 os.mkdir(join(src_dir, 'test_dir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200759 write_file((src_dir, 'test_dir2', 'test.txt'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000760 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
761 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200762 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
763 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000764
765 # testing glob-like patterns
766 try:
767 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
768 shutil.copytree(src_dir, dst_dir, ignore=patterns)
769 # checking the result: some elements should not be copied
770 self.assertTrue(exists(join(dst_dir, 'test.txt')))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200771 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
772 self.assertFalse(exists(join(dst_dir, 'test_dir2')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000773 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200774 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000775 try:
776 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
777 shutil.copytree(src_dir, dst_dir, ignore=patterns)
778 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200779 self.assertFalse(exists(join(dst_dir, 'test.tmp')))
780 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
781 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000782 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200783 shutil.rmtree(dst_dir)
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000784
785 # testing callable-style
786 try:
787 def _filter(src, names):
788 res = []
789 for name in names:
790 path = os.path.join(src, name)
791
792 if (os.path.isdir(path) and
793 path.split()[-1] == 'subdir'):
794 res.append(name)
795 elif os.path.splitext(path)[-1] in ('.py'):
796 res.append(name)
797 return res
798
799 shutil.copytree(src_dir, dst_dir, ignore=_filter)
800
801 # checking the result: some elements should not be copied
Éric Araujoa7e33a12011-08-12 19:51:35 +0200802 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
803 'test.py')))
804 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000805
806 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +0200807 shutil.rmtree(dst_dir)
Georg Brandl2ee470f2008-07-16 12:55:28 +0000808 finally:
Antoine Pitrou97c81ef2009-11-04 00:57:15 +0000809 shutil.rmtree(src_dir)
810 shutil.rmtree(os.path.dirname(dst_dir))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000811
Antoine Pitrouac601602013-08-16 19:35:02 +0200812 def test_copytree_retains_permissions(self):
813 tmp_dir = tempfile.mkdtemp()
814 src_dir = os.path.join(tmp_dir, 'source')
815 os.mkdir(src_dir)
816 dst_dir = os.path.join(tmp_dir, 'destination')
817 self.addCleanup(shutil.rmtree, tmp_dir)
818
819 os.chmod(src_dir, 0o777)
820 write_file((src_dir, 'permissive.txt'), '123')
821 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
822 write_file((src_dir, 'restrictive.txt'), '456')
823 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
824 restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
825 os.chmod(restrictive_subdir, 0o600)
826
827 shutil.copytree(src_dir, dst_dir)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400828 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
829 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200830 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
Brett Cannon9c7eb552013-08-23 14:38:11 -0400831 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200832 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
833 restrictive_subdir_dst = os.path.join(dst_dir,
834 os.path.split(restrictive_subdir)[1])
Brett Cannon9c7eb552013-08-23 14:38:11 -0400835 self.assertEqual(os.stat(restrictive_subdir).st_mode,
Antoine Pitrouac601602013-08-16 19:35:02 +0200836 os.stat(restrictive_subdir_dst).st_mode)
837
Berker Peksag884afd92014-12-10 02:50:32 +0200838 @unittest.mock.patch('os.chmod')
839 def test_copytree_winerror(self, mock_patch):
840 # When copying to VFAT, copystat() raises OSError. On Windows, the
841 # exception object has a meaningful 'winerror' attribute, but not
842 # on other operating systems. Do not assume 'winerror' is set.
843 src_dir = tempfile.mkdtemp()
844 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
845 self.addCleanup(shutil.rmtree, src_dir)
846 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
847
848 mock_patch.side_effect = PermissionError('ka-boom')
849 with self.assertRaises(shutil.Error):
850 shutil.copytree(src_dir, dst_dir)
851
Zachary Ware9fe6d862013-12-08 00:20:35 -0600852 @unittest.skipIf(os.name == 'nt', 'temporarily disabled on Windows')
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000853 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000854 def test_dont_copy_file_onto_link_to_itself(self):
855 # bug 851123.
856 os.mkdir(TESTFN)
857 src = os.path.join(TESTFN, 'cheese')
858 dst = os.path.join(TESTFN, 'shop')
859 try:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000860 with open(src, 'w') as f:
861 f.write('cheddar')
xdegaye92c2ca72017-11-12 17:31:07 +0100862 try:
863 os.link(src, dst)
864 except PermissionError as e:
865 self.skipTest('os.link(): %s' % e)
Hynek Schlawack48653762012-10-07 12:49:58 +0200866 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000867 with open(src, 'r') as f:
868 self.assertEqual(f.read(), 'cheddar')
869 os.remove(dst)
870 finally:
871 shutil.rmtree(TESTFN, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000872
Brian Curtin3b4499c2010-12-28 14:31:47 +0000873 @support.skip_unless_symlink
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000874 def test_dont_copy_file_onto_symlink_to_itself(self):
875 # bug 851123.
876 os.mkdir(TESTFN)
877 src = os.path.join(TESTFN, 'cheese')
878 dst = os.path.join(TESTFN, 'shop')
879 try:
880 with open(src, 'w') as f:
881 f.write('cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000882 # Using `src` here would mean we end up with a symlink pointing
883 # to TESTFN/TESTFN/cheese, while it should point at
884 # TESTFN/cheese.
885 os.symlink('cheese', dst)
Hynek Schlawack48653762012-10-07 12:49:58 +0200886 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +0000887 with open(src, 'r') as f:
888 self.assertEqual(f.read(), 'cheddar')
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000889 os.remove(dst)
890 finally:
Hirokazu Yamamoto26681452010-12-05 02:04:16 +0000891 shutil.rmtree(TESTFN, ignore_errors=True)
Johannes Gijsbers68128712004-08-14 13:57:08 +0000892
Brian Curtin3b4499c2010-12-28 14:31:47 +0000893 @support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +0000894 def test_rmtree_on_symlink(self):
895 # bug 1669.
896 os.mkdir(TESTFN)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000897 try:
Brian Curtind40e6f72010-07-08 21:39:08 +0000898 src = os.path.join(TESTFN, 'cheese')
899 dst = os.path.join(TESTFN, 'shop')
900 os.mkdir(src)
901 os.symlink(src, dst)
902 self.assertRaises(OSError, shutil.rmtree, dst)
Hynek Schlawack67be92b2012-06-23 17:58:42 +0200903 shutil.rmtree(dst, ignore_errors=True)
Tarek Ziadé51a6f722010-04-23 13:03:09 +0000904 finally:
Brian Curtind40e6f72010-07-08 21:39:08 +0000905 shutil.rmtree(TESTFN, ignore_errors=True)
906
Serhiy Storchaka43767632013-11-03 21:31:38 +0200907 # Issue #3002: copyfile and copytree block indefinitely on named pipes
908 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
909 def test_copyfile_named_pipe(self):
xdegaye92c2ca72017-11-12 17:31:07 +0100910 try:
911 os.mkfifo(TESTFN)
912 except PermissionError as e:
913 self.skipTest('os.mkfifo(): %s' % e)
Serhiy Storchaka43767632013-11-03 21:31:38 +0200914 try:
915 self.assertRaises(shutil.SpecialFileError,
916 shutil.copyfile, TESTFN, TESTFN2)
917 self.assertRaises(shutil.SpecialFileError,
918 shutil.copyfile, __file__, TESTFN)
919 finally:
920 os.remove(TESTFN)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000921
Serhiy Storchaka43767632013-11-03 21:31:38 +0200922 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
923 @support.skip_unless_symlink
924 def test_copytree_named_pipe(self):
925 os.mkdir(TESTFN)
926 try:
927 subdir = os.path.join(TESTFN, "subdir")
928 os.mkdir(subdir)
929 pipe = os.path.join(subdir, "mypipe")
xdegaye92c2ca72017-11-12 17:31:07 +0100930 try:
931 os.mkfifo(pipe)
932 except PermissionError as e:
933 self.skipTest('os.mkfifo(): %s' % e)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000934 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +0200935 shutil.copytree(TESTFN, TESTFN2)
936 except shutil.Error as e:
937 errors = e.args[0]
938 self.assertEqual(len(errors), 1)
939 src, dst, error_msg = errors[0]
940 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
941 else:
942 self.fail("shutil.Error should have been raised")
943 finally:
944 shutil.rmtree(TESTFN, ignore_errors=True)
945 shutil.rmtree(TESTFN2, ignore_errors=True)
Antoine Pitrou7fff0962009-05-01 21:09:44 +0000946
Tarek Ziadé5340db32010-04-19 22:30:51 +0000947 def test_copytree_special_func(self):
948
949 src_dir = self.mkdtemp()
950 dst_dir = os.path.join(self.mkdtemp(), 'destination')
Éric Araujoa7e33a12011-08-12 19:51:35 +0200951 write_file((src_dir, 'test.txt'), '123')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000952 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200953 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadé5340db32010-04-19 22:30:51 +0000954
955 copied = []
956 def _copy(src, dst):
957 copied.append((src, dst))
958
959 shutil.copytree(src_dir, dst_dir, copy_function=_copy)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000960 self.assertEqual(len(copied), 2)
Tarek Ziadé5340db32010-04-19 22:30:51 +0000961
Brian Curtin3b4499c2010-12-28 14:31:47 +0000962 @support.skip_unless_symlink
Tarek Ziadéfb437512010-04-20 08:57:33 +0000963 def test_copytree_dangling_symlinks(self):
964
965 # a dangling symlink raises an error at the end
966 src_dir = self.mkdtemp()
967 dst_dir = os.path.join(self.mkdtemp(), 'destination')
968 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt'))
969 os.mkdir(os.path.join(src_dir, 'test_dir'))
Éric Araujoa7e33a12011-08-12 19:51:35 +0200970 write_file((src_dir, 'test_dir', 'test.txt'), '456')
Tarek Ziadéfb437512010-04-20 08:57:33 +0000971 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
972
973 # a dangling symlink is ignored with the proper flag
974 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
975 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
976 self.assertNotIn('test.txt', os.listdir(dst_dir))
977
978 # a dangling symlink is copied if symlinks=True
979 dst_dir = os.path.join(self.mkdtemp(), 'destination3')
980 shutil.copytree(src_dir, dst_dir, symlinks=True)
981 self.assertIn('test.txt', os.listdir(dst_dir))
982
Berker Peksag5a294d82015-07-25 14:53:48 +0300983 @support.skip_unless_symlink
984 def test_copytree_symlink_dir(self):
985 src_dir = self.mkdtemp()
986 dst_dir = os.path.join(self.mkdtemp(), 'destination')
987 os.mkdir(os.path.join(src_dir, 'real_dir'))
988 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'):
989 pass
990 os.symlink(os.path.join(src_dir, 'real_dir'),
991 os.path.join(src_dir, 'link_to_dir'),
992 target_is_directory=True)
993
994 shutil.copytree(src_dir, dst_dir, symlinks=False)
995 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
996 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
997
998 dst_dir = os.path.join(self.mkdtemp(), 'destination2')
999 shutil.copytree(src_dir, dst_dir, symlinks=True)
1000 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
1001 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
1002
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001003 def _copy_file(self, method):
1004 fname = 'test.txt'
1005 tmpdir = self.mkdtemp()
Éric Araujoa7e33a12011-08-12 19:51:35 +02001006 write_file((tmpdir, fname), 'xxx')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001007 file1 = os.path.join(tmpdir, fname)
1008 tmpdir2 = self.mkdtemp()
1009 method(file1, tmpdir2)
1010 file2 = os.path.join(tmpdir2, fname)
1011 return (file1, file2)
1012
1013 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
1014 def test_copy(self):
1015 # Ensure that the copied file exists and has the same mode bits.
1016 file1, file2 = self._copy_file(shutil.copy)
1017 self.assertTrue(os.path.exists(file2))
1018 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1019
1020 @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
Senthil Kumaran0c2dba52011-07-03 18:21:38 -07001021 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001022 def test_copy2(self):
1023 # Ensure that the copied file exists and has the same mode and
1024 # modification time bits.
1025 file1, file2 = self._copy_file(shutil.copy2)
1026 self.assertTrue(os.path.exists(file2))
1027 file1_stat = os.stat(file1)
1028 file2_stat = os.stat(file2)
1029 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1030 for attr in 'st_atime', 'st_mtime':
1031 # The modification times may be truncated in the new file.
1032 self.assertLessEqual(getattr(file1_stat, attr),
1033 getattr(file2_stat, attr) + 1)
1034 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1035 self.assertEqual(getattr(file1_stat, 'st_flags'),
1036 getattr(file2_stat, 'st_flags'))
1037
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001038 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001039 def test_make_tarball(self):
1040 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001041 root_dir, base_dir = self._create_files('')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001042
1043 tmpdir2 = self.mkdtemp()
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001044 # force shutil to create the directory
1045 os.rmdir(tmpdir2)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001046 # working with relative paths
1047 work_dir = os.path.dirname(tmpdir2)
1048 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001049
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001050 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001051 base_name = os.path.abspath(rel_base_name)
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001052 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001053
1054 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001055 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001056 self.assertTrue(os.path.isfile(tarball))
1057 self.assertTrue(tarfile.is_tarfile(tarball))
1058 with tarfile.open(tarball, 'r:gz') as tf:
1059 self.assertCountEqual(tf.getnames(),
1060 ['.', './sub', './sub2',
1061 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001062
1063 # trying an uncompressed one
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001064 with support.change_cwd(work_dir):
1065 tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001066 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001067 self.assertTrue(os.path.isfile(tarball))
1068 self.assertTrue(tarfile.is_tarfile(tarball))
1069 with tarfile.open(tarball, 'r') as tf:
1070 self.assertCountEqual(tf.getnames(),
1071 ['.', './sub', './sub2',
1072 './file1', './file2', './sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001073
1074 def _tarinfo(self, path):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001075 with tarfile.open(path) as tar:
Tarek Ziadé396fad72010-02-23 05:30:31 +00001076 names = tar.getnames()
1077 names.sort()
1078 return tuple(names)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001079
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001080 def _create_files(self, base_dir='dist'):
Tarek Ziadé396fad72010-02-23 05:30:31 +00001081 # creating something to tar
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001082 root_dir = self.mkdtemp()
1083 dist = os.path.join(root_dir, base_dir)
1084 os.makedirs(dist, exist_ok=True)
Éric Araujoa7e33a12011-08-12 19:51:35 +02001085 write_file((dist, 'file1'), 'xxx')
1086 write_file((dist, 'file2'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001087 os.mkdir(os.path.join(dist, 'sub'))
Éric Araujoa7e33a12011-08-12 19:51:35 +02001088 write_file((dist, 'sub', 'file3'), 'xxx')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001089 os.mkdir(os.path.join(dist, 'sub2'))
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001090 if base_dir:
1091 write_file((root_dir, 'outer'), 'xxx')
1092 return root_dir, base_dir
Tarek Ziadé396fad72010-02-23 05:30:31 +00001093
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001094 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001095 @unittest.skipUnless(shutil.which('tar'),
Tarek Ziadé396fad72010-02-23 05:30:31 +00001096 'Need the tar command to run')
1097 def test_tarfile_vs_tar(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001098 root_dir, base_dir = self._create_files()
1099 base_name = os.path.join(self.mkdtemp(), 'archive')
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001100 tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001101
1102 # check if the compressed tarball was created
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001103 self.assertEqual(tarball, base_name + '.tar.gz')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001104 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001105
1106 # now create another tarball using `tar`
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001107 tarball2 = os.path.join(root_dir, 'archive2.tar')
1108 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001109 subprocess.check_call(tar_cmd, cwd=root_dir,
1110 stdout=subprocess.DEVNULL)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001111
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001112 self.assertTrue(os.path.isfile(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001113 # let's compare both tarballs
Ezio Melottib3aedd42010-11-20 19:04:17 +00001114 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001115
1116 # trying an uncompressed one
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001117 tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1118 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001119 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001120
1121 # now for a dry_run
Serhiy Storchakaa091a822015-09-07 13:55:25 +03001122 tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1123 dry_run=True)
1124 self.assertEqual(tarball, base_name + '.tar')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001125 self.assertTrue(os.path.isfile(tarball))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001126
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001127 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001128 def test_make_zipfile(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001129 # creating something to zip
1130 root_dir, base_dir = self._create_files()
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001131
1132 tmpdir2 = self.mkdtemp()
1133 # force shutil to create the directory
1134 os.rmdir(tmpdir2)
1135 # working with relative paths
1136 work_dir = os.path.dirname(tmpdir2)
1137 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
Serhiy Storchakaeba8fee2015-09-07 19:58:23 +03001138
1139 with support.change_cwd(work_dir):
Serhiy Storchaka5558d4f2015-09-08 09:59:02 +03001140 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka666de772016-10-23 15:55:09 +03001141 res = make_archive(rel_base_name, 'zip', root_dir)
1142
1143 self.assertEqual(res, base_name + '.zip')
1144 self.assertTrue(os.path.isfile(res))
1145 self.assertTrue(zipfile.is_zipfile(res))
1146 with zipfile.ZipFile(res) as zf:
1147 self.assertCountEqual(zf.namelist(),
1148 ['dist/', 'dist/sub/', 'dist/sub2/',
1149 'dist/file1', 'dist/file2', 'dist/sub/file3',
1150 'outer'])
1151
1152 with support.change_cwd(work_dir):
1153 base_name = os.path.abspath(rel_base_name)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001154 res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001155
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001156 self.assertEqual(res, base_name + '.zip')
1157 self.assertTrue(os.path.isfile(res))
1158 self.assertTrue(zipfile.is_zipfile(res))
1159 with zipfile.ZipFile(res) as zf:
1160 self.assertCountEqual(zf.namelist(),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001161 ['dist/', 'dist/sub/', 'dist/sub2/',
1162 'dist/file1', 'dist/file2', 'dist/sub/file3'])
Tarek Ziadé396fad72010-02-23 05:30:31 +00001163
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001164 @support.requires_zlib
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001165 @unittest.skipUnless(shutil.which('zip'),
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001166 'Need the zip command to run')
1167 def test_zipfile_vs_zip(self):
1168 root_dir, base_dir = self._create_files()
1169 base_name = os.path.join(self.mkdtemp(), 'archive')
1170 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1171
1172 # check if ZIP file was created
1173 self.assertEqual(archive, base_name + '.zip')
1174 self.assertTrue(os.path.isfile(archive))
1175
1176 # now create another ZIP file using `zip`
1177 archive2 = os.path.join(root_dir, 'archive2.zip')
1178 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
Serhiy Storchakab42de2f2015-11-21 14:09:26 +02001179 subprocess.check_call(zip_cmd, cwd=root_dir,
1180 stdout=subprocess.DEVNULL)
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001181
1182 self.assertTrue(os.path.isfile(archive2))
1183 # let's compare both ZIP files
1184 with zipfile.ZipFile(archive) as zf:
1185 names = zf.namelist()
1186 with zipfile.ZipFile(archive2) as zf:
1187 names2 = zf.namelist()
1188 self.assertEqual(sorted(names), sorted(names2))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001189
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001190 @support.requires_zlib
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001191 @unittest.skipUnless(shutil.which('unzip'),
1192 'Need the unzip command to run')
1193 def test_unzip_zipfile(self):
1194 root_dir, base_dir = self._create_files()
1195 base_name = os.path.join(self.mkdtemp(), 'archive')
1196 archive = make_archive(base_name, 'zip', root_dir, base_dir)
1197
1198 # check if ZIP file was created
1199 self.assertEqual(archive, base_name + '.zip')
1200 self.assertTrue(os.path.isfile(archive))
1201
1202 # now check the ZIP file using `unzip -t`
1203 zip_cmd = ['unzip', '-t', archive]
1204 with support.change_cwd(root_dir):
1205 try:
1206 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1207 except subprocess.CalledProcessError as exc:
1208 details = exc.output.decode(errors="replace")
Benjamin Petersona710ebd2018-09-13 10:08:46 -07001209 if 'unrecognized option: t' in details:
Benjamin Petersone78734d2018-09-13 10:57:23 -07001210 self.skipTest("unzip doesn't support -t")
Serhiy Storchaka8bc792a2015-11-22 14:49:58 +02001211 msg = "{}\n\n**Unzip Output**\n{}"
1212 self.fail(msg.format(exc, details))
1213
Tarek Ziadé396fad72010-02-23 05:30:31 +00001214 def test_make_archive(self):
1215 tmpdir = self.mkdtemp()
1216 base_name = os.path.join(tmpdir, 'archive')
1217 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1218
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001219 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001220 def test_make_archive_owner_group(self):
1221 # testing make_archive with owner and group, with various combinations
1222 # this works even if there's not gid/uid support
1223 if UID_GID_SUPPORT:
1224 group = grp.getgrgid(0)[0]
1225 owner = pwd.getpwuid(0)[0]
1226 else:
1227 group = owner = 'root'
1228
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001229 root_dir, base_dir = self._create_files()
1230 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001231 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1232 group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001233 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001234
1235 res = make_archive(base_name, 'zip', root_dir, base_dir)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001236 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001237
1238 res = make_archive(base_name, 'tar', root_dir, base_dir,
1239 owner=owner, group=group)
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001240 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001241
1242 res = make_archive(base_name, 'tar', root_dir, base_dir,
1243 owner='kjhkjhkjg', group='oihohoh')
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001244 self.assertTrue(os.path.isfile(res))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001245
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001246
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001247 @support.requires_zlib
Tarek Ziadé396fad72010-02-23 05:30:31 +00001248 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1249 def test_tarfile_root_owner(self):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001250 root_dir, base_dir = self._create_files()
1251 base_name = os.path.join(self.mkdtemp(), 'archive')
Tarek Ziadé396fad72010-02-23 05:30:31 +00001252 group = grp.getgrgid(0)[0]
1253 owner = pwd.getpwuid(0)[0]
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001254 with support.change_cwd(root_dir):
1255 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1256 owner=owner, group=group)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001257
1258 # check if the compressed tarball was created
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001259 self.assertTrue(os.path.isfile(archive_name))
Tarek Ziadé396fad72010-02-23 05:30:31 +00001260
1261 # now checks the rights
1262 archive = tarfile.open(archive_name)
1263 try:
1264 for member in archive.getmembers():
Ezio Melottib3aedd42010-11-20 19:04:17 +00001265 self.assertEqual(member.uid, 0)
1266 self.assertEqual(member.gid, 0)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001267 finally:
1268 archive.close()
1269
1270 def test_make_archive_cwd(self):
1271 current_dir = os.getcwd()
1272 def _breaks(*args, **kw):
1273 raise RuntimeError()
1274
1275 register_archive_format('xxx', _breaks, [], 'xxx file')
1276 try:
1277 try:
1278 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
1279 except Exception:
1280 pass
Ezio Melottib3aedd42010-11-20 19:04:17 +00001281 self.assertEqual(os.getcwd(), current_dir)
Tarek Ziadé396fad72010-02-23 05:30:31 +00001282 finally:
1283 unregister_archive_format('xxx')
1284
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001285 def test_make_tarfile_in_curdir(self):
1286 # Issue #21280
1287 root_dir = self.mkdtemp()
1288 with support.change_cwd(root_dir):
1289 self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1290 self.assertTrue(os.path.isfile('test.tar'))
1291
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001292 @support.requires_zlib
Serhiy Storchaka9a4fc192014-11-28 00:48:46 +02001293 def test_make_zipfile_in_curdir(self):
1294 # Issue #21280
1295 root_dir = self.mkdtemp()
1296 with support.change_cwd(root_dir):
1297 self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1298 self.assertTrue(os.path.isfile('test.zip'))
1299
Tarek Ziadé396fad72010-02-23 05:30:31 +00001300 def test_register_archive_format(self):
1301
1302 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1303 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1304 1)
1305 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1306 [(1, 2), (1, 2, 3)])
1307
1308 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1309 formats = [name for name, params in get_archive_formats()]
1310 self.assertIn('xxx', formats)
1311
1312 unregister_archive_format('xxx')
1313 formats = [name for name, params in get_archive_formats()]
1314 self.assertNotIn('xxx', formats)
1315
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001316 def check_unpack_archive(self, format):
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001317 self.check_unpack_archive_with_converter(format, lambda path: path)
1318 self.check_unpack_archive_with_converter(format, pathlib.Path)
Serhiy Storchakab21d1552018-03-02 11:53:51 +02001319 self.check_unpack_archive_with_converter(format, FakePath)
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001320
1321 def check_unpack_archive_with_converter(self, format, converter):
Serhiy Storchaka527ef072015-09-06 18:33:19 +03001322 root_dir, base_dir = self._create_files()
Serhiy Storchaka2504cec2015-09-08 05:47:23 +03001323 expected = rlistdir(root_dir)
1324 expected.remove('outer')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001325
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001326 base_name = os.path.join(self.mkdtemp(), 'archive')
1327 filename = make_archive(base_name, format, root_dir, base_dir)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001328
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001329 # let's try to unpack it now
1330 tmpdir2 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001331 unpack_archive(converter(filename), converter(tmpdir2))
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001332 self.assertEqual(rlistdir(tmpdir2), expected)
1333
1334 # and again, this time with the format specified
1335 tmpdir3 = self.mkdtemp()
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001336 unpack_archive(converter(filename), converter(tmpdir3), format=format)
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001337 self.assertEqual(rlistdir(tmpdir3), expected)
1338
Jelle Zijlstraa12df7b2017-05-05 14:27:12 -07001339 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN))
1340 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx')
Nick Coghlanabf202d2011-03-16 13:52:20 -04001341
Serhiy Storchaka20cdffd2016-12-16 18:58:33 +02001342 def test_unpack_archive_tar(self):
1343 self.check_unpack_archive('tar')
1344
1345 @support.requires_zlib
1346 def test_unpack_archive_gztar(self):
1347 self.check_unpack_archive('gztar')
1348
1349 @support.requires_bz2
1350 def test_unpack_archive_bztar(self):
1351 self.check_unpack_archive('bztar')
1352
1353 @support.requires_lzma
1354 def test_unpack_archive_xztar(self):
1355 self.check_unpack_archive('xztar')
1356
1357 @support.requires_zlib
1358 def test_unpack_archive_zip(self):
1359 self.check_unpack_archive('zip')
1360
Martin Pantereb995702016-07-28 01:11:04 +00001361 def test_unpack_registry(self):
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001362
1363 formats = get_unpack_formats()
1364
1365 def _boo(filename, extract_dir, extra):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001366 self.assertEqual(extra, 1)
1367 self.assertEqual(filename, 'stuff.boo')
1368 self.assertEqual(extract_dir, 'xx')
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001369
1370 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1371 unpack_archive('stuff.boo', 'xx')
1372
1373 # trying to register a .boo unpacker again
1374 self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1375 ['.boo'], _boo)
1376
1377 # should work now
1378 unregister_unpack_format('Boo')
1379 register_unpack_format('Boo2', ['.boo'], _boo)
1380 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1381 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1382
1383 # let's leave a clean state
1384 unregister_unpack_format('Boo2')
Ezio Melottib3aedd42010-11-20 19:04:17 +00001385 self.assertEqual(get_unpack_formats(), formats)
Tarek Ziadé6ac91722010-04-28 17:51:36 +00001386
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001387 @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1388 "disk_usage not available on this platform")
1389 def test_disk_usage(self):
Gregory P. Smith529746c2017-07-06 17:11:27 -07001390 usage = shutil.disk_usage(os.path.dirname(__file__))
Victor Stinnerdc525f42018-12-11 12:05:21 +01001391 for attr in ('total', 'used', 'free'):
1392 self.assertIsInstance(getattr(usage, attr), int)
Éric Araujo2ee61882011-07-02 16:45:45 +02001393 self.assertGreater(usage.total, 0)
1394 self.assertGreater(usage.used, 0)
1395 self.assertGreaterEqual(usage.free, 0)
1396 self.assertGreaterEqual(usage.total, usage.used)
1397 self.assertGreater(usage.total, usage.free)
Giampaolo Rodola'210e7ca2011-07-01 13:55:36 +02001398
Victor Stinnerdc525f42018-12-11 12:05:21 +01001399 # bpo-32557: Check that disk_usage() also accepts a filename
1400 shutil.disk_usage(__file__)
1401
Sandro Tosid902a142011-08-22 23:28:27 +02001402 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1403 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1404 def test_chown(self):
1405
1406 # cleaned-up automatically by TestShutil.tearDown method
1407 dirname = self.mkdtemp()
1408 filename = tempfile.mktemp(dir=dirname)
1409 write_file(filename, 'testing chown function')
1410
1411 with self.assertRaises(ValueError):
1412 shutil.chown(filename)
1413
1414 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001415 shutil.chown(filename, user='non-existing username')
Sandro Tosid902a142011-08-22 23:28:27 +02001416
1417 with self.assertRaises(LookupError):
Raymond Hettinger15f44ab2016-08-30 10:47:49 -07001418 shutil.chown(filename, group='non-existing groupname')
Sandro Tosid902a142011-08-22 23:28:27 +02001419
1420 with self.assertRaises(TypeError):
1421 shutil.chown(filename, b'spam')
1422
1423 with self.assertRaises(TypeError):
1424 shutil.chown(filename, 3.14)
1425
1426 uid = os.getuid()
1427 gid = os.getgid()
1428
1429 def check_chown(path, uid=None, gid=None):
1430 s = os.stat(filename)
1431 if uid is not None:
1432 self.assertEqual(uid, s.st_uid)
1433 if gid is not None:
1434 self.assertEqual(gid, s.st_gid)
1435
1436 shutil.chown(filename, uid, gid)
1437 check_chown(filename, uid, gid)
1438 shutil.chown(filename, uid)
1439 check_chown(filename, uid)
1440 shutil.chown(filename, user=uid)
1441 check_chown(filename, uid)
1442 shutil.chown(filename, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001443 check_chown(filename, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001444
1445 shutil.chown(dirname, uid, gid)
1446 check_chown(dirname, uid, gid)
1447 shutil.chown(dirname, uid)
1448 check_chown(dirname, uid)
1449 shutil.chown(dirname, user=uid)
1450 check_chown(dirname, uid)
1451 shutil.chown(dirname, group=gid)
Sandro Tosi91f948a2011-08-22 23:55:39 +02001452 check_chown(dirname, gid=gid)
Sandro Tosid902a142011-08-22 23:28:27 +02001453
1454 user = pwd.getpwuid(uid)[0]
1455 group = grp.getgrgid(gid)[0]
1456 shutil.chown(filename, user, group)
1457 check_chown(filename, uid, gid)
1458 shutil.chown(dirname, user, group)
1459 check_chown(dirname, uid, gid)
1460
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001461 def test_copy_return_value(self):
1462 # copy and copy2 both return their destination path.
1463 for fn in (shutil.copy, shutil.copy2):
1464 src_dir = self.mkdtemp()
1465 dst_dir = self.mkdtemp()
1466 src = os.path.join(src_dir, 'foo')
1467 write_file(src, 'foo')
1468 rv = fn(src, dst_dir)
1469 self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1470 rv = fn(src, os.path.join(dst_dir, 'bar'))
1471 self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1472
1473 def test_copyfile_return_value(self):
1474 # copytree returns its destination path.
1475 src_dir = self.mkdtemp()
1476 dst_dir = self.mkdtemp()
1477 dst_file = os.path.join(dst_dir, 'bar')
1478 src_file = os.path.join(src_dir, 'foo')
1479 write_file(src_file, 'foo')
1480 rv = shutil.copyfile(src_file, dst_file)
1481 self.assertTrue(os.path.exists(rv))
1482 self.assertEqual(read_file(src_file), read_file(dst_file))
1483
Hynek Schlawack48653762012-10-07 12:49:58 +02001484 def test_copyfile_same_file(self):
1485 # copyfile() should raise SameFileError if the source and destination
1486 # are the same.
1487 src_dir = self.mkdtemp()
1488 src_file = os.path.join(src_dir, 'foo')
1489 write_file(src_file, 'foo')
1490 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
Hynek Schlawack27ddb572012-10-28 13:59:27 +01001491 # But Error should work too, to stay backward compatible.
1492 self.assertRaises(Error, shutil.copyfile, src_file, src_file)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001493 # Make sure file is not corrupted.
1494 self.assertEqual(read_file(src_file), 'foo')
Hynek Schlawack48653762012-10-07 12:49:58 +02001495
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001496 def test_copytree_return_value(self):
1497 # copytree returns its destination path.
1498 src_dir = self.mkdtemp()
1499 dst_dir = src_dir + "dest"
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001500 self.addCleanup(shutil.rmtree, dst_dir, True)
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001501 src = os.path.join(src_dir, 'foo')
1502 write_file(src, 'foo')
1503 rv = shutil.copytree(src_dir, dst_dir)
1504 self.assertEqual(['foo'], os.listdir(rv))
1505
Christian Heimes9bd667a2008-01-20 15:14:11 +00001506
Brian Curtinc57a3452012-06-22 16:00:30 -05001507class TestWhich(unittest.TestCase):
1508
1509 def setUp(self):
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001510 self.temp_dir = tempfile.mkdtemp(prefix="Tmp")
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001511 self.addCleanup(shutil.rmtree, self.temp_dir, True)
Brian Curtinc57a3452012-06-22 16:00:30 -05001512 # Give the temp_file an ".exe" suffix for all.
1513 # It's needed on Windows and not harmful on other platforms.
1514 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
Serhiy Storchaka014791f2013-01-21 15:00:27 +02001515 prefix="Tmp",
1516 suffix=".Exe")
Brian Curtinc57a3452012-06-22 16:00:30 -05001517 os.chmod(self.temp_file.name, stat.S_IXUSR)
1518 self.addCleanup(self.temp_file.close)
1519 self.dir, self.file = os.path.split(self.temp_file.name)
1520
1521 def test_basic(self):
1522 # Given an EXE in a directory, it should be returned.
1523 rv = shutil.which(self.file, path=self.dir)
1524 self.assertEqual(rv, self.temp_file.name)
1525
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001526 def test_absolute_cmd(self):
Brian Curtinc57a3452012-06-22 16:00:30 -05001527 # When given the fully qualified path to an executable that exists,
1528 # it should be returned.
1529 rv = shutil.which(self.temp_file.name, path=self.temp_dir)
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001530 self.assertEqual(rv, self.temp_file.name)
1531
1532 def test_relative_cmd(self):
1533 # When given the relative path with a directory part to an executable
1534 # that exists, it should be returned.
1535 base_dir, tail_dir = os.path.split(self.dir)
1536 relpath = os.path.join(tail_dir, self.file)
Nick Coghlan55175962013-07-28 22:11:50 +10001537 with support.change_cwd(path=base_dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001538 rv = shutil.which(relpath, path=self.temp_dir)
1539 self.assertEqual(rv, relpath)
1540 # But it shouldn't be searched in PATH directories (issue #16957).
Nick Coghlan55175962013-07-28 22:11:50 +10001541 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001542 rv = shutil.which(relpath, path=base_dir)
1543 self.assertIsNone(rv)
1544
1545 def test_cwd(self):
1546 # Issue #16957
1547 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001548 with support.change_cwd(path=self.dir):
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001549 rv = shutil.which(self.file, path=base_dir)
1550 if sys.platform == "win32":
1551 # Windows: current directory implicitly on PATH
1552 self.assertEqual(rv, os.path.join(os.curdir, self.file))
1553 else:
1554 # Other platforms: shouldn't match in the current directory.
1555 self.assertIsNone(rv)
Brian Curtinc57a3452012-06-22 16:00:30 -05001556
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001557 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
1558 'non-root user required')
Brian Curtinc57a3452012-06-22 16:00:30 -05001559 def test_non_matching_mode(self):
1560 # Set the file read-only and ask for writeable files.
1561 os.chmod(self.temp_file.name, stat.S_IREAD)
Serhiy Storchaka12516e22013-05-28 15:50:15 +03001562 if os.access(self.temp_file.name, os.W_OK):
1563 self.skipTest("can't set the file read-only")
Brian Curtinc57a3452012-06-22 16:00:30 -05001564 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1565 self.assertIsNone(rv)
1566
Serhiy Storchaka8bea2002013-01-23 10:44:21 +02001567 def test_relative_path(self):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001568 base_dir, tail_dir = os.path.split(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001569 with support.change_cwd(path=base_dir):
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001570 rv = shutil.which(self.file, path=tail_dir)
1571 self.assertEqual(rv, os.path.join(tail_dir, self.file))
Antoine Pitrou07c24d12012-06-22 23:33:05 +02001572
Brian Curtinc57a3452012-06-22 16:00:30 -05001573 def test_nonexistent_file(self):
1574 # Return None when no matching executable file is found on the path.
1575 rv = shutil.which("foo.exe", path=self.dir)
1576 self.assertIsNone(rv)
1577
1578 @unittest.skipUnless(sys.platform == "win32",
1579 "pathext check is Windows-only")
1580 def test_pathext_checking(self):
1581 # Ask for the file without the ".exe" extension, then ensure that
1582 # it gets found properly with the extension.
Serhiy Storchakad70127a2013-01-24 20:03:49 +02001583 rv = shutil.which(self.file[:-4], path=self.dir)
Serhiy Storchaka80c88f42013-01-22 10:31:36 +02001584 self.assertEqual(rv, self.temp_file.name[:-4] + ".EXE")
Brian Curtinc57a3452012-06-22 16:00:30 -05001585
Barry Warsaw618738b2013-04-16 11:05:03 -04001586 def test_environ_path(self):
1587 with support.EnvironmentVarGuard() as env:
Victor Stinner1d006a22013-12-16 23:39:40 +01001588 env['PATH'] = self.dir
Barry Warsaw618738b2013-04-16 11:05:03 -04001589 rv = shutil.which(self.file)
1590 self.assertEqual(rv, self.temp_file.name)
1591
1592 def test_empty_path(self):
1593 base_dir = os.path.dirname(self.dir)
Nick Coghlan55175962013-07-28 22:11:50 +10001594 with support.change_cwd(path=self.dir), \
Barry Warsaw618738b2013-04-16 11:05:03 -04001595 support.EnvironmentVarGuard() as env:
Victor Stinner1d006a22013-12-16 23:39:40 +01001596 env['PATH'] = self.dir
Barry Warsaw618738b2013-04-16 11:05:03 -04001597 rv = shutil.which(self.file, path='')
1598 self.assertIsNone(rv)
1599
1600 def test_empty_path_no_PATH(self):
1601 with support.EnvironmentVarGuard() as env:
1602 env.pop('PATH', None)
1603 rv = shutil.which(self.file)
1604 self.assertIsNone(rv)
1605
Brian Curtinc57a3452012-06-22 16:00:30 -05001606
Christian Heimesada8c3b2008-03-18 18:26:33 +00001607class TestMove(unittest.TestCase):
1608
1609 def setUp(self):
1610 filename = "foo"
1611 self.src_dir = tempfile.mkdtemp()
1612 self.dst_dir = tempfile.mkdtemp()
1613 self.src_file = os.path.join(self.src_dir, filename)
1614 self.dst_file = os.path.join(self.dst_dir, filename)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001615 with open(self.src_file, "wb") as f:
1616 f.write(b"spam")
1617
1618 def tearDown(self):
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001619 for d in (self.src_dir, self.dst_dir):
Christian Heimesada8c3b2008-03-18 18:26:33 +00001620 try:
1621 if d:
1622 shutil.rmtree(d)
1623 except:
1624 pass
1625
1626 def _check_move_file(self, src, dst, real_dst):
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001627 with open(src, "rb") as f:
1628 contents = f.read()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001629 shutil.move(src, dst)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +00001630 with open(real_dst, "rb") as f:
1631 self.assertEqual(contents, f.read())
Christian Heimesada8c3b2008-03-18 18:26:33 +00001632 self.assertFalse(os.path.exists(src))
1633
1634 def _check_move_dir(self, src, dst, real_dst):
1635 contents = sorted(os.listdir(src))
1636 shutil.move(src, dst)
1637 self.assertEqual(contents, sorted(os.listdir(real_dst)))
1638 self.assertFalse(os.path.exists(src))
1639
1640 def test_move_file(self):
1641 # Move a file to another location on the same filesystem.
1642 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
1643
1644 def test_move_file_to_dir(self):
1645 # Move a file inside an existing dir on the same filesystem.
1646 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
1647
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001648 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001649 def test_move_file_other_fs(self):
1650 # Move a file to an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001651 self.test_move_file()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001652
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001653 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001654 def test_move_file_to_dir_other_fs(self):
1655 # Move a file to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001656 self.test_move_file_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001657
1658 def test_move_dir(self):
1659 # Move a dir to another location on the same filesystem.
1660 dst_dir = tempfile.mktemp()
1661 try:
1662 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
1663 finally:
1664 try:
1665 shutil.rmtree(dst_dir)
1666 except:
1667 pass
1668
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001669 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001670 def test_move_dir_other_fs(self):
1671 # Move a dir to another location on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001672 self.test_move_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001673
1674 def test_move_dir_to_dir(self):
1675 # Move a dir inside an existing dir on the same filesystem.
1676 self._check_move_dir(self.src_dir, self.dst_dir,
1677 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1678
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001679 @mock_rename
Christian Heimesada8c3b2008-03-18 18:26:33 +00001680 def test_move_dir_to_dir_other_fs(self):
1681 # Move a dir inside an existing dir on another filesystem.
Nick Coghlan8ed3cf32011-03-16 14:05:35 -04001682 self.test_move_dir_to_dir()
Christian Heimesada8c3b2008-03-18 18:26:33 +00001683
Serhiy Storchaka3a308b92014-02-11 10:30:59 +02001684 def test_move_dir_sep_to_dir(self):
1685 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
1686 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1687
1688 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
1689 def test_move_dir_altsep_to_dir(self):
1690 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
1691 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
1692
Christian Heimesada8c3b2008-03-18 18:26:33 +00001693 def test_existing_file_inside_dest_dir(self):
1694 # A file with the same name inside the destination dir already exists.
1695 with open(self.dst_file, "wb"):
1696 pass
1697 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
1698
1699 def test_dont_move_dir_in_itself(self):
1700 # Moving a dir inside itself raises an Error.
1701 dst = os.path.join(self.src_dir, "bar")
1702 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
1703
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001704 def test_destinsrc_false_negative(self):
1705 os.mkdir(TESTFN)
1706 try:
1707 for src, dst in [('srcdir', 'srcdir/dest')]:
1708 src = os.path.join(TESTFN, src)
1709 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001710 self.assertTrue(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001711 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001712 'dst (%s) is not in src (%s)' % (dst, src))
1713 finally:
1714 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimesada8c3b2008-03-18 18:26:33 +00001715
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001716 def test_destinsrc_false_positive(self):
1717 os.mkdir(TESTFN)
1718 try:
1719 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
1720 src = os.path.join(TESTFN, src)
1721 dst = os.path.join(TESTFN, dst)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001722 self.assertFalse(shutil._destinsrc(src, dst),
Benjamin Peterson247a9b82009-02-20 04:09:19 +00001723 msg='_destinsrc() wrongly concluded that '
Antoine Pitrou0dcc3cd2009-01-29 20:26:59 +00001724 'dst (%s) is in src (%s)' % (dst, src))
1725 finally:
1726 shutil.rmtree(TESTFN, ignore_errors=True)
Christian Heimes9bd667a2008-01-20 15:14:11 +00001727
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001728 @support.skip_unless_symlink
1729 @mock_rename
1730 def test_move_file_symlink(self):
1731 dst = os.path.join(self.src_dir, 'bar')
1732 os.symlink(self.src_file, dst)
1733 shutil.move(dst, self.dst_file)
1734 self.assertTrue(os.path.islink(self.dst_file))
1735 self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
1736
1737 @support.skip_unless_symlink
1738 @mock_rename
1739 def test_move_file_symlink_to_dir(self):
1740 filename = "bar"
1741 dst = os.path.join(self.src_dir, filename)
1742 os.symlink(self.src_file, dst)
1743 shutil.move(dst, self.dst_dir)
1744 final_link = os.path.join(self.dst_dir, filename)
1745 self.assertTrue(os.path.islink(final_link))
1746 self.assertTrue(os.path.samefile(self.src_file, final_link))
1747
1748 @support.skip_unless_symlink
1749 @mock_rename
1750 def test_move_dangling_symlink(self):
1751 src = os.path.join(self.src_dir, 'baz')
1752 dst = os.path.join(self.src_dir, 'bar')
1753 os.symlink(src, dst)
1754 dst_link = os.path.join(self.dst_dir, 'quux')
1755 shutil.move(dst, dst_link)
1756 self.assertTrue(os.path.islink(dst_link))
Antoine Pitrou3f48ac92014-01-01 02:50:45 +01001757 # On Windows, os.path.realpath does not follow symlinks (issue #9949)
1758 if os.name == 'nt':
1759 self.assertEqual(os.path.realpath(src), os.readlink(dst_link))
1760 else:
1761 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
Antoine Pitrou0a08d7a2012-01-06 20:16:19 +01001762
1763 @support.skip_unless_symlink
1764 @mock_rename
1765 def test_move_dir_symlink(self):
1766 src = os.path.join(self.src_dir, 'baz')
1767 dst = os.path.join(self.src_dir, 'bar')
1768 os.mkdir(src)
1769 os.symlink(src, dst)
1770 dst_link = os.path.join(self.dst_dir, 'quux')
1771 shutil.move(dst, dst_link)
1772 self.assertTrue(os.path.islink(dst_link))
1773 self.assertTrue(os.path.samefile(src, dst_link))
1774
Brian Curtin0d0a1de2012-06-18 18:41:07 -05001775 def test_move_return_value(self):
1776 rv = shutil.move(self.src_file, self.dst_dir)
1777 self.assertEqual(rv,
1778 os.path.join(self.dst_dir, os.path.basename(self.src_file)))
1779
1780 def test_move_as_rename_return_value(self):
1781 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
1782 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
1783
R David Murray6ffface2014-06-11 14:40:13 -04001784 @mock_rename
1785 def test_move_file_special_function(self):
1786 moved = []
1787 def _copy(src, dst):
1788 moved.append((src, dst))
1789 shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
1790 self.assertEqual(len(moved), 1)
1791
1792 @mock_rename
1793 def test_move_dir_special_function(self):
1794 moved = []
1795 def _copy(src, dst):
1796 moved.append((src, dst))
1797 support.create_empty_file(os.path.join(self.src_dir, 'child'))
1798 support.create_empty_file(os.path.join(self.src_dir, 'child1'))
1799 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
1800 self.assertEqual(len(moved), 3)
1801
Tarek Ziadé5340db32010-04-19 22:30:51 +00001802
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001803class TestCopyFile(unittest.TestCase):
1804
1805 _delete = False
1806
1807 class Faux(object):
1808 _entered = False
1809 _exited_with = None
1810 _raised = False
1811 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
1812 self._raise_in_exit = raise_in_exit
1813 self._suppress_at_exit = suppress_at_exit
1814 def read(self, *args):
1815 return ''
1816 def __enter__(self):
1817 self._entered = True
1818 def __exit__(self, exc_type, exc_val, exc_tb):
1819 self._exited_with = exc_type, exc_val, exc_tb
1820 if self._raise_in_exit:
1821 self._raised = True
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001822 raise OSError("Cannot close")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001823 return self._suppress_at_exit
1824
1825 def tearDown(self):
1826 if self._delete:
1827 del shutil.open
1828
1829 def _set_shutil_open(self, func):
1830 shutil.open = func
1831 self._delete = True
1832
1833 def test_w_source_open_fails(self):
1834 def _open(filename, mode='r'):
1835 if filename == 'srcfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001836 raise OSError('Cannot open "srcfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001837 assert 0 # shouldn't reach here.
1838
1839 self._set_shutil_open(_open)
1840
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001841 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001842
Victor Stinner937ee9e2018-06-26 02:11:06 +02001843 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001844 def test_w_dest_open_fails(self):
1845
1846 srcfile = self.Faux()
1847
1848 def _open(filename, mode='r'):
1849 if filename == 'srcfile':
1850 return srcfile
1851 if filename == 'destfile':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001852 raise OSError('Cannot open "destfile"')
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001853 assert 0 # shouldn't reach here.
1854
1855 self._set_shutil_open(_open)
1856
1857 shutil.copyfile('srcfile', 'destfile')
1858 self.assertTrue(srcfile._entered)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001859 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001860 self.assertEqual(srcfile._exited_with[1].args,
1861 ('Cannot open "destfile"',))
1862
Victor Stinner937ee9e2018-06-26 02:11:06 +02001863 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001864 def test_w_dest_close_fails(self):
1865
1866 srcfile = self.Faux()
1867 destfile = self.Faux(True)
1868
1869 def _open(filename, mode='r'):
1870 if filename == 'srcfile':
1871 return srcfile
1872 if filename == 'destfile':
1873 return destfile
1874 assert 0 # shouldn't reach here.
1875
1876 self._set_shutil_open(_open)
1877
1878 shutil.copyfile('srcfile', 'destfile')
1879 self.assertTrue(srcfile._entered)
1880 self.assertTrue(destfile._entered)
1881 self.assertTrue(destfile._raised)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001882 self.assertTrue(srcfile._exited_with[0] is OSError)
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001883 self.assertEqual(srcfile._exited_with[1].args,
1884 ('Cannot close',))
1885
Victor Stinner937ee9e2018-06-26 02:11:06 +02001886 @unittest.skipIf(MACOS, "skipped on macOS")
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001887 def test_w_source_close_fails(self):
1888
1889 srcfile = self.Faux(True)
1890 destfile = self.Faux()
1891
1892 def _open(filename, mode='r'):
1893 if filename == 'srcfile':
1894 return srcfile
1895 if filename == 'destfile':
1896 return destfile
1897 assert 0 # shouldn't reach here.
1898
1899 self._set_shutil_open(_open)
1900
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001901 self.assertRaises(OSError,
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00001902 shutil.copyfile, 'srcfile', 'destfile')
1903 self.assertTrue(srcfile._entered)
1904 self.assertTrue(destfile._entered)
1905 self.assertFalse(destfile._raised)
1906 self.assertTrue(srcfile._exited_with[0] is None)
1907 self.assertTrue(srcfile._raised)
1908
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001909 def test_move_dir_caseinsensitive(self):
1910 # Renames a folder to the same name
1911 # but a different case.
1912
1913 self.src_dir = tempfile.mkdtemp()
Larry Hastings5b2f9c02012-06-25 23:50:01 -07001914 self.addCleanup(shutil.rmtree, self.src_dir, True)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001915 dst_dir = os.path.join(
1916 os.path.dirname(self.src_dir),
1917 os.path.basename(self.src_dir).upper())
1918 self.assertNotEqual(self.src_dir, dst_dir)
1919
1920 try:
1921 shutil.move(self.src_dir, dst_dir)
1922 self.assertTrue(os.path.isdir(dst_dir))
1923 finally:
Éric Araujoa7e33a12011-08-12 19:51:35 +02001924 os.rmdir(dst_dir)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02001925
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02001926
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07001927class TestCopyFileObj(unittest.TestCase):
1928 FILESIZE = 2 * 1024 * 1024
1929
1930 @classmethod
1931 def setUpClass(cls):
1932 write_test_file(TESTFN, cls.FILESIZE)
1933
1934 @classmethod
1935 def tearDownClass(cls):
1936 support.unlink(TESTFN)
1937 support.unlink(TESTFN2)
1938
1939 def tearDown(self):
1940 support.unlink(TESTFN2)
1941
1942 @contextlib.contextmanager
1943 def get_files(self):
1944 with open(TESTFN, "rb") as src:
1945 with open(TESTFN2, "wb") as dst:
1946 yield (src, dst)
1947
1948 def assert_files_eq(self, src, dst):
1949 with open(src, 'rb') as fsrc:
1950 with open(dst, 'rb') as fdst:
1951 self.assertEqual(fsrc.read(), fdst.read())
1952
1953 def test_content(self):
1954 with self.get_files() as (src, dst):
1955 shutil.copyfileobj(src, dst)
1956 self.assert_files_eq(TESTFN, TESTFN2)
1957
1958 def test_file_not_closed(self):
1959 with self.get_files() as (src, dst):
1960 shutil.copyfileobj(src, dst)
1961 assert not src.closed
1962 assert not dst.closed
1963
1964 def test_file_offset(self):
1965 with self.get_files() as (src, dst):
1966 shutil.copyfileobj(src, dst)
1967 self.assertEqual(src.tell(), self.FILESIZE)
1968 self.assertEqual(dst.tell(), self.FILESIZE)
1969
1970 @unittest.skipIf(os.name != 'nt', "Windows only")
1971 def test_win_impl(self):
1972 # Make sure alternate Windows implementation is called.
1973 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1974 shutil.copyfile(TESTFN, TESTFN2)
1975 assert m.called
1976
1977 # File size is 2 MiB but max buf size should be 1 MiB.
1978 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
1979
1980 # If file size < 1 MiB memoryview() length must be equal to
1981 # the actual file size.
1982 with tempfile.NamedTemporaryFile(delete=False) as f:
1983 f.write(b'foo')
1984 fname = f.name
1985 self.addCleanup(support.unlink, fname)
1986 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1987 shutil.copyfile(fname, TESTFN2)
1988 self.assertEqual(m.call_args[0][2], 3)
1989
1990 # Empty files should not rely on readinto() variant.
1991 with tempfile.NamedTemporaryFile(delete=False) as f:
1992 pass
1993 fname = f.name
1994 self.addCleanup(support.unlink, fname)
1995 with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
1996 shutil.copyfile(fname, TESTFN2)
1997 assert not m.called
1998 self.assert_files_eq(fname, TESTFN2)
1999
2000
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002001class _ZeroCopyFileTest(object):
2002 """Tests common to all zero-copy APIs."""
2003 FILESIZE = (10 * 1024 * 1024) # 10 MiB
2004 FILEDATA = b""
2005 PATCHPOINT = ""
2006
2007 @classmethod
2008 def setUpClass(cls):
2009 write_test_file(TESTFN, cls.FILESIZE)
2010 with open(TESTFN, 'rb') as f:
2011 cls.FILEDATA = f.read()
2012 assert len(cls.FILEDATA) == cls.FILESIZE
2013
2014 @classmethod
2015 def tearDownClass(cls):
2016 support.unlink(TESTFN)
2017
2018 def tearDown(self):
2019 support.unlink(TESTFN2)
2020
2021 @contextlib.contextmanager
2022 def get_files(self):
2023 with open(TESTFN, "rb") as src:
2024 with open(TESTFN2, "wb") as dst:
2025 yield (src, dst)
2026
2027 def zerocopy_fun(self, *args, **kwargs):
2028 raise NotImplementedError("must be implemented in subclass")
2029
2030 def reset(self):
2031 self.tearDown()
2032 self.tearDownClass()
2033 self.setUpClass()
2034 self.setUp()
2035
2036 # ---
2037
2038 def test_regular_copy(self):
2039 with self.get_files() as (src, dst):
2040 self.zerocopy_fun(src, dst)
2041 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2042 # Make sure the fallback function is not called.
2043 with self.get_files() as (src, dst):
2044 with unittest.mock.patch('shutil.copyfileobj') as m:
2045 shutil.copyfile(TESTFN, TESTFN2)
2046 assert not m.called
2047
2048 def test_same_file(self):
2049 self.addCleanup(self.reset)
2050 with self.get_files() as (src, dst):
2051 with self.assertRaises(Exception):
2052 self.zerocopy_fun(src, src)
2053 # Make sure src file is not corrupted.
2054 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2055
2056 def test_non_existent_src(self):
2057 name = tempfile.mktemp()
2058 with self.assertRaises(FileNotFoundError) as cm:
2059 shutil.copyfile(name, "new")
2060 self.assertEqual(cm.exception.filename, name)
2061
2062 def test_empty_file(self):
2063 srcname = TESTFN + 'src'
2064 dstname = TESTFN + 'dst'
2065 self.addCleanup(lambda: support.unlink(srcname))
2066 self.addCleanup(lambda: support.unlink(dstname))
2067 with open(srcname, "wb"):
2068 pass
2069
2070 with open(srcname, "rb") as src:
2071 with open(dstname, "wb") as dst:
2072 self.zerocopy_fun(src, dst)
2073
2074 self.assertEqual(read_file(dstname, binary=True), b"")
2075
2076 def test_unhandled_exception(self):
2077 with unittest.mock.patch(self.PATCHPOINT,
2078 side_effect=ZeroDivisionError):
2079 self.assertRaises(ZeroDivisionError,
2080 shutil.copyfile, TESTFN, TESTFN2)
2081
2082 def test_exception_on_first_call(self):
2083 # Emulate a case where the first call to the zero-copy
2084 # function raises an exception in which case the function is
2085 # supposed to give up immediately.
2086 with unittest.mock.patch(self.PATCHPOINT,
2087 side_effect=OSError(errno.EINVAL, "yo")):
2088 with self.get_files() as (src, dst):
2089 with self.assertRaises(_GiveupOnFastCopy):
2090 self.zerocopy_fun(src, dst)
2091
2092 def test_filesystem_full(self):
2093 # Emulate a case where filesystem is full and sendfile() fails
2094 # on first call.
2095 with unittest.mock.patch(self.PATCHPOINT,
2096 side_effect=OSError(errno.ENOSPC, "yo")):
2097 with self.get_files() as (src, dst):
2098 self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2099
2100
2101@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2102class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2103 PATCHPOINT = "os.sendfile"
2104
2105 def zerocopy_fun(self, fsrc, fdst):
2106 return shutil._fastcopy_sendfile(fsrc, fdst)
2107
2108 def test_non_regular_file_src(self):
2109 with io.BytesIO(self.FILEDATA) as src:
2110 with open(TESTFN2, "wb") as dst:
2111 with self.assertRaises(_GiveupOnFastCopy):
2112 self.zerocopy_fun(src, dst)
2113 shutil.copyfileobj(src, dst)
2114
2115 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2116
2117 def test_non_regular_file_dst(self):
2118 with open(TESTFN, "rb") as src:
2119 with io.BytesIO() as dst:
2120 with self.assertRaises(_GiveupOnFastCopy):
2121 self.zerocopy_fun(src, dst)
2122 shutil.copyfileobj(src, dst)
2123 dst.seek(0)
2124 self.assertEqual(dst.read(), self.FILEDATA)
2125
2126 def test_exception_on_second_call(self):
2127 def sendfile(*args, **kwargs):
2128 if not flag:
2129 flag.append(None)
2130 return orig_sendfile(*args, **kwargs)
2131 else:
2132 raise OSError(errno.EBADF, "yo")
2133
2134 flag = []
2135 orig_sendfile = os.sendfile
2136 with unittest.mock.patch('os.sendfile', create=True,
2137 side_effect=sendfile):
2138 with self.get_files() as (src, dst):
2139 with self.assertRaises(OSError) as cm:
2140 shutil._fastcopy_sendfile(src, dst)
2141 assert flag
2142 self.assertEqual(cm.exception.errno, errno.EBADF)
2143
2144 def test_cant_get_size(self):
2145 # Emulate a case where src file size cannot be determined.
2146 # Internally bufsize will be set to a small value and
2147 # sendfile() will be called repeatedly.
2148 with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2149 with self.get_files() as (src, dst):
2150 shutil._fastcopy_sendfile(src, dst)
2151 assert m.called
2152 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2153
2154 def test_small_chunks(self):
2155 # Force internal file size detection to be smaller than the
2156 # actual file size. We want to force sendfile() to be called
2157 # multiple times, also in order to emulate a src fd which gets
2158 # bigger while it is being copied.
2159 mock = unittest.mock.Mock()
2160 mock.st_size = 65536 + 1
2161 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2162 with self.get_files() as (src, dst):
2163 shutil._fastcopy_sendfile(src, dst)
2164 assert m.called
2165 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2166
2167 def test_big_chunk(self):
2168 # Force internal file size detection to be +100MB bigger than
2169 # the actual file size. Make sure sendfile() does not rely on
2170 # file size value except for (maybe) a better throughput /
2171 # performance.
2172 mock = unittest.mock.Mock()
2173 mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2174 with unittest.mock.patch('os.fstat', return_value=mock) as m:
2175 with self.get_files() as (src, dst):
2176 shutil._fastcopy_sendfile(src, dst)
2177 assert m.called
2178 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2179
2180 def test_blocksize_arg(self):
2181 with unittest.mock.patch('os.sendfile',
2182 side_effect=ZeroDivisionError) as m:
2183 self.assertRaises(ZeroDivisionError,
2184 shutil.copyfile, TESTFN, TESTFN2)
2185 blocksize = m.call_args[0][3]
2186 # Make sure file size and the block size arg passed to
2187 # sendfile() are the same.
2188 self.assertEqual(blocksize, os.path.getsize(TESTFN))
2189 # ...unless we're dealing with a small file.
2190 support.unlink(TESTFN2)
2191 write_file(TESTFN2, b"hello", binary=True)
2192 self.addCleanup(support.unlink, TESTFN2 + '3')
2193 self.assertRaises(ZeroDivisionError,
2194 shutil.copyfile, TESTFN2, TESTFN2 + '3')
2195 blocksize = m.call_args[0][3]
2196 self.assertEqual(blocksize, 2 ** 23)
2197
2198 def test_file2file_not_supported(self):
2199 # Emulate a case where sendfile() only support file->socket
2200 # fds. In such a case copyfile() is supposed to skip the
2201 # fast-copy attempt from then on.
2202 assert shutil._HAS_SENDFILE
2203 try:
2204 with unittest.mock.patch(
2205 self.PATCHPOINT,
2206 side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2207 with self.get_files() as (src, dst):
2208 with self.assertRaises(_GiveupOnFastCopy):
2209 shutil._fastcopy_sendfile(src, dst)
2210 assert m.called
2211 assert not shutil._HAS_SENDFILE
2212
2213 with unittest.mock.patch(self.PATCHPOINT) as m:
2214 shutil.copyfile(TESTFN, TESTFN2)
2215 assert not m.called
2216 finally:
2217 shutil._HAS_SENDFILE = True
2218
2219
Victor Stinner937ee9e2018-06-26 02:11:06 +02002220@unittest.skipIf(not MACOS, 'macOS only')
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002221class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002222 PATCHPOINT = "posix._fcopyfile"
2223
2224 def zerocopy_fun(self, src, dst):
Giampaolo Rodolac7f02a92018-06-19 08:27:29 -07002225 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
Giampaolo Rodola4a172cc2018-06-12 23:04:50 +02002226
2227
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002228class TermsizeTests(unittest.TestCase):
2229 def test_does_not_crash(self):
2230 """Check if get_terminal_size() returns a meaningful value.
2231
2232 There's no easy portable way to actually check the size of the
2233 terminal, so let's check if it returns something sensible instead.
2234 """
2235 size = shutil.get_terminal_size()
Antoine Pitroucfade362012-02-08 23:48:59 +01002236 self.assertGreaterEqual(size.columns, 0)
2237 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002238
2239 def test_os_environ_first(self):
2240 "Check if environment variables have precedence"
2241
2242 with support.EnvironmentVarGuard() as env:
2243 env['COLUMNS'] = '777'
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002244 del env['LINES']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002245 size = shutil.get_terminal_size()
2246 self.assertEqual(size.columns, 777)
2247
2248 with support.EnvironmentVarGuard() as env:
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002249 del env['COLUMNS']
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002250 env['LINES'] = '888'
2251 size = shutil.get_terminal_size()
2252 self.assertEqual(size.lines, 888)
2253
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002254 def test_bad_environ(self):
2255 with support.EnvironmentVarGuard() as env:
2256 env['COLUMNS'] = 'xxx'
2257 env['LINES'] = 'yyy'
2258 size = shutil.get_terminal_size()
2259 self.assertGreaterEqual(size.columns, 0)
2260 self.assertGreaterEqual(size.lines, 0)
2261
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002262 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
Victor Stinner119ebb72016-04-19 22:24:56 +02002263 @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2264 'need os.get_terminal_size()')
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002265 def test_stty_match(self):
2266 """Check if stty returns the same results ignoring env
2267
2268 This test will fail if stdin and stdout are connected to
2269 different terminals with different sizes. Nevertheless, such
2270 situations should be pretty rare.
2271 """
2272 try:
2273 size = subprocess.check_output(['stty', 'size']).decode().split()
Xavier de Gaye38c8b7d2016-11-14 17:14:42 +01002274 except (FileNotFoundError, PermissionError,
2275 subprocess.CalledProcessError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002276 self.skipTest("stty invocation failed")
2277 expected = (int(size[1]), int(size[0])) # reversed order
2278
2279 with support.EnvironmentVarGuard() as env:
2280 del env['LINES']
2281 del env['COLUMNS']
2282 actual = shutil.get_terminal_size()
2283
2284 self.assertEqual(expected, actual)
Ronald Oussorenf51738b2011-05-06 10:23:04 +02002285
Serhiy Storchakad30829d2016-04-24 09:58:43 +03002286 def test_fallback(self):
2287 with support.EnvironmentVarGuard() as env:
2288 del env['LINES']
2289 del env['COLUMNS']
2290
2291 # sys.__stdout__ has no fileno()
2292 with support.swap_attr(sys, '__stdout__', None):
2293 size = shutil.get_terminal_size(fallback=(10, 20))
2294 self.assertEqual(size.columns, 10)
2295 self.assertEqual(size.lines, 20)
2296
2297 # sys.__stdout__ is not a terminal on Unix
2298 # or fileno() not in (0, 1, 2) on Windows
2299 with open(os.devnull, 'w') as f, \
2300 support.swap_attr(sys, '__stdout__', f):
2301 size = shutil.get_terminal_size(fallback=(30, 40))
2302 self.assertEqual(size.columns, 30)
2303 self.assertEqual(size.lines, 40)
2304
Tarek Ziadéae4d5c62010-05-05 22:27:31 +00002305
Berker Peksag8083cd62014-11-01 11:04:06 +02002306class PublicAPITests(unittest.TestCase):
2307 """Ensures that the correct values are exposed in the public API."""
2308
2309 def test_module_all_attribute(self):
2310 self.assertTrue(hasattr(shutil, '__all__'))
2311 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2312 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2313 'SpecialFileError', 'ExecError', 'make_archive',
2314 'get_archive_formats', 'register_archive_format',
2315 'unregister_archive_format', 'get_unpack_formats',
2316 'register_unpack_format', 'unregister_unpack_format',
2317 'unpack_archive', 'ignore_patterns', 'chown', 'which',
2318 'get_terminal_size', 'SameFileError']
2319 if hasattr(os, 'statvfs') or os.name == 'nt':
2320 target_api.append('disk_usage')
2321 self.assertEqual(set(shutil.__all__), set(target_api))
2322
2323
Barry Warsaw7fc2cca2003-01-24 17:34:13 +00002324if __name__ == '__main__':
Brett Cannon3e9a9ae2013-06-12 21:25:59 -04002325 unittest.main()