blob: 49e5a37cd209d79b64e586ab773f1d5b36f57780 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020025import time
26import unittest
27import uuid
28import warnings
29from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000030try:
31 import threading
32except ImportError:
33 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020034try:
35 import resource
36except ImportError:
37 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020038try:
39 import fcntl
40except ImportError:
41 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010042try:
43 import _winapi
44except ImportError:
45 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020046try:
R David Murrayf2ad1732014-12-25 18:36:56 -050047 import grp
48 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
49 if hasattr(os, 'getgid'):
50 process_gid = os.getgid()
51 if process_gid not in groups:
52 groups.append(process_gid)
53except ImportError:
54 groups = []
55try:
56 import pwd
57 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010058except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050059 all_users = []
60try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020062except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020063 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020064
Berker Peksagce643912015-05-06 06:33:17 +030065from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020066from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000067
Victor Stinner923590e2016-03-24 09:11:48 +010068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Victor Stinner923590e2016-03-24 09:11:48 +010085
86@contextlib.contextmanager
87def ignore_deprecation_warnings(msg_regex, quiet=False):
88 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
89 yield
90
91
Berker Peksag4af23d72016-09-15 20:32:44 +030092def requires_os_func(name):
93 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
94
95
Brett Cannonec6ce872016-09-06 15:50:29 -070096class _PathLike(os.PathLike):
97
98 def __init__(self, path=""):
99 self.path = path
100
101 def __str__(self):
102 return str(self.path)
103
104 def __fspath__(self):
105 if isinstance(self.path, BaseException):
106 raise self.path
107 else:
108 return self.path
109
110
Victor Stinnerae39d232016-03-24 17:12:55 +0100111def create_file(filename, content=b'content'):
112 with open(filename, "xb", 0) as fp:
113 fp.write(content)
114
115
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000116# Tests creating TESTFN
117class FileTests(unittest.TestCase):
118 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000119 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000120 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121 tearDown = setUp
122
123 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000124 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000126 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000127
Christian Heimesfdab48e2008-01-20 09:06:41 +0000128 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000129 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
130 # We must allocate two consecutive file descriptors, otherwise
131 # it will mess up other file descriptors (perhaps even the three
132 # standard ones).
133 second = os.dup(first)
134 try:
135 retries = 0
136 while second != first + 1:
137 os.close(first)
138 retries += 1
139 if retries > 10:
140 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000141 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000142 first, second = second, os.dup(second)
143 finally:
144 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000145 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000146 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000147 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000148
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000149 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000150 def test_rename(self):
151 path = support.TESTFN
152 old = sys.getrefcount(path)
153 self.assertRaises(TypeError, os.rename, path, 0)
154 new = sys.getrefcount(path)
155 self.assertEqual(old, new)
156
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000157 def test_read(self):
158 with open(support.TESTFN, "w+b") as fobj:
159 fobj.write(b"spam")
160 fobj.flush()
161 fd = fobj.fileno()
162 os.lseek(fd, 0, 0)
163 s = os.read(fd, 4)
164 self.assertEqual(type(s), bytes)
165 self.assertEqual(s, b"spam")
166
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200167 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200168 # Skip the test on 32-bit platforms: the number of bytes must fit in a
169 # Py_ssize_t type
170 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
171 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200172 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
173 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200174 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100175 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200176
177 # Issue #21932: Make sure that os.read() does not raise an
178 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200179 with open(support.TESTFN, "rb") as fp:
180 data = os.read(fp.fileno(), size)
181
182 # The test does not try to read more than 2 GB at once because the
183 # operating system is free to return less bytes than requested.
184 self.assertEqual(data, b'test')
185
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000186 def test_write(self):
187 # os.write() accepts bytes- and buffer-like objects but not strings
188 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
189 self.assertRaises(TypeError, os.write, fd, "beans")
190 os.write(fd, b"bacon\n")
191 os.write(fd, bytearray(b"eggs\n"))
192 os.write(fd, memoryview(b"spam\n"))
193 os.close(fd)
194 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000195 self.assertEqual(fobj.read().splitlines(),
196 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000197
Victor Stinnere0daff12011-03-20 23:36:35 +0100198 def write_windows_console(self, *args):
199 retcode = subprocess.call(args,
200 # use a new console to not flood the test output
201 creationflags=subprocess.CREATE_NEW_CONSOLE,
202 # use a shell to hide the console window (SW_HIDE)
203 shell=True)
204 self.assertEqual(retcode, 0)
205
206 @unittest.skipUnless(sys.platform == 'win32',
207 'test specific to the Windows console')
208 def test_write_windows_console(self):
209 # Issue #11395: the Windows console returns an error (12: not enough
210 # space error) on writing into stdout if stdout mode is binary and the
211 # length is greater than 66,000 bytes (or less, depending on heap
212 # usage).
213 code = "print('x' * 100000)"
214 self.write_windows_console(sys.executable, "-c", code)
215 self.write_windows_console(sys.executable, "-u", "-c", code)
216
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000217 def fdopen_helper(self, *args):
218 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200219 f = os.fdopen(fd, *args)
220 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000221
222 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200223 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
224 os.close(fd)
225
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000226 self.fdopen_helper()
227 self.fdopen_helper('r')
228 self.fdopen_helper('r', 100)
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 def test_replace(self):
231 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100232 self.addCleanup(support.unlink, support.TESTFN)
233 self.addCleanup(support.unlink, TESTFN2)
234
235 create_file(support.TESTFN, b"1")
236 create_file(TESTFN2, b"2")
237
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100238 os.replace(support.TESTFN, TESTFN2)
239 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
240 with open(TESTFN2, 'r') as f:
241 self.assertEqual(f.read(), "1")
242
Martin Panterbf19d162015-09-09 01:01:13 +0000243 def test_open_keywords(self):
244 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
245 dir_fd=None)
246 os.close(f)
247
248 def test_symlink_keywords(self):
249 symlink = support.get_attribute(os, "symlink")
250 try:
251 symlink(src='target', dst=support.TESTFN,
252 target_is_directory=False, dir_fd=None)
253 except (NotImplementedError, OSError):
254 pass # No OS support or unprivileged user
255
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200256
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000257# Test attributes on return values from os.*stat* family.
258class StatAttributeTests(unittest.TestCase):
259 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200260 self.fname = support.TESTFN
261 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100262 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Serhiy Storchaka43767632013-11-03 21:31:38 +0200264 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000265 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000266 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000267
268 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(result[stat.ST_SIZE], 3)
270 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000271
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000272 # Make sure all the attributes are there
273 members = dir(result)
274 for name in dir(stat):
275 if name[:3] == 'ST_':
276 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000277 if name.endswith("TIME"):
278 def trunc(x): return int(x)
279 else:
280 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000281 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000283 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284
Larry Hastings6fe20b32012-04-19 15:07:49 -0700285 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700286 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700287 for name in 'st_atime st_mtime st_ctime'.split():
288 floaty = int(getattr(result, name) * 100000)
289 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700290 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700291
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 try:
293 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200294 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 except IndexError:
296 pass
297
298 # Make sure that assignment fails
299 try:
300 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200301 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000302 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000303 pass
304
305 try:
306 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200307 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000308 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000309 pass
310
311 try:
312 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200313 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000314 except AttributeError:
315 pass
316
317 # Use the stat_result constructor with a too-short tuple.
318 try:
319 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200320 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000321 except TypeError:
322 pass
323
Ezio Melotti42da6632011-03-15 05:18:48 +0200324 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000325 try:
326 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
327 except TypeError:
328 pass
329
Antoine Pitrou38425292010-09-21 18:19:07 +0000330 def test_stat_attributes(self):
331 self.check_stat_attributes(self.fname)
332
333 def test_stat_attributes_bytes(self):
334 try:
335 fname = self.fname.encode(sys.getfilesystemencoding())
336 except UnicodeEncodeError:
337 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700338 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000339
Christian Heimes25827622013-10-12 01:27:08 +0200340 def test_stat_result_pickle(self):
341 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200342 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
343 p = pickle.dumps(result, proto)
344 self.assertIn(b'stat_result', p)
345 if proto < 4:
346 self.assertIn(b'cos\nstat_result\n', p)
347 unpickled = pickle.loads(p)
348 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200349
Serhiy Storchaka43767632013-11-03 21:31:38 +0200350 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000351 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000352 try:
353 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000354 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000355 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000356 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200357 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000358
359 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000360 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000361
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000362 # Make sure all the attributes are there.
363 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
364 'ffree', 'favail', 'flag', 'namemax')
365 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000366 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000367
368 # Make sure that assignment really fails
369 try:
370 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200371 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000372 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000373 pass
374
375 try:
376 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200377 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000378 except AttributeError:
379 pass
380
381 # Use the constructor with a too-short tuple.
382 try:
383 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200384 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000385 except TypeError:
386 pass
387
Ezio Melotti42da6632011-03-15 05:18:48 +0200388 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000389 try:
390 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
391 except TypeError:
392 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000393
Christian Heimes25827622013-10-12 01:27:08 +0200394 @unittest.skipUnless(hasattr(os, 'statvfs'),
395 "need os.statvfs()")
396 def test_statvfs_result_pickle(self):
397 try:
398 result = os.statvfs(self.fname)
399 except OSError as e:
400 # On AtheOS, glibc always returns ENOSYS
401 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200402 self.skipTest('os.statvfs() failed with ENOSYS')
403
Serhiy Storchakabad12572014-12-15 14:03:42 +0200404 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
405 p = pickle.dumps(result, proto)
406 self.assertIn(b'statvfs_result', p)
407 if proto < 4:
408 self.assertIn(b'cos\nstatvfs_result\n', p)
409 unpickled = pickle.loads(p)
410 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200411
Serhiy Storchaka43767632013-11-03 21:31:38 +0200412 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
413 def test_1686475(self):
414 # Verify that an open file can be stat'ed
415 try:
416 os.stat(r"c:\pagefile.sys")
417 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600418 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200419 except OSError as e:
420 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000421
Serhiy Storchaka43767632013-11-03 21:31:38 +0200422 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
423 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
424 def test_15261(self):
425 # Verify that stat'ing a closed fd does not cause crash
426 r, w = os.pipe()
427 try:
428 os.stat(r) # should not raise error
429 finally:
430 os.close(r)
431 os.close(w)
432 with self.assertRaises(OSError) as ctx:
433 os.stat(r)
434 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100435
Zachary Ware63f277b2014-06-19 09:46:37 -0500436 def check_file_attributes(self, result):
437 self.assertTrue(hasattr(result, 'st_file_attributes'))
438 self.assertTrue(isinstance(result.st_file_attributes, int))
439 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
440
441 @unittest.skipUnless(sys.platform == "win32",
442 "st_file_attributes is Win32 specific")
443 def test_file_attributes(self):
444 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
445 result = os.stat(self.fname)
446 self.check_file_attributes(result)
447 self.assertEqual(
448 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
449 0)
450
451 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200452 dirname = support.TESTFN + "dir"
453 os.mkdir(dirname)
454 self.addCleanup(os.rmdir, dirname)
455
456 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500457 self.check_file_attributes(result)
458 self.assertEqual(
459 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
460 stat.FILE_ATTRIBUTE_DIRECTORY)
461
Berker Peksag0b4dc482016-09-17 15:49:59 +0300462 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
463 def test_access_denied(self):
464 # Default to FindFirstFile WIN32_FIND_DATA when access is
465 # denied. See issue 28075.
466 # os.environ['TEMP'] should be located on a volume that
467 # supports file ACLs.
468 fname = os.path.join(os.environ['TEMP'], self.fname)
469 self.addCleanup(support.unlink, fname)
470 create_file(fname, b'ABC')
471 # Deny the right to [S]YNCHRONIZE on the file to
472 # force CreateFile to fail with ERROR_ACCESS_DENIED.
473 DETACHED_PROCESS = 8
474 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500475 # bpo-30584: Use security identifier *S-1-5-32-545 instead
476 # of localized "Users" to not depend on the locale.
477 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300478 creationflags=DETACHED_PROCESS
479 )
480 result = os.stat(fname)
481 self.assertNotEqual(result.st_size, 0)
482
Victor Stinner47aacc82015-06-12 17:26:23 +0200483
484class UtimeTests(unittest.TestCase):
485 def setUp(self):
486 self.dirname = support.TESTFN
487 self.fname = os.path.join(self.dirname, "f1")
488
489 self.addCleanup(support.rmtree, self.dirname)
490 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100491 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200492
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200493 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100494 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200495 os.stat_float_times(state)
496
Victor Stinner47aacc82015-06-12 17:26:23 +0200497 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100498 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200499 old_float_times = os.stat_float_times(-1)
500 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200501
502 os.stat_float_times(True)
503
504 def support_subsecond(self, filename):
505 # Heuristic to check if the filesystem supports timestamp with
506 # subsecond resolution: check if float and int timestamps are different
507 st = os.stat(filename)
508 return ((st.st_atime != st[7])
509 or (st.st_mtime != st[8])
510 or (st.st_ctime != st[9]))
511
512 def _test_utime(self, set_time, filename=None):
513 if not filename:
514 filename = self.fname
515
516 support_subsecond = self.support_subsecond(filename)
517 if support_subsecond:
518 # Timestamp with a resolution of 1 microsecond (10^-6).
519 #
520 # The resolution of the C internal function used by os.utime()
521 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
522 # test with a resolution of 1 ns requires more work:
523 # see the issue #15745.
524 atime_ns = 1002003000 # 1.002003 seconds
525 mtime_ns = 4005006000 # 4.005006 seconds
526 else:
527 # use a resolution of 1 second
528 atime_ns = 5 * 10**9
529 mtime_ns = 8 * 10**9
530
531 set_time(filename, (atime_ns, mtime_ns))
532 st = os.stat(filename)
533
534 if support_subsecond:
535 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
536 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
537 else:
538 self.assertEqual(st.st_atime, atime_ns * 1e-9)
539 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
540 self.assertEqual(st.st_atime_ns, atime_ns)
541 self.assertEqual(st.st_mtime_ns, mtime_ns)
542
543 def test_utime(self):
544 def set_time(filename, ns):
545 # test the ns keyword parameter
546 os.utime(filename, ns=ns)
547 self._test_utime(set_time)
548
549 @staticmethod
550 def ns_to_sec(ns):
551 # Convert a number of nanosecond (int) to a number of seconds (float).
552 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
553 # issue, os.utime() rounds towards minus infinity.
554 return (ns * 1e-9) + 0.5e-9
555
556 def test_utime_by_indexed(self):
557 # pass times as floating point seconds as the second indexed parameter
558 def set_time(filename, ns):
559 atime_ns, mtime_ns = ns
560 atime = self.ns_to_sec(atime_ns)
561 mtime = self.ns_to_sec(mtime_ns)
562 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
563 # or utime(time_t)
564 os.utime(filename, (atime, mtime))
565 self._test_utime(set_time)
566
567 def test_utime_by_times(self):
568 def set_time(filename, ns):
569 atime_ns, mtime_ns = ns
570 atime = self.ns_to_sec(atime_ns)
571 mtime = self.ns_to_sec(mtime_ns)
572 # test the times keyword parameter
573 os.utime(filename, times=(atime, mtime))
574 self._test_utime(set_time)
575
576 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
577 "follow_symlinks support for utime required "
578 "for this test.")
579 def test_utime_nofollow_symlinks(self):
580 def set_time(filename, ns):
581 # use follow_symlinks=False to test utimensat(timespec)
582 # or lutimes(timeval)
583 os.utime(filename, ns=ns, follow_symlinks=False)
584 self._test_utime(set_time)
585
586 @unittest.skipUnless(os.utime in os.supports_fd,
587 "fd support for utime required for this test.")
588 def test_utime_fd(self):
589 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100590 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200591 # use a file descriptor to test futimens(timespec)
592 # or futimes(timeval)
593 os.utime(fp.fileno(), ns=ns)
594 self._test_utime(set_time)
595
596 @unittest.skipUnless(os.utime in os.supports_dir_fd,
597 "dir_fd support for utime required for this test.")
598 def test_utime_dir_fd(self):
599 def set_time(filename, ns):
600 dirname, name = os.path.split(filename)
601 dirfd = os.open(dirname, os.O_RDONLY)
602 try:
603 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
604 os.utime(name, dir_fd=dirfd, ns=ns)
605 finally:
606 os.close(dirfd)
607 self._test_utime(set_time)
608
609 def test_utime_directory(self):
610 def set_time(filename, ns):
611 # test calling os.utime() on a directory
612 os.utime(filename, ns=ns)
613 self._test_utime(set_time, filename=self.dirname)
614
615 def _test_utime_current(self, set_time):
616 # Get the system clock
617 current = time.time()
618
619 # Call os.utime() to set the timestamp to the current system clock
620 set_time(self.fname)
621
622 if not self.support_subsecond(self.fname):
623 delta = 1.0
Victor Stinnerc94caca2017-06-13 23:48:27 +0200624 elif os.name == 'nt':
625 # On Windows, the usual resolution of time.time() is 15.6 ms.
626 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
627 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200628 else:
Victor Stinner3402f722017-06-14 11:55:17 +0200629 # bpo-30649: PPC64 Fedora 3.x buildbot requires
630 # at least a delta of 14 ms
631 delta = 0.020
Victor Stinner47aacc82015-06-12 17:26:23 +0200632 st = os.stat(self.fname)
633 msg = ("st_time=%r, current=%r, dt=%r"
634 % (st.st_mtime, current, st.st_mtime - current))
635 self.assertAlmostEqual(st.st_mtime, current,
636 delta=delta, msg=msg)
637
638 def test_utime_current(self):
639 def set_time(filename):
640 # Set to the current time in the new way
641 os.utime(self.fname)
642 self._test_utime_current(set_time)
643
644 def test_utime_current_old(self):
645 def set_time(filename):
646 # Set to the current time in the old explicit way.
647 os.utime(self.fname, None)
648 self._test_utime_current(set_time)
649
650 def get_file_system(self, path):
651 if sys.platform == 'win32':
652 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
653 import ctypes
654 kernel32 = ctypes.windll.kernel32
655 buf = ctypes.create_unicode_buffer("", 100)
656 ok = kernel32.GetVolumeInformationW(root, None, 0,
657 None, None, None,
658 buf, len(buf))
659 if ok:
660 return buf.value
661 # return None if the filesystem is unknown
662
663 def test_large_time(self):
664 # Many filesystems are limited to the year 2038. At least, the test
665 # pass with NTFS filesystem.
666 if self.get_file_system(self.dirname) != "NTFS":
667 self.skipTest("requires NTFS")
668
669 large = 5000000000 # some day in 2128
670 os.utime(self.fname, (large, large))
671 self.assertEqual(os.stat(self.fname).st_mtime, large)
672
673 def test_utime_invalid_arguments(self):
674 # seconds and nanoseconds parameters are mutually exclusive
675 with self.assertRaises(ValueError):
676 os.utime(self.fname, (5, 5), ns=(5, 5))
677
678
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000679from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000680
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000681class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000682 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000683 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000684
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000685 def setUp(self):
686 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000687 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000688 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000689 for key, value in self._reference().items():
690 os.environ[key] = value
691
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000692 def tearDown(self):
693 os.environ.clear()
694 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000695 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000696 os.environb.clear()
697 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000698
Christian Heimes90333392007-11-01 19:08:42 +0000699 def _reference(self):
700 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
701
702 def _empty_mapping(self):
703 os.environ.clear()
704 return os.environ
705
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000706 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200707 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
708 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000709 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000710 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300711 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200712 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300713 value = popen.read().strip()
714 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000715
Xavier de Gayed1415312016-07-22 12:15:29 +0200716 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
717 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000718 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200719 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
720 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300721 it = iter(popen)
722 self.assertEqual(next(it), "line1\n")
723 self.assertEqual(next(it), "line2\n")
724 self.assertEqual(next(it), "line3\n")
725 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000726
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000727 # Verify environ keys and values from the OS are of the
728 # correct str type.
729 def test_keyvalue_types(self):
730 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000731 self.assertEqual(type(key), str)
732 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000733
Christian Heimes90333392007-11-01 19:08:42 +0000734 def test_items(self):
735 for key, value in self._reference().items():
736 self.assertEqual(os.environ.get(key), value)
737
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000738 # Issue 7310
739 def test___repr__(self):
740 """Check that the repr() of os.environ looks like environ({...})."""
741 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000742 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
743 '{!r}: {!r}'.format(key, value)
744 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000745
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000746 def test_get_exec_path(self):
747 defpath_list = os.defpath.split(os.pathsep)
748 test_path = ['/monty', '/python', '', '/flying/circus']
749 test_env = {'PATH': os.pathsep.join(test_path)}
750
751 saved_environ = os.environ
752 try:
753 os.environ = dict(test_env)
754 # Test that defaulting to os.environ works.
755 self.assertSequenceEqual(test_path, os.get_exec_path())
756 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
757 finally:
758 os.environ = saved_environ
759
760 # No PATH environment variable
761 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
762 # Empty PATH environment variable
763 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
764 # Supplied PATH environment variable
765 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
766
Victor Stinnerb745a742010-05-18 17:17:23 +0000767 if os.supports_bytes_environ:
768 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000769 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000770 # ignore BytesWarning warning
771 with warnings.catch_warnings(record=True):
772 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000773 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000774 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000775 pass
776 else:
777 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000778
779 # bytes key and/or value
780 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
781 ['abc'])
782 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
783 ['abc'])
784 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
785 ['abc'])
786
787 @unittest.skipUnless(os.supports_bytes_environ,
788 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000789 def test_environb(self):
790 # os.environ -> os.environb
791 value = 'euro\u20ac'
792 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000793 value_bytes = value.encode(sys.getfilesystemencoding(),
794 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000795 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000796 msg = "U+20AC character is not encodable to %s" % (
797 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000798 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000799 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000800 self.assertEqual(os.environ['unicode'], value)
801 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000802
803 # os.environb -> os.environ
804 value = b'\xff'
805 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000806 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000807 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000808 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000809
Charles-François Natali2966f102011-11-26 11:32:46 +0100810 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
811 # #13415).
812 @support.requires_freebsd_version(7)
813 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100814 def test_unset_error(self):
815 if sys.platform == "win32":
816 # an environment variable is limited to 32,767 characters
817 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100818 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100819 else:
820 # "=" is not allowed in a variable name
821 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100822 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100823
Victor Stinner6d101392013-04-14 16:35:04 +0200824 def test_key_type(self):
825 missing = 'missingkey'
826 self.assertNotIn(missing, os.environ)
827
Victor Stinner839e5ea2013-04-14 16:43:03 +0200828 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200829 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200830 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200831 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200832
Victor Stinner839e5ea2013-04-14 16:43:03 +0200833 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200834 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200835 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200836 self.assertTrue(cm.exception.__suppress_context__)
837
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300838 def _test_environ_iteration(self, collection):
839 iterator = iter(collection)
840 new_key = "__new_key__"
841
842 next(iterator) # start iteration over os.environ.items
843
844 # add a new key in os.environ mapping
845 os.environ[new_key] = "test_environ_iteration"
846
847 try:
848 next(iterator) # force iteration over modified mapping
849 self.assertEqual(os.environ[new_key], "test_environ_iteration")
850 finally:
851 del os.environ[new_key]
852
853 def test_iter_error_when_changing_os_environ(self):
854 self._test_environ_iteration(os.environ)
855
856 def test_iter_error_when_changing_os_environ_items(self):
857 self._test_environ_iteration(os.environ.items())
858
859 def test_iter_error_when_changing_os_environ_values(self):
860 self._test_environ_iteration(os.environ.values())
861
Victor Stinner6d101392013-04-14 16:35:04 +0200862
Tim Petersc4e09402003-04-25 07:11:48 +0000863class WalkTests(unittest.TestCase):
864 """Tests for os.walk()."""
865
Victor Stinner0561c532015-03-12 10:28:24 +0100866 # Wrapper to hide minor differences between os.walk and os.fwalk
867 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200868 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200869 if 'follow_symlinks' in kwargs:
870 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200871 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100872
Charles-François Natali7372b062012-02-05 15:15:38 +0100873 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100874 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100875 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000876
877 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000878 # TESTFN/
879 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000880 # tmp1
881 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000882 # tmp2
883 # SUB11/ no kids
884 # SUB2/ a file kid and a dirsymlink kid
885 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300886 # SUB21/ not readable
887 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000888 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200889 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300890 # broken_link2
891 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000892 # TEST2/
893 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100894 self.walk_path = join(support.TESTFN, "TEST1")
895 self.sub1_path = join(self.walk_path, "SUB1")
896 self.sub11_path = join(self.sub1_path, "SUB11")
897 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300898 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100899 tmp1_path = join(self.walk_path, "tmp1")
900 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000901 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300902 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100903 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000904 t2_path = join(support.TESTFN, "TEST2")
905 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200906 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300907 broken_link2_path = join(sub2_path, "broken_link2")
908 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000909
910 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100911 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000912 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300913 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000914 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100915
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300916 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100917 with open(path, "x") as f:
918 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000919
Victor Stinner0561c532015-03-12 10:28:24 +0100920 if support.can_symlink():
921 os.symlink(os.path.abspath(t2_path), self.link_path)
922 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300923 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
924 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300925 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300926 ["broken_link", "broken_link2", "broken_link3",
927 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100928 else:
929 self.sub2_tree = (sub2_path, [], ["tmp3"])
930
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300931 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300932 try:
933 os.listdir(sub21_path)
934 except PermissionError:
935 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
936 else:
937 os.chmod(sub21_path, stat.S_IRWXU)
938 os.unlink(tmp5_path)
939 os.rmdir(sub21_path)
940 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300941
Victor Stinner0561c532015-03-12 10:28:24 +0100942 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000943 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300944 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100945
Tim Petersc4e09402003-04-25 07:11:48 +0000946 self.assertEqual(len(all), 4)
947 # We can't know which order SUB1 and SUB2 will appear in.
948 # Not flipped: TESTFN, SUB1, SUB11, SUB2
949 # flipped: TESTFN, SUB2, SUB1, SUB11
950 flipped = all[0][1][0] != "SUB1"
951 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200952 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300953 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100954 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
955 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
956 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
957 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000958
Brett Cannon3f9183b2016-08-26 14:44:48 -0700959 def test_walk_prune(self, walk_path=None):
960 if walk_path is None:
961 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000962 # Prune the search.
963 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700964 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000965 all.append((root, dirs, files))
966 # Don't descend into SUB1.
967 if 'SUB1' in dirs:
968 # Note that this also mutates the dirs we appended to all!
969 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000970
Victor Stinner0561c532015-03-12 10:28:24 +0100971 self.assertEqual(len(all), 2)
972 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700973 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100974
975 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300976 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100977 self.assertEqual(all[1], self.sub2_tree)
978
Brett Cannon3f9183b2016-08-26 14:44:48 -0700979 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700980 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700981
Victor Stinner0561c532015-03-12 10:28:24 +0100982 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000983 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100984 all = list(self.walk(self.walk_path, topdown=False))
985
Victor Stinner53b0a412016-03-26 01:12:36 +0100986 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000987 # We can't know which order SUB1 and SUB2 will appear in.
988 # Not flipped: SUB11, SUB1, SUB2, TESTFN
989 # flipped: SUB2, SUB11, SUB1, TESTFN
990 flipped = all[3][1][0] != "SUB1"
991 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200992 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300993 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100994 self.assertEqual(all[3],
995 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
996 self.assertEqual(all[flipped],
997 (self.sub11_path, [], []))
998 self.assertEqual(all[flipped + 1],
999 (self.sub1_path, ["SUB11"], ["tmp2"]))
1000 self.assertEqual(all[2 - 2 * flipped],
1001 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001002
Victor Stinner0561c532015-03-12 10:28:24 +01001003 def test_walk_symlink(self):
1004 if not support.can_symlink():
1005 self.skipTest("need symlink support")
1006
1007 # Walk, following symlinks.
1008 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1009 for root, dirs, files in walk_it:
1010 if root == self.link_path:
1011 self.assertEqual(dirs, [])
1012 self.assertEqual(files, ["tmp4"])
1013 break
1014 else:
1015 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001016
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001017 def test_walk_bad_dir(self):
1018 # Walk top-down.
1019 errors = []
1020 walk_it = self.walk(self.walk_path, onerror=errors.append)
1021 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001022 self.assertEqual(errors, [])
1023 dir1 = 'SUB1'
1024 path1 = os.path.join(root, dir1)
1025 path1new = os.path.join(root, dir1 + '.new')
1026 os.rename(path1, path1new)
1027 try:
1028 roots = [r for r, d, f in walk_it]
1029 self.assertTrue(errors)
1030 self.assertNotIn(path1, roots)
1031 self.assertNotIn(path1new, roots)
1032 for dir2 in dirs:
1033 if dir2 != dir1:
1034 self.assertIn(os.path.join(root, dir2), roots)
1035 finally:
1036 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001037
Charles-François Natali7372b062012-02-05 15:15:38 +01001038
1039@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1040class FwalkTests(WalkTests):
1041 """Tests for os.fwalk()."""
1042
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001043 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001044 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001045 yield (root, dirs, files)
1046
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001047 def fwalk(self, *args, **kwargs):
1048 return os.fwalk(*args, **kwargs)
1049
Larry Hastingsc48fe982012-06-25 04:49:05 -07001050 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1051 """
1052 compare with walk() results.
1053 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001054 walk_kwargs = walk_kwargs.copy()
1055 fwalk_kwargs = fwalk_kwargs.copy()
1056 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1057 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1058 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001059
Charles-François Natali7372b062012-02-05 15:15:38 +01001060 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001061 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001062 expected[root] = (set(dirs), set(files))
1063
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001064 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001065 self.assertIn(root, expected)
1066 self.assertEqual(expected[root], (set(dirs), set(files)))
1067
Larry Hastingsc48fe982012-06-25 04:49:05 -07001068 def test_compare_to_walk(self):
1069 kwargs = {'top': support.TESTFN}
1070 self._compare_to_walk(kwargs, kwargs)
1071
Charles-François Natali7372b062012-02-05 15:15:38 +01001072 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001073 try:
1074 fd = os.open(".", os.O_RDONLY)
1075 walk_kwargs = {'top': support.TESTFN}
1076 fwalk_kwargs = walk_kwargs.copy()
1077 fwalk_kwargs['dir_fd'] = fd
1078 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1079 finally:
1080 os.close(fd)
1081
1082 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001083 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001084 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1085 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001086 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001087 # check that the FD is valid
1088 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001089 # redundant check
1090 os.stat(rootfd)
1091 # check that listdir() returns consistent information
1092 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001093
1094 def test_fd_leak(self):
1095 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1096 # we both check that calling fwalk() a large number of times doesn't
1097 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1098 minfd = os.dup(1)
1099 os.close(minfd)
1100 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001101 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001102 pass
1103 newfd = os.dup(1)
1104 self.addCleanup(os.close, newfd)
1105 self.assertEqual(newfd, minfd)
1106
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001107class BytesWalkTests(WalkTests):
1108 """Tests for os.walk() with bytes."""
1109 def walk(self, top, **kwargs):
1110 if 'follow_symlinks' in kwargs:
1111 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1112 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1113 root = os.fsdecode(broot)
1114 dirs = list(map(os.fsdecode, bdirs))
1115 files = list(map(os.fsdecode, bfiles))
1116 yield (root, dirs, files)
1117 bdirs[:] = list(map(os.fsencode, dirs))
1118 bfiles[:] = list(map(os.fsencode, files))
1119
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001120@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1121class BytesFwalkTests(FwalkTests):
1122 """Tests for os.walk() with bytes."""
1123 def fwalk(self, top='.', *args, **kwargs):
1124 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1125 root = os.fsdecode(broot)
1126 dirs = list(map(os.fsdecode, bdirs))
1127 files = list(map(os.fsdecode, bfiles))
1128 yield (root, dirs, files, topfd)
1129 bdirs[:] = list(map(os.fsencode, dirs))
1130 bfiles[:] = list(map(os.fsencode, files))
1131
Charles-François Natali7372b062012-02-05 15:15:38 +01001132
Guido van Rossume7ba4952007-06-06 23:52:48 +00001133class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001134 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001135 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001136
1137 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001138 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001139 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1140 os.makedirs(path) # Should work
1141 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1142 os.makedirs(path)
1143
1144 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001145 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001146 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1147 os.makedirs(path)
1148 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1149 'dir5', 'dir6')
1150 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001151
Serhiy Storchakae304e332017-03-24 13:27:42 +02001152 def test_mode(self):
1153 with support.temp_umask(0o002):
1154 base = support.TESTFN
1155 parent = os.path.join(base, 'dir1')
1156 path = os.path.join(parent, 'dir2')
1157 os.makedirs(path, 0o555)
1158 self.assertTrue(os.path.exists(path))
1159 self.assertTrue(os.path.isdir(path))
1160 if os.name != 'nt':
1161 self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o555)
1162 self.assertEqual(stat.S_IMODE(os.stat(parent).st_mode), 0o775)
1163
Terry Reedy5a22b652010-12-02 07:05:56 +00001164 def test_exist_ok_existing_directory(self):
1165 path = os.path.join(support.TESTFN, 'dir1')
1166 mode = 0o777
1167 old_mask = os.umask(0o022)
1168 os.makedirs(path, mode)
1169 self.assertRaises(OSError, os.makedirs, path, mode)
1170 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001171 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001172 os.makedirs(path, mode=mode, exist_ok=True)
1173 os.umask(old_mask)
1174
Martin Pantera82642f2015-11-19 04:48:44 +00001175 # Issue #25583: A drive root could raise PermissionError on Windows
1176 os.makedirs(os.path.abspath('/'), exist_ok=True)
1177
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001178 def test_exist_ok_s_isgid_directory(self):
1179 path = os.path.join(support.TESTFN, 'dir1')
1180 S_ISGID = stat.S_ISGID
1181 mode = 0o777
1182 old_mask = os.umask(0o022)
1183 try:
1184 existing_testfn_mode = stat.S_IMODE(
1185 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001186 try:
1187 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001188 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001189 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001190 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1191 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1192 # The os should apply S_ISGID from the parent dir for us, but
1193 # this test need not depend on that behavior. Be explicit.
1194 os.makedirs(path, mode | S_ISGID)
1195 # http://bugs.python.org/issue14992
1196 # Should not fail when the bit is already set.
1197 os.makedirs(path, mode, exist_ok=True)
1198 # remove the bit.
1199 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001200 # May work even when the bit is not already set when demanded.
1201 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001202 finally:
1203 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001204
1205 def test_exist_ok_existing_regular_file(self):
1206 base = support.TESTFN
1207 path = os.path.join(support.TESTFN, 'dir1')
1208 f = open(path, 'w')
1209 f.write('abc')
1210 f.close()
1211 self.assertRaises(OSError, os.makedirs, path)
1212 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1213 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1214 os.remove(path)
1215
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001216 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001217 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001218 'dir4', 'dir5', 'dir6')
1219 # If the tests failed, the bottom-most directory ('../dir6')
1220 # may not have been created, so we look for the outermost directory
1221 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001222 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001223 path = os.path.dirname(path)
1224
1225 os.removedirs(path)
1226
Andrew Svetlov405faed2012-12-25 12:18:09 +02001227
R David Murrayf2ad1732014-12-25 18:36:56 -05001228@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1229class ChownFileTests(unittest.TestCase):
1230
Berker Peksag036a71b2015-07-21 09:29:48 +03001231 @classmethod
1232 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001233 os.mkdir(support.TESTFN)
1234
1235 def test_chown_uid_gid_arguments_must_be_index(self):
1236 stat = os.stat(support.TESTFN)
1237 uid = stat.st_uid
1238 gid = stat.st_gid
1239 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1240 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1241 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1242 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1243 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1244
1245 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1246 def test_chown(self):
1247 gid_1, gid_2 = groups[:2]
1248 uid = os.stat(support.TESTFN).st_uid
1249 os.chown(support.TESTFN, uid, gid_1)
1250 gid = os.stat(support.TESTFN).st_gid
1251 self.assertEqual(gid, gid_1)
1252 os.chown(support.TESTFN, uid, gid_2)
1253 gid = os.stat(support.TESTFN).st_gid
1254 self.assertEqual(gid, gid_2)
1255
1256 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1257 "test needs root privilege and more than one user")
1258 def test_chown_with_root(self):
1259 uid_1, uid_2 = all_users[:2]
1260 gid = os.stat(support.TESTFN).st_gid
1261 os.chown(support.TESTFN, uid_1, gid)
1262 uid = os.stat(support.TESTFN).st_uid
1263 self.assertEqual(uid, uid_1)
1264 os.chown(support.TESTFN, uid_2, gid)
1265 uid = os.stat(support.TESTFN).st_uid
1266 self.assertEqual(uid, uid_2)
1267
1268 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1269 "test needs non-root account and more than one user")
1270 def test_chown_without_permission(self):
1271 uid_1, uid_2 = all_users[:2]
1272 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001273 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001274 os.chown(support.TESTFN, uid_1, gid)
1275 os.chown(support.TESTFN, uid_2, gid)
1276
Berker Peksag036a71b2015-07-21 09:29:48 +03001277 @classmethod
1278 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001279 os.rmdir(support.TESTFN)
1280
1281
Andrew Svetlov405faed2012-12-25 12:18:09 +02001282class RemoveDirsTests(unittest.TestCase):
1283 def setUp(self):
1284 os.makedirs(support.TESTFN)
1285
1286 def tearDown(self):
1287 support.rmtree(support.TESTFN)
1288
1289 def test_remove_all(self):
1290 dira = os.path.join(support.TESTFN, 'dira')
1291 os.mkdir(dira)
1292 dirb = os.path.join(dira, 'dirb')
1293 os.mkdir(dirb)
1294 os.removedirs(dirb)
1295 self.assertFalse(os.path.exists(dirb))
1296 self.assertFalse(os.path.exists(dira))
1297 self.assertFalse(os.path.exists(support.TESTFN))
1298
1299 def test_remove_partial(self):
1300 dira = os.path.join(support.TESTFN, 'dira')
1301 os.mkdir(dira)
1302 dirb = os.path.join(dira, 'dirb')
1303 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001304 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001305 os.removedirs(dirb)
1306 self.assertFalse(os.path.exists(dirb))
1307 self.assertTrue(os.path.exists(dira))
1308 self.assertTrue(os.path.exists(support.TESTFN))
1309
1310 def test_remove_nothing(self):
1311 dira = os.path.join(support.TESTFN, 'dira')
1312 os.mkdir(dira)
1313 dirb = os.path.join(dira, 'dirb')
1314 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001315 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001316 with self.assertRaises(OSError):
1317 os.removedirs(dirb)
1318 self.assertTrue(os.path.exists(dirb))
1319 self.assertTrue(os.path.exists(dira))
1320 self.assertTrue(os.path.exists(support.TESTFN))
1321
1322
Guido van Rossume7ba4952007-06-06 23:52:48 +00001323class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001324 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001325 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001326 f.write(b'hello')
1327 f.close()
1328 with open(os.devnull, 'rb') as f:
1329 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001330
Andrew Svetlov405faed2012-12-25 12:18:09 +02001331
Guido van Rossume7ba4952007-06-06 23:52:48 +00001332class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001333 def test_urandom_length(self):
1334 self.assertEqual(len(os.urandom(0)), 0)
1335 self.assertEqual(len(os.urandom(1)), 1)
1336 self.assertEqual(len(os.urandom(10)), 10)
1337 self.assertEqual(len(os.urandom(100)), 100)
1338 self.assertEqual(len(os.urandom(1000)), 1000)
1339
1340 def test_urandom_value(self):
1341 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001342 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001343 data2 = os.urandom(16)
1344 self.assertNotEqual(data1, data2)
1345
1346 def get_urandom_subprocess(self, count):
1347 code = '\n'.join((
1348 'import os, sys',
1349 'data = os.urandom(%s)' % count,
1350 'sys.stdout.buffer.write(data)',
1351 'sys.stdout.buffer.flush()'))
1352 out = assert_python_ok('-c', code)
1353 stdout = out[1]
1354 self.assertEqual(len(stdout), 16)
1355 return stdout
1356
1357 def test_urandom_subprocess(self):
1358 data1 = self.get_urandom_subprocess(16)
1359 data2 = self.get_urandom_subprocess(16)
1360 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001361
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001362
Victor Stinner9b1f4742016-09-06 16:18:52 -07001363@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1364class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001365 @classmethod
1366 def setUpClass(cls):
1367 try:
1368 os.getrandom(1)
1369 except OSError as exc:
1370 if exc.errno == errno.ENOSYS:
1371 # Python compiled on a more recent Linux version
1372 # than the current Linux kernel
1373 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1374 else:
1375 raise
1376
Victor Stinner9b1f4742016-09-06 16:18:52 -07001377 def test_getrandom_type(self):
1378 data = os.getrandom(16)
1379 self.assertIsInstance(data, bytes)
1380 self.assertEqual(len(data), 16)
1381
1382 def test_getrandom0(self):
1383 empty = os.getrandom(0)
1384 self.assertEqual(empty, b'')
1385
1386 def test_getrandom_random(self):
1387 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1388
1389 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1390 # resource /dev/random
1391
1392 def test_getrandom_nonblock(self):
1393 # The call must not fail. Check also that the flag exists
1394 try:
1395 os.getrandom(1, os.GRND_NONBLOCK)
1396 except BlockingIOError:
1397 # System urandom is not initialized yet
1398 pass
1399
1400 def test_getrandom_value(self):
1401 data1 = os.getrandom(16)
1402 data2 = os.getrandom(16)
1403 self.assertNotEqual(data1, data2)
1404
1405
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001406# os.urandom() doesn't use a file descriptor when it is implemented with the
1407# getentropy() function, the getrandom() function or the getrandom() syscall
1408OS_URANDOM_DONT_USE_FD = (
1409 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1410 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1411 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001412
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001413@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1414 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001415class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001416 @unittest.skipUnless(resource, "test requires the resource module")
1417 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001418 # Check urandom() failing when it is not able to open /dev/random.
1419 # We spawn a new process to make the test more robust (if getrlimit()
1420 # failed to restore the file descriptor limit after this, the whole
1421 # test suite would crash; this actually happened on the OS X Tiger
1422 # buildbot).
1423 code = """if 1:
1424 import errno
1425 import os
1426 import resource
1427
1428 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1429 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1430 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001431 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001432 except OSError as e:
1433 assert e.errno == errno.EMFILE, e.errno
1434 else:
1435 raise AssertionError("OSError not raised")
1436 """
1437 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001438
Antoine Pitroue472aea2014-04-26 14:33:03 +02001439 def test_urandom_fd_closed(self):
1440 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1441 # closed.
1442 code = """if 1:
1443 import os
1444 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001445 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001446 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001447 with test.support.SuppressCrashReport():
1448 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001449 sys.stdout.buffer.write(os.urandom(4))
1450 """
1451 rc, out, err = assert_python_ok('-Sc', code)
1452
1453 def test_urandom_fd_reopened(self):
1454 # Issue #21207: urandom() should detect its fd to /dev/urandom
1455 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001456 self.addCleanup(support.unlink, support.TESTFN)
1457 create_file(support.TESTFN, b"x" * 256)
1458
Antoine Pitroue472aea2014-04-26 14:33:03 +02001459 code = """if 1:
1460 import os
1461 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001462 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001463 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001464 with test.support.SuppressCrashReport():
1465 for fd in range(3, 256):
1466 try:
1467 os.close(fd)
1468 except OSError:
1469 pass
1470 else:
1471 # Found the urandom fd (XXX hopefully)
1472 break
1473 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001474 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001475 new_fd = f.fileno()
1476 # Issue #26935: posix allows new_fd and fd to be equal but
1477 # some libc implementations have dup2 return an error in this
1478 # case.
1479 if new_fd != fd:
1480 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001481 sys.stdout.buffer.write(os.urandom(4))
1482 sys.stdout.buffer.write(os.urandom(4))
1483 """.format(TESTFN=support.TESTFN)
1484 rc, out, err = assert_python_ok('-Sc', code)
1485 self.assertEqual(len(out), 8)
1486 self.assertNotEqual(out[0:4], out[4:8])
1487 rc, out2, err2 = assert_python_ok('-Sc', code)
1488 self.assertEqual(len(out2), 8)
1489 self.assertNotEqual(out2, out)
1490
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001491
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001492@contextlib.contextmanager
1493def _execvpe_mockup(defpath=None):
1494 """
1495 Stubs out execv and execve functions when used as context manager.
1496 Records exec calls. The mock execv and execve functions always raise an
1497 exception as they would normally never return.
1498 """
1499 # A list of tuples containing (function name, first arg, args)
1500 # of calls to execv or execve that have been made.
1501 calls = []
1502
1503 def mock_execv(name, *args):
1504 calls.append(('execv', name, args))
1505 raise RuntimeError("execv called")
1506
1507 def mock_execve(name, *args):
1508 calls.append(('execve', name, args))
1509 raise OSError(errno.ENOTDIR, "execve called")
1510
1511 try:
1512 orig_execv = os.execv
1513 orig_execve = os.execve
1514 orig_defpath = os.defpath
1515 os.execv = mock_execv
1516 os.execve = mock_execve
1517 if defpath is not None:
1518 os.defpath = defpath
1519 yield calls
1520 finally:
1521 os.execv = orig_execv
1522 os.execve = orig_execve
1523 os.defpath = orig_defpath
1524
Victor Stinner4659ccf2016-09-14 10:57:00 +02001525
Guido van Rossume7ba4952007-06-06 23:52:48 +00001526class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001527 @unittest.skipIf(USING_LINUXTHREADS,
1528 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001529 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001530 self.assertRaises(OSError, os.execvpe, 'no such app-',
1531 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001532
Steve Dowerbce26262016-11-19 19:17:26 -08001533 def test_execv_with_bad_arglist(self):
1534 self.assertRaises(ValueError, os.execv, 'notepad', ())
1535 self.assertRaises(ValueError, os.execv, 'notepad', [])
1536 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1537 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1538
Thomas Heller6790d602007-08-30 17:15:14 +00001539 def test_execvpe_with_bad_arglist(self):
1540 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001541 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1542 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001543
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001544 @unittest.skipUnless(hasattr(os, '_execvpe'),
1545 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001546 def _test_internal_execvpe(self, test_type):
1547 program_path = os.sep + 'absolutepath'
1548 if test_type is bytes:
1549 program = b'executable'
1550 fullpath = os.path.join(os.fsencode(program_path), program)
1551 native_fullpath = fullpath
1552 arguments = [b'progname', 'arg1', 'arg2']
1553 else:
1554 program = 'executable'
1555 arguments = ['progname', 'arg1', 'arg2']
1556 fullpath = os.path.join(program_path, program)
1557 if os.name != "nt":
1558 native_fullpath = os.fsencode(fullpath)
1559 else:
1560 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001561 env = {'spam': 'beans'}
1562
Victor Stinnerb745a742010-05-18 17:17:23 +00001563 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001564 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001565 self.assertRaises(RuntimeError,
1566 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001567 self.assertEqual(len(calls), 1)
1568 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1569
Victor Stinnerb745a742010-05-18 17:17:23 +00001570 # test os._execvpe() with a relative path:
1571 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001572 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001573 self.assertRaises(OSError,
1574 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001575 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001576 self.assertSequenceEqual(calls[0],
1577 ('execve', native_fullpath, (arguments, env)))
1578
1579 # test os._execvpe() with a relative path:
1580 # os.get_exec_path() reads the 'PATH' variable
1581 with _execvpe_mockup() as calls:
1582 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001583 if test_type is bytes:
1584 env_path[b'PATH'] = program_path
1585 else:
1586 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001587 self.assertRaises(OSError,
1588 os._execvpe, program, arguments, env=env_path)
1589 self.assertEqual(len(calls), 1)
1590 self.assertSequenceEqual(calls[0],
1591 ('execve', native_fullpath, (arguments, env_path)))
1592
1593 def test_internal_execvpe_str(self):
1594 self._test_internal_execvpe(str)
1595 if os.name != "nt":
1596 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001597
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001598 def test_execve_invalid_env(self):
1599 args = [sys.executable, '-c', 'pass']
1600
1601 # null character in the enviroment variable name
1602 newenv = os.environ.copy()
1603 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1604 with self.assertRaises(ValueError):
1605 os.execve(args[0], args, newenv)
1606
1607 # null character in the enviroment variable value
1608 newenv = os.environ.copy()
1609 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1610 with self.assertRaises(ValueError):
1611 os.execve(args[0], args, newenv)
1612
1613 # equal character in the enviroment variable name
1614 newenv = os.environ.copy()
1615 newenv["FRUIT=ORANGE"] = "lemon"
1616 with self.assertRaises(ValueError):
1617 os.execve(args[0], args, newenv)
1618
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001619
Serhiy Storchaka43767632013-11-03 21:31:38 +02001620@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001621class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001622 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001623 try:
1624 os.stat(support.TESTFN)
1625 except FileNotFoundError:
1626 exists = False
1627 except OSError as exc:
1628 exists = True
1629 self.fail("file %s must not exist; os.stat failed with %s"
1630 % (support.TESTFN, exc))
1631 else:
1632 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001633
Thomas Wouters477c8d52006-05-27 19:21:47 +00001634 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001635 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001636
1637 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001638 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001639
1640 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001641 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001642
1643 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001644 self.addCleanup(support.unlink, support.TESTFN)
1645
Victor Stinnere77c9742016-03-25 10:28:23 +01001646 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001647 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001648
1649 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001650 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001651
Thomas Wouters477c8d52006-05-27 19:21:47 +00001652 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001653 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001654
Victor Stinnere77c9742016-03-25 10:28:23 +01001655
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001656class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001657 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001658 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1659 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001660 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001661 def get_single(f):
1662 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001663 if hasattr(os, f):
1664 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001665 return helper
1666 for f in singles:
1667 locals()["test_"+f] = get_single(f)
1668
Benjamin Peterson7522c742009-01-19 21:00:09 +00001669 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001670 try:
1671 f(support.make_bad_fd(), *args)
1672 except OSError as e:
1673 self.assertEqual(e.errno, errno.EBADF)
1674 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001675 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001676 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001677
Serhiy Storchaka43767632013-11-03 21:31:38 +02001678 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001679 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001680 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001681
Serhiy Storchaka43767632013-11-03 21:31:38 +02001682 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001683 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001684 fd = support.make_bad_fd()
1685 # Make sure none of the descriptors we are about to close are
1686 # currently valid (issue 6542).
1687 for i in range(10):
1688 try: os.fstat(fd+i)
1689 except OSError:
1690 pass
1691 else:
1692 break
1693 if i < 2:
1694 raise unittest.SkipTest(
1695 "Unable to acquire a range of invalid file descriptors")
1696 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001697
Serhiy Storchaka43767632013-11-03 21:31:38 +02001698 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001699 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001700 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001701
Serhiy Storchaka43767632013-11-03 21:31:38 +02001702 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001703 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001704 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001705
Serhiy Storchaka43767632013-11-03 21:31:38 +02001706 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001707 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001709
Serhiy Storchaka43767632013-11-03 21:31:38 +02001710 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001711 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001712 self.check(os.pathconf, "PC_NAME_MAX")
1713 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001714
Serhiy Storchaka43767632013-11-03 21:31:38 +02001715 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001716 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001717 self.check(os.truncate, 0)
1718 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001719
Serhiy Storchaka43767632013-11-03 21:31:38 +02001720 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001721 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001722 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001723
Serhiy Storchaka43767632013-11-03 21:31:38 +02001724 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001725 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001726 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001727
Victor Stinner57ddf782014-01-08 15:21:28 +01001728 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1729 def test_readv(self):
1730 buf = bytearray(10)
1731 self.check(os.readv, [buf])
1732
Serhiy Storchaka43767632013-11-03 21:31:38 +02001733 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001734 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001735 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001736
Serhiy Storchaka43767632013-11-03 21:31:38 +02001737 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001738 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001739 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001740
Victor Stinner57ddf782014-01-08 15:21:28 +01001741 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1742 def test_writev(self):
1743 self.check(os.writev, [b'abc'])
1744
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001745 def test_inheritable(self):
1746 self.check(os.get_inheritable)
1747 self.check(os.set_inheritable, True)
1748
1749 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1750 'needs os.get_blocking() and os.set_blocking()')
1751 def test_blocking(self):
1752 self.check(os.get_blocking)
1753 self.check(os.set_blocking, True)
1754
Brian Curtin1b9df392010-11-24 20:24:31 +00001755
1756class LinkTests(unittest.TestCase):
1757 def setUp(self):
1758 self.file1 = support.TESTFN
1759 self.file2 = os.path.join(support.TESTFN + "2")
1760
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001761 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001762 for file in (self.file1, self.file2):
1763 if os.path.exists(file):
1764 os.unlink(file)
1765
Brian Curtin1b9df392010-11-24 20:24:31 +00001766 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001767 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001768
Steve Dowercc16be82016-09-08 10:35:16 -07001769 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001770 with open(file1, "r") as f1, open(file2, "r") as f2:
1771 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1772
1773 def test_link(self):
1774 self._test_link(self.file1, self.file2)
1775
1776 def test_link_bytes(self):
1777 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1778 bytes(self.file2, sys.getfilesystemencoding()))
1779
Brian Curtinf498b752010-11-30 15:54:04 +00001780 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001781 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001782 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001783 except UnicodeError:
1784 raise unittest.SkipTest("Unable to encode for this platform.")
1785
Brian Curtinf498b752010-11-30 15:54:04 +00001786 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001787 self.file2 = self.file1 + "2"
1788 self._test_link(self.file1, self.file2)
1789
Serhiy Storchaka43767632013-11-03 21:31:38 +02001790@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1791class PosixUidGidTests(unittest.TestCase):
1792 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1793 def test_setuid(self):
1794 if os.getuid() != 0:
1795 self.assertRaises(OSError, os.setuid, 0)
1796 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001797
Serhiy Storchaka43767632013-11-03 21:31:38 +02001798 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1799 def test_setgid(self):
1800 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1801 self.assertRaises(OSError, os.setgid, 0)
1802 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001803
Serhiy Storchaka43767632013-11-03 21:31:38 +02001804 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1805 def test_seteuid(self):
1806 if os.getuid() != 0:
1807 self.assertRaises(OSError, os.seteuid, 0)
1808 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001809
Serhiy Storchaka43767632013-11-03 21:31:38 +02001810 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1811 def test_setegid(self):
1812 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1813 self.assertRaises(OSError, os.setegid, 0)
1814 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001815
Serhiy Storchaka43767632013-11-03 21:31:38 +02001816 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1817 def test_setreuid(self):
1818 if os.getuid() != 0:
1819 self.assertRaises(OSError, os.setreuid, 0, 0)
1820 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1821 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001822
Serhiy Storchaka43767632013-11-03 21:31:38 +02001823 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1824 def test_setreuid_neg1(self):
1825 # Needs to accept -1. We run this in a subprocess to avoid
1826 # altering the test runner's process state (issue8045).
1827 subprocess.check_call([
1828 sys.executable, '-c',
1829 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001830
Serhiy Storchaka43767632013-11-03 21:31:38 +02001831 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1832 def test_setregid(self):
1833 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1834 self.assertRaises(OSError, os.setregid, 0, 0)
1835 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1836 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001837
Serhiy Storchaka43767632013-11-03 21:31:38 +02001838 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1839 def test_setregid_neg1(self):
1840 # Needs to accept -1. We run this in a subprocess to avoid
1841 # altering the test runner's process state (issue8045).
1842 subprocess.check_call([
1843 sys.executable, '-c',
1844 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001845
Serhiy Storchaka43767632013-11-03 21:31:38 +02001846@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1847class Pep383Tests(unittest.TestCase):
1848 def setUp(self):
1849 if support.TESTFN_UNENCODABLE:
1850 self.dir = support.TESTFN_UNENCODABLE
1851 elif support.TESTFN_NONASCII:
1852 self.dir = support.TESTFN_NONASCII
1853 else:
1854 self.dir = support.TESTFN
1855 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001856
Serhiy Storchaka43767632013-11-03 21:31:38 +02001857 bytesfn = []
1858 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001859 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001860 fn = os.fsencode(fn)
1861 except UnicodeEncodeError:
1862 return
1863 bytesfn.append(fn)
1864 add_filename(support.TESTFN_UNICODE)
1865 if support.TESTFN_UNENCODABLE:
1866 add_filename(support.TESTFN_UNENCODABLE)
1867 if support.TESTFN_NONASCII:
1868 add_filename(support.TESTFN_NONASCII)
1869 if not bytesfn:
1870 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001871
Serhiy Storchaka43767632013-11-03 21:31:38 +02001872 self.unicodefn = set()
1873 os.mkdir(self.dir)
1874 try:
1875 for fn in bytesfn:
1876 support.create_empty_file(os.path.join(self.bdir, fn))
1877 fn = os.fsdecode(fn)
1878 if fn in self.unicodefn:
1879 raise ValueError("duplicate filename")
1880 self.unicodefn.add(fn)
1881 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001882 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001883 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001884
Serhiy Storchaka43767632013-11-03 21:31:38 +02001885 def tearDown(self):
1886 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001887
Serhiy Storchaka43767632013-11-03 21:31:38 +02001888 def test_listdir(self):
1889 expected = self.unicodefn
1890 found = set(os.listdir(self.dir))
1891 self.assertEqual(found, expected)
1892 # test listdir without arguments
1893 current_directory = os.getcwd()
1894 try:
1895 os.chdir(os.sep)
1896 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1897 finally:
1898 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001899
Serhiy Storchaka43767632013-11-03 21:31:38 +02001900 def test_open(self):
1901 for fn in self.unicodefn:
1902 f = open(os.path.join(self.dir, fn), 'rb')
1903 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001904
Serhiy Storchaka43767632013-11-03 21:31:38 +02001905 @unittest.skipUnless(hasattr(os, 'statvfs'),
1906 "need os.statvfs()")
1907 def test_statvfs(self):
1908 # issue #9645
1909 for fn in self.unicodefn:
1910 # should not fail with file not found error
1911 fullname = os.path.join(self.dir, fn)
1912 os.statvfs(fullname)
1913
1914 def test_stat(self):
1915 for fn in self.unicodefn:
1916 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001917
Brian Curtineb24d742010-04-12 17:16:38 +00001918@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1919class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001920 def _kill(self, sig):
1921 # Start sys.executable as a subprocess and communicate from the
1922 # subprocess to the parent that the interpreter is ready. When it
1923 # becomes ready, send *sig* via os.kill to the subprocess and check
1924 # that the return code is equal to *sig*.
1925 import ctypes
1926 from ctypes import wintypes
1927 import msvcrt
1928
1929 # Since we can't access the contents of the process' stdout until the
1930 # process has exited, use PeekNamedPipe to see what's inside stdout
1931 # without waiting. This is done so we can tell that the interpreter
1932 # is started and running at a point where it could handle a signal.
1933 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1934 PeekNamedPipe.restype = wintypes.BOOL
1935 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1936 ctypes.POINTER(ctypes.c_char), # stdout buf
1937 wintypes.DWORD, # Buffer size
1938 ctypes.POINTER(wintypes.DWORD), # bytes read
1939 ctypes.POINTER(wintypes.DWORD), # bytes avail
1940 ctypes.POINTER(wintypes.DWORD)) # bytes left
1941 msg = "running"
1942 proc = subprocess.Popen([sys.executable, "-c",
1943 "import sys;"
1944 "sys.stdout.write('{}');"
1945 "sys.stdout.flush();"
1946 "input()".format(msg)],
1947 stdout=subprocess.PIPE,
1948 stderr=subprocess.PIPE,
1949 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001950 self.addCleanup(proc.stdout.close)
1951 self.addCleanup(proc.stderr.close)
1952 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001953
1954 count, max = 0, 100
1955 while count < max and proc.poll() is None:
1956 # Create a string buffer to store the result of stdout from the pipe
1957 buf = ctypes.create_string_buffer(len(msg))
1958 # Obtain the text currently in proc.stdout
1959 # Bytes read/avail/left are left as NULL and unused
1960 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1961 buf, ctypes.sizeof(buf), None, None, None)
1962 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1963 if buf.value:
1964 self.assertEqual(msg, buf.value.decode())
1965 break
1966 time.sleep(0.1)
1967 count += 1
1968 else:
1969 self.fail("Did not receive communication from the subprocess")
1970
Brian Curtineb24d742010-04-12 17:16:38 +00001971 os.kill(proc.pid, sig)
1972 self.assertEqual(proc.wait(), sig)
1973
1974 def test_kill_sigterm(self):
1975 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001976 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001977
1978 def test_kill_int(self):
1979 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001980 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001981
1982 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001983 tagname = "test_os_%s" % uuid.uuid1()
1984 m = mmap.mmap(-1, 1, tagname)
1985 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001986 # Run a script which has console control handling enabled.
1987 proc = subprocess.Popen([sys.executable,
1988 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001989 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001990 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1991 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001992 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001993 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001994 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001995 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001996 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001997 count += 1
1998 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001999 # Forcefully kill the process if we weren't able to signal it.
2000 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00002001 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002002 os.kill(proc.pid, event)
2003 # proc.send_signal(event) could also be done here.
2004 # Allow time for the signal to be passed and the process to exit.
2005 time.sleep(0.5)
2006 if not proc.poll():
2007 # Forcefully kill the process if we weren't able to signal it.
2008 os.kill(proc.pid, signal.SIGINT)
2009 self.fail("subprocess did not stop on {}".format(name))
2010
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002011 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002012 def test_CTRL_C_EVENT(self):
2013 from ctypes import wintypes
2014 import ctypes
2015
2016 # Make a NULL value by creating a pointer with no argument.
2017 NULL = ctypes.POINTER(ctypes.c_int)()
2018 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2019 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2020 wintypes.BOOL)
2021 SetConsoleCtrlHandler.restype = wintypes.BOOL
2022
2023 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002024 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002025 # by subprocesses.
2026 SetConsoleCtrlHandler(NULL, 0)
2027
2028 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2029
2030 def test_CTRL_BREAK_EVENT(self):
2031 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2032
2033
Brian Curtind40e6f72010-07-08 21:39:08 +00002034@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002035class Win32ListdirTests(unittest.TestCase):
2036 """Test listdir on Windows."""
2037
2038 def setUp(self):
2039 self.created_paths = []
2040 for i in range(2):
2041 dir_name = 'SUB%d' % i
2042 dir_path = os.path.join(support.TESTFN, dir_name)
2043 file_name = 'FILE%d' % i
2044 file_path = os.path.join(support.TESTFN, file_name)
2045 os.makedirs(dir_path)
2046 with open(file_path, 'w') as f:
2047 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2048 self.created_paths.extend([dir_name, file_name])
2049 self.created_paths.sort()
2050
2051 def tearDown(self):
2052 shutil.rmtree(support.TESTFN)
2053
2054 def test_listdir_no_extended_path(self):
2055 """Test when the path is not an "extended" path."""
2056 # unicode
2057 self.assertEqual(
2058 sorted(os.listdir(support.TESTFN)),
2059 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002060
Tim Golden781bbeb2013-10-25 20:24:06 +01002061 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002062 self.assertEqual(
2063 sorted(os.listdir(os.fsencode(support.TESTFN))),
2064 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002065
2066 def test_listdir_extended_path(self):
2067 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002068 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002069 # unicode
2070 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2071 self.assertEqual(
2072 sorted(os.listdir(path)),
2073 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002074
Tim Golden781bbeb2013-10-25 20:24:06 +01002075 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002076 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2077 self.assertEqual(
2078 sorted(os.listdir(path)),
2079 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002080
2081
2082@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002083@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002084class Win32SymlinkTests(unittest.TestCase):
2085 filelink = 'filelinktest'
2086 filelink_target = os.path.abspath(__file__)
2087 dirlink = 'dirlinktest'
2088 dirlink_target = os.path.dirname(filelink_target)
2089 missing_link = 'missing link'
2090
2091 def setUp(self):
2092 assert os.path.exists(self.dirlink_target)
2093 assert os.path.exists(self.filelink_target)
2094 assert not os.path.exists(self.dirlink)
2095 assert not os.path.exists(self.filelink)
2096 assert not os.path.exists(self.missing_link)
2097
2098 def tearDown(self):
2099 if os.path.exists(self.filelink):
2100 os.remove(self.filelink)
2101 if os.path.exists(self.dirlink):
2102 os.rmdir(self.dirlink)
2103 if os.path.lexists(self.missing_link):
2104 os.remove(self.missing_link)
2105
2106 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002107 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002108 self.assertTrue(os.path.exists(self.dirlink))
2109 self.assertTrue(os.path.isdir(self.dirlink))
2110 self.assertTrue(os.path.islink(self.dirlink))
2111 self.check_stat(self.dirlink, self.dirlink_target)
2112
2113 def test_file_link(self):
2114 os.symlink(self.filelink_target, self.filelink)
2115 self.assertTrue(os.path.exists(self.filelink))
2116 self.assertTrue(os.path.isfile(self.filelink))
2117 self.assertTrue(os.path.islink(self.filelink))
2118 self.check_stat(self.filelink, self.filelink_target)
2119
2120 def _create_missing_dir_link(self):
2121 'Create a "directory" link to a non-existent target'
2122 linkname = self.missing_link
2123 if os.path.lexists(linkname):
2124 os.remove(linkname)
2125 target = r'c:\\target does not exist.29r3c740'
2126 assert not os.path.exists(target)
2127 target_is_dir = True
2128 os.symlink(target, linkname, target_is_dir)
2129
2130 def test_remove_directory_link_to_missing_target(self):
2131 self._create_missing_dir_link()
2132 # For compatibility with Unix, os.remove will check the
2133 # directory status and call RemoveDirectory if the symlink
2134 # was created with target_is_dir==True.
2135 os.remove(self.missing_link)
2136
2137 @unittest.skip("currently fails; consider for improvement")
2138 def test_isdir_on_directory_link_to_missing_target(self):
2139 self._create_missing_dir_link()
2140 # consider having isdir return true for directory links
2141 self.assertTrue(os.path.isdir(self.missing_link))
2142
2143 @unittest.skip("currently fails; consider for improvement")
2144 def test_rmdir_on_directory_link_to_missing_target(self):
2145 self._create_missing_dir_link()
2146 # consider allowing rmdir to remove directory links
2147 os.rmdir(self.missing_link)
2148
2149 def check_stat(self, link, target):
2150 self.assertEqual(os.stat(link), os.stat(target))
2151 self.assertNotEqual(os.lstat(link), os.stat(link))
2152
Brian Curtind25aef52011-06-13 15:16:04 -05002153 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002154 self.assertEqual(os.stat(bytes_link), os.stat(target))
2155 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002156
2157 def test_12084(self):
2158 level1 = os.path.abspath(support.TESTFN)
2159 level2 = os.path.join(level1, "level2")
2160 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002161 self.addCleanup(support.rmtree, level1)
2162
2163 os.mkdir(level1)
2164 os.mkdir(level2)
2165 os.mkdir(level3)
2166
2167 file1 = os.path.abspath(os.path.join(level1, "file1"))
2168 create_file(file1)
2169
2170 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002171 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002172 os.chdir(level2)
2173 link = os.path.join(level2, "link")
2174 os.symlink(os.path.relpath(file1), "link")
2175 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002176
Victor Stinnerae39d232016-03-24 17:12:55 +01002177 # Check os.stat calls from the same dir as the link
2178 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002179
Victor Stinnerae39d232016-03-24 17:12:55 +01002180 # Check os.stat calls from a dir below the link
2181 os.chdir(level1)
2182 self.assertEqual(os.stat(file1),
2183 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002184
Victor Stinnerae39d232016-03-24 17:12:55 +01002185 # Check os.stat calls from a dir above the link
2186 os.chdir(level3)
2187 self.assertEqual(os.stat(file1),
2188 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002189 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002190 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002191
Brian Curtind40e6f72010-07-08 21:39:08 +00002192
Tim Golden0321cf22014-05-05 19:46:17 +01002193@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2194class Win32JunctionTests(unittest.TestCase):
2195 junction = 'junctiontest'
2196 junction_target = os.path.dirname(os.path.abspath(__file__))
2197
2198 def setUp(self):
2199 assert os.path.exists(self.junction_target)
2200 assert not os.path.exists(self.junction)
2201
2202 def tearDown(self):
2203 if os.path.exists(self.junction):
2204 # os.rmdir delegates to Windows' RemoveDirectoryW,
2205 # which removes junction points safely.
2206 os.rmdir(self.junction)
2207
2208 def test_create_junction(self):
2209 _winapi.CreateJunction(self.junction_target, self.junction)
2210 self.assertTrue(os.path.exists(self.junction))
2211 self.assertTrue(os.path.isdir(self.junction))
2212
2213 # Junctions are not recognized as links.
2214 self.assertFalse(os.path.islink(self.junction))
2215
2216 def test_unlink_removes_junction(self):
2217 _winapi.CreateJunction(self.junction_target, self.junction)
2218 self.assertTrue(os.path.exists(self.junction))
2219
2220 os.unlink(self.junction)
2221 self.assertFalse(os.path.exists(self.junction))
2222
2223
Jason R. Coombs3a092862013-05-27 23:21:28 -04002224@support.skip_unless_symlink
2225class NonLocalSymlinkTests(unittest.TestCase):
2226
2227 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002228 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002229 Create this structure:
2230
2231 base
2232 \___ some_dir
2233 """
2234 os.makedirs('base/some_dir')
2235
2236 def tearDown(self):
2237 shutil.rmtree('base')
2238
2239 def test_directory_link_nonlocal(self):
2240 """
2241 The symlink target should resolve relative to the link, not relative
2242 to the current directory.
2243
2244 Then, link base/some_link -> base/some_dir and ensure that some_link
2245 is resolved as a directory.
2246
2247 In issue13772, it was discovered that directory detection failed if
2248 the symlink target was not specified relative to the current
2249 directory, which was a defect in the implementation.
2250 """
2251 src = os.path.join('base', 'some_link')
2252 os.symlink('some_dir', src)
2253 assert os.path.isdir(src)
2254
2255
Victor Stinnere8d51452010-08-19 01:05:19 +00002256class FSEncodingTests(unittest.TestCase):
2257 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002258 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2259 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002260
Victor Stinnere8d51452010-08-19 01:05:19 +00002261 def test_identity(self):
2262 # assert fsdecode(fsencode(x)) == x
2263 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2264 try:
2265 bytesfn = os.fsencode(fn)
2266 except UnicodeEncodeError:
2267 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002269
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002270
Brett Cannonefb00c02012-02-29 18:31:31 -05002271
2272class DeviceEncodingTests(unittest.TestCase):
2273
2274 def test_bad_fd(self):
2275 # Return None when an fd doesn't actually exist.
2276 self.assertIsNone(os.device_encoding(123456))
2277
Philip Jenveye308b7c2012-02-29 16:16:15 -08002278 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2279 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002280 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002281 def test_device_encoding(self):
2282 encoding = os.device_encoding(0)
2283 self.assertIsNotNone(encoding)
2284 self.assertTrue(codecs.lookup(encoding))
2285
2286
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002287class PidTests(unittest.TestCase):
2288 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2289 def test_getppid(self):
2290 p = subprocess.Popen([sys.executable, '-c',
2291 'import os; print(os.getppid())'],
2292 stdout=subprocess.PIPE)
2293 stdout, _ = p.communicate()
2294 # We are the parent of our subprocess
2295 self.assertEqual(int(stdout), os.getpid())
2296
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002297 def test_waitpid(self):
2298 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002299 # Add an implicit test for PyUnicode_FSConverter().
2300 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002301 status = os.waitpid(pid, 0)
2302 self.assertEqual(status, (pid, 0))
2303
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002304
Victor Stinner4659ccf2016-09-14 10:57:00 +02002305class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002306 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002307 self.exitcode = 17
2308
2309 filename = support.TESTFN
2310 self.addCleanup(support.unlink, filename)
2311
2312 if not with_env:
2313 code = 'import sys; sys.exit(%s)' % self.exitcode
2314 else:
2315 self.env = dict(os.environ)
2316 # create an unique key
2317 self.key = str(uuid.uuid4())
2318 self.env[self.key] = self.key
2319 # read the variable from os.environ to check that it exists
2320 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2321 % (self.key, self.exitcode))
2322
2323 with open(filename, "w") as fp:
2324 fp.write(code)
2325
Berker Peksag81816462016-09-15 20:19:47 +03002326 args = [sys.executable, filename]
2327 if use_bytes:
2328 args = [os.fsencode(a) for a in args]
2329 self.env = {os.fsencode(k): os.fsencode(v)
2330 for k, v in self.env.items()}
2331
2332 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002333
Berker Peksag4af23d72016-09-15 20:32:44 +03002334 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002335 def test_spawnl(self):
2336 args = self.create_args()
2337 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2338 self.assertEqual(exitcode, self.exitcode)
2339
Berker Peksag4af23d72016-09-15 20:32:44 +03002340 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002341 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002342 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002343 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2344 self.assertEqual(exitcode, self.exitcode)
2345
Berker Peksag4af23d72016-09-15 20:32:44 +03002346 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002347 def test_spawnlp(self):
2348 args = self.create_args()
2349 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2350 self.assertEqual(exitcode, self.exitcode)
2351
Berker Peksag4af23d72016-09-15 20:32:44 +03002352 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002353 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002354 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002355 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2356 self.assertEqual(exitcode, self.exitcode)
2357
Berker Peksag4af23d72016-09-15 20:32:44 +03002358 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002359 def test_spawnv(self):
2360 args = self.create_args()
2361 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2362 self.assertEqual(exitcode, self.exitcode)
2363
Berker Peksag4af23d72016-09-15 20:32:44 +03002364 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002365 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002366 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002367 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2368 self.assertEqual(exitcode, self.exitcode)
2369
Berker Peksag4af23d72016-09-15 20:32:44 +03002370 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002371 def test_spawnvp(self):
2372 args = self.create_args()
2373 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2374 self.assertEqual(exitcode, self.exitcode)
2375
Berker Peksag4af23d72016-09-15 20:32:44 +03002376 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002377 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002378 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002379 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2380 self.assertEqual(exitcode, self.exitcode)
2381
Berker Peksag4af23d72016-09-15 20:32:44 +03002382 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002383 def test_nowait(self):
2384 args = self.create_args()
2385 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2386 result = os.waitpid(pid, 0)
2387 self.assertEqual(result[0], pid)
2388 status = result[1]
2389 if hasattr(os, 'WIFEXITED'):
2390 self.assertTrue(os.WIFEXITED(status))
2391 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2392 else:
2393 self.assertEqual(status, self.exitcode << 8)
2394
Berker Peksag4af23d72016-09-15 20:32:44 +03002395 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002396 def test_spawnve_bytes(self):
2397 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2398 args = self.create_args(with_env=True, use_bytes=True)
2399 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2400 self.assertEqual(exitcode, self.exitcode)
2401
Steve Dower859fd7b2016-11-19 18:53:19 -08002402 @requires_os_func('spawnl')
2403 def test_spawnl_noargs(self):
2404 args = self.create_args()
2405 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002406 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002407
2408 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002409 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002410 args = self.create_args()
2411 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002412 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002413
2414 @requires_os_func('spawnv')
2415 def test_spawnv_noargs(self):
2416 args = self.create_args()
2417 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2418 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002419 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2420 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002421
2422 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002423 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002424 args = self.create_args()
2425 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2426 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002427 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2428 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002429
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002430 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002431 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002432
2433 # null character in the enviroment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002434 newenv = os.environ.copy()
2435 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2436 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002437 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002438 except ValueError:
2439 pass
2440 else:
2441 self.assertEqual(exitcode, 127)
2442
2443 # null character in the enviroment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002444 newenv = os.environ.copy()
2445 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2446 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002447 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002448 except ValueError:
2449 pass
2450 else:
2451 self.assertEqual(exitcode, 127)
2452
2453 # equal character in the enviroment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002454 newenv = os.environ.copy()
2455 newenv["FRUIT=ORANGE"] = "lemon"
2456 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002457 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002458 except ValueError:
2459 pass
2460 else:
2461 self.assertEqual(exitcode, 127)
2462
2463 # equal character in the enviroment variable value
2464 filename = support.TESTFN
2465 self.addCleanup(support.unlink, filename)
2466 with open(filename, "w") as fp:
2467 fp.write('import sys, os\n'
2468 'if os.getenv("FRUIT") != "orange=lemon":\n'
2469 ' raise AssertionError')
2470 args = [sys.executable, filename]
2471 newenv = os.environ.copy()
2472 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002473 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002474 self.assertEqual(exitcode, 0)
2475
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002476 @requires_os_func('spawnve')
2477 def test_spawnve_invalid_env(self):
2478 self._test_invalid_env(os.spawnve)
2479
2480 @requires_os_func('spawnvpe')
2481 def test_spawnvpe_invalid_env(self):
2482 self._test_invalid_env(os.spawnvpe)
2483
Serhiy Storchaka77703942017-06-25 07:33:01 +03002484
Brian Curtin0151b8e2010-09-24 13:43:43 +00002485# The introduction of this TestCase caused at least two different errors on
2486# *nix buildbots. Temporarily skip this to let the buildbots move along.
2487@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002488@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2489class LoginTests(unittest.TestCase):
2490 def test_getlogin(self):
2491 user_name = os.getlogin()
2492 self.assertNotEqual(len(user_name), 0)
2493
2494
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002495@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2496 "needs os.getpriority and os.setpriority")
2497class ProgramPriorityTests(unittest.TestCase):
2498 """Tests for os.getpriority() and os.setpriority()."""
2499
2500 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002501
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002502 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2503 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2504 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002505 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2506 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002507 raise unittest.SkipTest("unable to reliably test setpriority "
2508 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002509 else:
2510 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002511 finally:
2512 try:
2513 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2514 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002515 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002516 raise
2517
2518
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002519if threading is not None:
2520 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002521
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002522 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002523
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002524 def __init__(self, conn):
2525 asynchat.async_chat.__init__(self, conn)
2526 self.in_buffer = []
2527 self.closed = False
2528 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002529
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002530 def handle_read(self):
2531 data = self.recv(4096)
2532 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002533
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002534 def get_data(self):
2535 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002536
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002537 def handle_close(self):
2538 self.close()
2539 self.closed = True
2540
2541 def handle_error(self):
2542 raise
2543
2544 def __init__(self, address):
2545 threading.Thread.__init__(self)
2546 asyncore.dispatcher.__init__(self)
2547 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2548 self.bind(address)
2549 self.listen(5)
2550 self.host, self.port = self.socket.getsockname()[:2]
2551 self.handler_instance = None
2552 self._active = False
2553 self._active_lock = threading.Lock()
2554
2555 # --- public API
2556
2557 @property
2558 def running(self):
2559 return self._active
2560
2561 def start(self):
2562 assert not self.running
2563 self.__flag = threading.Event()
2564 threading.Thread.start(self)
2565 self.__flag.wait()
2566
2567 def stop(self):
2568 assert self.running
2569 self._active = False
2570 self.join()
2571
2572 def wait(self):
2573 # wait for handler connection to be closed, then stop the server
2574 while not getattr(self.handler_instance, "closed", False):
2575 time.sleep(0.001)
2576 self.stop()
2577
2578 # --- internals
2579
2580 def run(self):
2581 self._active = True
2582 self.__flag.set()
2583 while self._active and asyncore.socket_map:
2584 self._active_lock.acquire()
2585 asyncore.loop(timeout=0.001, count=1)
2586 self._active_lock.release()
2587 asyncore.close_all()
2588
2589 def handle_accept(self):
2590 conn, addr = self.accept()
2591 self.handler_instance = self.Handler(conn)
2592
2593 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002594 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002595 handle_read = handle_connect
2596
2597 def writable(self):
2598 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002599
2600 def handle_error(self):
2601 raise
2602
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002603
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002604@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002605@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2606class TestSendfile(unittest.TestCase):
2607
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002608 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002609 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002610 not sys.platform.startswith("solaris") and \
2611 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002612 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2613 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002614
2615 @classmethod
2616 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002617 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002618 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002619
2620 @classmethod
2621 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002622 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002623 support.unlink(support.TESTFN)
2624
2625 def setUp(self):
2626 self.server = SendfileTestServer((support.HOST, 0))
2627 self.server.start()
2628 self.client = socket.socket()
2629 self.client.connect((self.server.host, self.server.port))
2630 self.client.settimeout(1)
2631 # synchronize by waiting for "220 ready" response
2632 self.client.recv(1024)
2633 self.sockno = self.client.fileno()
2634 self.file = open(support.TESTFN, 'rb')
2635 self.fileno = self.file.fileno()
2636
2637 def tearDown(self):
2638 self.file.close()
2639 self.client.close()
2640 if self.server.running:
2641 self.server.stop()
2642
2643 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2644 """A higher level wrapper representing how an application is
2645 supposed to use sendfile().
2646 """
2647 while 1:
2648 try:
2649 if self.SUPPORT_HEADERS_TRAILERS:
2650 return os.sendfile(sock, file, offset, nbytes, headers,
2651 trailers)
2652 else:
2653 return os.sendfile(sock, file, offset, nbytes)
2654 except OSError as err:
2655 if err.errno == errno.ECONNRESET:
2656 # disconnected
2657 raise
2658 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2659 # we have to retry send data
2660 continue
2661 else:
2662 raise
2663
2664 def test_send_whole_file(self):
2665 # normal send
2666 total_sent = 0
2667 offset = 0
2668 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002669 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002670 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2671 if sent == 0:
2672 break
2673 offset += sent
2674 total_sent += sent
2675 self.assertTrue(sent <= nbytes)
2676 self.assertEqual(offset, total_sent)
2677
2678 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002679 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002680 self.client.close()
2681 self.server.wait()
2682 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002683 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002684 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002685
2686 def test_send_at_certain_offset(self):
2687 # start sending a file at a certain offset
2688 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002689 offset = len(self.DATA) // 2
2690 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002691 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002692 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002693 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2694 if sent == 0:
2695 break
2696 offset += sent
2697 total_sent += sent
2698 self.assertTrue(sent <= nbytes)
2699
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002700 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002701 self.client.close()
2702 self.server.wait()
2703 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002704 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002705 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002706 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002707 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002708
2709 def test_offset_overflow(self):
2710 # specify an offset > file size
2711 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002712 try:
2713 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2714 except OSError as e:
2715 # Solaris can raise EINVAL if offset >= file length, ignore.
2716 if e.errno != errno.EINVAL:
2717 raise
2718 else:
2719 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002720 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002721 self.client.close()
2722 self.server.wait()
2723 data = self.server.handler_instance.get_data()
2724 self.assertEqual(data, b'')
2725
2726 def test_invalid_offset(self):
2727 with self.assertRaises(OSError) as cm:
2728 os.sendfile(self.sockno, self.fileno, -1, 4096)
2729 self.assertEqual(cm.exception.errno, errno.EINVAL)
2730
Martin Panterbf19d162015-09-09 01:01:13 +00002731 def test_keywords(self):
2732 # Keyword arguments should be supported
2733 os.sendfile(out=self.sockno, offset=0, count=4096,
2734 **{'in': self.fileno})
2735 if self.SUPPORT_HEADERS_TRAILERS:
2736 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002737 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002738
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002739 # --- headers / trailers tests
2740
Serhiy Storchaka43767632013-11-03 21:31:38 +02002741 @requires_headers_trailers
2742 def test_headers(self):
2743 total_sent = 0
2744 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2745 headers=[b"x" * 512])
2746 total_sent += sent
2747 offset = 4096
2748 nbytes = 4096
2749 while 1:
2750 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2751 offset, nbytes)
2752 if sent == 0:
2753 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002754 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002755 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002756
Serhiy Storchaka43767632013-11-03 21:31:38 +02002757 expected_data = b"x" * 512 + self.DATA
2758 self.assertEqual(total_sent, len(expected_data))
2759 self.client.close()
2760 self.server.wait()
2761 data = self.server.handler_instance.get_data()
2762 self.assertEqual(hash(data), hash(expected_data))
2763
2764 @requires_headers_trailers
2765 def test_trailers(self):
2766 TESTFN2 = support.TESTFN + "2"
2767 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002768
2769 self.addCleanup(support.unlink, TESTFN2)
2770 create_file(TESTFN2, file_data)
2771
2772 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002773 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2774 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002775 self.client.close()
2776 self.server.wait()
2777 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002778 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002779
Serhiy Storchaka43767632013-11-03 21:31:38 +02002780 @requires_headers_trailers
2781 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2782 'test needs os.SF_NODISKIO')
2783 def test_flags(self):
2784 try:
2785 os.sendfile(self.sockno, self.fileno, 0, 4096,
2786 flags=os.SF_NODISKIO)
2787 except OSError as err:
2788 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2789 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002790
2791
Larry Hastings9cf065c2012-06-22 16:30:09 -07002792def supports_extended_attributes():
2793 if not hasattr(os, "setxattr"):
2794 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002795
Larry Hastings9cf065c2012-06-22 16:30:09 -07002796 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002797 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002798 try:
2799 os.setxattr(fp.fileno(), b"user.test", b"")
2800 except OSError:
2801 return False
2802 finally:
2803 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002804
2805 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002806
2807
2808@unittest.skipUnless(supports_extended_attributes(),
2809 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002810# Kernels < 2.6.39 don't respect setxattr flags.
2811@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002812class ExtendedAttributeTests(unittest.TestCase):
2813
Larry Hastings9cf065c2012-06-22 16:30:09 -07002814 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002815 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002816 self.addCleanup(support.unlink, fn)
2817 create_file(fn)
2818
Benjamin Peterson799bd802011-08-31 22:15:17 -04002819 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002820 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002821 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002822
Victor Stinnerf12e5062011-10-16 22:12:03 +02002823 init_xattr = listxattr(fn)
2824 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002825
Larry Hastings9cf065c2012-06-22 16:30:09 -07002826 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002827 xattr = set(init_xattr)
2828 xattr.add("user.test")
2829 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002830 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2831 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2832 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002833
Benjamin Peterson799bd802011-08-31 22:15:17 -04002834 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002835 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002836 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002837
Benjamin Peterson799bd802011-08-31 22:15:17 -04002838 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002839 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002840 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002841
Larry Hastings9cf065c2012-06-22 16:30:09 -07002842 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002843 xattr.add("user.test2")
2844 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002845 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002846
Benjamin Peterson799bd802011-08-31 22:15:17 -04002847 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002848 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002849 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002850
Victor Stinnerf12e5062011-10-16 22:12:03 +02002851 xattr.remove("user.test")
2852 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002853 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2854 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2855 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2856 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002857 many = sorted("user.test{}".format(i) for i in range(100))
2858 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002859 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002860 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002861
Larry Hastings9cf065c2012-06-22 16:30:09 -07002862 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002863 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002864 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002865
2866 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2867 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002868
2869 def test_simple(self):
2870 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2871 os.listxattr)
2872
2873 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002874 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2875 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002876
2877 def test_fds(self):
2878 def getxattr(path, *args):
2879 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002880 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002881 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002882 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002883 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002884 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002885 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002886 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002887 def listxattr(path, *args):
2888 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002889 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002890 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2891
2892
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002893@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2894class TermsizeTests(unittest.TestCase):
2895 def test_does_not_crash(self):
2896 """Check if get_terminal_size() returns a meaningful value.
2897
2898 There's no easy portable way to actually check the size of the
2899 terminal, so let's check if it returns something sensible instead.
2900 """
2901 try:
2902 size = os.get_terminal_size()
2903 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002904 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002905 # Under win32 a generic OSError can be thrown if the
2906 # handle cannot be retrieved
2907 self.skipTest("failed to query terminal size")
2908 raise
2909
Antoine Pitroucfade362012-02-08 23:48:59 +01002910 self.assertGreaterEqual(size.columns, 0)
2911 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002912
2913 def test_stty_match(self):
2914 """Check if stty returns the same results
2915
2916 stty actually tests stdin, so get_terminal_size is invoked on
2917 stdin explicitly. If stty succeeded, then get_terminal_size()
2918 should work too.
2919 """
2920 try:
2921 size = subprocess.check_output(['stty', 'size']).decode().split()
2922 except (FileNotFoundError, subprocess.CalledProcessError):
2923 self.skipTest("stty invocation failed")
2924 expected = (int(size[1]), int(size[0])) # reversed order
2925
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002926 try:
2927 actual = os.get_terminal_size(sys.__stdin__.fileno())
2928 except OSError as e:
2929 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2930 # Under win32 a generic OSError can be thrown if the
2931 # handle cannot be retrieved
2932 self.skipTest("failed to query terminal size")
2933 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002934 self.assertEqual(expected, actual)
2935
2936
Victor Stinner292c8352012-10-30 02:17:38 +01002937class OSErrorTests(unittest.TestCase):
2938 def setUp(self):
2939 class Str(str):
2940 pass
2941
Victor Stinnerafe17062012-10-31 22:47:43 +01002942 self.bytes_filenames = []
2943 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002944 if support.TESTFN_UNENCODABLE is not None:
2945 decoded = support.TESTFN_UNENCODABLE
2946 else:
2947 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002948 self.unicode_filenames.append(decoded)
2949 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002950 if support.TESTFN_UNDECODABLE is not None:
2951 encoded = support.TESTFN_UNDECODABLE
2952 else:
2953 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002954 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002955 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002956 self.bytes_filenames.append(memoryview(encoded))
2957
2958 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002959
2960 def test_oserror_filename(self):
2961 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002962 (self.filenames, os.chdir,),
2963 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002964 (self.filenames, os.lstat,),
2965 (self.filenames, os.open, os.O_RDONLY),
2966 (self.filenames, os.rmdir,),
2967 (self.filenames, os.stat,),
2968 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002969 ]
2970 if sys.platform == "win32":
2971 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002972 (self.bytes_filenames, os.rename, b"dst"),
2973 (self.bytes_filenames, os.replace, b"dst"),
2974 (self.unicode_filenames, os.rename, "dst"),
2975 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002976 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002977 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002978 else:
2979 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002980 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002981 (self.filenames, os.rename, "dst"),
2982 (self.filenames, os.replace, "dst"),
2983 ))
2984 if hasattr(os, "chown"):
2985 funcs.append((self.filenames, os.chown, 0, 0))
2986 if hasattr(os, "lchown"):
2987 funcs.append((self.filenames, os.lchown, 0, 0))
2988 if hasattr(os, "truncate"):
2989 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002990 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002991 funcs.append((self.filenames, os.chflags, 0))
2992 if hasattr(os, "lchflags"):
2993 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002994 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002995 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002996 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002997 if sys.platform == "win32":
2998 funcs.append((self.bytes_filenames, os.link, b"dst"))
2999 funcs.append((self.unicode_filenames, os.link, "dst"))
3000 else:
3001 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01003002 if hasattr(os, "listxattr"):
3003 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003004 (self.filenames, os.listxattr,),
3005 (self.filenames, os.getxattr, "user.test"),
3006 (self.filenames, os.setxattr, "user.test", b'user'),
3007 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003008 ))
3009 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003010 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003011 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003012 if sys.platform == "win32":
3013 funcs.append((self.unicode_filenames, os.readlink,))
3014 else:
3015 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003016
Steve Dowercc16be82016-09-08 10:35:16 -07003017
Victor Stinnerafe17062012-10-31 22:47:43 +01003018 for filenames, func, *func_args in funcs:
3019 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003020 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003021 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003022 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003023 else:
3024 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3025 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003026 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003027 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003028 except UnicodeDecodeError:
3029 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003030 else:
3031 self.fail("No exception thrown by {}".format(func))
3032
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003033class CPUCountTests(unittest.TestCase):
3034 def test_cpu_count(self):
3035 cpus = os.cpu_count()
3036 if cpus is not None:
3037 self.assertIsInstance(cpus, int)
3038 self.assertGreater(cpus, 0)
3039 else:
3040 self.skipTest("Could not determine the number of CPUs")
3041
Victor Stinnerdaf45552013-08-28 00:53:59 +02003042
3043class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003044 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003045 fd = os.open(__file__, os.O_RDONLY)
3046 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003047 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003048
Victor Stinnerdaf45552013-08-28 00:53:59 +02003049 os.set_inheritable(fd, True)
3050 self.assertEqual(os.get_inheritable(fd), True)
3051
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003052 @unittest.skipIf(fcntl is None, "need fcntl")
3053 def test_get_inheritable_cloexec(self):
3054 fd = os.open(__file__, os.O_RDONLY)
3055 self.addCleanup(os.close, fd)
3056 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003057
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003058 # clear FD_CLOEXEC flag
3059 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3060 flags &= ~fcntl.FD_CLOEXEC
3061 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003062
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003063 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003064
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003065 @unittest.skipIf(fcntl is None, "need fcntl")
3066 def test_set_inheritable_cloexec(self):
3067 fd = os.open(__file__, os.O_RDONLY)
3068 self.addCleanup(os.close, fd)
3069 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3070 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003071
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003072 os.set_inheritable(fd, True)
3073 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3074 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003075
Victor Stinnerdaf45552013-08-28 00:53:59 +02003076 def test_open(self):
3077 fd = os.open(__file__, os.O_RDONLY)
3078 self.addCleanup(os.close, fd)
3079 self.assertEqual(os.get_inheritable(fd), False)
3080
3081 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3082 def test_pipe(self):
3083 rfd, wfd = os.pipe()
3084 self.addCleanup(os.close, rfd)
3085 self.addCleanup(os.close, wfd)
3086 self.assertEqual(os.get_inheritable(rfd), False)
3087 self.assertEqual(os.get_inheritable(wfd), False)
3088
3089 def test_dup(self):
3090 fd1 = os.open(__file__, os.O_RDONLY)
3091 self.addCleanup(os.close, fd1)
3092
3093 fd2 = os.dup(fd1)
3094 self.addCleanup(os.close, fd2)
3095 self.assertEqual(os.get_inheritable(fd2), False)
3096
3097 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3098 def test_dup2(self):
3099 fd = os.open(__file__, os.O_RDONLY)
3100 self.addCleanup(os.close, fd)
3101
3102 # inheritable by default
3103 fd2 = os.open(__file__, os.O_RDONLY)
3104 try:
3105 os.dup2(fd, fd2)
3106 self.assertEqual(os.get_inheritable(fd2), True)
3107 finally:
3108 os.close(fd2)
3109
3110 # force non-inheritable
3111 fd3 = os.open(__file__, os.O_RDONLY)
3112 try:
3113 os.dup2(fd, fd3, inheritable=False)
3114 self.assertEqual(os.get_inheritable(fd3), False)
3115 finally:
3116 os.close(fd3)
3117
3118 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3119 def test_openpty(self):
3120 master_fd, slave_fd = os.openpty()
3121 self.addCleanup(os.close, master_fd)
3122 self.addCleanup(os.close, slave_fd)
3123 self.assertEqual(os.get_inheritable(master_fd), False)
3124 self.assertEqual(os.get_inheritable(slave_fd), False)
3125
3126
Brett Cannon3f9183b2016-08-26 14:44:48 -07003127class PathTConverterTests(unittest.TestCase):
3128 # tuples of (function name, allows fd arguments, additional arguments to
3129 # function, cleanup function)
3130 functions = [
3131 ('stat', True, (), None),
3132 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003133 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003134 ('chflags', False, (0,), None),
3135 ('lchflags', False, (0,), None),
3136 ('open', False, (0,), getattr(os, 'close', None)),
3137 ]
3138
3139 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003140 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003141 if os.name == 'nt':
3142 bytes_fspath = bytes_filename = None
3143 else:
3144 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003145 bytes_fspath = _PathLike(bytes_filename)
3146 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003147 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003148 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003149
Brett Cannonec6ce872016-09-06 15:50:29 -07003150 int_fspath = _PathLike(fd)
3151 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003152
3153 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3154 with self.subTest(name=name):
3155 try:
3156 fn = getattr(os, name)
3157 except AttributeError:
3158 continue
3159
Brett Cannon8f96a302016-08-26 19:30:11 -07003160 for path in (str_filename, bytes_filename, str_fspath,
3161 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003162 if path is None:
3163 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003164 with self.subTest(name=name, path=path):
3165 result = fn(path, *extra_args)
3166 if cleanup_fn is not None:
3167 cleanup_fn(result)
3168
3169 with self.assertRaisesRegex(
3170 TypeError, 'should be string, bytes'):
3171 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003172
3173 if allow_fd:
3174 result = fn(fd, *extra_args) # should not fail
3175 if cleanup_fn is not None:
3176 cleanup_fn(result)
3177 else:
3178 with self.assertRaisesRegex(
3179 TypeError,
3180 'os.PathLike'):
3181 fn(fd, *extra_args)
3182
3183
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003184@unittest.skipUnless(hasattr(os, 'get_blocking'),
3185 'needs os.get_blocking() and os.set_blocking()')
3186class BlockingTests(unittest.TestCase):
3187 def test_blocking(self):
3188 fd = os.open(__file__, os.O_RDONLY)
3189 self.addCleanup(os.close, fd)
3190 self.assertEqual(os.get_blocking(fd), True)
3191
3192 os.set_blocking(fd, False)
3193 self.assertEqual(os.get_blocking(fd), False)
3194
3195 os.set_blocking(fd, True)
3196 self.assertEqual(os.get_blocking(fd), True)
3197
3198
Yury Selivanov97e2e062014-09-26 12:33:06 -04003199
3200class ExportsTests(unittest.TestCase):
3201 def test_os_all(self):
3202 self.assertIn('open', os.__all__)
3203 self.assertIn('walk', os.__all__)
3204
3205
Victor Stinner6036e442015-03-08 01:58:04 +01003206class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003207 check_no_resource_warning = support.check_no_resource_warning
3208
Victor Stinner6036e442015-03-08 01:58:04 +01003209 def setUp(self):
3210 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003211 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003212 self.addCleanup(support.rmtree, self.path)
3213 os.mkdir(self.path)
3214
3215 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003216 path = self.bytes_path if isinstance(name, bytes) else self.path
3217 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003218 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003219 return filename
3220
3221 def get_entries(self, names):
3222 entries = dict((entry.name, entry)
3223 for entry in os.scandir(self.path))
3224 self.assertEqual(sorted(entries.keys()), names)
3225 return entries
3226
3227 def assert_stat_equal(self, stat1, stat2, skip_fields):
3228 if skip_fields:
3229 for attr in dir(stat1):
3230 if not attr.startswith("st_"):
3231 continue
3232 if attr in ("st_dev", "st_ino", "st_nlink"):
3233 continue
3234 self.assertEqual(getattr(stat1, attr),
3235 getattr(stat2, attr),
3236 (stat1, stat2, attr))
3237 else:
3238 self.assertEqual(stat1, stat2)
3239
3240 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003241 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003242 self.assertEqual(entry.name, name)
3243 self.assertEqual(entry.path, os.path.join(self.path, name))
3244 self.assertEqual(entry.inode(),
3245 os.stat(entry.path, follow_symlinks=False).st_ino)
3246
3247 entry_stat = os.stat(entry.path)
3248 self.assertEqual(entry.is_dir(),
3249 stat.S_ISDIR(entry_stat.st_mode))
3250 self.assertEqual(entry.is_file(),
3251 stat.S_ISREG(entry_stat.st_mode))
3252 self.assertEqual(entry.is_symlink(),
3253 os.path.islink(entry.path))
3254
3255 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3256 self.assertEqual(entry.is_dir(follow_symlinks=False),
3257 stat.S_ISDIR(entry_lstat.st_mode))
3258 self.assertEqual(entry.is_file(follow_symlinks=False),
3259 stat.S_ISREG(entry_lstat.st_mode))
3260
3261 self.assert_stat_equal(entry.stat(),
3262 entry_stat,
3263 os.name == 'nt' and not is_symlink)
3264 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3265 entry_lstat,
3266 os.name == 'nt')
3267
3268 def test_attributes(self):
3269 link = hasattr(os, 'link')
3270 symlink = support.can_symlink()
3271
3272 dirname = os.path.join(self.path, "dir")
3273 os.mkdir(dirname)
3274 filename = self.create_file("file.txt")
3275 if link:
3276 os.link(filename, os.path.join(self.path, "link_file.txt"))
3277 if symlink:
3278 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3279 target_is_directory=True)
3280 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3281
3282 names = ['dir', 'file.txt']
3283 if link:
3284 names.append('link_file.txt')
3285 if symlink:
3286 names.extend(('symlink_dir', 'symlink_file.txt'))
3287 entries = self.get_entries(names)
3288
3289 entry = entries['dir']
3290 self.check_entry(entry, 'dir', True, False, False)
3291
3292 entry = entries['file.txt']
3293 self.check_entry(entry, 'file.txt', False, True, False)
3294
3295 if link:
3296 entry = entries['link_file.txt']
3297 self.check_entry(entry, 'link_file.txt', False, True, False)
3298
3299 if symlink:
3300 entry = entries['symlink_dir']
3301 self.check_entry(entry, 'symlink_dir', True, False, True)
3302
3303 entry = entries['symlink_file.txt']
3304 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3305
3306 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003307 path = self.bytes_path if isinstance(name, bytes) else self.path
3308 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003309 self.assertEqual(len(entries), 1)
3310
3311 entry = entries[0]
3312 self.assertEqual(entry.name, name)
3313 return entry
3314
Brett Cannon96881cd2016-06-10 14:37:21 -07003315 def create_file_entry(self, name='file.txt'):
3316 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003317 return self.get_entry(os.path.basename(filename))
3318
3319 def test_current_directory(self):
3320 filename = self.create_file()
3321 old_dir = os.getcwd()
3322 try:
3323 os.chdir(self.path)
3324
3325 # call scandir() without parameter: it must list the content
3326 # of the current directory
3327 entries = dict((entry.name, entry) for entry in os.scandir())
3328 self.assertEqual(sorted(entries.keys()),
3329 [os.path.basename(filename)])
3330 finally:
3331 os.chdir(old_dir)
3332
3333 def test_repr(self):
3334 entry = self.create_file_entry()
3335 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3336
Brett Cannon96881cd2016-06-10 14:37:21 -07003337 def test_fspath_protocol(self):
3338 entry = self.create_file_entry()
3339 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3340
3341 def test_fspath_protocol_bytes(self):
3342 bytes_filename = os.fsencode('bytesfile.txt')
3343 bytes_entry = self.create_file_entry(name=bytes_filename)
3344 fspath = os.fspath(bytes_entry)
3345 self.assertIsInstance(fspath, bytes)
3346 self.assertEqual(fspath,
3347 os.path.join(os.fsencode(self.path),bytes_filename))
3348
Victor Stinner6036e442015-03-08 01:58:04 +01003349 def test_removed_dir(self):
3350 path = os.path.join(self.path, 'dir')
3351
3352 os.mkdir(path)
3353 entry = self.get_entry('dir')
3354 os.rmdir(path)
3355
3356 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3357 if os.name == 'nt':
3358 self.assertTrue(entry.is_dir())
3359 self.assertFalse(entry.is_file())
3360 self.assertFalse(entry.is_symlink())
3361 if os.name == 'nt':
3362 self.assertRaises(FileNotFoundError, entry.inode)
3363 # don't fail
3364 entry.stat()
3365 entry.stat(follow_symlinks=False)
3366 else:
3367 self.assertGreater(entry.inode(), 0)
3368 self.assertRaises(FileNotFoundError, entry.stat)
3369 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3370
3371 def test_removed_file(self):
3372 entry = self.create_file_entry()
3373 os.unlink(entry.path)
3374
3375 self.assertFalse(entry.is_dir())
3376 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3377 if os.name == 'nt':
3378 self.assertTrue(entry.is_file())
3379 self.assertFalse(entry.is_symlink())
3380 if os.name == 'nt':
3381 self.assertRaises(FileNotFoundError, entry.inode)
3382 # don't fail
3383 entry.stat()
3384 entry.stat(follow_symlinks=False)
3385 else:
3386 self.assertGreater(entry.inode(), 0)
3387 self.assertRaises(FileNotFoundError, entry.stat)
3388 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3389
3390 def test_broken_symlink(self):
3391 if not support.can_symlink():
3392 return self.skipTest('cannot create symbolic link')
3393
3394 filename = self.create_file("file.txt")
3395 os.symlink(filename,
3396 os.path.join(self.path, "symlink.txt"))
3397 entries = self.get_entries(['file.txt', 'symlink.txt'])
3398 entry = entries['symlink.txt']
3399 os.unlink(filename)
3400
3401 self.assertGreater(entry.inode(), 0)
3402 self.assertFalse(entry.is_dir())
3403 self.assertFalse(entry.is_file()) # broken symlink returns False
3404 self.assertFalse(entry.is_dir(follow_symlinks=False))
3405 self.assertFalse(entry.is_file(follow_symlinks=False))
3406 self.assertTrue(entry.is_symlink())
3407 self.assertRaises(FileNotFoundError, entry.stat)
3408 # don't fail
3409 entry.stat(follow_symlinks=False)
3410
3411 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003412 self.create_file("file.txt")
3413
3414 path_bytes = os.fsencode(self.path)
3415 entries = list(os.scandir(path_bytes))
3416 self.assertEqual(len(entries), 1, entries)
3417 entry = entries[0]
3418
3419 self.assertEqual(entry.name, b'file.txt')
3420 self.assertEqual(entry.path,
3421 os.fsencode(os.path.join(self.path, 'file.txt')))
3422
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003423 @unittest.skipUnless(os.listdir in os.supports_fd,
3424 'fd support for listdir required for this test.')
3425 def test_fd(self):
3426 self.assertIn(os.scandir, os.supports_fd)
3427 self.create_file('file.txt')
3428 expected_names = ['file.txt']
3429 if support.can_symlink():
3430 os.symlink('file.txt', os.path.join(self.path, 'link'))
3431 expected_names.append('link')
3432
3433 fd = os.open(self.path, os.O_RDONLY)
3434 try:
3435 with os.scandir(fd) as it:
3436 entries = list(it)
3437 names = [entry.name for entry in entries]
3438 self.assertEqual(sorted(names), expected_names)
3439 self.assertEqual(names, os.listdir(fd))
3440 for entry in entries:
3441 self.assertEqual(entry.path, entry.name)
3442 self.assertEqual(os.fspath(entry), entry.name)
3443 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3444 if os.stat in os.supports_dir_fd:
3445 st = os.stat(entry.name, dir_fd=fd)
3446 self.assertEqual(entry.stat(), st)
3447 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3448 self.assertEqual(entry.stat(follow_symlinks=False), st)
3449 finally:
3450 os.close(fd)
3451
Victor Stinner6036e442015-03-08 01:58:04 +01003452 def test_empty_path(self):
3453 self.assertRaises(FileNotFoundError, os.scandir, '')
3454
3455 def test_consume_iterator_twice(self):
3456 self.create_file("file.txt")
3457 iterator = os.scandir(self.path)
3458
3459 entries = list(iterator)
3460 self.assertEqual(len(entries), 1, entries)
3461
3462 # check than consuming the iterator twice doesn't raise exception
3463 entries2 = list(iterator)
3464 self.assertEqual(len(entries2), 0, entries2)
3465
3466 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003467 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003468 self.assertRaises(TypeError, os.scandir, obj)
3469
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003470 def test_close(self):
3471 self.create_file("file.txt")
3472 self.create_file("file2.txt")
3473 iterator = os.scandir(self.path)
3474 next(iterator)
3475 iterator.close()
3476 # multiple closes
3477 iterator.close()
3478 with self.check_no_resource_warning():
3479 del iterator
3480
3481 def test_context_manager(self):
3482 self.create_file("file.txt")
3483 self.create_file("file2.txt")
3484 with os.scandir(self.path) as iterator:
3485 next(iterator)
3486 with self.check_no_resource_warning():
3487 del iterator
3488
3489 def test_context_manager_close(self):
3490 self.create_file("file.txt")
3491 self.create_file("file2.txt")
3492 with os.scandir(self.path) as iterator:
3493 next(iterator)
3494 iterator.close()
3495
3496 def test_context_manager_exception(self):
3497 self.create_file("file.txt")
3498 self.create_file("file2.txt")
3499 with self.assertRaises(ZeroDivisionError):
3500 with os.scandir(self.path) as iterator:
3501 next(iterator)
3502 1/0
3503 with self.check_no_resource_warning():
3504 del iterator
3505
3506 def test_resource_warning(self):
3507 self.create_file("file.txt")
3508 self.create_file("file2.txt")
3509 iterator = os.scandir(self.path)
3510 next(iterator)
3511 with self.assertWarns(ResourceWarning):
3512 del iterator
3513 support.gc_collect()
3514 # exhausted iterator
3515 iterator = os.scandir(self.path)
3516 list(iterator)
3517 with self.check_no_resource_warning():
3518 del iterator
3519
Victor Stinner6036e442015-03-08 01:58:04 +01003520
Ethan Furmancdc08792016-06-02 15:06:09 -07003521class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003522
3523 # Abstracted so it can be overridden to test pure Python implementation
3524 # if a C version is provided.
3525 fspath = staticmethod(os.fspath)
3526
Ethan Furmancdc08792016-06-02 15:06:09 -07003527 def test_return_bytes(self):
3528 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003529 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003530
3531 def test_return_string(self):
3532 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003533 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003534
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003535 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003536 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003537 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003538
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003539 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003540 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3541 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3542
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003543 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003544 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3545 self.assertTrue(issubclass(_PathLike, os.PathLike))
3546 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003547
Ethan Furmancdc08792016-06-02 15:06:09 -07003548 def test_garbage_in_exception_out(self):
3549 vapor = type('blah', (), {})
3550 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003551 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003552
3553 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003554 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003555
Brett Cannon044283a2016-07-15 10:41:49 -07003556 def test_bad_pathlike(self):
3557 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003558 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003559 # __fspath__ attribute that is not callable.
3560 c = type('foo', (), {})
3561 c.__fspath__ = 1
3562 self.assertRaises(TypeError, self.fspath, c())
3563 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003564 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003565 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003566
3567# Only test if the C version is provided, otherwise TestPEP519 already tested
3568# the pure Python implementation.
3569if hasattr(os, "_fspath"):
3570 class TestPEP519PurePython(TestPEP519):
3571
3572 """Explicitly test the pure Python implementation of os.fspath()."""
3573
3574 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003575
3576
Fred Drake2e2be372001-09-20 21:33:42 +00003577if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003578 unittest.main()