blob: c3c82381abf3044592fffa1bb0f05777985a375b [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031
Antoine Pitrouec34ab52013-08-16 20:44:38 +020032try:
33 import resource
34except ImportError:
35 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020036try:
37 import fcntl
38except ImportError:
39 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010040try:
41 import _winapi
42except ImportError:
43 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020044try:
R David Murrayf2ad1732014-12-25 18:36:56 -050045 import grp
46 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
47 if hasattr(os, 'getgid'):
48 process_gid = os.getgid()
49 if process_gid not in groups:
50 groups.append(process_gid)
51except ImportError:
52 groups = []
53try:
54 import pwd
55 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010056except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050057 all_users = []
58try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020060except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020062
Berker Peksagce643912015-05-06 06:33:17 +030063from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020064from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
84@contextlib.contextmanager
85def ignore_deprecation_warnings(msg_regex, quiet=False):
86 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
87 yield
88
89
Berker Peksag4af23d72016-09-15 20:32:44 +030090def requires_os_func(name):
91 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
92
93
Brett Cannonec6ce872016-09-06 15:50:29 -070094class _PathLike(os.PathLike):
95
96 def __init__(self, path=""):
97 self.path = path
98
99 def __str__(self):
100 return str(self.path)
101
102 def __fspath__(self):
103 if isinstance(self.path, BaseException):
104 raise self.path
105 else:
106 return self.path
107
108
Victor Stinnerae39d232016-03-24 17:12:55 +0100109def create_file(filename, content=b'content'):
110 with open(filename, "xb", 0) as fp:
111 fp.write(content)
112
113
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000114# Tests creating TESTFN
115class FileTests(unittest.TestCase):
116 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000117 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000118 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000119 tearDown = setUp
120
121 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000122 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000123 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000124 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000125
Christian Heimesfdab48e2008-01-20 09:06:41 +0000126 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000127 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
128 # We must allocate two consecutive file descriptors, otherwise
129 # it will mess up other file descriptors (perhaps even the three
130 # standard ones).
131 second = os.dup(first)
132 try:
133 retries = 0
134 while second != first + 1:
135 os.close(first)
136 retries += 1
137 if retries > 10:
138 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000139 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000140 first, second = second, os.dup(second)
141 finally:
142 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000143 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000144 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000145 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000146
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000147 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000148 def test_rename(self):
149 path = support.TESTFN
150 old = sys.getrefcount(path)
151 self.assertRaises(TypeError, os.rename, path, 0)
152 new = sys.getrefcount(path)
153 self.assertEqual(old, new)
154
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000155 def test_read(self):
156 with open(support.TESTFN, "w+b") as fobj:
157 fobj.write(b"spam")
158 fobj.flush()
159 fd = fobj.fileno()
160 os.lseek(fd, 0, 0)
161 s = os.read(fd, 4)
162 self.assertEqual(type(s), bytes)
163 self.assertEqual(s, b"spam")
164
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200165 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200166 # Skip the test on 32-bit platforms: the number of bytes must fit in a
167 # Py_ssize_t type
168 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
169 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200170 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
171 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200172 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100173 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200174
175 # Issue #21932: Make sure that os.read() does not raise an
176 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200177 with open(support.TESTFN, "rb") as fp:
178 data = os.read(fp.fileno(), size)
179
180 # The test does not try to read more than 2 GB at once because the
181 # operating system is free to return less bytes than requested.
182 self.assertEqual(data, b'test')
183
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000184 def test_write(self):
185 # os.write() accepts bytes- and buffer-like objects but not strings
186 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
187 self.assertRaises(TypeError, os.write, fd, "beans")
188 os.write(fd, b"bacon\n")
189 os.write(fd, bytearray(b"eggs\n"))
190 os.write(fd, memoryview(b"spam\n"))
191 os.close(fd)
192 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000193 self.assertEqual(fobj.read().splitlines(),
194 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000195
Victor Stinnere0daff12011-03-20 23:36:35 +0100196 def write_windows_console(self, *args):
197 retcode = subprocess.call(args,
198 # use a new console to not flood the test output
199 creationflags=subprocess.CREATE_NEW_CONSOLE,
200 # use a shell to hide the console window (SW_HIDE)
201 shell=True)
202 self.assertEqual(retcode, 0)
203
204 @unittest.skipUnless(sys.platform == 'win32',
205 'test specific to the Windows console')
206 def test_write_windows_console(self):
207 # Issue #11395: the Windows console returns an error (12: not enough
208 # space error) on writing into stdout if stdout mode is binary and the
209 # length is greater than 66,000 bytes (or less, depending on heap
210 # usage).
211 code = "print('x' * 100000)"
212 self.write_windows_console(sys.executable, "-c", code)
213 self.write_windows_console(sys.executable, "-u", "-c", code)
214
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000215 def fdopen_helper(self, *args):
216 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200217 f = os.fdopen(fd, *args)
218 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000219
220 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200221 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
222 os.close(fd)
223
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000224 self.fdopen_helper()
225 self.fdopen_helper('r')
226 self.fdopen_helper('r', 100)
227
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100228 def test_replace(self):
229 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100230 self.addCleanup(support.unlink, support.TESTFN)
231 self.addCleanup(support.unlink, TESTFN2)
232
233 create_file(support.TESTFN, b"1")
234 create_file(TESTFN2, b"2")
235
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100236 os.replace(support.TESTFN, TESTFN2)
237 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
238 with open(TESTFN2, 'r') as f:
239 self.assertEqual(f.read(), "1")
240
Martin Panterbf19d162015-09-09 01:01:13 +0000241 def test_open_keywords(self):
242 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
243 dir_fd=None)
244 os.close(f)
245
246 def test_symlink_keywords(self):
247 symlink = support.get_attribute(os, "symlink")
248 try:
249 symlink(src='target', dst=support.TESTFN,
250 target_is_directory=False, dir_fd=None)
251 except (NotImplementedError, OSError):
252 pass # No OS support or unprivileged user
253
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200254
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255# Test attributes on return values from os.*stat* family.
256class StatAttributeTests(unittest.TestCase):
257 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200258 self.fname = support.TESTFN
259 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100260 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000261
Serhiy Storchaka43767632013-11-03 21:31:38 +0200262 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000263 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000264 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000265
266 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000267 self.assertEqual(result[stat.ST_SIZE], 3)
268 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000270 # Make sure all the attributes are there
271 members = dir(result)
272 for name in dir(stat):
273 if name[:3] == 'ST_':
274 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000275 if name.endswith("TIME"):
276 def trunc(x): return int(x)
277 else:
278 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000279 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000280 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000281 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000282
Larry Hastings6fe20b32012-04-19 15:07:49 -0700283 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700284 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700285 for name in 'st_atime st_mtime st_ctime'.split():
286 floaty = int(getattr(result, name) * 100000)
287 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700288 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700289
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000290 try:
291 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200292 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000293 except IndexError:
294 pass
295
296 # Make sure that assignment fails
297 try:
298 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200299 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000300 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 pass
302
303 try:
304 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200305 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000306 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000307 pass
308
309 try:
310 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200311 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000312 except AttributeError:
313 pass
314
315 # Use the stat_result constructor with a too-short tuple.
316 try:
317 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200318 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000319 except TypeError:
320 pass
321
Ezio Melotti42da6632011-03-15 05:18:48 +0200322 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000323 try:
324 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
325 except TypeError:
326 pass
327
Antoine Pitrou38425292010-09-21 18:19:07 +0000328 def test_stat_attributes(self):
329 self.check_stat_attributes(self.fname)
330
331 def test_stat_attributes_bytes(self):
332 try:
333 fname = self.fname.encode(sys.getfilesystemencoding())
334 except UnicodeEncodeError:
335 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700336 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000337
Christian Heimes25827622013-10-12 01:27:08 +0200338 def test_stat_result_pickle(self):
339 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200340 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
341 p = pickle.dumps(result, proto)
342 self.assertIn(b'stat_result', p)
343 if proto < 4:
344 self.assertIn(b'cos\nstat_result\n', p)
345 unpickled = pickle.loads(p)
346 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200347
Serhiy Storchaka43767632013-11-03 21:31:38 +0200348 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000349 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000350 try:
351 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000352 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000353 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000354 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200355 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000356
357 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000358 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000359
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000360 # Make sure all the attributes are there.
361 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
362 'ffree', 'favail', 'flag', 'namemax')
363 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000364 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365
366 # Make sure that assignment really fails
367 try:
368 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200369 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000370 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000371 pass
372
373 try:
374 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200375 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000376 except AttributeError:
377 pass
378
379 # Use the constructor with a too-short tuple.
380 try:
381 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200382 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000383 except TypeError:
384 pass
385
Ezio Melotti42da6632011-03-15 05:18:48 +0200386 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000387 try:
388 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
389 except TypeError:
390 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000391
Christian Heimes25827622013-10-12 01:27:08 +0200392 @unittest.skipUnless(hasattr(os, 'statvfs'),
393 "need os.statvfs()")
394 def test_statvfs_result_pickle(self):
395 try:
396 result = os.statvfs(self.fname)
397 except OSError as e:
398 # On AtheOS, glibc always returns ENOSYS
399 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200400 self.skipTest('os.statvfs() failed with ENOSYS')
401
Serhiy Storchakabad12572014-12-15 14:03:42 +0200402 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
403 p = pickle.dumps(result, proto)
404 self.assertIn(b'statvfs_result', p)
405 if proto < 4:
406 self.assertIn(b'cos\nstatvfs_result\n', p)
407 unpickled = pickle.loads(p)
408 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200409
Serhiy Storchaka43767632013-11-03 21:31:38 +0200410 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
411 def test_1686475(self):
412 # Verify that an open file can be stat'ed
413 try:
414 os.stat(r"c:\pagefile.sys")
415 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600416 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200417 except OSError as e:
418 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000419
Serhiy Storchaka43767632013-11-03 21:31:38 +0200420 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
421 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
422 def test_15261(self):
423 # Verify that stat'ing a closed fd does not cause crash
424 r, w = os.pipe()
425 try:
426 os.stat(r) # should not raise error
427 finally:
428 os.close(r)
429 os.close(w)
430 with self.assertRaises(OSError) as ctx:
431 os.stat(r)
432 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100433
Zachary Ware63f277b2014-06-19 09:46:37 -0500434 def check_file_attributes(self, result):
435 self.assertTrue(hasattr(result, 'st_file_attributes'))
436 self.assertTrue(isinstance(result.st_file_attributes, int))
437 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
438
439 @unittest.skipUnless(sys.platform == "win32",
440 "st_file_attributes is Win32 specific")
441 def test_file_attributes(self):
442 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
443 result = os.stat(self.fname)
444 self.check_file_attributes(result)
445 self.assertEqual(
446 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
447 0)
448
449 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200450 dirname = support.TESTFN + "dir"
451 os.mkdir(dirname)
452 self.addCleanup(os.rmdir, dirname)
453
454 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500455 self.check_file_attributes(result)
456 self.assertEqual(
457 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
458 stat.FILE_ATTRIBUTE_DIRECTORY)
459
Berker Peksag0b4dc482016-09-17 15:49:59 +0300460 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
461 def test_access_denied(self):
462 # Default to FindFirstFile WIN32_FIND_DATA when access is
463 # denied. See issue 28075.
464 # os.environ['TEMP'] should be located on a volume that
465 # supports file ACLs.
466 fname = os.path.join(os.environ['TEMP'], self.fname)
467 self.addCleanup(support.unlink, fname)
468 create_file(fname, b'ABC')
469 # Deny the right to [S]YNCHRONIZE on the file to
470 # force CreateFile to fail with ERROR_ACCESS_DENIED.
471 DETACHED_PROCESS = 8
472 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500473 # bpo-30584: Use security identifier *S-1-5-32-545 instead
474 # of localized "Users" to not depend on the locale.
475 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300476 creationflags=DETACHED_PROCESS
477 )
478 result = os.stat(fname)
479 self.assertNotEqual(result.st_size, 0)
480
Victor Stinner47aacc82015-06-12 17:26:23 +0200481
482class UtimeTests(unittest.TestCase):
483 def setUp(self):
484 self.dirname = support.TESTFN
485 self.fname = os.path.join(self.dirname, "f1")
486
487 self.addCleanup(support.rmtree, self.dirname)
488 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100489 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200490
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200491 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100492 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200493 os.stat_float_times(state)
494
Victor Stinner47aacc82015-06-12 17:26:23 +0200495 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100496 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200497 old_float_times = os.stat_float_times(-1)
498 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200499
500 os.stat_float_times(True)
501
502 def support_subsecond(self, filename):
503 # Heuristic to check if the filesystem supports timestamp with
504 # subsecond resolution: check if float and int timestamps are different
505 st = os.stat(filename)
506 return ((st.st_atime != st[7])
507 or (st.st_mtime != st[8])
508 or (st.st_ctime != st[9]))
509
510 def _test_utime(self, set_time, filename=None):
511 if not filename:
512 filename = self.fname
513
514 support_subsecond = self.support_subsecond(filename)
515 if support_subsecond:
516 # Timestamp with a resolution of 1 microsecond (10^-6).
517 #
518 # The resolution of the C internal function used by os.utime()
519 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
520 # test with a resolution of 1 ns requires more work:
521 # see the issue #15745.
522 atime_ns = 1002003000 # 1.002003 seconds
523 mtime_ns = 4005006000 # 4.005006 seconds
524 else:
525 # use a resolution of 1 second
526 atime_ns = 5 * 10**9
527 mtime_ns = 8 * 10**9
528
529 set_time(filename, (atime_ns, mtime_ns))
530 st = os.stat(filename)
531
532 if support_subsecond:
533 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
534 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
535 else:
536 self.assertEqual(st.st_atime, atime_ns * 1e-9)
537 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
538 self.assertEqual(st.st_atime_ns, atime_ns)
539 self.assertEqual(st.st_mtime_ns, mtime_ns)
540
541 def test_utime(self):
542 def set_time(filename, ns):
543 # test the ns keyword parameter
544 os.utime(filename, ns=ns)
545 self._test_utime(set_time)
546
547 @staticmethod
548 def ns_to_sec(ns):
549 # Convert a number of nanosecond (int) to a number of seconds (float).
550 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
551 # issue, os.utime() rounds towards minus infinity.
552 return (ns * 1e-9) + 0.5e-9
553
554 def test_utime_by_indexed(self):
555 # pass times as floating point seconds as the second indexed parameter
556 def set_time(filename, ns):
557 atime_ns, mtime_ns = ns
558 atime = self.ns_to_sec(atime_ns)
559 mtime = self.ns_to_sec(mtime_ns)
560 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
561 # or utime(time_t)
562 os.utime(filename, (atime, mtime))
563 self._test_utime(set_time)
564
565 def test_utime_by_times(self):
566 def set_time(filename, ns):
567 atime_ns, mtime_ns = ns
568 atime = self.ns_to_sec(atime_ns)
569 mtime = self.ns_to_sec(mtime_ns)
570 # test the times keyword parameter
571 os.utime(filename, times=(atime, mtime))
572 self._test_utime(set_time)
573
574 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
575 "follow_symlinks support for utime required "
576 "for this test.")
577 def test_utime_nofollow_symlinks(self):
578 def set_time(filename, ns):
579 # use follow_symlinks=False to test utimensat(timespec)
580 # or lutimes(timeval)
581 os.utime(filename, ns=ns, follow_symlinks=False)
582 self._test_utime(set_time)
583
584 @unittest.skipUnless(os.utime in os.supports_fd,
585 "fd support for utime required for this test.")
586 def test_utime_fd(self):
587 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100588 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200589 # use a file descriptor to test futimens(timespec)
590 # or futimes(timeval)
591 os.utime(fp.fileno(), ns=ns)
592 self._test_utime(set_time)
593
594 @unittest.skipUnless(os.utime in os.supports_dir_fd,
595 "dir_fd support for utime required for this test.")
596 def test_utime_dir_fd(self):
597 def set_time(filename, ns):
598 dirname, name = os.path.split(filename)
599 dirfd = os.open(dirname, os.O_RDONLY)
600 try:
601 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
602 os.utime(name, dir_fd=dirfd, ns=ns)
603 finally:
604 os.close(dirfd)
605 self._test_utime(set_time)
606
607 def test_utime_directory(self):
608 def set_time(filename, ns):
609 # test calling os.utime() on a directory
610 os.utime(filename, ns=ns)
611 self._test_utime(set_time, filename=self.dirname)
612
613 def _test_utime_current(self, set_time):
614 # Get the system clock
615 current = time.time()
616
617 # Call os.utime() to set the timestamp to the current system clock
618 set_time(self.fname)
619
620 if not self.support_subsecond(self.fname):
621 delta = 1.0
Victor Stinnerc94caca2017-06-13 23:48:27 +0200622 elif os.name == 'nt':
623 # On Windows, the usual resolution of time.time() is 15.6 ms.
624 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
625 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200626 else:
Victor Stinner3402f722017-06-14 11:55:17 +0200627 # bpo-30649: PPC64 Fedora 3.x buildbot requires
628 # at least a delta of 14 ms
629 delta = 0.020
Victor Stinner47aacc82015-06-12 17:26:23 +0200630 st = os.stat(self.fname)
631 msg = ("st_time=%r, current=%r, dt=%r"
632 % (st.st_mtime, current, st.st_mtime - current))
633 self.assertAlmostEqual(st.st_mtime, current,
634 delta=delta, msg=msg)
635
636 def test_utime_current(self):
637 def set_time(filename):
638 # Set to the current time in the new way
639 os.utime(self.fname)
640 self._test_utime_current(set_time)
641
642 def test_utime_current_old(self):
643 def set_time(filename):
644 # Set to the current time in the old explicit way.
645 os.utime(self.fname, None)
646 self._test_utime_current(set_time)
647
648 def get_file_system(self, path):
649 if sys.platform == 'win32':
650 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
651 import ctypes
652 kernel32 = ctypes.windll.kernel32
653 buf = ctypes.create_unicode_buffer("", 100)
654 ok = kernel32.GetVolumeInformationW(root, None, 0,
655 None, None, None,
656 buf, len(buf))
657 if ok:
658 return buf.value
659 # return None if the filesystem is unknown
660
661 def test_large_time(self):
662 # Many filesystems are limited to the year 2038. At least, the test
663 # pass with NTFS filesystem.
664 if self.get_file_system(self.dirname) != "NTFS":
665 self.skipTest("requires NTFS")
666
667 large = 5000000000 # some day in 2128
668 os.utime(self.fname, (large, large))
669 self.assertEqual(os.stat(self.fname).st_mtime, large)
670
671 def test_utime_invalid_arguments(self):
672 # seconds and nanoseconds parameters are mutually exclusive
673 with self.assertRaises(ValueError):
674 os.utime(self.fname, (5, 5), ns=(5, 5))
675
676
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000677from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000678
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000679class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000680 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000681 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000682
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000683 def setUp(self):
684 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000685 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000686 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000687 for key, value in self._reference().items():
688 os.environ[key] = value
689
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000690 def tearDown(self):
691 os.environ.clear()
692 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000693 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000694 os.environb.clear()
695 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000696
Christian Heimes90333392007-11-01 19:08:42 +0000697 def _reference(self):
698 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
699
700 def _empty_mapping(self):
701 os.environ.clear()
702 return os.environ
703
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000704 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200705 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
706 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000707 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000708 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300709 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200710 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300711 value = popen.read().strip()
712 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000713
Xavier de Gayed1415312016-07-22 12:15:29 +0200714 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
715 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000716 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200717 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
718 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300719 it = iter(popen)
720 self.assertEqual(next(it), "line1\n")
721 self.assertEqual(next(it), "line2\n")
722 self.assertEqual(next(it), "line3\n")
723 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000724
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000725 # Verify environ keys and values from the OS are of the
726 # correct str type.
727 def test_keyvalue_types(self):
728 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000729 self.assertEqual(type(key), str)
730 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000731
Christian Heimes90333392007-11-01 19:08:42 +0000732 def test_items(self):
733 for key, value in self._reference().items():
734 self.assertEqual(os.environ.get(key), value)
735
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000736 # Issue 7310
737 def test___repr__(self):
738 """Check that the repr() of os.environ looks like environ({...})."""
739 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000740 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
741 '{!r}: {!r}'.format(key, value)
742 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000743
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000744 def test_get_exec_path(self):
745 defpath_list = os.defpath.split(os.pathsep)
746 test_path = ['/monty', '/python', '', '/flying/circus']
747 test_env = {'PATH': os.pathsep.join(test_path)}
748
749 saved_environ = os.environ
750 try:
751 os.environ = dict(test_env)
752 # Test that defaulting to os.environ works.
753 self.assertSequenceEqual(test_path, os.get_exec_path())
754 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
755 finally:
756 os.environ = saved_environ
757
758 # No PATH environment variable
759 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
760 # Empty PATH environment variable
761 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
762 # Supplied PATH environment variable
763 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
764
Victor Stinnerb745a742010-05-18 17:17:23 +0000765 if os.supports_bytes_environ:
766 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000767 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000768 # ignore BytesWarning warning
769 with warnings.catch_warnings(record=True):
770 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000771 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000772 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000773 pass
774 else:
775 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000776
777 # bytes key and/or value
778 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
779 ['abc'])
780 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
781 ['abc'])
782 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
783 ['abc'])
784
785 @unittest.skipUnless(os.supports_bytes_environ,
786 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000787 def test_environb(self):
788 # os.environ -> os.environb
789 value = 'euro\u20ac'
790 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000791 value_bytes = value.encode(sys.getfilesystemencoding(),
792 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000793 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000794 msg = "U+20AC character is not encodable to %s" % (
795 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000796 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000797 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000798 self.assertEqual(os.environ['unicode'], value)
799 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000800
801 # os.environb -> os.environ
802 value = b'\xff'
803 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000804 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000805 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000806 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000807
Charles-François Natali2966f102011-11-26 11:32:46 +0100808 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
809 # #13415).
810 @support.requires_freebsd_version(7)
811 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100812 def test_unset_error(self):
813 if sys.platform == "win32":
814 # an environment variable is limited to 32,767 characters
815 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100816 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100817 else:
818 # "=" is not allowed in a variable name
819 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100820 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100821
Victor Stinner6d101392013-04-14 16:35:04 +0200822 def test_key_type(self):
823 missing = 'missingkey'
824 self.assertNotIn(missing, os.environ)
825
Victor Stinner839e5ea2013-04-14 16:43:03 +0200826 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200827 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200828 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200829 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200830
Victor Stinner839e5ea2013-04-14 16:43:03 +0200831 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200832 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200833 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200834 self.assertTrue(cm.exception.__suppress_context__)
835
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300836 def _test_environ_iteration(self, collection):
837 iterator = iter(collection)
838 new_key = "__new_key__"
839
840 next(iterator) # start iteration over os.environ.items
841
842 # add a new key in os.environ mapping
843 os.environ[new_key] = "test_environ_iteration"
844
845 try:
846 next(iterator) # force iteration over modified mapping
847 self.assertEqual(os.environ[new_key], "test_environ_iteration")
848 finally:
849 del os.environ[new_key]
850
851 def test_iter_error_when_changing_os_environ(self):
852 self._test_environ_iteration(os.environ)
853
854 def test_iter_error_when_changing_os_environ_items(self):
855 self._test_environ_iteration(os.environ.items())
856
857 def test_iter_error_when_changing_os_environ_values(self):
858 self._test_environ_iteration(os.environ.values())
859
Victor Stinner6d101392013-04-14 16:35:04 +0200860
Tim Petersc4e09402003-04-25 07:11:48 +0000861class WalkTests(unittest.TestCase):
862 """Tests for os.walk()."""
863
Victor Stinner0561c532015-03-12 10:28:24 +0100864 # Wrapper to hide minor differences between os.walk and os.fwalk
865 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200866 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200867 if 'follow_symlinks' in kwargs:
868 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200869 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100870
Charles-François Natali7372b062012-02-05 15:15:38 +0100871 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100872 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100873 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000874
875 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000876 # TESTFN/
877 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000878 # tmp1
879 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000880 # tmp2
881 # SUB11/ no kids
882 # SUB2/ a file kid and a dirsymlink kid
883 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300884 # SUB21/ not readable
885 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000886 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200887 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300888 # broken_link2
889 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000890 # TEST2/
891 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100892 self.walk_path = join(support.TESTFN, "TEST1")
893 self.sub1_path = join(self.walk_path, "SUB1")
894 self.sub11_path = join(self.sub1_path, "SUB11")
895 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300896 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100897 tmp1_path = join(self.walk_path, "tmp1")
898 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000899 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300900 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100901 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000902 t2_path = join(support.TESTFN, "TEST2")
903 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200904 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300905 broken_link2_path = join(sub2_path, "broken_link2")
906 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000907
908 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100909 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000910 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300911 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000912 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100913
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300914 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100915 with open(path, "x") as f:
916 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000917
Victor Stinner0561c532015-03-12 10:28:24 +0100918 if support.can_symlink():
919 os.symlink(os.path.abspath(t2_path), self.link_path)
920 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300921 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
922 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300923 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300924 ["broken_link", "broken_link2", "broken_link3",
925 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100926 else:
927 self.sub2_tree = (sub2_path, [], ["tmp3"])
928
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300929 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300930 try:
931 os.listdir(sub21_path)
932 except PermissionError:
933 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
934 else:
935 os.chmod(sub21_path, stat.S_IRWXU)
936 os.unlink(tmp5_path)
937 os.rmdir(sub21_path)
938 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300939
Victor Stinner0561c532015-03-12 10:28:24 +0100940 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000941 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300942 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100943
Tim Petersc4e09402003-04-25 07:11:48 +0000944 self.assertEqual(len(all), 4)
945 # We can't know which order SUB1 and SUB2 will appear in.
946 # Not flipped: TESTFN, SUB1, SUB11, SUB2
947 # flipped: TESTFN, SUB2, SUB1, SUB11
948 flipped = all[0][1][0] != "SUB1"
949 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200950 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300951 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100952 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
953 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
954 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
955 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000956
Brett Cannon3f9183b2016-08-26 14:44:48 -0700957 def test_walk_prune(self, walk_path=None):
958 if walk_path is None:
959 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000960 # Prune the search.
961 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700962 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000963 all.append((root, dirs, files))
964 # Don't descend into SUB1.
965 if 'SUB1' in dirs:
966 # Note that this also mutates the dirs we appended to all!
967 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000968
Victor Stinner0561c532015-03-12 10:28:24 +0100969 self.assertEqual(len(all), 2)
970 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700971 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100972
973 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300974 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100975 self.assertEqual(all[1], self.sub2_tree)
976
Brett Cannon3f9183b2016-08-26 14:44:48 -0700977 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700978 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700979
Victor Stinner0561c532015-03-12 10:28:24 +0100980 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000981 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100982 all = list(self.walk(self.walk_path, topdown=False))
983
Victor Stinner53b0a412016-03-26 01:12:36 +0100984 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000985 # We can't know which order SUB1 and SUB2 will appear in.
986 # Not flipped: SUB11, SUB1, SUB2, TESTFN
987 # flipped: SUB2, SUB11, SUB1, TESTFN
988 flipped = all[3][1][0] != "SUB1"
989 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200990 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300991 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100992 self.assertEqual(all[3],
993 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
994 self.assertEqual(all[flipped],
995 (self.sub11_path, [], []))
996 self.assertEqual(all[flipped + 1],
997 (self.sub1_path, ["SUB11"], ["tmp2"]))
998 self.assertEqual(all[2 - 2 * flipped],
999 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +00001000
Victor Stinner0561c532015-03-12 10:28:24 +01001001 def test_walk_symlink(self):
1002 if not support.can_symlink():
1003 self.skipTest("need symlink support")
1004
1005 # Walk, following symlinks.
1006 walk_it = self.walk(self.walk_path, follow_symlinks=True)
1007 for root, dirs, files in walk_it:
1008 if root == self.link_path:
1009 self.assertEqual(dirs, [])
1010 self.assertEqual(files, ["tmp4"])
1011 break
1012 else:
1013 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +00001014
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001015 def test_walk_bad_dir(self):
1016 # Walk top-down.
1017 errors = []
1018 walk_it = self.walk(self.walk_path, onerror=errors.append)
1019 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +03001020 self.assertEqual(errors, [])
1021 dir1 = 'SUB1'
1022 path1 = os.path.join(root, dir1)
1023 path1new = os.path.join(root, dir1 + '.new')
1024 os.rename(path1, path1new)
1025 try:
1026 roots = [r for r, d, f in walk_it]
1027 self.assertTrue(errors)
1028 self.assertNotIn(path1, roots)
1029 self.assertNotIn(path1new, roots)
1030 for dir2 in dirs:
1031 if dir2 != dir1:
1032 self.assertIn(os.path.join(root, dir2), roots)
1033 finally:
1034 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001035
Charles-François Natali7372b062012-02-05 15:15:38 +01001036
1037@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1038class FwalkTests(WalkTests):
1039 """Tests for os.fwalk()."""
1040
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001041 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001042 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001043 yield (root, dirs, files)
1044
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001045 def fwalk(self, *args, **kwargs):
1046 return os.fwalk(*args, **kwargs)
1047
Larry Hastingsc48fe982012-06-25 04:49:05 -07001048 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1049 """
1050 compare with walk() results.
1051 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001052 walk_kwargs = walk_kwargs.copy()
1053 fwalk_kwargs = fwalk_kwargs.copy()
1054 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1055 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1056 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001057
Charles-François Natali7372b062012-02-05 15:15:38 +01001058 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001059 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001060 expected[root] = (set(dirs), set(files))
1061
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001062 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001063 self.assertIn(root, expected)
1064 self.assertEqual(expected[root], (set(dirs), set(files)))
1065
Larry Hastingsc48fe982012-06-25 04:49:05 -07001066 def test_compare_to_walk(self):
1067 kwargs = {'top': support.TESTFN}
1068 self._compare_to_walk(kwargs, kwargs)
1069
Charles-François Natali7372b062012-02-05 15:15:38 +01001070 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001071 try:
1072 fd = os.open(".", os.O_RDONLY)
1073 walk_kwargs = {'top': support.TESTFN}
1074 fwalk_kwargs = walk_kwargs.copy()
1075 fwalk_kwargs['dir_fd'] = fd
1076 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1077 finally:
1078 os.close(fd)
1079
1080 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001081 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001082 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1083 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001084 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001085 # check that the FD is valid
1086 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001087 # redundant check
1088 os.stat(rootfd)
1089 # check that listdir() returns consistent information
1090 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001091
1092 def test_fd_leak(self):
1093 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1094 # we both check that calling fwalk() a large number of times doesn't
1095 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1096 minfd = os.dup(1)
1097 os.close(minfd)
1098 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001099 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001100 pass
1101 newfd = os.dup(1)
1102 self.addCleanup(os.close, newfd)
1103 self.assertEqual(newfd, minfd)
1104
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001105class BytesWalkTests(WalkTests):
1106 """Tests for os.walk() with bytes."""
1107 def walk(self, top, **kwargs):
1108 if 'follow_symlinks' in kwargs:
1109 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1110 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1111 root = os.fsdecode(broot)
1112 dirs = list(map(os.fsdecode, bdirs))
1113 files = list(map(os.fsdecode, bfiles))
1114 yield (root, dirs, files)
1115 bdirs[:] = list(map(os.fsencode, dirs))
1116 bfiles[:] = list(map(os.fsencode, files))
1117
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001118@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1119class BytesFwalkTests(FwalkTests):
1120 """Tests for os.walk() with bytes."""
1121 def fwalk(self, top='.', *args, **kwargs):
1122 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1123 root = os.fsdecode(broot)
1124 dirs = list(map(os.fsdecode, bdirs))
1125 files = list(map(os.fsdecode, bfiles))
1126 yield (root, dirs, files, topfd)
1127 bdirs[:] = list(map(os.fsencode, dirs))
1128 bfiles[:] = list(map(os.fsencode, files))
1129
Charles-François Natali7372b062012-02-05 15:15:38 +01001130
Guido van Rossume7ba4952007-06-06 23:52:48 +00001131class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001132 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001133 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001134
1135 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001136 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001137 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1138 os.makedirs(path) # Should work
1139 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1140 os.makedirs(path)
1141
1142 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001143 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001144 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1145 os.makedirs(path)
1146 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1147 'dir5', 'dir6')
1148 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001149
Serhiy Storchakae304e332017-03-24 13:27:42 +02001150 def test_mode(self):
1151 with support.temp_umask(0o002):
1152 base = support.TESTFN
1153 parent = os.path.join(base, 'dir1')
1154 path = os.path.join(parent, 'dir2')
1155 os.makedirs(path, 0o555)
1156 self.assertTrue(os.path.exists(path))
1157 self.assertTrue(os.path.isdir(path))
1158 if os.name != 'nt':
1159 self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o555)
1160 self.assertEqual(stat.S_IMODE(os.stat(parent).st_mode), 0o775)
1161
Terry Reedy5a22b652010-12-02 07:05:56 +00001162 def test_exist_ok_existing_directory(self):
1163 path = os.path.join(support.TESTFN, 'dir1')
1164 mode = 0o777
1165 old_mask = os.umask(0o022)
1166 os.makedirs(path, mode)
1167 self.assertRaises(OSError, os.makedirs, path, mode)
1168 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001169 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001170 os.makedirs(path, mode=mode, exist_ok=True)
1171 os.umask(old_mask)
1172
Martin Pantera82642f2015-11-19 04:48:44 +00001173 # Issue #25583: A drive root could raise PermissionError on Windows
1174 os.makedirs(os.path.abspath('/'), exist_ok=True)
1175
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001176 def test_exist_ok_s_isgid_directory(self):
1177 path = os.path.join(support.TESTFN, 'dir1')
1178 S_ISGID = stat.S_ISGID
1179 mode = 0o777
1180 old_mask = os.umask(0o022)
1181 try:
1182 existing_testfn_mode = stat.S_IMODE(
1183 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001184 try:
1185 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001186 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001187 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001188 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1189 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1190 # The os should apply S_ISGID from the parent dir for us, but
1191 # this test need not depend on that behavior. Be explicit.
1192 os.makedirs(path, mode | S_ISGID)
1193 # http://bugs.python.org/issue14992
1194 # Should not fail when the bit is already set.
1195 os.makedirs(path, mode, exist_ok=True)
1196 # remove the bit.
1197 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001198 # May work even when the bit is not already set when demanded.
1199 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001200 finally:
1201 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001202
1203 def test_exist_ok_existing_regular_file(self):
1204 base = support.TESTFN
1205 path = os.path.join(support.TESTFN, 'dir1')
1206 f = open(path, 'w')
1207 f.write('abc')
1208 f.close()
1209 self.assertRaises(OSError, os.makedirs, path)
1210 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1211 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1212 os.remove(path)
1213
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001214 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001215 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001216 'dir4', 'dir5', 'dir6')
1217 # If the tests failed, the bottom-most directory ('../dir6')
1218 # may not have been created, so we look for the outermost directory
1219 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001220 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001221 path = os.path.dirname(path)
1222
1223 os.removedirs(path)
1224
Andrew Svetlov405faed2012-12-25 12:18:09 +02001225
R David Murrayf2ad1732014-12-25 18:36:56 -05001226@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1227class ChownFileTests(unittest.TestCase):
1228
Berker Peksag036a71b2015-07-21 09:29:48 +03001229 @classmethod
1230 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001231 os.mkdir(support.TESTFN)
1232
1233 def test_chown_uid_gid_arguments_must_be_index(self):
1234 stat = os.stat(support.TESTFN)
1235 uid = stat.st_uid
1236 gid = stat.st_gid
1237 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1238 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1239 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1240 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1241 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1242
1243 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1244 def test_chown(self):
1245 gid_1, gid_2 = groups[:2]
1246 uid = os.stat(support.TESTFN).st_uid
1247 os.chown(support.TESTFN, uid, gid_1)
1248 gid = os.stat(support.TESTFN).st_gid
1249 self.assertEqual(gid, gid_1)
1250 os.chown(support.TESTFN, uid, gid_2)
1251 gid = os.stat(support.TESTFN).st_gid
1252 self.assertEqual(gid, gid_2)
1253
1254 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1255 "test needs root privilege and more than one user")
1256 def test_chown_with_root(self):
1257 uid_1, uid_2 = all_users[:2]
1258 gid = os.stat(support.TESTFN).st_gid
1259 os.chown(support.TESTFN, uid_1, gid)
1260 uid = os.stat(support.TESTFN).st_uid
1261 self.assertEqual(uid, uid_1)
1262 os.chown(support.TESTFN, uid_2, gid)
1263 uid = os.stat(support.TESTFN).st_uid
1264 self.assertEqual(uid, uid_2)
1265
1266 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1267 "test needs non-root account and more than one user")
1268 def test_chown_without_permission(self):
1269 uid_1, uid_2 = all_users[:2]
1270 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001271 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001272 os.chown(support.TESTFN, uid_1, gid)
1273 os.chown(support.TESTFN, uid_2, gid)
1274
Berker Peksag036a71b2015-07-21 09:29:48 +03001275 @classmethod
1276 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001277 os.rmdir(support.TESTFN)
1278
1279
Andrew Svetlov405faed2012-12-25 12:18:09 +02001280class RemoveDirsTests(unittest.TestCase):
1281 def setUp(self):
1282 os.makedirs(support.TESTFN)
1283
1284 def tearDown(self):
1285 support.rmtree(support.TESTFN)
1286
1287 def test_remove_all(self):
1288 dira = os.path.join(support.TESTFN, 'dira')
1289 os.mkdir(dira)
1290 dirb = os.path.join(dira, 'dirb')
1291 os.mkdir(dirb)
1292 os.removedirs(dirb)
1293 self.assertFalse(os.path.exists(dirb))
1294 self.assertFalse(os.path.exists(dira))
1295 self.assertFalse(os.path.exists(support.TESTFN))
1296
1297 def test_remove_partial(self):
1298 dira = os.path.join(support.TESTFN, 'dira')
1299 os.mkdir(dira)
1300 dirb = os.path.join(dira, 'dirb')
1301 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001302 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001303 os.removedirs(dirb)
1304 self.assertFalse(os.path.exists(dirb))
1305 self.assertTrue(os.path.exists(dira))
1306 self.assertTrue(os.path.exists(support.TESTFN))
1307
1308 def test_remove_nothing(self):
1309 dira = os.path.join(support.TESTFN, 'dira')
1310 os.mkdir(dira)
1311 dirb = os.path.join(dira, 'dirb')
1312 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001313 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001314 with self.assertRaises(OSError):
1315 os.removedirs(dirb)
1316 self.assertTrue(os.path.exists(dirb))
1317 self.assertTrue(os.path.exists(dira))
1318 self.assertTrue(os.path.exists(support.TESTFN))
1319
1320
Guido van Rossume7ba4952007-06-06 23:52:48 +00001321class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001322 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001323 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001324 f.write(b'hello')
1325 f.close()
1326 with open(os.devnull, 'rb') as f:
1327 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001328
Andrew Svetlov405faed2012-12-25 12:18:09 +02001329
Guido van Rossume7ba4952007-06-06 23:52:48 +00001330class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001331 def test_urandom_length(self):
1332 self.assertEqual(len(os.urandom(0)), 0)
1333 self.assertEqual(len(os.urandom(1)), 1)
1334 self.assertEqual(len(os.urandom(10)), 10)
1335 self.assertEqual(len(os.urandom(100)), 100)
1336 self.assertEqual(len(os.urandom(1000)), 1000)
1337
1338 def test_urandom_value(self):
1339 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001340 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001341 data2 = os.urandom(16)
1342 self.assertNotEqual(data1, data2)
1343
1344 def get_urandom_subprocess(self, count):
1345 code = '\n'.join((
1346 'import os, sys',
1347 'data = os.urandom(%s)' % count,
1348 'sys.stdout.buffer.write(data)',
1349 'sys.stdout.buffer.flush()'))
1350 out = assert_python_ok('-c', code)
1351 stdout = out[1]
1352 self.assertEqual(len(stdout), 16)
1353 return stdout
1354
1355 def test_urandom_subprocess(self):
1356 data1 = self.get_urandom_subprocess(16)
1357 data2 = self.get_urandom_subprocess(16)
1358 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001359
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001360
Victor Stinner9b1f4742016-09-06 16:18:52 -07001361@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1362class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001363 @classmethod
1364 def setUpClass(cls):
1365 try:
1366 os.getrandom(1)
1367 except OSError as exc:
1368 if exc.errno == errno.ENOSYS:
1369 # Python compiled on a more recent Linux version
1370 # than the current Linux kernel
1371 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1372 else:
1373 raise
1374
Victor Stinner9b1f4742016-09-06 16:18:52 -07001375 def test_getrandom_type(self):
1376 data = os.getrandom(16)
1377 self.assertIsInstance(data, bytes)
1378 self.assertEqual(len(data), 16)
1379
1380 def test_getrandom0(self):
1381 empty = os.getrandom(0)
1382 self.assertEqual(empty, b'')
1383
1384 def test_getrandom_random(self):
1385 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1386
1387 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1388 # resource /dev/random
1389
1390 def test_getrandom_nonblock(self):
1391 # The call must not fail. Check also that the flag exists
1392 try:
1393 os.getrandom(1, os.GRND_NONBLOCK)
1394 except BlockingIOError:
1395 # System urandom is not initialized yet
1396 pass
1397
1398 def test_getrandom_value(self):
1399 data1 = os.getrandom(16)
1400 data2 = os.getrandom(16)
1401 self.assertNotEqual(data1, data2)
1402
1403
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001404# os.urandom() doesn't use a file descriptor when it is implemented with the
1405# getentropy() function, the getrandom() function or the getrandom() syscall
1406OS_URANDOM_DONT_USE_FD = (
1407 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1408 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1409 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001410
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001411@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1412 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001413class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001414 @unittest.skipUnless(resource, "test requires the resource module")
1415 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001416 # Check urandom() failing when it is not able to open /dev/random.
1417 # We spawn a new process to make the test more robust (if getrlimit()
1418 # failed to restore the file descriptor limit after this, the whole
1419 # test suite would crash; this actually happened on the OS X Tiger
1420 # buildbot).
1421 code = """if 1:
1422 import errno
1423 import os
1424 import resource
1425
1426 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1427 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1428 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001429 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001430 except OSError as e:
1431 assert e.errno == errno.EMFILE, e.errno
1432 else:
1433 raise AssertionError("OSError not raised")
1434 """
1435 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001436
Antoine Pitroue472aea2014-04-26 14:33:03 +02001437 def test_urandom_fd_closed(self):
1438 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1439 # closed.
1440 code = """if 1:
1441 import os
1442 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001443 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001444 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001445 with test.support.SuppressCrashReport():
1446 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001447 sys.stdout.buffer.write(os.urandom(4))
1448 """
1449 rc, out, err = assert_python_ok('-Sc', code)
1450
1451 def test_urandom_fd_reopened(self):
1452 # Issue #21207: urandom() should detect its fd to /dev/urandom
1453 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001454 self.addCleanup(support.unlink, support.TESTFN)
1455 create_file(support.TESTFN, b"x" * 256)
1456
Antoine Pitroue472aea2014-04-26 14:33:03 +02001457 code = """if 1:
1458 import os
1459 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001460 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001461 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001462 with test.support.SuppressCrashReport():
1463 for fd in range(3, 256):
1464 try:
1465 os.close(fd)
1466 except OSError:
1467 pass
1468 else:
1469 # Found the urandom fd (XXX hopefully)
1470 break
1471 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001472 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001473 new_fd = f.fileno()
1474 # Issue #26935: posix allows new_fd and fd to be equal but
1475 # some libc implementations have dup2 return an error in this
1476 # case.
1477 if new_fd != fd:
1478 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001479 sys.stdout.buffer.write(os.urandom(4))
1480 sys.stdout.buffer.write(os.urandom(4))
1481 """.format(TESTFN=support.TESTFN)
1482 rc, out, err = assert_python_ok('-Sc', code)
1483 self.assertEqual(len(out), 8)
1484 self.assertNotEqual(out[0:4], out[4:8])
1485 rc, out2, err2 = assert_python_ok('-Sc', code)
1486 self.assertEqual(len(out2), 8)
1487 self.assertNotEqual(out2, out)
1488
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001489
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001490@contextlib.contextmanager
1491def _execvpe_mockup(defpath=None):
1492 """
1493 Stubs out execv and execve functions when used as context manager.
1494 Records exec calls. The mock execv and execve functions always raise an
1495 exception as they would normally never return.
1496 """
1497 # A list of tuples containing (function name, first arg, args)
1498 # of calls to execv or execve that have been made.
1499 calls = []
1500
1501 def mock_execv(name, *args):
1502 calls.append(('execv', name, args))
1503 raise RuntimeError("execv called")
1504
1505 def mock_execve(name, *args):
1506 calls.append(('execve', name, args))
1507 raise OSError(errno.ENOTDIR, "execve called")
1508
1509 try:
1510 orig_execv = os.execv
1511 orig_execve = os.execve
1512 orig_defpath = os.defpath
1513 os.execv = mock_execv
1514 os.execve = mock_execve
1515 if defpath is not None:
1516 os.defpath = defpath
1517 yield calls
1518 finally:
1519 os.execv = orig_execv
1520 os.execve = orig_execve
1521 os.defpath = orig_defpath
1522
Victor Stinner4659ccf2016-09-14 10:57:00 +02001523
Guido van Rossume7ba4952007-06-06 23:52:48 +00001524class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001525 @unittest.skipIf(USING_LINUXTHREADS,
1526 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001527 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001528 self.assertRaises(OSError, os.execvpe, 'no such app-',
1529 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001530
Steve Dowerbce26262016-11-19 19:17:26 -08001531 def test_execv_with_bad_arglist(self):
1532 self.assertRaises(ValueError, os.execv, 'notepad', ())
1533 self.assertRaises(ValueError, os.execv, 'notepad', [])
1534 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1535 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1536
Thomas Heller6790d602007-08-30 17:15:14 +00001537 def test_execvpe_with_bad_arglist(self):
1538 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001539 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1540 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001541
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001542 @unittest.skipUnless(hasattr(os, '_execvpe'),
1543 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001544 def _test_internal_execvpe(self, test_type):
1545 program_path = os.sep + 'absolutepath'
1546 if test_type is bytes:
1547 program = b'executable'
1548 fullpath = os.path.join(os.fsencode(program_path), program)
1549 native_fullpath = fullpath
1550 arguments = [b'progname', 'arg1', 'arg2']
1551 else:
1552 program = 'executable'
1553 arguments = ['progname', 'arg1', 'arg2']
1554 fullpath = os.path.join(program_path, program)
1555 if os.name != "nt":
1556 native_fullpath = os.fsencode(fullpath)
1557 else:
1558 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001559 env = {'spam': 'beans'}
1560
Victor Stinnerb745a742010-05-18 17:17:23 +00001561 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001562 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001563 self.assertRaises(RuntimeError,
1564 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001565 self.assertEqual(len(calls), 1)
1566 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1567
Victor Stinnerb745a742010-05-18 17:17:23 +00001568 # test os._execvpe() with a relative path:
1569 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001570 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001571 self.assertRaises(OSError,
1572 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001573 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001574 self.assertSequenceEqual(calls[0],
1575 ('execve', native_fullpath, (arguments, env)))
1576
1577 # test os._execvpe() with a relative path:
1578 # os.get_exec_path() reads the 'PATH' variable
1579 with _execvpe_mockup() as calls:
1580 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001581 if test_type is bytes:
1582 env_path[b'PATH'] = program_path
1583 else:
1584 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001585 self.assertRaises(OSError,
1586 os._execvpe, program, arguments, env=env_path)
1587 self.assertEqual(len(calls), 1)
1588 self.assertSequenceEqual(calls[0],
1589 ('execve', native_fullpath, (arguments, env_path)))
1590
1591 def test_internal_execvpe_str(self):
1592 self._test_internal_execvpe(str)
1593 if os.name != "nt":
1594 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001595
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001596 def test_execve_invalid_env(self):
1597 args = [sys.executable, '-c', 'pass']
1598
Ville Skyttä49b27342017-08-03 09:00:59 +03001599 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001600 newenv = os.environ.copy()
1601 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1602 with self.assertRaises(ValueError):
1603 os.execve(args[0], args, newenv)
1604
Ville Skyttä49b27342017-08-03 09:00:59 +03001605 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001606 newenv = os.environ.copy()
1607 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1608 with self.assertRaises(ValueError):
1609 os.execve(args[0], args, newenv)
1610
Ville Skyttä49b27342017-08-03 09:00:59 +03001611 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001612 newenv = os.environ.copy()
1613 newenv["FRUIT=ORANGE"] = "lemon"
1614 with self.assertRaises(ValueError):
1615 os.execve(args[0], args, newenv)
1616
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001617
Serhiy Storchaka43767632013-11-03 21:31:38 +02001618@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001619class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001620 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001621 try:
1622 os.stat(support.TESTFN)
1623 except FileNotFoundError:
1624 exists = False
1625 except OSError as exc:
1626 exists = True
1627 self.fail("file %s must not exist; os.stat failed with %s"
1628 % (support.TESTFN, exc))
1629 else:
1630 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001631
Thomas Wouters477c8d52006-05-27 19:21:47 +00001632 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001633 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001634
1635 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001636 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001637
1638 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001639 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001640
1641 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001642 self.addCleanup(support.unlink, support.TESTFN)
1643
Victor Stinnere77c9742016-03-25 10:28:23 +01001644 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001645 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001646
1647 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001648 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001649
Thomas Wouters477c8d52006-05-27 19:21:47 +00001650 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001651 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001652
Victor Stinnere77c9742016-03-25 10:28:23 +01001653
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001654class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001655 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001656 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1657 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001658 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001659 def get_single(f):
1660 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001661 if hasattr(os, f):
1662 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001663 return helper
1664 for f in singles:
1665 locals()["test_"+f] = get_single(f)
1666
Benjamin Peterson7522c742009-01-19 21:00:09 +00001667 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001668 try:
1669 f(support.make_bad_fd(), *args)
1670 except OSError as e:
1671 self.assertEqual(e.errno, errno.EBADF)
1672 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001673 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001674 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001675
Serhiy Storchaka43767632013-11-03 21:31:38 +02001676 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001677 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001678 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001679
Serhiy Storchaka43767632013-11-03 21:31:38 +02001680 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001681 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001682 fd = support.make_bad_fd()
1683 # Make sure none of the descriptors we are about to close are
1684 # currently valid (issue 6542).
1685 for i in range(10):
1686 try: os.fstat(fd+i)
1687 except OSError:
1688 pass
1689 else:
1690 break
1691 if i < 2:
1692 raise unittest.SkipTest(
1693 "Unable to acquire a range of invalid file descriptors")
1694 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001695
Serhiy Storchaka43767632013-11-03 21:31:38 +02001696 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001697 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001698 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001699
Serhiy Storchaka43767632013-11-03 21:31:38 +02001700 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001701 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001702 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001703
Serhiy Storchaka43767632013-11-03 21:31:38 +02001704 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001705 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001706 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001707
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001709 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001710 self.check(os.pathconf, "PC_NAME_MAX")
1711 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001712
Serhiy Storchaka43767632013-11-03 21:31:38 +02001713 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001714 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001715 self.check(os.truncate, 0)
1716 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001717
Serhiy Storchaka43767632013-11-03 21:31:38 +02001718 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001719 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001720 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001721
Serhiy Storchaka43767632013-11-03 21:31:38 +02001722 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001723 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001724 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001725
Victor Stinner57ddf782014-01-08 15:21:28 +01001726 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1727 def test_readv(self):
1728 buf = bytearray(10)
1729 self.check(os.readv, [buf])
1730
Serhiy Storchaka43767632013-11-03 21:31:38 +02001731 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001732 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001733 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001734
Serhiy Storchaka43767632013-11-03 21:31:38 +02001735 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001736 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001737 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001738
Victor Stinner57ddf782014-01-08 15:21:28 +01001739 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1740 def test_writev(self):
1741 self.check(os.writev, [b'abc'])
1742
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001743 def test_inheritable(self):
1744 self.check(os.get_inheritable)
1745 self.check(os.set_inheritable, True)
1746
1747 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1748 'needs os.get_blocking() and os.set_blocking()')
1749 def test_blocking(self):
1750 self.check(os.get_blocking)
1751 self.check(os.set_blocking, True)
1752
Brian Curtin1b9df392010-11-24 20:24:31 +00001753
1754class LinkTests(unittest.TestCase):
1755 def setUp(self):
1756 self.file1 = support.TESTFN
1757 self.file2 = os.path.join(support.TESTFN + "2")
1758
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001759 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001760 for file in (self.file1, self.file2):
1761 if os.path.exists(file):
1762 os.unlink(file)
1763
Brian Curtin1b9df392010-11-24 20:24:31 +00001764 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001765 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001766
Steve Dowercc16be82016-09-08 10:35:16 -07001767 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001768 with open(file1, "r") as f1, open(file2, "r") as f2:
1769 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1770
1771 def test_link(self):
1772 self._test_link(self.file1, self.file2)
1773
1774 def test_link_bytes(self):
1775 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1776 bytes(self.file2, sys.getfilesystemencoding()))
1777
Brian Curtinf498b752010-11-30 15:54:04 +00001778 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001779 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001780 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001781 except UnicodeError:
1782 raise unittest.SkipTest("Unable to encode for this platform.")
1783
Brian Curtinf498b752010-11-30 15:54:04 +00001784 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001785 self.file2 = self.file1 + "2"
1786 self._test_link(self.file1, self.file2)
1787
Serhiy Storchaka43767632013-11-03 21:31:38 +02001788@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1789class PosixUidGidTests(unittest.TestCase):
1790 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1791 def test_setuid(self):
1792 if os.getuid() != 0:
1793 self.assertRaises(OSError, os.setuid, 0)
1794 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001795
Serhiy Storchaka43767632013-11-03 21:31:38 +02001796 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1797 def test_setgid(self):
1798 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1799 self.assertRaises(OSError, os.setgid, 0)
1800 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001801
Serhiy Storchaka43767632013-11-03 21:31:38 +02001802 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1803 def test_seteuid(self):
1804 if os.getuid() != 0:
1805 self.assertRaises(OSError, os.seteuid, 0)
1806 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001807
Serhiy Storchaka43767632013-11-03 21:31:38 +02001808 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1809 def test_setegid(self):
1810 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1811 self.assertRaises(OSError, os.setegid, 0)
1812 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001813
Serhiy Storchaka43767632013-11-03 21:31:38 +02001814 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1815 def test_setreuid(self):
1816 if os.getuid() != 0:
1817 self.assertRaises(OSError, os.setreuid, 0, 0)
1818 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1819 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001820
Serhiy Storchaka43767632013-11-03 21:31:38 +02001821 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1822 def test_setreuid_neg1(self):
1823 # Needs to accept -1. We run this in a subprocess to avoid
1824 # altering the test runner's process state (issue8045).
1825 subprocess.check_call([
1826 sys.executable, '-c',
1827 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001828
Serhiy Storchaka43767632013-11-03 21:31:38 +02001829 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1830 def test_setregid(self):
1831 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1832 self.assertRaises(OSError, os.setregid, 0, 0)
1833 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1834 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001835
Serhiy Storchaka43767632013-11-03 21:31:38 +02001836 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1837 def test_setregid_neg1(self):
1838 # Needs to accept -1. We run this in a subprocess to avoid
1839 # altering the test runner's process state (issue8045).
1840 subprocess.check_call([
1841 sys.executable, '-c',
1842 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001843
Serhiy Storchaka43767632013-11-03 21:31:38 +02001844@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1845class Pep383Tests(unittest.TestCase):
1846 def setUp(self):
1847 if support.TESTFN_UNENCODABLE:
1848 self.dir = support.TESTFN_UNENCODABLE
1849 elif support.TESTFN_NONASCII:
1850 self.dir = support.TESTFN_NONASCII
1851 else:
1852 self.dir = support.TESTFN
1853 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001854
Serhiy Storchaka43767632013-11-03 21:31:38 +02001855 bytesfn = []
1856 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001857 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001858 fn = os.fsencode(fn)
1859 except UnicodeEncodeError:
1860 return
1861 bytesfn.append(fn)
1862 add_filename(support.TESTFN_UNICODE)
1863 if support.TESTFN_UNENCODABLE:
1864 add_filename(support.TESTFN_UNENCODABLE)
1865 if support.TESTFN_NONASCII:
1866 add_filename(support.TESTFN_NONASCII)
1867 if not bytesfn:
1868 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001869
Serhiy Storchaka43767632013-11-03 21:31:38 +02001870 self.unicodefn = set()
1871 os.mkdir(self.dir)
1872 try:
1873 for fn in bytesfn:
1874 support.create_empty_file(os.path.join(self.bdir, fn))
1875 fn = os.fsdecode(fn)
1876 if fn in self.unicodefn:
1877 raise ValueError("duplicate filename")
1878 self.unicodefn.add(fn)
1879 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001880 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001881 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001882
Serhiy Storchaka43767632013-11-03 21:31:38 +02001883 def tearDown(self):
1884 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001885
Serhiy Storchaka43767632013-11-03 21:31:38 +02001886 def test_listdir(self):
1887 expected = self.unicodefn
1888 found = set(os.listdir(self.dir))
1889 self.assertEqual(found, expected)
1890 # test listdir without arguments
1891 current_directory = os.getcwd()
1892 try:
1893 os.chdir(os.sep)
1894 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1895 finally:
1896 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001897
Serhiy Storchaka43767632013-11-03 21:31:38 +02001898 def test_open(self):
1899 for fn in self.unicodefn:
1900 f = open(os.path.join(self.dir, fn), 'rb')
1901 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001902
Serhiy Storchaka43767632013-11-03 21:31:38 +02001903 @unittest.skipUnless(hasattr(os, 'statvfs'),
1904 "need os.statvfs()")
1905 def test_statvfs(self):
1906 # issue #9645
1907 for fn in self.unicodefn:
1908 # should not fail with file not found error
1909 fullname = os.path.join(self.dir, fn)
1910 os.statvfs(fullname)
1911
1912 def test_stat(self):
1913 for fn in self.unicodefn:
1914 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001915
Brian Curtineb24d742010-04-12 17:16:38 +00001916@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1917class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001918 def _kill(self, sig):
1919 # Start sys.executable as a subprocess and communicate from the
1920 # subprocess to the parent that the interpreter is ready. When it
1921 # becomes ready, send *sig* via os.kill to the subprocess and check
1922 # that the return code is equal to *sig*.
1923 import ctypes
1924 from ctypes import wintypes
1925 import msvcrt
1926
1927 # Since we can't access the contents of the process' stdout until the
1928 # process has exited, use PeekNamedPipe to see what's inside stdout
1929 # without waiting. This is done so we can tell that the interpreter
1930 # is started and running at a point where it could handle a signal.
1931 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1932 PeekNamedPipe.restype = wintypes.BOOL
1933 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1934 ctypes.POINTER(ctypes.c_char), # stdout buf
1935 wintypes.DWORD, # Buffer size
1936 ctypes.POINTER(wintypes.DWORD), # bytes read
1937 ctypes.POINTER(wintypes.DWORD), # bytes avail
1938 ctypes.POINTER(wintypes.DWORD)) # bytes left
1939 msg = "running"
1940 proc = subprocess.Popen([sys.executable, "-c",
1941 "import sys;"
1942 "sys.stdout.write('{}');"
1943 "sys.stdout.flush();"
1944 "input()".format(msg)],
1945 stdout=subprocess.PIPE,
1946 stderr=subprocess.PIPE,
1947 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001948 self.addCleanup(proc.stdout.close)
1949 self.addCleanup(proc.stderr.close)
1950 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001951
1952 count, max = 0, 100
1953 while count < max and proc.poll() is None:
1954 # Create a string buffer to store the result of stdout from the pipe
1955 buf = ctypes.create_string_buffer(len(msg))
1956 # Obtain the text currently in proc.stdout
1957 # Bytes read/avail/left are left as NULL and unused
1958 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1959 buf, ctypes.sizeof(buf), None, None, None)
1960 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1961 if buf.value:
1962 self.assertEqual(msg, buf.value.decode())
1963 break
1964 time.sleep(0.1)
1965 count += 1
1966 else:
1967 self.fail("Did not receive communication from the subprocess")
1968
Brian Curtineb24d742010-04-12 17:16:38 +00001969 os.kill(proc.pid, sig)
1970 self.assertEqual(proc.wait(), sig)
1971
1972 def test_kill_sigterm(self):
1973 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001974 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001975
1976 def test_kill_int(self):
1977 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001978 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001979
1980 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001981 tagname = "test_os_%s" % uuid.uuid1()
1982 m = mmap.mmap(-1, 1, tagname)
1983 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001984 # Run a script which has console control handling enabled.
1985 proc = subprocess.Popen([sys.executable,
1986 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001987 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001988 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1989 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001990 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001991 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001992 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001993 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001994 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001995 count += 1
1996 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001997 # Forcefully kill the process if we weren't able to signal it.
1998 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001999 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00002000 os.kill(proc.pid, event)
2001 # proc.send_signal(event) could also be done here.
2002 # Allow time for the signal to be passed and the process to exit.
2003 time.sleep(0.5)
2004 if not proc.poll():
2005 # Forcefully kill the process if we weren't able to signal it.
2006 os.kill(proc.pid, signal.SIGINT)
2007 self.fail("subprocess did not stop on {}".format(name))
2008
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002009 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00002010 def test_CTRL_C_EVENT(self):
2011 from ctypes import wintypes
2012 import ctypes
2013
2014 # Make a NULL value by creating a pointer with no argument.
2015 NULL = ctypes.POINTER(ctypes.c_int)()
2016 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
2017 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
2018 wintypes.BOOL)
2019 SetConsoleCtrlHandler.restype = wintypes.BOOL
2020
2021 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002022 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002023 # by subprocesses.
2024 SetConsoleCtrlHandler(NULL, 0)
2025
2026 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2027
2028 def test_CTRL_BREAK_EVENT(self):
2029 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2030
2031
Brian Curtind40e6f72010-07-08 21:39:08 +00002032@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002033class Win32ListdirTests(unittest.TestCase):
2034 """Test listdir on Windows."""
2035
2036 def setUp(self):
2037 self.created_paths = []
2038 for i in range(2):
2039 dir_name = 'SUB%d' % i
2040 dir_path = os.path.join(support.TESTFN, dir_name)
2041 file_name = 'FILE%d' % i
2042 file_path = os.path.join(support.TESTFN, file_name)
2043 os.makedirs(dir_path)
2044 with open(file_path, 'w') as f:
2045 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2046 self.created_paths.extend([dir_name, file_name])
2047 self.created_paths.sort()
2048
2049 def tearDown(self):
2050 shutil.rmtree(support.TESTFN)
2051
2052 def test_listdir_no_extended_path(self):
2053 """Test when the path is not an "extended" path."""
2054 # unicode
2055 self.assertEqual(
2056 sorted(os.listdir(support.TESTFN)),
2057 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002058
Tim Golden781bbeb2013-10-25 20:24:06 +01002059 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002060 self.assertEqual(
2061 sorted(os.listdir(os.fsencode(support.TESTFN))),
2062 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002063
2064 def test_listdir_extended_path(self):
2065 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002066 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002067 # unicode
2068 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2069 self.assertEqual(
2070 sorted(os.listdir(path)),
2071 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002072
Tim Golden781bbeb2013-10-25 20:24:06 +01002073 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002074 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2075 self.assertEqual(
2076 sorted(os.listdir(path)),
2077 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002078
2079
2080@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002081@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002082class Win32SymlinkTests(unittest.TestCase):
2083 filelink = 'filelinktest'
2084 filelink_target = os.path.abspath(__file__)
2085 dirlink = 'dirlinktest'
2086 dirlink_target = os.path.dirname(filelink_target)
2087 missing_link = 'missing link'
2088
2089 def setUp(self):
2090 assert os.path.exists(self.dirlink_target)
2091 assert os.path.exists(self.filelink_target)
2092 assert not os.path.exists(self.dirlink)
2093 assert not os.path.exists(self.filelink)
2094 assert not os.path.exists(self.missing_link)
2095
2096 def tearDown(self):
2097 if os.path.exists(self.filelink):
2098 os.remove(self.filelink)
2099 if os.path.exists(self.dirlink):
2100 os.rmdir(self.dirlink)
2101 if os.path.lexists(self.missing_link):
2102 os.remove(self.missing_link)
2103
2104 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002105 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002106 self.assertTrue(os.path.exists(self.dirlink))
2107 self.assertTrue(os.path.isdir(self.dirlink))
2108 self.assertTrue(os.path.islink(self.dirlink))
2109 self.check_stat(self.dirlink, self.dirlink_target)
2110
2111 def test_file_link(self):
2112 os.symlink(self.filelink_target, self.filelink)
2113 self.assertTrue(os.path.exists(self.filelink))
2114 self.assertTrue(os.path.isfile(self.filelink))
2115 self.assertTrue(os.path.islink(self.filelink))
2116 self.check_stat(self.filelink, self.filelink_target)
2117
2118 def _create_missing_dir_link(self):
2119 'Create a "directory" link to a non-existent target'
2120 linkname = self.missing_link
2121 if os.path.lexists(linkname):
2122 os.remove(linkname)
2123 target = r'c:\\target does not exist.29r3c740'
2124 assert not os.path.exists(target)
2125 target_is_dir = True
2126 os.symlink(target, linkname, target_is_dir)
2127
2128 def test_remove_directory_link_to_missing_target(self):
2129 self._create_missing_dir_link()
2130 # For compatibility with Unix, os.remove will check the
2131 # directory status and call RemoveDirectory if the symlink
2132 # was created with target_is_dir==True.
2133 os.remove(self.missing_link)
2134
2135 @unittest.skip("currently fails; consider for improvement")
2136 def test_isdir_on_directory_link_to_missing_target(self):
2137 self._create_missing_dir_link()
2138 # consider having isdir return true for directory links
2139 self.assertTrue(os.path.isdir(self.missing_link))
2140
2141 @unittest.skip("currently fails; consider for improvement")
2142 def test_rmdir_on_directory_link_to_missing_target(self):
2143 self._create_missing_dir_link()
2144 # consider allowing rmdir to remove directory links
2145 os.rmdir(self.missing_link)
2146
2147 def check_stat(self, link, target):
2148 self.assertEqual(os.stat(link), os.stat(target))
2149 self.assertNotEqual(os.lstat(link), os.stat(link))
2150
Brian Curtind25aef52011-06-13 15:16:04 -05002151 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002152 self.assertEqual(os.stat(bytes_link), os.stat(target))
2153 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002154
2155 def test_12084(self):
2156 level1 = os.path.abspath(support.TESTFN)
2157 level2 = os.path.join(level1, "level2")
2158 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002159 self.addCleanup(support.rmtree, level1)
2160
2161 os.mkdir(level1)
2162 os.mkdir(level2)
2163 os.mkdir(level3)
2164
2165 file1 = os.path.abspath(os.path.join(level1, "file1"))
2166 create_file(file1)
2167
2168 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002169 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002170 os.chdir(level2)
2171 link = os.path.join(level2, "link")
2172 os.symlink(os.path.relpath(file1), "link")
2173 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002174
Victor Stinnerae39d232016-03-24 17:12:55 +01002175 # Check os.stat calls from the same dir as the link
2176 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002177
Victor Stinnerae39d232016-03-24 17:12:55 +01002178 # Check os.stat calls from a dir below the link
2179 os.chdir(level1)
2180 self.assertEqual(os.stat(file1),
2181 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002182
Victor Stinnerae39d232016-03-24 17:12:55 +01002183 # Check os.stat calls from a dir above the link
2184 os.chdir(level3)
2185 self.assertEqual(os.stat(file1),
2186 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002187 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002188 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002189
Brian Curtind40e6f72010-07-08 21:39:08 +00002190
Tim Golden0321cf22014-05-05 19:46:17 +01002191@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2192class Win32JunctionTests(unittest.TestCase):
2193 junction = 'junctiontest'
2194 junction_target = os.path.dirname(os.path.abspath(__file__))
2195
2196 def setUp(self):
2197 assert os.path.exists(self.junction_target)
2198 assert not os.path.exists(self.junction)
2199
2200 def tearDown(self):
2201 if os.path.exists(self.junction):
2202 # os.rmdir delegates to Windows' RemoveDirectoryW,
2203 # which removes junction points safely.
2204 os.rmdir(self.junction)
2205
2206 def test_create_junction(self):
2207 _winapi.CreateJunction(self.junction_target, self.junction)
2208 self.assertTrue(os.path.exists(self.junction))
2209 self.assertTrue(os.path.isdir(self.junction))
2210
2211 # Junctions are not recognized as links.
2212 self.assertFalse(os.path.islink(self.junction))
2213
2214 def test_unlink_removes_junction(self):
2215 _winapi.CreateJunction(self.junction_target, self.junction)
2216 self.assertTrue(os.path.exists(self.junction))
2217
2218 os.unlink(self.junction)
2219 self.assertFalse(os.path.exists(self.junction))
2220
2221
Jason R. Coombs3a092862013-05-27 23:21:28 -04002222@support.skip_unless_symlink
2223class NonLocalSymlinkTests(unittest.TestCase):
2224
2225 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002226 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002227 Create this structure:
2228
2229 base
2230 \___ some_dir
2231 """
2232 os.makedirs('base/some_dir')
2233
2234 def tearDown(self):
2235 shutil.rmtree('base')
2236
2237 def test_directory_link_nonlocal(self):
2238 """
2239 The symlink target should resolve relative to the link, not relative
2240 to the current directory.
2241
2242 Then, link base/some_link -> base/some_dir and ensure that some_link
2243 is resolved as a directory.
2244
2245 In issue13772, it was discovered that directory detection failed if
2246 the symlink target was not specified relative to the current
2247 directory, which was a defect in the implementation.
2248 """
2249 src = os.path.join('base', 'some_link')
2250 os.symlink('some_dir', src)
2251 assert os.path.isdir(src)
2252
2253
Victor Stinnere8d51452010-08-19 01:05:19 +00002254class FSEncodingTests(unittest.TestCase):
2255 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002256 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2257 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002258
Victor Stinnere8d51452010-08-19 01:05:19 +00002259 def test_identity(self):
2260 # assert fsdecode(fsencode(x)) == x
2261 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2262 try:
2263 bytesfn = os.fsencode(fn)
2264 except UnicodeEncodeError:
2265 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002266 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002267
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002268
Brett Cannonefb00c02012-02-29 18:31:31 -05002269
2270class DeviceEncodingTests(unittest.TestCase):
2271
2272 def test_bad_fd(self):
2273 # Return None when an fd doesn't actually exist.
2274 self.assertIsNone(os.device_encoding(123456))
2275
Philip Jenveye308b7c2012-02-29 16:16:15 -08002276 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2277 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002278 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002279 def test_device_encoding(self):
2280 encoding = os.device_encoding(0)
2281 self.assertIsNotNone(encoding)
2282 self.assertTrue(codecs.lookup(encoding))
2283
2284
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002285class PidTests(unittest.TestCase):
2286 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2287 def test_getppid(self):
2288 p = subprocess.Popen([sys.executable, '-c',
2289 'import os; print(os.getppid())'],
2290 stdout=subprocess.PIPE)
2291 stdout, _ = p.communicate()
2292 # We are the parent of our subprocess
2293 self.assertEqual(int(stdout), os.getpid())
2294
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002295 def test_waitpid(self):
2296 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002297 # Add an implicit test for PyUnicode_FSConverter().
2298 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002299 status = os.waitpid(pid, 0)
2300 self.assertEqual(status, (pid, 0))
2301
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002302
Victor Stinner4659ccf2016-09-14 10:57:00 +02002303class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002304 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002305 self.exitcode = 17
2306
2307 filename = support.TESTFN
2308 self.addCleanup(support.unlink, filename)
2309
2310 if not with_env:
2311 code = 'import sys; sys.exit(%s)' % self.exitcode
2312 else:
2313 self.env = dict(os.environ)
2314 # create an unique key
2315 self.key = str(uuid.uuid4())
2316 self.env[self.key] = self.key
2317 # read the variable from os.environ to check that it exists
2318 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2319 % (self.key, self.exitcode))
2320
2321 with open(filename, "w") as fp:
2322 fp.write(code)
2323
Berker Peksag81816462016-09-15 20:19:47 +03002324 args = [sys.executable, filename]
2325 if use_bytes:
2326 args = [os.fsencode(a) for a in args]
2327 self.env = {os.fsencode(k): os.fsencode(v)
2328 for k, v in self.env.items()}
2329
2330 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002331
Berker Peksag4af23d72016-09-15 20:32:44 +03002332 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002333 def test_spawnl(self):
2334 args = self.create_args()
2335 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2336 self.assertEqual(exitcode, self.exitcode)
2337
Berker Peksag4af23d72016-09-15 20:32:44 +03002338 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002339 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002340 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002341 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2342 self.assertEqual(exitcode, self.exitcode)
2343
Berker Peksag4af23d72016-09-15 20:32:44 +03002344 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002345 def test_spawnlp(self):
2346 args = self.create_args()
2347 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2348 self.assertEqual(exitcode, self.exitcode)
2349
Berker Peksag4af23d72016-09-15 20:32:44 +03002350 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002351 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002352 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002353 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2354 self.assertEqual(exitcode, self.exitcode)
2355
Berker Peksag4af23d72016-09-15 20:32:44 +03002356 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002357 def test_spawnv(self):
2358 args = self.create_args()
2359 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2360 self.assertEqual(exitcode, self.exitcode)
2361
Berker Peksag4af23d72016-09-15 20:32:44 +03002362 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002363 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002364 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002365 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2366 self.assertEqual(exitcode, self.exitcode)
2367
Berker Peksag4af23d72016-09-15 20:32:44 +03002368 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002369 def test_spawnvp(self):
2370 args = self.create_args()
2371 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2372 self.assertEqual(exitcode, self.exitcode)
2373
Berker Peksag4af23d72016-09-15 20:32:44 +03002374 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002375 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002376 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002377 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2378 self.assertEqual(exitcode, self.exitcode)
2379
Berker Peksag4af23d72016-09-15 20:32:44 +03002380 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002381 def test_nowait(self):
2382 args = self.create_args()
2383 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2384 result = os.waitpid(pid, 0)
2385 self.assertEqual(result[0], pid)
2386 status = result[1]
2387 if hasattr(os, 'WIFEXITED'):
2388 self.assertTrue(os.WIFEXITED(status))
2389 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2390 else:
2391 self.assertEqual(status, self.exitcode << 8)
2392
Berker Peksag4af23d72016-09-15 20:32:44 +03002393 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002394 def test_spawnve_bytes(self):
2395 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2396 args = self.create_args(with_env=True, use_bytes=True)
2397 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2398 self.assertEqual(exitcode, self.exitcode)
2399
Steve Dower859fd7b2016-11-19 18:53:19 -08002400 @requires_os_func('spawnl')
2401 def test_spawnl_noargs(self):
2402 args = self.create_args()
2403 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002404 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002405
2406 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002407 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002408 args = self.create_args()
2409 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002410 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002411
2412 @requires_os_func('spawnv')
2413 def test_spawnv_noargs(self):
2414 args = self.create_args()
2415 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2416 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002417 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2418 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002419
2420 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002421 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002422 args = self.create_args()
2423 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2424 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002425 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2426 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002427
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002428 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002429 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002430
Ville Skyttä49b27342017-08-03 09:00:59 +03002431 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002432 newenv = os.environ.copy()
2433 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2434 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002435 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002436 except ValueError:
2437 pass
2438 else:
2439 self.assertEqual(exitcode, 127)
2440
Ville Skyttä49b27342017-08-03 09:00:59 +03002441 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002442 newenv = os.environ.copy()
2443 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2444 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002445 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002446 except ValueError:
2447 pass
2448 else:
2449 self.assertEqual(exitcode, 127)
2450
Ville Skyttä49b27342017-08-03 09:00:59 +03002451 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002452 newenv = os.environ.copy()
2453 newenv["FRUIT=ORANGE"] = "lemon"
2454 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002455 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002456 except ValueError:
2457 pass
2458 else:
2459 self.assertEqual(exitcode, 127)
2460
Ville Skyttä49b27342017-08-03 09:00:59 +03002461 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002462 filename = support.TESTFN
2463 self.addCleanup(support.unlink, filename)
2464 with open(filename, "w") as fp:
2465 fp.write('import sys, os\n'
2466 'if os.getenv("FRUIT") != "orange=lemon":\n'
2467 ' raise AssertionError')
2468 args = [sys.executable, filename]
2469 newenv = os.environ.copy()
2470 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002471 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002472 self.assertEqual(exitcode, 0)
2473
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002474 @requires_os_func('spawnve')
2475 def test_spawnve_invalid_env(self):
2476 self._test_invalid_env(os.spawnve)
2477
2478 @requires_os_func('spawnvpe')
2479 def test_spawnvpe_invalid_env(self):
2480 self._test_invalid_env(os.spawnvpe)
2481
Serhiy Storchaka77703942017-06-25 07:33:01 +03002482
Brian Curtin0151b8e2010-09-24 13:43:43 +00002483# The introduction of this TestCase caused at least two different errors on
2484# *nix buildbots. Temporarily skip this to let the buildbots move along.
2485@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002486@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2487class LoginTests(unittest.TestCase):
2488 def test_getlogin(self):
2489 user_name = os.getlogin()
2490 self.assertNotEqual(len(user_name), 0)
2491
2492
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002493@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2494 "needs os.getpriority and os.setpriority")
2495class ProgramPriorityTests(unittest.TestCase):
2496 """Tests for os.getpriority() and os.setpriority()."""
2497
2498 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002499
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002500 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2501 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2502 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002503 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2504 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002505 raise unittest.SkipTest("unable to reliably test setpriority "
2506 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002507 else:
2508 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002509 finally:
2510 try:
2511 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2512 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002513 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002514 raise
2515
2516
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002517class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002518
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002519 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002520
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002521 def __init__(self, conn):
2522 asynchat.async_chat.__init__(self, conn)
2523 self.in_buffer = []
2524 self.closed = False
2525 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002526
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 def handle_read(self):
2528 data = self.recv(4096)
2529 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002530
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002531 def get_data(self):
2532 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002533
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002534 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002535 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002536 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002537
2538 def handle_error(self):
2539 raise
2540
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002541 def __init__(self, address):
2542 threading.Thread.__init__(self)
2543 asyncore.dispatcher.__init__(self)
2544 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2545 self.bind(address)
2546 self.listen(5)
2547 self.host, self.port = self.socket.getsockname()[:2]
2548 self.handler_instance = None
2549 self._active = False
2550 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002551
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002552 # --- public API
2553
2554 @property
2555 def running(self):
2556 return self._active
2557
2558 def start(self):
2559 assert not self.running
2560 self.__flag = threading.Event()
2561 threading.Thread.start(self)
2562 self.__flag.wait()
2563
2564 def stop(self):
2565 assert self.running
2566 self._active = False
2567 self.join()
2568
2569 def wait(self):
2570 # wait for handler connection to be closed, then stop the server
2571 while not getattr(self.handler_instance, "closed", False):
2572 time.sleep(0.001)
2573 self.stop()
2574
2575 # --- internals
2576
2577 def run(self):
2578 self._active = True
2579 self.__flag.set()
2580 while self._active and asyncore.socket_map:
2581 self._active_lock.acquire()
2582 asyncore.loop(timeout=0.001, count=1)
2583 self._active_lock.release()
2584 asyncore.close_all()
2585
2586 def handle_accept(self):
2587 conn, addr = self.accept()
2588 self.handler_instance = self.Handler(conn)
2589
2590 def handle_connect(self):
2591 self.close()
2592 handle_read = handle_connect
2593
2594 def writable(self):
2595 return 0
2596
2597 def handle_error(self):
2598 raise
2599
2600
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002601@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2602class TestSendfile(unittest.TestCase):
2603
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002604 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002605 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002606 not sys.platform.startswith("solaris") and \
2607 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002608 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2609 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002610
2611 @classmethod
2612 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002613 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002614 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002615
2616 @classmethod
2617 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002618 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002619 support.unlink(support.TESTFN)
2620
2621 def setUp(self):
2622 self.server = SendfileTestServer((support.HOST, 0))
2623 self.server.start()
2624 self.client = socket.socket()
2625 self.client.connect((self.server.host, self.server.port))
2626 self.client.settimeout(1)
2627 # synchronize by waiting for "220 ready" response
2628 self.client.recv(1024)
2629 self.sockno = self.client.fileno()
2630 self.file = open(support.TESTFN, 'rb')
2631 self.fileno = self.file.fileno()
2632
2633 def tearDown(self):
2634 self.file.close()
2635 self.client.close()
2636 if self.server.running:
2637 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002638 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002639
2640 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2641 """A higher level wrapper representing how an application is
2642 supposed to use sendfile().
2643 """
2644 while 1:
2645 try:
2646 if self.SUPPORT_HEADERS_TRAILERS:
2647 return os.sendfile(sock, file, offset, nbytes, headers,
2648 trailers)
2649 else:
2650 return os.sendfile(sock, file, offset, nbytes)
2651 except OSError as err:
2652 if err.errno == errno.ECONNRESET:
2653 # disconnected
2654 raise
2655 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2656 # we have to retry send data
2657 continue
2658 else:
2659 raise
2660
2661 def test_send_whole_file(self):
2662 # normal send
2663 total_sent = 0
2664 offset = 0
2665 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002666 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002667 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2668 if sent == 0:
2669 break
2670 offset += sent
2671 total_sent += sent
2672 self.assertTrue(sent <= nbytes)
2673 self.assertEqual(offset, total_sent)
2674
2675 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002676 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002677 self.client.close()
2678 self.server.wait()
2679 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002680 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002681 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002682
2683 def test_send_at_certain_offset(self):
2684 # start sending a file at a certain offset
2685 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002686 offset = len(self.DATA) // 2
2687 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002688 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002689 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002690 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2691 if sent == 0:
2692 break
2693 offset += sent
2694 total_sent += sent
2695 self.assertTrue(sent <= nbytes)
2696
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002697 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002698 self.client.close()
2699 self.server.wait()
2700 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002701 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002702 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002703 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002704 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002705
2706 def test_offset_overflow(self):
2707 # specify an offset > file size
2708 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002709 try:
2710 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2711 except OSError as e:
2712 # Solaris can raise EINVAL if offset >= file length, ignore.
2713 if e.errno != errno.EINVAL:
2714 raise
2715 else:
2716 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002717 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002718 self.client.close()
2719 self.server.wait()
2720 data = self.server.handler_instance.get_data()
2721 self.assertEqual(data, b'')
2722
2723 def test_invalid_offset(self):
2724 with self.assertRaises(OSError) as cm:
2725 os.sendfile(self.sockno, self.fileno, -1, 4096)
2726 self.assertEqual(cm.exception.errno, errno.EINVAL)
2727
Martin Panterbf19d162015-09-09 01:01:13 +00002728 def test_keywords(self):
2729 # Keyword arguments should be supported
2730 os.sendfile(out=self.sockno, offset=0, count=4096,
2731 **{'in': self.fileno})
2732 if self.SUPPORT_HEADERS_TRAILERS:
2733 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002734 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002735
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002736 # --- headers / trailers tests
2737
Serhiy Storchaka43767632013-11-03 21:31:38 +02002738 @requires_headers_trailers
2739 def test_headers(self):
2740 total_sent = 0
2741 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2742 headers=[b"x" * 512])
2743 total_sent += sent
2744 offset = 4096
2745 nbytes = 4096
2746 while 1:
2747 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2748 offset, nbytes)
2749 if sent == 0:
2750 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002751 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002752 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002753
Serhiy Storchaka43767632013-11-03 21:31:38 +02002754 expected_data = b"x" * 512 + self.DATA
2755 self.assertEqual(total_sent, len(expected_data))
2756 self.client.close()
2757 self.server.wait()
2758 data = self.server.handler_instance.get_data()
2759 self.assertEqual(hash(data), hash(expected_data))
2760
2761 @requires_headers_trailers
2762 def test_trailers(self):
2763 TESTFN2 = support.TESTFN + "2"
2764 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002765
2766 self.addCleanup(support.unlink, TESTFN2)
2767 create_file(TESTFN2, file_data)
2768
2769 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002770 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2771 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002772 self.client.close()
2773 self.server.wait()
2774 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002775 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002776
Serhiy Storchaka43767632013-11-03 21:31:38 +02002777 @requires_headers_trailers
2778 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2779 'test needs os.SF_NODISKIO')
2780 def test_flags(self):
2781 try:
2782 os.sendfile(self.sockno, self.fileno, 0, 4096,
2783 flags=os.SF_NODISKIO)
2784 except OSError as err:
2785 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2786 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002787
2788
Larry Hastings9cf065c2012-06-22 16:30:09 -07002789def supports_extended_attributes():
2790 if not hasattr(os, "setxattr"):
2791 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002792
Larry Hastings9cf065c2012-06-22 16:30:09 -07002793 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002794 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002795 try:
2796 os.setxattr(fp.fileno(), b"user.test", b"")
2797 except OSError:
2798 return False
2799 finally:
2800 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002801
2802 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002803
2804
2805@unittest.skipUnless(supports_extended_attributes(),
2806 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002807# Kernels < 2.6.39 don't respect setxattr flags.
2808@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002809class ExtendedAttributeTests(unittest.TestCase):
2810
Larry Hastings9cf065c2012-06-22 16:30:09 -07002811 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002812 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002813 self.addCleanup(support.unlink, fn)
2814 create_file(fn)
2815
Benjamin Peterson799bd802011-08-31 22:15:17 -04002816 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002817 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002818 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002819
Victor Stinnerf12e5062011-10-16 22:12:03 +02002820 init_xattr = listxattr(fn)
2821 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002822
Larry Hastings9cf065c2012-06-22 16:30:09 -07002823 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002824 xattr = set(init_xattr)
2825 xattr.add("user.test")
2826 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002827 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2828 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2829 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002830
Benjamin Peterson799bd802011-08-31 22:15:17 -04002831 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002832 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002833 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002834
Benjamin Peterson799bd802011-08-31 22:15:17 -04002835 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002836 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002837 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002838
Larry Hastings9cf065c2012-06-22 16:30:09 -07002839 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002840 xattr.add("user.test2")
2841 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002842 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002843
Benjamin Peterson799bd802011-08-31 22:15:17 -04002844 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002845 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002846 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002847
Victor Stinnerf12e5062011-10-16 22:12:03 +02002848 xattr.remove("user.test")
2849 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002850 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2851 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2852 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2853 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002854 many = sorted("user.test{}".format(i) for i in range(100))
2855 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002856 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002857 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002858
Larry Hastings9cf065c2012-06-22 16:30:09 -07002859 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002860 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002861 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002862
2863 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2864 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002865
2866 def test_simple(self):
2867 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2868 os.listxattr)
2869
2870 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002871 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2872 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002873
2874 def test_fds(self):
2875 def getxattr(path, *args):
2876 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002877 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002878 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002879 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002880 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002881 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002882 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002883 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002884 def listxattr(path, *args):
2885 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002886 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002887 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2888
2889
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002890@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2891class TermsizeTests(unittest.TestCase):
2892 def test_does_not_crash(self):
2893 """Check if get_terminal_size() returns a meaningful value.
2894
2895 There's no easy portable way to actually check the size of the
2896 terminal, so let's check if it returns something sensible instead.
2897 """
2898 try:
2899 size = os.get_terminal_size()
2900 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002901 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002902 # Under win32 a generic OSError can be thrown if the
2903 # handle cannot be retrieved
2904 self.skipTest("failed to query terminal size")
2905 raise
2906
Antoine Pitroucfade362012-02-08 23:48:59 +01002907 self.assertGreaterEqual(size.columns, 0)
2908 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002909
2910 def test_stty_match(self):
2911 """Check if stty returns the same results
2912
2913 stty actually tests stdin, so get_terminal_size is invoked on
2914 stdin explicitly. If stty succeeded, then get_terminal_size()
2915 should work too.
2916 """
2917 try:
2918 size = subprocess.check_output(['stty', 'size']).decode().split()
2919 except (FileNotFoundError, subprocess.CalledProcessError):
2920 self.skipTest("stty invocation failed")
2921 expected = (int(size[1]), int(size[0])) # reversed order
2922
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002923 try:
2924 actual = os.get_terminal_size(sys.__stdin__.fileno())
2925 except OSError as e:
2926 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2927 # Under win32 a generic OSError can be thrown if the
2928 # handle cannot be retrieved
2929 self.skipTest("failed to query terminal size")
2930 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002931 self.assertEqual(expected, actual)
2932
2933
Victor Stinner292c8352012-10-30 02:17:38 +01002934class OSErrorTests(unittest.TestCase):
2935 def setUp(self):
2936 class Str(str):
2937 pass
2938
Victor Stinnerafe17062012-10-31 22:47:43 +01002939 self.bytes_filenames = []
2940 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002941 if support.TESTFN_UNENCODABLE is not None:
2942 decoded = support.TESTFN_UNENCODABLE
2943 else:
2944 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002945 self.unicode_filenames.append(decoded)
2946 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002947 if support.TESTFN_UNDECODABLE is not None:
2948 encoded = support.TESTFN_UNDECODABLE
2949 else:
2950 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002951 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002952 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002953 self.bytes_filenames.append(memoryview(encoded))
2954
2955 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002956
2957 def test_oserror_filename(self):
2958 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002959 (self.filenames, os.chdir,),
2960 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002961 (self.filenames, os.lstat,),
2962 (self.filenames, os.open, os.O_RDONLY),
2963 (self.filenames, os.rmdir,),
2964 (self.filenames, os.stat,),
2965 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002966 ]
2967 if sys.platform == "win32":
2968 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002969 (self.bytes_filenames, os.rename, b"dst"),
2970 (self.bytes_filenames, os.replace, b"dst"),
2971 (self.unicode_filenames, os.rename, "dst"),
2972 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002973 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002974 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002975 else:
2976 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002977 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002978 (self.filenames, os.rename, "dst"),
2979 (self.filenames, os.replace, "dst"),
2980 ))
2981 if hasattr(os, "chown"):
2982 funcs.append((self.filenames, os.chown, 0, 0))
2983 if hasattr(os, "lchown"):
2984 funcs.append((self.filenames, os.lchown, 0, 0))
2985 if hasattr(os, "truncate"):
2986 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002987 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002988 funcs.append((self.filenames, os.chflags, 0))
2989 if hasattr(os, "lchflags"):
2990 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002991 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002992 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002993 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002994 if sys.platform == "win32":
2995 funcs.append((self.bytes_filenames, os.link, b"dst"))
2996 funcs.append((self.unicode_filenames, os.link, "dst"))
2997 else:
2998 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002999 if hasattr(os, "listxattr"):
3000 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01003001 (self.filenames, os.listxattr,),
3002 (self.filenames, os.getxattr, "user.test"),
3003 (self.filenames, os.setxattr, "user.test", b'user'),
3004 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01003005 ))
3006 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003007 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003008 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003009 if sys.platform == "win32":
3010 funcs.append((self.unicode_filenames, os.readlink,))
3011 else:
3012 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003013
Steve Dowercc16be82016-09-08 10:35:16 -07003014
Victor Stinnerafe17062012-10-31 22:47:43 +01003015 for filenames, func, *func_args in funcs:
3016 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003017 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003018 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003019 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003020 else:
3021 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3022 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003023 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003024 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003025 except UnicodeDecodeError:
3026 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003027 else:
3028 self.fail("No exception thrown by {}".format(func))
3029
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003030class CPUCountTests(unittest.TestCase):
3031 def test_cpu_count(self):
3032 cpus = os.cpu_count()
3033 if cpus is not None:
3034 self.assertIsInstance(cpus, int)
3035 self.assertGreater(cpus, 0)
3036 else:
3037 self.skipTest("Could not determine the number of CPUs")
3038
Victor Stinnerdaf45552013-08-28 00:53:59 +02003039
3040class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003041 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003042 fd = os.open(__file__, os.O_RDONLY)
3043 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003044 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003045
Victor Stinnerdaf45552013-08-28 00:53:59 +02003046 os.set_inheritable(fd, True)
3047 self.assertEqual(os.get_inheritable(fd), True)
3048
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003049 @unittest.skipIf(fcntl is None, "need fcntl")
3050 def test_get_inheritable_cloexec(self):
3051 fd = os.open(__file__, os.O_RDONLY)
3052 self.addCleanup(os.close, fd)
3053 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003054
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003055 # clear FD_CLOEXEC flag
3056 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3057 flags &= ~fcntl.FD_CLOEXEC
3058 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003059
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003060 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003061
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003062 @unittest.skipIf(fcntl is None, "need fcntl")
3063 def test_set_inheritable_cloexec(self):
3064 fd = os.open(__file__, os.O_RDONLY)
3065 self.addCleanup(os.close, fd)
3066 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3067 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003068
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003069 os.set_inheritable(fd, True)
3070 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3071 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003072
Victor Stinnerdaf45552013-08-28 00:53:59 +02003073 def test_open(self):
3074 fd = os.open(__file__, os.O_RDONLY)
3075 self.addCleanup(os.close, fd)
3076 self.assertEqual(os.get_inheritable(fd), False)
3077
3078 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3079 def test_pipe(self):
3080 rfd, wfd = os.pipe()
3081 self.addCleanup(os.close, rfd)
3082 self.addCleanup(os.close, wfd)
3083 self.assertEqual(os.get_inheritable(rfd), False)
3084 self.assertEqual(os.get_inheritable(wfd), False)
3085
3086 def test_dup(self):
3087 fd1 = os.open(__file__, os.O_RDONLY)
3088 self.addCleanup(os.close, fd1)
3089
3090 fd2 = os.dup(fd1)
3091 self.addCleanup(os.close, fd2)
3092 self.assertEqual(os.get_inheritable(fd2), False)
3093
3094 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3095 def test_dup2(self):
3096 fd = os.open(__file__, os.O_RDONLY)
3097 self.addCleanup(os.close, fd)
3098
3099 # inheritable by default
3100 fd2 = os.open(__file__, os.O_RDONLY)
3101 try:
3102 os.dup2(fd, fd2)
3103 self.assertEqual(os.get_inheritable(fd2), True)
3104 finally:
3105 os.close(fd2)
3106
3107 # force non-inheritable
3108 fd3 = os.open(__file__, os.O_RDONLY)
3109 try:
3110 os.dup2(fd, fd3, inheritable=False)
3111 self.assertEqual(os.get_inheritable(fd3), False)
3112 finally:
3113 os.close(fd3)
3114
3115 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3116 def test_openpty(self):
3117 master_fd, slave_fd = os.openpty()
3118 self.addCleanup(os.close, master_fd)
3119 self.addCleanup(os.close, slave_fd)
3120 self.assertEqual(os.get_inheritable(master_fd), False)
3121 self.assertEqual(os.get_inheritable(slave_fd), False)
3122
3123
Brett Cannon3f9183b2016-08-26 14:44:48 -07003124class PathTConverterTests(unittest.TestCase):
3125 # tuples of (function name, allows fd arguments, additional arguments to
3126 # function, cleanup function)
3127 functions = [
3128 ('stat', True, (), None),
3129 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003130 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003131 ('chflags', False, (0,), None),
3132 ('lchflags', False, (0,), None),
3133 ('open', False, (0,), getattr(os, 'close', None)),
3134 ]
3135
3136 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003137 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003138 if os.name == 'nt':
3139 bytes_fspath = bytes_filename = None
3140 else:
3141 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003142 bytes_fspath = _PathLike(bytes_filename)
3143 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003144 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003145 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003146
Brett Cannonec6ce872016-09-06 15:50:29 -07003147 int_fspath = _PathLike(fd)
3148 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003149
3150 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3151 with self.subTest(name=name):
3152 try:
3153 fn = getattr(os, name)
3154 except AttributeError:
3155 continue
3156
Brett Cannon8f96a302016-08-26 19:30:11 -07003157 for path in (str_filename, bytes_filename, str_fspath,
3158 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003159 if path is None:
3160 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003161 with self.subTest(name=name, path=path):
3162 result = fn(path, *extra_args)
3163 if cleanup_fn is not None:
3164 cleanup_fn(result)
3165
3166 with self.assertRaisesRegex(
3167 TypeError, 'should be string, bytes'):
3168 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003169
3170 if allow_fd:
3171 result = fn(fd, *extra_args) # should not fail
3172 if cleanup_fn is not None:
3173 cleanup_fn(result)
3174 else:
3175 with self.assertRaisesRegex(
3176 TypeError,
3177 'os.PathLike'):
3178 fn(fd, *extra_args)
3179
3180
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003181@unittest.skipUnless(hasattr(os, 'get_blocking'),
3182 'needs os.get_blocking() and os.set_blocking()')
3183class BlockingTests(unittest.TestCase):
3184 def test_blocking(self):
3185 fd = os.open(__file__, os.O_RDONLY)
3186 self.addCleanup(os.close, fd)
3187 self.assertEqual(os.get_blocking(fd), True)
3188
3189 os.set_blocking(fd, False)
3190 self.assertEqual(os.get_blocking(fd), False)
3191
3192 os.set_blocking(fd, True)
3193 self.assertEqual(os.get_blocking(fd), True)
3194
3195
Yury Selivanov97e2e062014-09-26 12:33:06 -04003196
3197class ExportsTests(unittest.TestCase):
3198 def test_os_all(self):
3199 self.assertIn('open', os.__all__)
3200 self.assertIn('walk', os.__all__)
3201
3202
Victor Stinner6036e442015-03-08 01:58:04 +01003203class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003204 check_no_resource_warning = support.check_no_resource_warning
3205
Victor Stinner6036e442015-03-08 01:58:04 +01003206 def setUp(self):
3207 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003208 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003209 self.addCleanup(support.rmtree, self.path)
3210 os.mkdir(self.path)
3211
3212 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003213 path = self.bytes_path if isinstance(name, bytes) else self.path
3214 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003215 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003216 return filename
3217
3218 def get_entries(self, names):
3219 entries = dict((entry.name, entry)
3220 for entry in os.scandir(self.path))
3221 self.assertEqual(sorted(entries.keys()), names)
3222 return entries
3223
3224 def assert_stat_equal(self, stat1, stat2, skip_fields):
3225 if skip_fields:
3226 for attr in dir(stat1):
3227 if not attr.startswith("st_"):
3228 continue
3229 if attr in ("st_dev", "st_ino", "st_nlink"):
3230 continue
3231 self.assertEqual(getattr(stat1, attr),
3232 getattr(stat2, attr),
3233 (stat1, stat2, attr))
3234 else:
3235 self.assertEqual(stat1, stat2)
3236
3237 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003238 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003239 self.assertEqual(entry.name, name)
3240 self.assertEqual(entry.path, os.path.join(self.path, name))
3241 self.assertEqual(entry.inode(),
3242 os.stat(entry.path, follow_symlinks=False).st_ino)
3243
3244 entry_stat = os.stat(entry.path)
3245 self.assertEqual(entry.is_dir(),
3246 stat.S_ISDIR(entry_stat.st_mode))
3247 self.assertEqual(entry.is_file(),
3248 stat.S_ISREG(entry_stat.st_mode))
3249 self.assertEqual(entry.is_symlink(),
3250 os.path.islink(entry.path))
3251
3252 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3253 self.assertEqual(entry.is_dir(follow_symlinks=False),
3254 stat.S_ISDIR(entry_lstat.st_mode))
3255 self.assertEqual(entry.is_file(follow_symlinks=False),
3256 stat.S_ISREG(entry_lstat.st_mode))
3257
3258 self.assert_stat_equal(entry.stat(),
3259 entry_stat,
3260 os.name == 'nt' and not is_symlink)
3261 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3262 entry_lstat,
3263 os.name == 'nt')
3264
3265 def test_attributes(self):
3266 link = hasattr(os, 'link')
3267 symlink = support.can_symlink()
3268
3269 dirname = os.path.join(self.path, "dir")
3270 os.mkdir(dirname)
3271 filename = self.create_file("file.txt")
3272 if link:
3273 os.link(filename, os.path.join(self.path, "link_file.txt"))
3274 if symlink:
3275 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3276 target_is_directory=True)
3277 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3278
3279 names = ['dir', 'file.txt']
3280 if link:
3281 names.append('link_file.txt')
3282 if symlink:
3283 names.extend(('symlink_dir', 'symlink_file.txt'))
3284 entries = self.get_entries(names)
3285
3286 entry = entries['dir']
3287 self.check_entry(entry, 'dir', True, False, False)
3288
3289 entry = entries['file.txt']
3290 self.check_entry(entry, 'file.txt', False, True, False)
3291
3292 if link:
3293 entry = entries['link_file.txt']
3294 self.check_entry(entry, 'link_file.txt', False, True, False)
3295
3296 if symlink:
3297 entry = entries['symlink_dir']
3298 self.check_entry(entry, 'symlink_dir', True, False, True)
3299
3300 entry = entries['symlink_file.txt']
3301 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3302
3303 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003304 path = self.bytes_path if isinstance(name, bytes) else self.path
3305 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003306 self.assertEqual(len(entries), 1)
3307
3308 entry = entries[0]
3309 self.assertEqual(entry.name, name)
3310 return entry
3311
Brett Cannon96881cd2016-06-10 14:37:21 -07003312 def create_file_entry(self, name='file.txt'):
3313 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003314 return self.get_entry(os.path.basename(filename))
3315
3316 def test_current_directory(self):
3317 filename = self.create_file()
3318 old_dir = os.getcwd()
3319 try:
3320 os.chdir(self.path)
3321
3322 # call scandir() without parameter: it must list the content
3323 # of the current directory
3324 entries = dict((entry.name, entry) for entry in os.scandir())
3325 self.assertEqual(sorted(entries.keys()),
3326 [os.path.basename(filename)])
3327 finally:
3328 os.chdir(old_dir)
3329
3330 def test_repr(self):
3331 entry = self.create_file_entry()
3332 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3333
Brett Cannon96881cd2016-06-10 14:37:21 -07003334 def test_fspath_protocol(self):
3335 entry = self.create_file_entry()
3336 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3337
3338 def test_fspath_protocol_bytes(self):
3339 bytes_filename = os.fsencode('bytesfile.txt')
3340 bytes_entry = self.create_file_entry(name=bytes_filename)
3341 fspath = os.fspath(bytes_entry)
3342 self.assertIsInstance(fspath, bytes)
3343 self.assertEqual(fspath,
3344 os.path.join(os.fsencode(self.path),bytes_filename))
3345
Victor Stinner6036e442015-03-08 01:58:04 +01003346 def test_removed_dir(self):
3347 path = os.path.join(self.path, 'dir')
3348
3349 os.mkdir(path)
3350 entry = self.get_entry('dir')
3351 os.rmdir(path)
3352
3353 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3354 if os.name == 'nt':
3355 self.assertTrue(entry.is_dir())
3356 self.assertFalse(entry.is_file())
3357 self.assertFalse(entry.is_symlink())
3358 if os.name == 'nt':
3359 self.assertRaises(FileNotFoundError, entry.inode)
3360 # don't fail
3361 entry.stat()
3362 entry.stat(follow_symlinks=False)
3363 else:
3364 self.assertGreater(entry.inode(), 0)
3365 self.assertRaises(FileNotFoundError, entry.stat)
3366 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3367
3368 def test_removed_file(self):
3369 entry = self.create_file_entry()
3370 os.unlink(entry.path)
3371
3372 self.assertFalse(entry.is_dir())
3373 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3374 if os.name == 'nt':
3375 self.assertTrue(entry.is_file())
3376 self.assertFalse(entry.is_symlink())
3377 if os.name == 'nt':
3378 self.assertRaises(FileNotFoundError, entry.inode)
3379 # don't fail
3380 entry.stat()
3381 entry.stat(follow_symlinks=False)
3382 else:
3383 self.assertGreater(entry.inode(), 0)
3384 self.assertRaises(FileNotFoundError, entry.stat)
3385 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3386
3387 def test_broken_symlink(self):
3388 if not support.can_symlink():
3389 return self.skipTest('cannot create symbolic link')
3390
3391 filename = self.create_file("file.txt")
3392 os.symlink(filename,
3393 os.path.join(self.path, "symlink.txt"))
3394 entries = self.get_entries(['file.txt', 'symlink.txt'])
3395 entry = entries['symlink.txt']
3396 os.unlink(filename)
3397
3398 self.assertGreater(entry.inode(), 0)
3399 self.assertFalse(entry.is_dir())
3400 self.assertFalse(entry.is_file()) # broken symlink returns False
3401 self.assertFalse(entry.is_dir(follow_symlinks=False))
3402 self.assertFalse(entry.is_file(follow_symlinks=False))
3403 self.assertTrue(entry.is_symlink())
3404 self.assertRaises(FileNotFoundError, entry.stat)
3405 # don't fail
3406 entry.stat(follow_symlinks=False)
3407
3408 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003409 self.create_file("file.txt")
3410
3411 path_bytes = os.fsencode(self.path)
3412 entries = list(os.scandir(path_bytes))
3413 self.assertEqual(len(entries), 1, entries)
3414 entry = entries[0]
3415
3416 self.assertEqual(entry.name, b'file.txt')
3417 self.assertEqual(entry.path,
3418 os.fsencode(os.path.join(self.path, 'file.txt')))
3419
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003420 def test_bytes_like(self):
3421 self.create_file("file.txt")
3422
3423 for cls in bytearray, memoryview:
3424 path_bytes = cls(os.fsencode(self.path))
3425 with self.assertWarns(DeprecationWarning):
3426 entries = list(os.scandir(path_bytes))
3427 self.assertEqual(len(entries), 1, entries)
3428 entry = entries[0]
3429
3430 self.assertEqual(entry.name, b'file.txt')
3431 self.assertEqual(entry.path,
3432 os.fsencode(os.path.join(self.path, 'file.txt')))
3433 self.assertIs(type(entry.name), bytes)
3434 self.assertIs(type(entry.path), bytes)
3435
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003436 @unittest.skipUnless(os.listdir in os.supports_fd,
3437 'fd support for listdir required for this test.')
3438 def test_fd(self):
3439 self.assertIn(os.scandir, os.supports_fd)
3440 self.create_file('file.txt')
3441 expected_names = ['file.txt']
3442 if support.can_symlink():
3443 os.symlink('file.txt', os.path.join(self.path, 'link'))
3444 expected_names.append('link')
3445
3446 fd = os.open(self.path, os.O_RDONLY)
3447 try:
3448 with os.scandir(fd) as it:
3449 entries = list(it)
3450 names = [entry.name for entry in entries]
3451 self.assertEqual(sorted(names), expected_names)
3452 self.assertEqual(names, os.listdir(fd))
3453 for entry in entries:
3454 self.assertEqual(entry.path, entry.name)
3455 self.assertEqual(os.fspath(entry), entry.name)
3456 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3457 if os.stat in os.supports_dir_fd:
3458 st = os.stat(entry.name, dir_fd=fd)
3459 self.assertEqual(entry.stat(), st)
3460 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3461 self.assertEqual(entry.stat(follow_symlinks=False), st)
3462 finally:
3463 os.close(fd)
3464
Victor Stinner6036e442015-03-08 01:58:04 +01003465 def test_empty_path(self):
3466 self.assertRaises(FileNotFoundError, os.scandir, '')
3467
3468 def test_consume_iterator_twice(self):
3469 self.create_file("file.txt")
3470 iterator = os.scandir(self.path)
3471
3472 entries = list(iterator)
3473 self.assertEqual(len(entries), 1, entries)
3474
3475 # check than consuming the iterator twice doesn't raise exception
3476 entries2 = list(iterator)
3477 self.assertEqual(len(entries2), 0, entries2)
3478
3479 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003480 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003481 self.assertRaises(TypeError, os.scandir, obj)
3482
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003483 def test_close(self):
3484 self.create_file("file.txt")
3485 self.create_file("file2.txt")
3486 iterator = os.scandir(self.path)
3487 next(iterator)
3488 iterator.close()
3489 # multiple closes
3490 iterator.close()
3491 with self.check_no_resource_warning():
3492 del iterator
3493
3494 def test_context_manager(self):
3495 self.create_file("file.txt")
3496 self.create_file("file2.txt")
3497 with os.scandir(self.path) as iterator:
3498 next(iterator)
3499 with self.check_no_resource_warning():
3500 del iterator
3501
3502 def test_context_manager_close(self):
3503 self.create_file("file.txt")
3504 self.create_file("file2.txt")
3505 with os.scandir(self.path) as iterator:
3506 next(iterator)
3507 iterator.close()
3508
3509 def test_context_manager_exception(self):
3510 self.create_file("file.txt")
3511 self.create_file("file2.txt")
3512 with self.assertRaises(ZeroDivisionError):
3513 with os.scandir(self.path) as iterator:
3514 next(iterator)
3515 1/0
3516 with self.check_no_resource_warning():
3517 del iterator
3518
3519 def test_resource_warning(self):
3520 self.create_file("file.txt")
3521 self.create_file("file2.txt")
3522 iterator = os.scandir(self.path)
3523 next(iterator)
3524 with self.assertWarns(ResourceWarning):
3525 del iterator
3526 support.gc_collect()
3527 # exhausted iterator
3528 iterator = os.scandir(self.path)
3529 list(iterator)
3530 with self.check_no_resource_warning():
3531 del iterator
3532
Victor Stinner6036e442015-03-08 01:58:04 +01003533
Ethan Furmancdc08792016-06-02 15:06:09 -07003534class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003535
3536 # Abstracted so it can be overridden to test pure Python implementation
3537 # if a C version is provided.
3538 fspath = staticmethod(os.fspath)
3539
Ethan Furmancdc08792016-06-02 15:06:09 -07003540 def test_return_bytes(self):
3541 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003542 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003543
3544 def test_return_string(self):
3545 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003546 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003547
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003548 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003549 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003550 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003551
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003552 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003553 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3554 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3555
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003556 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003557 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3558 self.assertTrue(issubclass(_PathLike, os.PathLike))
3559 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003560
Ethan Furmancdc08792016-06-02 15:06:09 -07003561 def test_garbage_in_exception_out(self):
3562 vapor = type('blah', (), {})
3563 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003564 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003565
3566 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003567 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003568
Brett Cannon044283a2016-07-15 10:41:49 -07003569 def test_bad_pathlike(self):
3570 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003571 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003572 # __fspath__ attribute that is not callable.
3573 c = type('foo', (), {})
3574 c.__fspath__ = 1
3575 self.assertRaises(TypeError, self.fspath, c())
3576 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003577 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003578 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003579
3580# Only test if the C version is provided, otherwise TestPEP519 already tested
3581# the pure Python implementation.
3582if hasattr(os, "_fspath"):
3583 class TestPEP519PurePython(TestPEP519):
3584
3585 """Explicitly test the pure Python implementation of os.fspath()."""
3586
3587 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003588
3589
Fred Drake2e2be372001-09-20 21:33:42 +00003590if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003591 unittest.main()