blob: 95c74824d7d24e46e1abeadc776f796cd1eb782c [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import re
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020023import subprocess
24import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010025import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54except ImportError:
55 groups = []
56try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
59except ImportError:
60 all_users = []
61try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020063except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020064 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020065
Berker Peksagce643912015-05-06 06:33:17 +030066from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020067from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
87@contextlib.contextmanager
88def ignore_deprecation_warnings(msg_regex, quiet=False):
89 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
90 yield
91
92
Brett Cannonec6ce872016-09-06 15:50:29 -070093class _PathLike(os.PathLike):
94
95 def __init__(self, path=""):
96 self.path = path
97
98 def __str__(self):
99 return str(self.path)
100
101 def __fspath__(self):
102 if isinstance(self.path, BaseException):
103 raise self.path
104 else:
105 return self.path
106
107
Victor Stinnerae39d232016-03-24 17:12:55 +0100108def create_file(filename, content=b'content'):
109 with open(filename, "xb", 0) as fp:
110 fp.write(content)
111
112
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000113# Tests creating TESTFN
114class FileTests(unittest.TestCase):
115 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000116 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000117 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000118 tearDown = setUp
119
120 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000121 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000122 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000123 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000124
Christian Heimesfdab48e2008-01-20 09:06:41 +0000125 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000126 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
127 # We must allocate two consecutive file descriptors, otherwise
128 # it will mess up other file descriptors (perhaps even the three
129 # standard ones).
130 second = os.dup(first)
131 try:
132 retries = 0
133 while second != first + 1:
134 os.close(first)
135 retries += 1
136 if retries > 10:
137 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000138 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000139 first, second = second, os.dup(second)
140 finally:
141 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000142 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000143 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000144 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000145
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000146 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000147 def test_rename(self):
148 path = support.TESTFN
149 old = sys.getrefcount(path)
150 self.assertRaises(TypeError, os.rename, path, 0)
151 new = sys.getrefcount(path)
152 self.assertEqual(old, new)
153
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000154 def test_read(self):
155 with open(support.TESTFN, "w+b") as fobj:
156 fobj.write(b"spam")
157 fobj.flush()
158 fd = fobj.fileno()
159 os.lseek(fd, 0, 0)
160 s = os.read(fd, 4)
161 self.assertEqual(type(s), bytes)
162 self.assertEqual(s, b"spam")
163
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200164 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200165 # Skip the test on 32-bit platforms: the number of bytes must fit in a
166 # Py_ssize_t type
167 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
168 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200169 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
170 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200171 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100172 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200173
174 # Issue #21932: Make sure that os.read() does not raise an
175 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200176 with open(support.TESTFN, "rb") as fp:
177 data = os.read(fp.fileno(), size)
178
179 # The test does not try to read more than 2 GB at once because the
180 # operating system is free to return less bytes than requested.
181 self.assertEqual(data, b'test')
182
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000183 def test_write(self):
184 # os.write() accepts bytes- and buffer-like objects but not strings
185 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
186 self.assertRaises(TypeError, os.write, fd, "beans")
187 os.write(fd, b"bacon\n")
188 os.write(fd, bytearray(b"eggs\n"))
189 os.write(fd, memoryview(b"spam\n"))
190 os.close(fd)
191 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000192 self.assertEqual(fobj.read().splitlines(),
193 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000194
Victor Stinnere0daff12011-03-20 23:36:35 +0100195 def write_windows_console(self, *args):
196 retcode = subprocess.call(args,
197 # use a new console to not flood the test output
198 creationflags=subprocess.CREATE_NEW_CONSOLE,
199 # use a shell to hide the console window (SW_HIDE)
200 shell=True)
201 self.assertEqual(retcode, 0)
202
203 @unittest.skipUnless(sys.platform == 'win32',
204 'test specific to the Windows console')
205 def test_write_windows_console(self):
206 # Issue #11395: the Windows console returns an error (12: not enough
207 # space error) on writing into stdout if stdout mode is binary and the
208 # length is greater than 66,000 bytes (or less, depending on heap
209 # usage).
210 code = "print('x' * 100000)"
211 self.write_windows_console(sys.executable, "-c", code)
212 self.write_windows_console(sys.executable, "-u", "-c", code)
213
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000214 def fdopen_helper(self, *args):
215 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200216 f = os.fdopen(fd, *args)
217 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218
219 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200220 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
221 os.close(fd)
222
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000223 self.fdopen_helper()
224 self.fdopen_helper('r')
225 self.fdopen_helper('r', 100)
226
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100227 def test_replace(self):
228 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100229 self.addCleanup(support.unlink, support.TESTFN)
230 self.addCleanup(support.unlink, TESTFN2)
231
232 create_file(support.TESTFN, b"1")
233 create_file(TESTFN2, b"2")
234
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100235 os.replace(support.TESTFN, TESTFN2)
236 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
237 with open(TESTFN2, 'r') as f:
238 self.assertEqual(f.read(), "1")
239
Martin Panterbf19d162015-09-09 01:01:13 +0000240 def test_open_keywords(self):
241 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
242 dir_fd=None)
243 os.close(f)
244
245 def test_symlink_keywords(self):
246 symlink = support.get_attribute(os, "symlink")
247 try:
248 symlink(src='target', dst=support.TESTFN,
249 target_is_directory=False, dir_fd=None)
250 except (NotImplementedError, OSError):
251 pass # No OS support or unprivileged user
252
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200253
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254# Test attributes on return values from os.*stat* family.
255class StatAttributeTests(unittest.TestCase):
256 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200257 self.fname = support.TESTFN
258 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100259 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000260
Serhiy Storchaka43767632013-11-03 21:31:38 +0200261 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000262 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000263 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264
265 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000266 self.assertEqual(result[stat.ST_SIZE], 3)
267 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000268
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000269 # Make sure all the attributes are there
270 members = dir(result)
271 for name in dir(stat):
272 if name[:3] == 'ST_':
273 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000274 if name.endswith("TIME"):
275 def trunc(x): return int(x)
276 else:
277 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000278 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000279 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000280 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000281
Larry Hastings6fe20b32012-04-19 15:07:49 -0700282 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700283 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700284 for name in 'st_atime st_mtime st_ctime'.split():
285 floaty = int(getattr(result, name) * 100000)
286 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700287 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700288
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000289 try:
290 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200291 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000292 except IndexError:
293 pass
294
295 # Make sure that assignment fails
296 try:
297 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200298 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000299 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 pass
301
302 try:
303 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200304 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000305 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000306 pass
307
308 try:
309 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200310 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000311 except AttributeError:
312 pass
313
314 # Use the stat_result constructor with a too-short tuple.
315 try:
316 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200317 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000318 except TypeError:
319 pass
320
Ezio Melotti42da6632011-03-15 05:18:48 +0200321 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000322 try:
323 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
324 except TypeError:
325 pass
326
Antoine Pitrou38425292010-09-21 18:19:07 +0000327 def test_stat_attributes(self):
328 self.check_stat_attributes(self.fname)
329
330 def test_stat_attributes_bytes(self):
331 try:
332 fname = self.fname.encode(sys.getfilesystemencoding())
333 except UnicodeEncodeError:
334 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700335 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000336
Christian Heimes25827622013-10-12 01:27:08 +0200337 def test_stat_result_pickle(self):
338 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200339 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
340 p = pickle.dumps(result, proto)
341 self.assertIn(b'stat_result', p)
342 if proto < 4:
343 self.assertIn(b'cos\nstat_result\n', p)
344 unpickled = pickle.loads(p)
345 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200346
Serhiy Storchaka43767632013-11-03 21:31:38 +0200347 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000348 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000349 try:
350 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000351 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000352 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000353 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200354 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000355
356 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000357 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000358
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000359 # Make sure all the attributes are there.
360 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
361 'ffree', 'favail', 'flag', 'namemax')
362 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000363 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000364
365 # Make sure that assignment really fails
366 try:
367 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200368 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000369 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000370 pass
371
372 try:
373 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200374 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000375 except AttributeError:
376 pass
377
378 # Use the constructor with a too-short tuple.
379 try:
380 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200381 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000382 except TypeError:
383 pass
384
Ezio Melotti42da6632011-03-15 05:18:48 +0200385 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000386 try:
387 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
388 except TypeError:
389 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000390
Christian Heimes25827622013-10-12 01:27:08 +0200391 @unittest.skipUnless(hasattr(os, 'statvfs'),
392 "need os.statvfs()")
393 def test_statvfs_result_pickle(self):
394 try:
395 result = os.statvfs(self.fname)
396 except OSError as e:
397 # On AtheOS, glibc always returns ENOSYS
398 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200399 self.skipTest('os.statvfs() failed with ENOSYS')
400
Serhiy Storchakabad12572014-12-15 14:03:42 +0200401 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
402 p = pickle.dumps(result, proto)
403 self.assertIn(b'statvfs_result', p)
404 if proto < 4:
405 self.assertIn(b'cos\nstatvfs_result\n', p)
406 unpickled = pickle.loads(p)
407 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200408
Serhiy Storchaka43767632013-11-03 21:31:38 +0200409 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
410 def test_1686475(self):
411 # Verify that an open file can be stat'ed
412 try:
413 os.stat(r"c:\pagefile.sys")
414 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600415 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200416 except OSError as e:
417 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000418
Serhiy Storchaka43767632013-11-03 21:31:38 +0200419 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
420 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
421 def test_15261(self):
422 # Verify that stat'ing a closed fd does not cause crash
423 r, w = os.pipe()
424 try:
425 os.stat(r) # should not raise error
426 finally:
427 os.close(r)
428 os.close(w)
429 with self.assertRaises(OSError) as ctx:
430 os.stat(r)
431 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100432
Zachary Ware63f277b2014-06-19 09:46:37 -0500433 def check_file_attributes(self, result):
434 self.assertTrue(hasattr(result, 'st_file_attributes'))
435 self.assertTrue(isinstance(result.st_file_attributes, int))
436 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
437
438 @unittest.skipUnless(sys.platform == "win32",
439 "st_file_attributes is Win32 specific")
440 def test_file_attributes(self):
441 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
442 result = os.stat(self.fname)
443 self.check_file_attributes(result)
444 self.assertEqual(
445 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
446 0)
447
448 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200449 dirname = support.TESTFN + "dir"
450 os.mkdir(dirname)
451 self.addCleanup(os.rmdir, dirname)
452
453 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500454 self.check_file_attributes(result)
455 self.assertEqual(
456 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
457 stat.FILE_ATTRIBUTE_DIRECTORY)
458
Victor Stinner47aacc82015-06-12 17:26:23 +0200459
460class UtimeTests(unittest.TestCase):
461 def setUp(self):
462 self.dirname = support.TESTFN
463 self.fname = os.path.join(self.dirname, "f1")
464
465 self.addCleanup(support.rmtree, self.dirname)
466 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100467 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200468
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200469 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100470 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200471 os.stat_float_times(state)
472
Victor Stinner47aacc82015-06-12 17:26:23 +0200473 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100474 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200475 old_float_times = os.stat_float_times(-1)
476 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200477
478 os.stat_float_times(True)
479
480 def support_subsecond(self, filename):
481 # Heuristic to check if the filesystem supports timestamp with
482 # subsecond resolution: check if float and int timestamps are different
483 st = os.stat(filename)
484 return ((st.st_atime != st[7])
485 or (st.st_mtime != st[8])
486 or (st.st_ctime != st[9]))
487
488 def _test_utime(self, set_time, filename=None):
489 if not filename:
490 filename = self.fname
491
492 support_subsecond = self.support_subsecond(filename)
493 if support_subsecond:
494 # Timestamp with a resolution of 1 microsecond (10^-6).
495 #
496 # The resolution of the C internal function used by os.utime()
497 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
498 # test with a resolution of 1 ns requires more work:
499 # see the issue #15745.
500 atime_ns = 1002003000 # 1.002003 seconds
501 mtime_ns = 4005006000 # 4.005006 seconds
502 else:
503 # use a resolution of 1 second
504 atime_ns = 5 * 10**9
505 mtime_ns = 8 * 10**9
506
507 set_time(filename, (atime_ns, mtime_ns))
508 st = os.stat(filename)
509
510 if support_subsecond:
511 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
512 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
513 else:
514 self.assertEqual(st.st_atime, atime_ns * 1e-9)
515 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
516 self.assertEqual(st.st_atime_ns, atime_ns)
517 self.assertEqual(st.st_mtime_ns, mtime_ns)
518
519 def test_utime(self):
520 def set_time(filename, ns):
521 # test the ns keyword parameter
522 os.utime(filename, ns=ns)
523 self._test_utime(set_time)
524
525 @staticmethod
526 def ns_to_sec(ns):
527 # Convert a number of nanosecond (int) to a number of seconds (float).
528 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
529 # issue, os.utime() rounds towards minus infinity.
530 return (ns * 1e-9) + 0.5e-9
531
532 def test_utime_by_indexed(self):
533 # pass times as floating point seconds as the second indexed parameter
534 def set_time(filename, ns):
535 atime_ns, mtime_ns = ns
536 atime = self.ns_to_sec(atime_ns)
537 mtime = self.ns_to_sec(mtime_ns)
538 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
539 # or utime(time_t)
540 os.utime(filename, (atime, mtime))
541 self._test_utime(set_time)
542
543 def test_utime_by_times(self):
544 def set_time(filename, ns):
545 atime_ns, mtime_ns = ns
546 atime = self.ns_to_sec(atime_ns)
547 mtime = self.ns_to_sec(mtime_ns)
548 # test the times keyword parameter
549 os.utime(filename, times=(atime, mtime))
550 self._test_utime(set_time)
551
552 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
553 "follow_symlinks support for utime required "
554 "for this test.")
555 def test_utime_nofollow_symlinks(self):
556 def set_time(filename, ns):
557 # use follow_symlinks=False to test utimensat(timespec)
558 # or lutimes(timeval)
559 os.utime(filename, ns=ns, follow_symlinks=False)
560 self._test_utime(set_time)
561
562 @unittest.skipUnless(os.utime in os.supports_fd,
563 "fd support for utime required for this test.")
564 def test_utime_fd(self):
565 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100566 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200567 # use a file descriptor to test futimens(timespec)
568 # or futimes(timeval)
569 os.utime(fp.fileno(), ns=ns)
570 self._test_utime(set_time)
571
572 @unittest.skipUnless(os.utime in os.supports_dir_fd,
573 "dir_fd support for utime required for this test.")
574 def test_utime_dir_fd(self):
575 def set_time(filename, ns):
576 dirname, name = os.path.split(filename)
577 dirfd = os.open(dirname, os.O_RDONLY)
578 try:
579 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
580 os.utime(name, dir_fd=dirfd, ns=ns)
581 finally:
582 os.close(dirfd)
583 self._test_utime(set_time)
584
585 def test_utime_directory(self):
586 def set_time(filename, ns):
587 # test calling os.utime() on a directory
588 os.utime(filename, ns=ns)
589 self._test_utime(set_time, filename=self.dirname)
590
591 def _test_utime_current(self, set_time):
592 # Get the system clock
593 current = time.time()
594
595 # Call os.utime() to set the timestamp to the current system clock
596 set_time(self.fname)
597
598 if not self.support_subsecond(self.fname):
599 delta = 1.0
600 else:
601 # On Windows, the usual resolution of time.time() is 15.6 ms
602 delta = 0.020
603 st = os.stat(self.fname)
604 msg = ("st_time=%r, current=%r, dt=%r"
605 % (st.st_mtime, current, st.st_mtime - current))
606 self.assertAlmostEqual(st.st_mtime, current,
607 delta=delta, msg=msg)
608
609 def test_utime_current(self):
610 def set_time(filename):
611 # Set to the current time in the new way
612 os.utime(self.fname)
613 self._test_utime_current(set_time)
614
615 def test_utime_current_old(self):
616 def set_time(filename):
617 # Set to the current time in the old explicit way.
618 os.utime(self.fname, None)
619 self._test_utime_current(set_time)
620
621 def get_file_system(self, path):
622 if sys.platform == 'win32':
623 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
624 import ctypes
625 kernel32 = ctypes.windll.kernel32
626 buf = ctypes.create_unicode_buffer("", 100)
627 ok = kernel32.GetVolumeInformationW(root, None, 0,
628 None, None, None,
629 buf, len(buf))
630 if ok:
631 return buf.value
632 # return None if the filesystem is unknown
633
634 def test_large_time(self):
635 # Many filesystems are limited to the year 2038. At least, the test
636 # pass with NTFS filesystem.
637 if self.get_file_system(self.dirname) != "NTFS":
638 self.skipTest("requires NTFS")
639
640 large = 5000000000 # some day in 2128
641 os.utime(self.fname, (large, large))
642 self.assertEqual(os.stat(self.fname).st_mtime, large)
643
644 def test_utime_invalid_arguments(self):
645 # seconds and nanoseconds parameters are mutually exclusive
646 with self.assertRaises(ValueError):
647 os.utime(self.fname, (5, 5), ns=(5, 5))
648
649
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000650from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000651
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000652class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000653 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000654 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000655
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000656 def setUp(self):
657 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000658 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000659 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000660 for key, value in self._reference().items():
661 os.environ[key] = value
662
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000663 def tearDown(self):
664 os.environ.clear()
665 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000666 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000667 os.environb.clear()
668 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000669
Christian Heimes90333392007-11-01 19:08:42 +0000670 def _reference(self):
671 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
672
673 def _empty_mapping(self):
674 os.environ.clear()
675 return os.environ
676
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000677 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200678 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
679 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000680 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000681 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300682 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200683 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300684 value = popen.read().strip()
685 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000686
Xavier de Gayed1415312016-07-22 12:15:29 +0200687 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
688 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000689 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200690 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
691 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300692 it = iter(popen)
693 self.assertEqual(next(it), "line1\n")
694 self.assertEqual(next(it), "line2\n")
695 self.assertEqual(next(it), "line3\n")
696 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000697
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000698 # Verify environ keys and values from the OS are of the
699 # correct str type.
700 def test_keyvalue_types(self):
701 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000702 self.assertEqual(type(key), str)
703 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000704
Christian Heimes90333392007-11-01 19:08:42 +0000705 def test_items(self):
706 for key, value in self._reference().items():
707 self.assertEqual(os.environ.get(key), value)
708
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000709 # Issue 7310
710 def test___repr__(self):
711 """Check that the repr() of os.environ looks like environ({...})."""
712 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000713 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
714 '{!r}: {!r}'.format(key, value)
715 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000716
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000717 def test_get_exec_path(self):
718 defpath_list = os.defpath.split(os.pathsep)
719 test_path = ['/monty', '/python', '', '/flying/circus']
720 test_env = {'PATH': os.pathsep.join(test_path)}
721
722 saved_environ = os.environ
723 try:
724 os.environ = dict(test_env)
725 # Test that defaulting to os.environ works.
726 self.assertSequenceEqual(test_path, os.get_exec_path())
727 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
728 finally:
729 os.environ = saved_environ
730
731 # No PATH environment variable
732 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
733 # Empty PATH environment variable
734 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
735 # Supplied PATH environment variable
736 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
737
Victor Stinnerb745a742010-05-18 17:17:23 +0000738 if os.supports_bytes_environ:
739 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000740 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000741 # ignore BytesWarning warning
742 with warnings.catch_warnings(record=True):
743 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000744 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000745 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000746 pass
747 else:
748 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000749
750 # bytes key and/or value
751 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
752 ['abc'])
753 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
754 ['abc'])
755 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
756 ['abc'])
757
758 @unittest.skipUnless(os.supports_bytes_environ,
759 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000760 def test_environb(self):
761 # os.environ -> os.environb
762 value = 'euro\u20ac'
763 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000764 value_bytes = value.encode(sys.getfilesystemencoding(),
765 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000766 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000767 msg = "U+20AC character is not encodable to %s" % (
768 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000769 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000770 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000771 self.assertEqual(os.environ['unicode'], value)
772 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000773
774 # os.environb -> os.environ
775 value = b'\xff'
776 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000777 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000778 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000779 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000780
Charles-François Natali2966f102011-11-26 11:32:46 +0100781 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
782 # #13415).
783 @support.requires_freebsd_version(7)
784 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100785 def test_unset_error(self):
786 if sys.platform == "win32":
787 # an environment variable is limited to 32,767 characters
788 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100789 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100790 else:
791 # "=" is not allowed in a variable name
792 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100793 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100794
Victor Stinner6d101392013-04-14 16:35:04 +0200795 def test_key_type(self):
796 missing = 'missingkey'
797 self.assertNotIn(missing, os.environ)
798
Victor Stinner839e5ea2013-04-14 16:43:03 +0200799 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200800 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200801 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200802 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200803
Victor Stinner839e5ea2013-04-14 16:43:03 +0200804 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200805 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200806 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200807 self.assertTrue(cm.exception.__suppress_context__)
808
Victor Stinner6d101392013-04-14 16:35:04 +0200809
Tim Petersc4e09402003-04-25 07:11:48 +0000810class WalkTests(unittest.TestCase):
811 """Tests for os.walk()."""
812
Victor Stinner0561c532015-03-12 10:28:24 +0100813 # Wrapper to hide minor differences between os.walk and os.fwalk
814 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200815 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200816 if 'follow_symlinks' in kwargs:
817 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200818 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100819
Charles-François Natali7372b062012-02-05 15:15:38 +0100820 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100821 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100822 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000823
824 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000825 # TESTFN/
826 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000827 # tmp1
828 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000829 # tmp2
830 # SUB11/ no kids
831 # SUB2/ a file kid and a dirsymlink kid
832 # tmp3
833 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200834 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000835 # TEST2/
836 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100837 self.walk_path = join(support.TESTFN, "TEST1")
838 self.sub1_path = join(self.walk_path, "SUB1")
839 self.sub11_path = join(self.sub1_path, "SUB11")
840 sub2_path = join(self.walk_path, "SUB2")
841 tmp1_path = join(self.walk_path, "tmp1")
842 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000843 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100844 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000845 t2_path = join(support.TESTFN, "TEST2")
846 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200847 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000848
849 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100850 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000851 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000852 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100853
Guido van Rossumd8faa362007-04-27 19:54:29 +0000854 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100855 with open(path, "x") as f:
856 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000857
Victor Stinner0561c532015-03-12 10:28:24 +0100858 if support.can_symlink():
859 os.symlink(os.path.abspath(t2_path), self.link_path)
860 os.symlink('broken', broken_link_path, True)
Serhiy Storchakaadca8462016-03-08 21:13:35 +0200861 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100862 else:
863 self.sub2_tree = (sub2_path, [], ["tmp3"])
864
865 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000866 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300867 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100868
Tim Petersc4e09402003-04-25 07:11:48 +0000869 self.assertEqual(len(all), 4)
870 # We can't know which order SUB1 and SUB2 will appear in.
871 # Not flipped: TESTFN, SUB1, SUB11, SUB2
872 # flipped: TESTFN, SUB2, SUB1, SUB11
873 flipped = all[0][1][0] != "SUB1"
874 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200875 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100876 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
877 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
878 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
879 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000880
Brett Cannon3f9183b2016-08-26 14:44:48 -0700881 def test_walk_prune(self, walk_path=None):
882 if walk_path is None:
883 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000884 # Prune the search.
885 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700886 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000887 all.append((root, dirs, files))
888 # Don't descend into SUB1.
889 if 'SUB1' in dirs:
890 # Note that this also mutates the dirs we appended to all!
891 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000892
Victor Stinner0561c532015-03-12 10:28:24 +0100893 self.assertEqual(len(all), 2)
894 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700895 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100896
897 all[1][-1].sort()
898 self.assertEqual(all[1], self.sub2_tree)
899
Brett Cannon3f9183b2016-08-26 14:44:48 -0700900 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700901 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700902
Victor Stinner0561c532015-03-12 10:28:24 +0100903 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000904 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100905 all = list(self.walk(self.walk_path, topdown=False))
906
Victor Stinner53b0a412016-03-26 01:12:36 +0100907 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000908 # We can't know which order SUB1 and SUB2 will appear in.
909 # Not flipped: SUB11, SUB1, SUB2, TESTFN
910 # flipped: SUB2, SUB11, SUB1, TESTFN
911 flipped = all[3][1][0] != "SUB1"
912 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200913 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100914 self.assertEqual(all[3],
915 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
916 self.assertEqual(all[flipped],
917 (self.sub11_path, [], []))
918 self.assertEqual(all[flipped + 1],
919 (self.sub1_path, ["SUB11"], ["tmp2"]))
920 self.assertEqual(all[2 - 2 * flipped],
921 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000922
Victor Stinner0561c532015-03-12 10:28:24 +0100923 def test_walk_symlink(self):
924 if not support.can_symlink():
925 self.skipTest("need symlink support")
926
927 # Walk, following symlinks.
928 walk_it = self.walk(self.walk_path, follow_symlinks=True)
929 for root, dirs, files in walk_it:
930 if root == self.link_path:
931 self.assertEqual(dirs, [])
932 self.assertEqual(files, ["tmp4"])
933 break
934 else:
935 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000936
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200937 def test_walk_bad_dir(self):
938 # Walk top-down.
939 errors = []
940 walk_it = self.walk(self.walk_path, onerror=errors.append)
941 root, dirs, files = next(walk_it)
942 self.assertFalse(errors)
943 dir1 = dirs[0]
944 dir1new = dir1 + '.new'
945 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
946 roots = [r for r, d, f in walk_it]
947 self.assertTrue(errors)
948 self.assertNotIn(os.path.join(root, dir1), roots)
949 self.assertNotIn(os.path.join(root, dir1new), roots)
950 for dir2 in dirs[1:]:
951 self.assertIn(os.path.join(root, dir2), roots)
952
Charles-François Natali7372b062012-02-05 15:15:38 +0100953
954@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
955class FwalkTests(WalkTests):
956 """Tests for os.fwalk()."""
957
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200958 def walk(self, top, **kwargs):
959 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100960 yield (root, dirs, files)
961
Larry Hastingsc48fe982012-06-25 04:49:05 -0700962 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
963 """
964 compare with walk() results.
965 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700966 walk_kwargs = walk_kwargs.copy()
967 fwalk_kwargs = fwalk_kwargs.copy()
968 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
969 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
970 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700971
Charles-François Natali7372b062012-02-05 15:15:38 +0100972 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700973 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100974 expected[root] = (set(dirs), set(files))
975
Larry Hastingsc48fe982012-06-25 04:49:05 -0700976 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100977 self.assertIn(root, expected)
978 self.assertEqual(expected[root], (set(dirs), set(files)))
979
Larry Hastingsc48fe982012-06-25 04:49:05 -0700980 def test_compare_to_walk(self):
981 kwargs = {'top': support.TESTFN}
982 self._compare_to_walk(kwargs, kwargs)
983
Charles-François Natali7372b062012-02-05 15:15:38 +0100984 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700985 try:
986 fd = os.open(".", os.O_RDONLY)
987 walk_kwargs = {'top': support.TESTFN}
988 fwalk_kwargs = walk_kwargs.copy()
989 fwalk_kwargs['dir_fd'] = fd
990 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
991 finally:
992 os.close(fd)
993
994 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100995 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700996 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
997 args = support.TESTFN, topdown, None
998 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100999 # check that the FD is valid
1000 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001001 # redundant check
1002 os.stat(rootfd)
1003 # check that listdir() returns consistent information
1004 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001005
1006 def test_fd_leak(self):
1007 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1008 # we both check that calling fwalk() a large number of times doesn't
1009 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1010 minfd = os.dup(1)
1011 os.close(minfd)
1012 for i in range(256):
1013 for x in os.fwalk(support.TESTFN):
1014 pass
1015 newfd = os.dup(1)
1016 self.addCleanup(os.close, newfd)
1017 self.assertEqual(newfd, minfd)
1018
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001019class BytesWalkTests(WalkTests):
1020 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001021 def setUp(self):
1022 super().setUp()
1023 self.stack = contextlib.ExitStack()
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001024
1025 def tearDown(self):
1026 self.stack.close()
1027 super().tearDown()
1028
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001029 def walk(self, top, **kwargs):
1030 if 'follow_symlinks' in kwargs:
1031 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1032 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1033 root = os.fsdecode(broot)
1034 dirs = list(map(os.fsdecode, bdirs))
1035 files = list(map(os.fsdecode, bfiles))
1036 yield (root, dirs, files)
1037 bdirs[:] = list(map(os.fsencode, dirs))
1038 bfiles[:] = list(map(os.fsencode, files))
1039
Charles-François Natali7372b062012-02-05 15:15:38 +01001040
Guido van Rossume7ba4952007-06-06 23:52:48 +00001041class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001042 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001043 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001044
1045 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001046 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001047 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1048 os.makedirs(path) # Should work
1049 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1050 os.makedirs(path)
1051
1052 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001053 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001054 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1055 os.makedirs(path)
1056 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1057 'dir5', 'dir6')
1058 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001059
Terry Reedy5a22b652010-12-02 07:05:56 +00001060 def test_exist_ok_existing_directory(self):
1061 path = os.path.join(support.TESTFN, 'dir1')
1062 mode = 0o777
1063 old_mask = os.umask(0o022)
1064 os.makedirs(path, mode)
1065 self.assertRaises(OSError, os.makedirs, path, mode)
1066 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001067 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001068 os.makedirs(path, mode=mode, exist_ok=True)
1069 os.umask(old_mask)
1070
Martin Pantera82642f2015-11-19 04:48:44 +00001071 # Issue #25583: A drive root could raise PermissionError on Windows
1072 os.makedirs(os.path.abspath('/'), exist_ok=True)
1073
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001074 def test_exist_ok_s_isgid_directory(self):
1075 path = os.path.join(support.TESTFN, 'dir1')
1076 S_ISGID = stat.S_ISGID
1077 mode = 0o777
1078 old_mask = os.umask(0o022)
1079 try:
1080 existing_testfn_mode = stat.S_IMODE(
1081 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001082 try:
1083 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001084 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001085 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001086 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1087 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1088 # The os should apply S_ISGID from the parent dir for us, but
1089 # this test need not depend on that behavior. Be explicit.
1090 os.makedirs(path, mode | S_ISGID)
1091 # http://bugs.python.org/issue14992
1092 # Should not fail when the bit is already set.
1093 os.makedirs(path, mode, exist_ok=True)
1094 # remove the bit.
1095 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001096 # May work even when the bit is not already set when demanded.
1097 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001098 finally:
1099 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001100
1101 def test_exist_ok_existing_regular_file(self):
1102 base = support.TESTFN
1103 path = os.path.join(support.TESTFN, 'dir1')
1104 f = open(path, 'w')
1105 f.write('abc')
1106 f.close()
1107 self.assertRaises(OSError, os.makedirs, path)
1108 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1109 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1110 os.remove(path)
1111
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001112 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001113 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001114 'dir4', 'dir5', 'dir6')
1115 # If the tests failed, the bottom-most directory ('../dir6')
1116 # may not have been created, so we look for the outermost directory
1117 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001118 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001119 path = os.path.dirname(path)
1120
1121 os.removedirs(path)
1122
Andrew Svetlov405faed2012-12-25 12:18:09 +02001123
R David Murrayf2ad1732014-12-25 18:36:56 -05001124@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1125class ChownFileTests(unittest.TestCase):
1126
Berker Peksag036a71b2015-07-21 09:29:48 +03001127 @classmethod
1128 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001129 os.mkdir(support.TESTFN)
1130
1131 def test_chown_uid_gid_arguments_must_be_index(self):
1132 stat = os.stat(support.TESTFN)
1133 uid = stat.st_uid
1134 gid = stat.st_gid
1135 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1136 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1137 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1138 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1139 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1140
1141 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1142 def test_chown(self):
1143 gid_1, gid_2 = groups[:2]
1144 uid = os.stat(support.TESTFN).st_uid
1145 os.chown(support.TESTFN, uid, gid_1)
1146 gid = os.stat(support.TESTFN).st_gid
1147 self.assertEqual(gid, gid_1)
1148 os.chown(support.TESTFN, uid, gid_2)
1149 gid = os.stat(support.TESTFN).st_gid
1150 self.assertEqual(gid, gid_2)
1151
1152 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1153 "test needs root privilege and more than one user")
1154 def test_chown_with_root(self):
1155 uid_1, uid_2 = all_users[:2]
1156 gid = os.stat(support.TESTFN).st_gid
1157 os.chown(support.TESTFN, uid_1, gid)
1158 uid = os.stat(support.TESTFN).st_uid
1159 self.assertEqual(uid, uid_1)
1160 os.chown(support.TESTFN, uid_2, gid)
1161 uid = os.stat(support.TESTFN).st_uid
1162 self.assertEqual(uid, uid_2)
1163
1164 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1165 "test needs non-root account and more than one user")
1166 def test_chown_without_permission(self):
1167 uid_1, uid_2 = all_users[:2]
1168 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001169 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001170 os.chown(support.TESTFN, uid_1, gid)
1171 os.chown(support.TESTFN, uid_2, gid)
1172
Berker Peksag036a71b2015-07-21 09:29:48 +03001173 @classmethod
1174 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001175 os.rmdir(support.TESTFN)
1176
1177
Andrew Svetlov405faed2012-12-25 12:18:09 +02001178class RemoveDirsTests(unittest.TestCase):
1179 def setUp(self):
1180 os.makedirs(support.TESTFN)
1181
1182 def tearDown(self):
1183 support.rmtree(support.TESTFN)
1184
1185 def test_remove_all(self):
1186 dira = os.path.join(support.TESTFN, 'dira')
1187 os.mkdir(dira)
1188 dirb = os.path.join(dira, 'dirb')
1189 os.mkdir(dirb)
1190 os.removedirs(dirb)
1191 self.assertFalse(os.path.exists(dirb))
1192 self.assertFalse(os.path.exists(dira))
1193 self.assertFalse(os.path.exists(support.TESTFN))
1194
1195 def test_remove_partial(self):
1196 dira = os.path.join(support.TESTFN, 'dira')
1197 os.mkdir(dira)
1198 dirb = os.path.join(dira, 'dirb')
1199 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001200 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001201 os.removedirs(dirb)
1202 self.assertFalse(os.path.exists(dirb))
1203 self.assertTrue(os.path.exists(dira))
1204 self.assertTrue(os.path.exists(support.TESTFN))
1205
1206 def test_remove_nothing(self):
1207 dira = os.path.join(support.TESTFN, 'dira')
1208 os.mkdir(dira)
1209 dirb = os.path.join(dira, 'dirb')
1210 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001211 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001212 with self.assertRaises(OSError):
1213 os.removedirs(dirb)
1214 self.assertTrue(os.path.exists(dirb))
1215 self.assertTrue(os.path.exists(dira))
1216 self.assertTrue(os.path.exists(support.TESTFN))
1217
1218
Guido van Rossume7ba4952007-06-06 23:52:48 +00001219class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001220 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001221 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001222 f.write(b'hello')
1223 f.close()
1224 with open(os.devnull, 'rb') as f:
1225 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001226
Andrew Svetlov405faed2012-12-25 12:18:09 +02001227
Guido van Rossume7ba4952007-06-06 23:52:48 +00001228class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001229 def test_urandom_length(self):
1230 self.assertEqual(len(os.urandom(0)), 0)
1231 self.assertEqual(len(os.urandom(1)), 1)
1232 self.assertEqual(len(os.urandom(10)), 10)
1233 self.assertEqual(len(os.urandom(100)), 100)
1234 self.assertEqual(len(os.urandom(1000)), 1000)
1235
1236 def test_urandom_value(self):
1237 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001238 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001239 data2 = os.urandom(16)
1240 self.assertNotEqual(data1, data2)
1241
1242 def get_urandom_subprocess(self, count):
1243 code = '\n'.join((
1244 'import os, sys',
1245 'data = os.urandom(%s)' % count,
1246 'sys.stdout.buffer.write(data)',
1247 'sys.stdout.buffer.flush()'))
1248 out = assert_python_ok('-c', code)
1249 stdout = out[1]
1250 self.assertEqual(len(stdout), 16)
1251 return stdout
1252
1253 def test_urandom_subprocess(self):
1254 data1 = self.get_urandom_subprocess(16)
1255 data2 = self.get_urandom_subprocess(16)
1256 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001257
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001258
Victor Stinner9b1f4742016-09-06 16:18:52 -07001259@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1260class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001261 @classmethod
1262 def setUpClass(cls):
1263 try:
1264 os.getrandom(1)
1265 except OSError as exc:
1266 if exc.errno == errno.ENOSYS:
1267 # Python compiled on a more recent Linux version
1268 # than the current Linux kernel
1269 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1270 else:
1271 raise
1272
Victor Stinner9b1f4742016-09-06 16:18:52 -07001273 def test_getrandom_type(self):
1274 data = os.getrandom(16)
1275 self.assertIsInstance(data, bytes)
1276 self.assertEqual(len(data), 16)
1277
1278 def test_getrandom0(self):
1279 empty = os.getrandom(0)
1280 self.assertEqual(empty, b'')
1281
1282 def test_getrandom_random(self):
1283 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1284
1285 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1286 # resource /dev/random
1287
1288 def test_getrandom_nonblock(self):
1289 # The call must not fail. Check also that the flag exists
1290 try:
1291 os.getrandom(1, os.GRND_NONBLOCK)
1292 except BlockingIOError:
1293 # System urandom is not initialized yet
1294 pass
1295
1296 def test_getrandom_value(self):
1297 data1 = os.getrandom(16)
1298 data2 = os.getrandom(16)
1299 self.assertNotEqual(data1, data2)
1300
1301
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001302# os.urandom() doesn't use a file descriptor when it is implemented with the
1303# getentropy() function, the getrandom() function or the getrandom() syscall
1304OS_URANDOM_DONT_USE_FD = (
1305 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1306 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1307 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001308
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001309@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1310 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001311class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001312 @unittest.skipUnless(resource, "test requires the resource module")
1313 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001314 # Check urandom() failing when it is not able to open /dev/random.
1315 # We spawn a new process to make the test more robust (if getrlimit()
1316 # failed to restore the file descriptor limit after this, the whole
1317 # test suite would crash; this actually happened on the OS X Tiger
1318 # buildbot).
1319 code = """if 1:
1320 import errno
1321 import os
1322 import resource
1323
1324 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1325 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1326 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001327 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001328 except OSError as e:
1329 assert e.errno == errno.EMFILE, e.errno
1330 else:
1331 raise AssertionError("OSError not raised")
1332 """
1333 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001334
Antoine Pitroue472aea2014-04-26 14:33:03 +02001335 def test_urandom_fd_closed(self):
1336 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1337 # closed.
1338 code = """if 1:
1339 import os
1340 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001341 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001342 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001343 with test.support.SuppressCrashReport():
1344 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001345 sys.stdout.buffer.write(os.urandom(4))
1346 """
1347 rc, out, err = assert_python_ok('-Sc', code)
1348
1349 def test_urandom_fd_reopened(self):
1350 # Issue #21207: urandom() should detect its fd to /dev/urandom
1351 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001352 self.addCleanup(support.unlink, support.TESTFN)
1353 create_file(support.TESTFN, b"x" * 256)
1354
Antoine Pitroue472aea2014-04-26 14:33:03 +02001355 code = """if 1:
1356 import os
1357 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001358 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001359 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001360 with test.support.SuppressCrashReport():
1361 for fd in range(3, 256):
1362 try:
1363 os.close(fd)
1364 except OSError:
1365 pass
1366 else:
1367 # Found the urandom fd (XXX hopefully)
1368 break
1369 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001370 with open({TESTFN!r}, 'rb') as f:
1371 os.dup2(f.fileno(), fd)
1372 sys.stdout.buffer.write(os.urandom(4))
1373 sys.stdout.buffer.write(os.urandom(4))
1374 """.format(TESTFN=support.TESTFN)
1375 rc, out, err = assert_python_ok('-Sc', code)
1376 self.assertEqual(len(out), 8)
1377 self.assertNotEqual(out[0:4], out[4:8])
1378 rc, out2, err2 = assert_python_ok('-Sc', code)
1379 self.assertEqual(len(out2), 8)
1380 self.assertNotEqual(out2, out)
1381
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001382
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001383@contextlib.contextmanager
1384def _execvpe_mockup(defpath=None):
1385 """
1386 Stubs out execv and execve functions when used as context manager.
1387 Records exec calls. The mock execv and execve functions always raise an
1388 exception as they would normally never return.
1389 """
1390 # A list of tuples containing (function name, first arg, args)
1391 # of calls to execv or execve that have been made.
1392 calls = []
1393
1394 def mock_execv(name, *args):
1395 calls.append(('execv', name, args))
1396 raise RuntimeError("execv called")
1397
1398 def mock_execve(name, *args):
1399 calls.append(('execve', name, args))
1400 raise OSError(errno.ENOTDIR, "execve called")
1401
1402 try:
1403 orig_execv = os.execv
1404 orig_execve = os.execve
1405 orig_defpath = os.defpath
1406 os.execv = mock_execv
1407 os.execve = mock_execve
1408 if defpath is not None:
1409 os.defpath = defpath
1410 yield calls
1411 finally:
1412 os.execv = orig_execv
1413 os.execve = orig_execve
1414 os.defpath = orig_defpath
1415
Guido van Rossume7ba4952007-06-06 23:52:48 +00001416class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001417 @unittest.skipIf(USING_LINUXTHREADS,
1418 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001419 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001420 self.assertRaises(OSError, os.execvpe, 'no such app-',
1421 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001422
Thomas Heller6790d602007-08-30 17:15:14 +00001423 def test_execvpe_with_bad_arglist(self):
1424 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1425
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001426 @unittest.skipUnless(hasattr(os, '_execvpe'),
1427 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001428 def _test_internal_execvpe(self, test_type):
1429 program_path = os.sep + 'absolutepath'
1430 if test_type is bytes:
1431 program = b'executable'
1432 fullpath = os.path.join(os.fsencode(program_path), program)
1433 native_fullpath = fullpath
1434 arguments = [b'progname', 'arg1', 'arg2']
1435 else:
1436 program = 'executable'
1437 arguments = ['progname', 'arg1', 'arg2']
1438 fullpath = os.path.join(program_path, program)
1439 if os.name != "nt":
1440 native_fullpath = os.fsencode(fullpath)
1441 else:
1442 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001443 env = {'spam': 'beans'}
1444
Victor Stinnerb745a742010-05-18 17:17:23 +00001445 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001446 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001447 self.assertRaises(RuntimeError,
1448 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001449 self.assertEqual(len(calls), 1)
1450 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1451
Victor Stinnerb745a742010-05-18 17:17:23 +00001452 # test os._execvpe() with a relative path:
1453 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001454 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001455 self.assertRaises(OSError,
1456 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001457 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001458 self.assertSequenceEqual(calls[0],
1459 ('execve', native_fullpath, (arguments, env)))
1460
1461 # test os._execvpe() with a relative path:
1462 # os.get_exec_path() reads the 'PATH' variable
1463 with _execvpe_mockup() as calls:
1464 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001465 if test_type is bytes:
1466 env_path[b'PATH'] = program_path
1467 else:
1468 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001469 self.assertRaises(OSError,
1470 os._execvpe, program, arguments, env=env_path)
1471 self.assertEqual(len(calls), 1)
1472 self.assertSequenceEqual(calls[0],
1473 ('execve', native_fullpath, (arguments, env_path)))
1474
1475 def test_internal_execvpe_str(self):
1476 self._test_internal_execvpe(str)
1477 if os.name != "nt":
1478 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001479
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001480
Serhiy Storchaka43767632013-11-03 21:31:38 +02001481@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001482class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001483 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001484 try:
1485 os.stat(support.TESTFN)
1486 except FileNotFoundError:
1487 exists = False
1488 except OSError as exc:
1489 exists = True
1490 self.fail("file %s must not exist; os.stat failed with %s"
1491 % (support.TESTFN, exc))
1492 else:
1493 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001494
Thomas Wouters477c8d52006-05-27 19:21:47 +00001495 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001496 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001497
1498 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001499 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001500
1501 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001502 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001503
1504 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001505 self.addCleanup(support.unlink, support.TESTFN)
1506
Victor Stinnere77c9742016-03-25 10:28:23 +01001507 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001508 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001509
1510 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001511 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001512
Thomas Wouters477c8d52006-05-27 19:21:47 +00001513 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001514 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001515
Victor Stinnere77c9742016-03-25 10:28:23 +01001516
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001517class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001518 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001519 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1520 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001521 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001522 def get_single(f):
1523 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001524 if hasattr(os, f):
1525 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001526 return helper
1527 for f in singles:
1528 locals()["test_"+f] = get_single(f)
1529
Benjamin Peterson7522c742009-01-19 21:00:09 +00001530 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001531 try:
1532 f(support.make_bad_fd(), *args)
1533 except OSError as e:
1534 self.assertEqual(e.errno, errno.EBADF)
1535 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001536 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001537 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001538
Serhiy Storchaka43767632013-11-03 21:31:38 +02001539 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001540 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001541 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001542
Serhiy Storchaka43767632013-11-03 21:31:38 +02001543 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001544 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001545 fd = support.make_bad_fd()
1546 # Make sure none of the descriptors we are about to close are
1547 # currently valid (issue 6542).
1548 for i in range(10):
1549 try: os.fstat(fd+i)
1550 except OSError:
1551 pass
1552 else:
1553 break
1554 if i < 2:
1555 raise unittest.SkipTest(
1556 "Unable to acquire a range of invalid file descriptors")
1557 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001558
Serhiy Storchaka43767632013-11-03 21:31:38 +02001559 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001560 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001561 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001562
Serhiy Storchaka43767632013-11-03 21:31:38 +02001563 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001564 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001565 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001566
Serhiy Storchaka43767632013-11-03 21:31:38 +02001567 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001568 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001569 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001570
Serhiy Storchaka43767632013-11-03 21:31:38 +02001571 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001572 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001573 self.check(os.pathconf, "PC_NAME_MAX")
1574 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001575
Serhiy Storchaka43767632013-11-03 21:31:38 +02001576 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001577 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001578 self.check(os.truncate, 0)
1579 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001580
Serhiy Storchaka43767632013-11-03 21:31:38 +02001581 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001582 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001583 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001584
Serhiy Storchaka43767632013-11-03 21:31:38 +02001585 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001586 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001587 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001588
Victor Stinner57ddf782014-01-08 15:21:28 +01001589 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1590 def test_readv(self):
1591 buf = bytearray(10)
1592 self.check(os.readv, [buf])
1593
Serhiy Storchaka43767632013-11-03 21:31:38 +02001594 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001595 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001596 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001597
Serhiy Storchaka43767632013-11-03 21:31:38 +02001598 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001599 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001600 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001601
Victor Stinner57ddf782014-01-08 15:21:28 +01001602 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1603 def test_writev(self):
1604 self.check(os.writev, [b'abc'])
1605
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001606 def test_inheritable(self):
1607 self.check(os.get_inheritable)
1608 self.check(os.set_inheritable, True)
1609
1610 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1611 'needs os.get_blocking() and os.set_blocking()')
1612 def test_blocking(self):
1613 self.check(os.get_blocking)
1614 self.check(os.set_blocking, True)
1615
Brian Curtin1b9df392010-11-24 20:24:31 +00001616
1617class LinkTests(unittest.TestCase):
1618 def setUp(self):
1619 self.file1 = support.TESTFN
1620 self.file2 = os.path.join(support.TESTFN + "2")
1621
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001622 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001623 for file in (self.file1, self.file2):
1624 if os.path.exists(file):
1625 os.unlink(file)
1626
Brian Curtin1b9df392010-11-24 20:24:31 +00001627 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001628 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001629
Steve Dowercc16be82016-09-08 10:35:16 -07001630 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001631 with open(file1, "r") as f1, open(file2, "r") as f2:
1632 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1633
1634 def test_link(self):
1635 self._test_link(self.file1, self.file2)
1636
1637 def test_link_bytes(self):
1638 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1639 bytes(self.file2, sys.getfilesystemencoding()))
1640
Brian Curtinf498b752010-11-30 15:54:04 +00001641 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001642 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001643 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001644 except UnicodeError:
1645 raise unittest.SkipTest("Unable to encode for this platform.")
1646
Brian Curtinf498b752010-11-30 15:54:04 +00001647 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001648 self.file2 = self.file1 + "2"
1649 self._test_link(self.file1, self.file2)
1650
Serhiy Storchaka43767632013-11-03 21:31:38 +02001651@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1652class PosixUidGidTests(unittest.TestCase):
1653 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1654 def test_setuid(self):
1655 if os.getuid() != 0:
1656 self.assertRaises(OSError, os.setuid, 0)
1657 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001658
Serhiy Storchaka43767632013-11-03 21:31:38 +02001659 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1660 def test_setgid(self):
1661 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1662 self.assertRaises(OSError, os.setgid, 0)
1663 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001664
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1666 def test_seteuid(self):
1667 if os.getuid() != 0:
1668 self.assertRaises(OSError, os.seteuid, 0)
1669 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001670
Serhiy Storchaka43767632013-11-03 21:31:38 +02001671 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1672 def test_setegid(self):
1673 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1674 self.assertRaises(OSError, os.setegid, 0)
1675 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001676
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1678 def test_setreuid(self):
1679 if os.getuid() != 0:
1680 self.assertRaises(OSError, os.setreuid, 0, 0)
1681 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1682 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001683
Serhiy Storchaka43767632013-11-03 21:31:38 +02001684 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1685 def test_setreuid_neg1(self):
1686 # Needs to accept -1. We run this in a subprocess to avoid
1687 # altering the test runner's process state (issue8045).
1688 subprocess.check_call([
1689 sys.executable, '-c',
1690 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001691
Serhiy Storchaka43767632013-11-03 21:31:38 +02001692 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1693 def test_setregid(self):
1694 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1695 self.assertRaises(OSError, os.setregid, 0, 0)
1696 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1697 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001698
Serhiy Storchaka43767632013-11-03 21:31:38 +02001699 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1700 def test_setregid_neg1(self):
1701 # Needs to accept -1. We run this in a subprocess to avoid
1702 # altering the test runner's process state (issue8045).
1703 subprocess.check_call([
1704 sys.executable, '-c',
1705 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001706
Serhiy Storchaka43767632013-11-03 21:31:38 +02001707@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1708class Pep383Tests(unittest.TestCase):
1709 def setUp(self):
1710 if support.TESTFN_UNENCODABLE:
1711 self.dir = support.TESTFN_UNENCODABLE
1712 elif support.TESTFN_NONASCII:
1713 self.dir = support.TESTFN_NONASCII
1714 else:
1715 self.dir = support.TESTFN
1716 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001717
Serhiy Storchaka43767632013-11-03 21:31:38 +02001718 bytesfn = []
1719 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001720 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001721 fn = os.fsencode(fn)
1722 except UnicodeEncodeError:
1723 return
1724 bytesfn.append(fn)
1725 add_filename(support.TESTFN_UNICODE)
1726 if support.TESTFN_UNENCODABLE:
1727 add_filename(support.TESTFN_UNENCODABLE)
1728 if support.TESTFN_NONASCII:
1729 add_filename(support.TESTFN_NONASCII)
1730 if not bytesfn:
1731 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001732
Serhiy Storchaka43767632013-11-03 21:31:38 +02001733 self.unicodefn = set()
1734 os.mkdir(self.dir)
1735 try:
1736 for fn in bytesfn:
1737 support.create_empty_file(os.path.join(self.bdir, fn))
1738 fn = os.fsdecode(fn)
1739 if fn in self.unicodefn:
1740 raise ValueError("duplicate filename")
1741 self.unicodefn.add(fn)
1742 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001743 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001744 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001745
Serhiy Storchaka43767632013-11-03 21:31:38 +02001746 def tearDown(self):
1747 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001748
Serhiy Storchaka43767632013-11-03 21:31:38 +02001749 def test_listdir(self):
1750 expected = self.unicodefn
1751 found = set(os.listdir(self.dir))
1752 self.assertEqual(found, expected)
1753 # test listdir without arguments
1754 current_directory = os.getcwd()
1755 try:
1756 os.chdir(os.sep)
1757 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1758 finally:
1759 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001760
Serhiy Storchaka43767632013-11-03 21:31:38 +02001761 def test_open(self):
1762 for fn in self.unicodefn:
1763 f = open(os.path.join(self.dir, fn), 'rb')
1764 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001765
Serhiy Storchaka43767632013-11-03 21:31:38 +02001766 @unittest.skipUnless(hasattr(os, 'statvfs'),
1767 "need os.statvfs()")
1768 def test_statvfs(self):
1769 # issue #9645
1770 for fn in self.unicodefn:
1771 # should not fail with file not found error
1772 fullname = os.path.join(self.dir, fn)
1773 os.statvfs(fullname)
1774
1775 def test_stat(self):
1776 for fn in self.unicodefn:
1777 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001778
Brian Curtineb24d742010-04-12 17:16:38 +00001779@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1780class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001781 def _kill(self, sig):
1782 # Start sys.executable as a subprocess and communicate from the
1783 # subprocess to the parent that the interpreter is ready. When it
1784 # becomes ready, send *sig* via os.kill to the subprocess and check
1785 # that the return code is equal to *sig*.
1786 import ctypes
1787 from ctypes import wintypes
1788 import msvcrt
1789
1790 # Since we can't access the contents of the process' stdout until the
1791 # process has exited, use PeekNamedPipe to see what's inside stdout
1792 # without waiting. This is done so we can tell that the interpreter
1793 # is started and running at a point where it could handle a signal.
1794 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1795 PeekNamedPipe.restype = wintypes.BOOL
1796 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1797 ctypes.POINTER(ctypes.c_char), # stdout buf
1798 wintypes.DWORD, # Buffer size
1799 ctypes.POINTER(wintypes.DWORD), # bytes read
1800 ctypes.POINTER(wintypes.DWORD), # bytes avail
1801 ctypes.POINTER(wintypes.DWORD)) # bytes left
1802 msg = "running"
1803 proc = subprocess.Popen([sys.executable, "-c",
1804 "import sys;"
1805 "sys.stdout.write('{}');"
1806 "sys.stdout.flush();"
1807 "input()".format(msg)],
1808 stdout=subprocess.PIPE,
1809 stderr=subprocess.PIPE,
1810 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001811 self.addCleanup(proc.stdout.close)
1812 self.addCleanup(proc.stderr.close)
1813 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001814
1815 count, max = 0, 100
1816 while count < max and proc.poll() is None:
1817 # Create a string buffer to store the result of stdout from the pipe
1818 buf = ctypes.create_string_buffer(len(msg))
1819 # Obtain the text currently in proc.stdout
1820 # Bytes read/avail/left are left as NULL and unused
1821 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1822 buf, ctypes.sizeof(buf), None, None, None)
1823 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1824 if buf.value:
1825 self.assertEqual(msg, buf.value.decode())
1826 break
1827 time.sleep(0.1)
1828 count += 1
1829 else:
1830 self.fail("Did not receive communication from the subprocess")
1831
Brian Curtineb24d742010-04-12 17:16:38 +00001832 os.kill(proc.pid, sig)
1833 self.assertEqual(proc.wait(), sig)
1834
1835 def test_kill_sigterm(self):
1836 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001837 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001838
1839 def test_kill_int(self):
1840 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001841 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001842
1843 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001844 tagname = "test_os_%s" % uuid.uuid1()
1845 m = mmap.mmap(-1, 1, tagname)
1846 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001847 # Run a script which has console control handling enabled.
1848 proc = subprocess.Popen([sys.executable,
1849 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001850 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001851 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1852 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001853 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001854 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001855 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001856 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001857 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001858 count += 1
1859 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001860 # Forcefully kill the process if we weren't able to signal it.
1861 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001862 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001863 os.kill(proc.pid, event)
1864 # proc.send_signal(event) could also be done here.
1865 # Allow time for the signal to be passed and the process to exit.
1866 time.sleep(0.5)
1867 if not proc.poll():
1868 # Forcefully kill the process if we weren't able to signal it.
1869 os.kill(proc.pid, signal.SIGINT)
1870 self.fail("subprocess did not stop on {}".format(name))
1871
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001872 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001873 def test_CTRL_C_EVENT(self):
1874 from ctypes import wintypes
1875 import ctypes
1876
1877 # Make a NULL value by creating a pointer with no argument.
1878 NULL = ctypes.POINTER(ctypes.c_int)()
1879 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1880 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1881 wintypes.BOOL)
1882 SetConsoleCtrlHandler.restype = wintypes.BOOL
1883
1884 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001885 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001886 # by subprocesses.
1887 SetConsoleCtrlHandler(NULL, 0)
1888
1889 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1890
1891 def test_CTRL_BREAK_EVENT(self):
1892 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1893
1894
Brian Curtind40e6f72010-07-08 21:39:08 +00001895@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001896class Win32ListdirTests(unittest.TestCase):
1897 """Test listdir on Windows."""
1898
1899 def setUp(self):
1900 self.created_paths = []
1901 for i in range(2):
1902 dir_name = 'SUB%d' % i
1903 dir_path = os.path.join(support.TESTFN, dir_name)
1904 file_name = 'FILE%d' % i
1905 file_path = os.path.join(support.TESTFN, file_name)
1906 os.makedirs(dir_path)
1907 with open(file_path, 'w') as f:
1908 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1909 self.created_paths.extend([dir_name, file_name])
1910 self.created_paths.sort()
1911
1912 def tearDown(self):
1913 shutil.rmtree(support.TESTFN)
1914
1915 def test_listdir_no_extended_path(self):
1916 """Test when the path is not an "extended" path."""
1917 # unicode
1918 self.assertEqual(
1919 sorted(os.listdir(support.TESTFN)),
1920 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001921
Tim Golden781bbeb2013-10-25 20:24:06 +01001922 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07001923 self.assertEqual(
1924 sorted(os.listdir(os.fsencode(support.TESTFN))),
1925 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001926
1927 def test_listdir_extended_path(self):
1928 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001929 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001930 # unicode
1931 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1932 self.assertEqual(
1933 sorted(os.listdir(path)),
1934 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001935
Tim Golden781bbeb2013-10-25 20:24:06 +01001936 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07001937 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1938 self.assertEqual(
1939 sorted(os.listdir(path)),
1940 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001941
1942
1943@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001944@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001945class Win32SymlinkTests(unittest.TestCase):
1946 filelink = 'filelinktest'
1947 filelink_target = os.path.abspath(__file__)
1948 dirlink = 'dirlinktest'
1949 dirlink_target = os.path.dirname(filelink_target)
1950 missing_link = 'missing link'
1951
1952 def setUp(self):
1953 assert os.path.exists(self.dirlink_target)
1954 assert os.path.exists(self.filelink_target)
1955 assert not os.path.exists(self.dirlink)
1956 assert not os.path.exists(self.filelink)
1957 assert not os.path.exists(self.missing_link)
1958
1959 def tearDown(self):
1960 if os.path.exists(self.filelink):
1961 os.remove(self.filelink)
1962 if os.path.exists(self.dirlink):
1963 os.rmdir(self.dirlink)
1964 if os.path.lexists(self.missing_link):
1965 os.remove(self.missing_link)
1966
1967 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001968 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001969 self.assertTrue(os.path.exists(self.dirlink))
1970 self.assertTrue(os.path.isdir(self.dirlink))
1971 self.assertTrue(os.path.islink(self.dirlink))
1972 self.check_stat(self.dirlink, self.dirlink_target)
1973
1974 def test_file_link(self):
1975 os.symlink(self.filelink_target, self.filelink)
1976 self.assertTrue(os.path.exists(self.filelink))
1977 self.assertTrue(os.path.isfile(self.filelink))
1978 self.assertTrue(os.path.islink(self.filelink))
1979 self.check_stat(self.filelink, self.filelink_target)
1980
1981 def _create_missing_dir_link(self):
1982 'Create a "directory" link to a non-existent target'
1983 linkname = self.missing_link
1984 if os.path.lexists(linkname):
1985 os.remove(linkname)
1986 target = r'c:\\target does not exist.29r3c740'
1987 assert not os.path.exists(target)
1988 target_is_dir = True
1989 os.symlink(target, linkname, target_is_dir)
1990
1991 def test_remove_directory_link_to_missing_target(self):
1992 self._create_missing_dir_link()
1993 # For compatibility with Unix, os.remove will check the
1994 # directory status and call RemoveDirectory if the symlink
1995 # was created with target_is_dir==True.
1996 os.remove(self.missing_link)
1997
1998 @unittest.skip("currently fails; consider for improvement")
1999 def test_isdir_on_directory_link_to_missing_target(self):
2000 self._create_missing_dir_link()
2001 # consider having isdir return true for directory links
2002 self.assertTrue(os.path.isdir(self.missing_link))
2003
2004 @unittest.skip("currently fails; consider for improvement")
2005 def test_rmdir_on_directory_link_to_missing_target(self):
2006 self._create_missing_dir_link()
2007 # consider allowing rmdir to remove directory links
2008 os.rmdir(self.missing_link)
2009
2010 def check_stat(self, link, target):
2011 self.assertEqual(os.stat(link), os.stat(target))
2012 self.assertNotEqual(os.lstat(link), os.stat(link))
2013
Brian Curtind25aef52011-06-13 15:16:04 -05002014 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002015 self.assertEqual(os.stat(bytes_link), os.stat(target))
2016 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002017
2018 def test_12084(self):
2019 level1 = os.path.abspath(support.TESTFN)
2020 level2 = os.path.join(level1, "level2")
2021 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002022 self.addCleanup(support.rmtree, level1)
2023
2024 os.mkdir(level1)
2025 os.mkdir(level2)
2026 os.mkdir(level3)
2027
2028 file1 = os.path.abspath(os.path.join(level1, "file1"))
2029 create_file(file1)
2030
2031 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002032 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002033 os.chdir(level2)
2034 link = os.path.join(level2, "link")
2035 os.symlink(os.path.relpath(file1), "link")
2036 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002037
Victor Stinnerae39d232016-03-24 17:12:55 +01002038 # Check os.stat calls from the same dir as the link
2039 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002040
Victor Stinnerae39d232016-03-24 17:12:55 +01002041 # Check os.stat calls from a dir below the link
2042 os.chdir(level1)
2043 self.assertEqual(os.stat(file1),
2044 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002045
Victor Stinnerae39d232016-03-24 17:12:55 +01002046 # Check os.stat calls from a dir above the link
2047 os.chdir(level3)
2048 self.assertEqual(os.stat(file1),
2049 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002050 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002051 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002052
Brian Curtind40e6f72010-07-08 21:39:08 +00002053
Tim Golden0321cf22014-05-05 19:46:17 +01002054@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2055class Win32JunctionTests(unittest.TestCase):
2056 junction = 'junctiontest'
2057 junction_target = os.path.dirname(os.path.abspath(__file__))
2058
2059 def setUp(self):
2060 assert os.path.exists(self.junction_target)
2061 assert not os.path.exists(self.junction)
2062
2063 def tearDown(self):
2064 if os.path.exists(self.junction):
2065 # os.rmdir delegates to Windows' RemoveDirectoryW,
2066 # which removes junction points safely.
2067 os.rmdir(self.junction)
2068
2069 def test_create_junction(self):
2070 _winapi.CreateJunction(self.junction_target, self.junction)
2071 self.assertTrue(os.path.exists(self.junction))
2072 self.assertTrue(os.path.isdir(self.junction))
2073
2074 # Junctions are not recognized as links.
2075 self.assertFalse(os.path.islink(self.junction))
2076
2077 def test_unlink_removes_junction(self):
2078 _winapi.CreateJunction(self.junction_target, self.junction)
2079 self.assertTrue(os.path.exists(self.junction))
2080
2081 os.unlink(self.junction)
2082 self.assertFalse(os.path.exists(self.junction))
2083
2084
Jason R. Coombs3a092862013-05-27 23:21:28 -04002085@support.skip_unless_symlink
2086class NonLocalSymlinkTests(unittest.TestCase):
2087
2088 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002089 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002090 Create this structure:
2091
2092 base
2093 \___ some_dir
2094 """
2095 os.makedirs('base/some_dir')
2096
2097 def tearDown(self):
2098 shutil.rmtree('base')
2099
2100 def test_directory_link_nonlocal(self):
2101 """
2102 The symlink target should resolve relative to the link, not relative
2103 to the current directory.
2104
2105 Then, link base/some_link -> base/some_dir and ensure that some_link
2106 is resolved as a directory.
2107
2108 In issue13772, it was discovered that directory detection failed if
2109 the symlink target was not specified relative to the current
2110 directory, which was a defect in the implementation.
2111 """
2112 src = os.path.join('base', 'some_link')
2113 os.symlink('some_dir', src)
2114 assert os.path.isdir(src)
2115
2116
Victor Stinnere8d51452010-08-19 01:05:19 +00002117class FSEncodingTests(unittest.TestCase):
2118 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002119 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2120 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002121
Victor Stinnere8d51452010-08-19 01:05:19 +00002122 def test_identity(self):
2123 # assert fsdecode(fsencode(x)) == x
2124 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2125 try:
2126 bytesfn = os.fsencode(fn)
2127 except UnicodeEncodeError:
2128 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002129 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002130
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002131
Brett Cannonefb00c02012-02-29 18:31:31 -05002132
2133class DeviceEncodingTests(unittest.TestCase):
2134
2135 def test_bad_fd(self):
2136 # Return None when an fd doesn't actually exist.
2137 self.assertIsNone(os.device_encoding(123456))
2138
Philip Jenveye308b7c2012-02-29 16:16:15 -08002139 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2140 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002141 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002142 def test_device_encoding(self):
2143 encoding = os.device_encoding(0)
2144 self.assertIsNotNone(encoding)
2145 self.assertTrue(codecs.lookup(encoding))
2146
2147
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002148class PidTests(unittest.TestCase):
2149 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2150 def test_getppid(self):
2151 p = subprocess.Popen([sys.executable, '-c',
2152 'import os; print(os.getppid())'],
2153 stdout=subprocess.PIPE)
2154 stdout, _ = p.communicate()
2155 # We are the parent of our subprocess
2156 self.assertEqual(int(stdout), os.getpid())
2157
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002158 def test_waitpid(self):
2159 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002160 # Add an implicit test for PyUnicode_FSConverter().
2161 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002162 status = os.waitpid(pid, 0)
2163 self.assertEqual(status, (pid, 0))
2164
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002165
Brian Curtin0151b8e2010-09-24 13:43:43 +00002166# The introduction of this TestCase caused at least two different errors on
2167# *nix buildbots. Temporarily skip this to let the buildbots move along.
2168@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002169@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2170class LoginTests(unittest.TestCase):
2171 def test_getlogin(self):
2172 user_name = os.getlogin()
2173 self.assertNotEqual(len(user_name), 0)
2174
2175
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002176@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2177 "needs os.getpriority and os.setpriority")
2178class ProgramPriorityTests(unittest.TestCase):
2179 """Tests for os.getpriority() and os.setpriority()."""
2180
2181 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002182
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002183 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2184 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2185 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002186 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2187 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002188 raise unittest.SkipTest("unable to reliably test setpriority "
2189 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002190 else:
2191 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002192 finally:
2193 try:
2194 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2195 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002196 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002197 raise
2198
2199
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002200if threading is not None:
2201 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002202
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002203 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002204
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002205 def __init__(self, conn):
2206 asynchat.async_chat.__init__(self, conn)
2207 self.in_buffer = []
2208 self.closed = False
2209 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002210
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002211 def handle_read(self):
2212 data = self.recv(4096)
2213 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002214
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002215 def get_data(self):
2216 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002217
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002218 def handle_close(self):
2219 self.close()
2220 self.closed = True
2221
2222 def handle_error(self):
2223 raise
2224
2225 def __init__(self, address):
2226 threading.Thread.__init__(self)
2227 asyncore.dispatcher.__init__(self)
2228 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2229 self.bind(address)
2230 self.listen(5)
2231 self.host, self.port = self.socket.getsockname()[:2]
2232 self.handler_instance = None
2233 self._active = False
2234 self._active_lock = threading.Lock()
2235
2236 # --- public API
2237
2238 @property
2239 def running(self):
2240 return self._active
2241
2242 def start(self):
2243 assert not self.running
2244 self.__flag = threading.Event()
2245 threading.Thread.start(self)
2246 self.__flag.wait()
2247
2248 def stop(self):
2249 assert self.running
2250 self._active = False
2251 self.join()
2252
2253 def wait(self):
2254 # wait for handler connection to be closed, then stop the server
2255 while not getattr(self.handler_instance, "closed", False):
2256 time.sleep(0.001)
2257 self.stop()
2258
2259 # --- internals
2260
2261 def run(self):
2262 self._active = True
2263 self.__flag.set()
2264 while self._active and asyncore.socket_map:
2265 self._active_lock.acquire()
2266 asyncore.loop(timeout=0.001, count=1)
2267 self._active_lock.release()
2268 asyncore.close_all()
2269
2270 def handle_accept(self):
2271 conn, addr = self.accept()
2272 self.handler_instance = self.Handler(conn)
2273
2274 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002275 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002276 handle_read = handle_connect
2277
2278 def writable(self):
2279 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002280
2281 def handle_error(self):
2282 raise
2283
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002284
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002285@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002286@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2287class TestSendfile(unittest.TestCase):
2288
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002289 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002290 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002291 not sys.platform.startswith("solaris") and \
2292 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002293 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2294 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002295
2296 @classmethod
2297 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002298 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002299 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002300
2301 @classmethod
2302 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002303 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002304 support.unlink(support.TESTFN)
2305
2306 def setUp(self):
2307 self.server = SendfileTestServer((support.HOST, 0))
2308 self.server.start()
2309 self.client = socket.socket()
2310 self.client.connect((self.server.host, self.server.port))
2311 self.client.settimeout(1)
2312 # synchronize by waiting for "220 ready" response
2313 self.client.recv(1024)
2314 self.sockno = self.client.fileno()
2315 self.file = open(support.TESTFN, 'rb')
2316 self.fileno = self.file.fileno()
2317
2318 def tearDown(self):
2319 self.file.close()
2320 self.client.close()
2321 if self.server.running:
2322 self.server.stop()
2323
2324 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2325 """A higher level wrapper representing how an application is
2326 supposed to use sendfile().
2327 """
2328 while 1:
2329 try:
2330 if self.SUPPORT_HEADERS_TRAILERS:
2331 return os.sendfile(sock, file, offset, nbytes, headers,
2332 trailers)
2333 else:
2334 return os.sendfile(sock, file, offset, nbytes)
2335 except OSError as err:
2336 if err.errno == errno.ECONNRESET:
2337 # disconnected
2338 raise
2339 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2340 # we have to retry send data
2341 continue
2342 else:
2343 raise
2344
2345 def test_send_whole_file(self):
2346 # normal send
2347 total_sent = 0
2348 offset = 0
2349 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002350 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002351 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2352 if sent == 0:
2353 break
2354 offset += sent
2355 total_sent += sent
2356 self.assertTrue(sent <= nbytes)
2357 self.assertEqual(offset, total_sent)
2358
2359 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002360 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002361 self.client.close()
2362 self.server.wait()
2363 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002364 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002365 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002366
2367 def test_send_at_certain_offset(self):
2368 # start sending a file at a certain offset
2369 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002370 offset = len(self.DATA) // 2
2371 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002372 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002373 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002374 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2375 if sent == 0:
2376 break
2377 offset += sent
2378 total_sent += sent
2379 self.assertTrue(sent <= nbytes)
2380
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002381 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002382 self.client.close()
2383 self.server.wait()
2384 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002385 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002386 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002387 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002388 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002389
2390 def test_offset_overflow(self):
2391 # specify an offset > file size
2392 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002393 try:
2394 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2395 except OSError as e:
2396 # Solaris can raise EINVAL if offset >= file length, ignore.
2397 if e.errno != errno.EINVAL:
2398 raise
2399 else:
2400 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002401 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002402 self.client.close()
2403 self.server.wait()
2404 data = self.server.handler_instance.get_data()
2405 self.assertEqual(data, b'')
2406
2407 def test_invalid_offset(self):
2408 with self.assertRaises(OSError) as cm:
2409 os.sendfile(self.sockno, self.fileno, -1, 4096)
2410 self.assertEqual(cm.exception.errno, errno.EINVAL)
2411
Martin Panterbf19d162015-09-09 01:01:13 +00002412 def test_keywords(self):
2413 # Keyword arguments should be supported
2414 os.sendfile(out=self.sockno, offset=0, count=4096,
2415 **{'in': self.fileno})
2416 if self.SUPPORT_HEADERS_TRAILERS:
2417 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002418 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002419
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002420 # --- headers / trailers tests
2421
Serhiy Storchaka43767632013-11-03 21:31:38 +02002422 @requires_headers_trailers
2423 def test_headers(self):
2424 total_sent = 0
2425 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2426 headers=[b"x" * 512])
2427 total_sent += sent
2428 offset = 4096
2429 nbytes = 4096
2430 while 1:
2431 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2432 offset, nbytes)
2433 if sent == 0:
2434 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002435 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002436 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002437
Serhiy Storchaka43767632013-11-03 21:31:38 +02002438 expected_data = b"x" * 512 + self.DATA
2439 self.assertEqual(total_sent, len(expected_data))
2440 self.client.close()
2441 self.server.wait()
2442 data = self.server.handler_instance.get_data()
2443 self.assertEqual(hash(data), hash(expected_data))
2444
2445 @requires_headers_trailers
2446 def test_trailers(self):
2447 TESTFN2 = support.TESTFN + "2"
2448 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002449
2450 self.addCleanup(support.unlink, TESTFN2)
2451 create_file(TESTFN2, file_data)
2452
2453 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002454 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2455 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002456 self.client.close()
2457 self.server.wait()
2458 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002459 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002460
Serhiy Storchaka43767632013-11-03 21:31:38 +02002461 @requires_headers_trailers
2462 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2463 'test needs os.SF_NODISKIO')
2464 def test_flags(self):
2465 try:
2466 os.sendfile(self.sockno, self.fileno, 0, 4096,
2467 flags=os.SF_NODISKIO)
2468 except OSError as err:
2469 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2470 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002471
2472
Larry Hastings9cf065c2012-06-22 16:30:09 -07002473def supports_extended_attributes():
2474 if not hasattr(os, "setxattr"):
2475 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002476
Larry Hastings9cf065c2012-06-22 16:30:09 -07002477 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002478 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002479 try:
2480 os.setxattr(fp.fileno(), b"user.test", b"")
2481 except OSError:
2482 return False
2483 finally:
2484 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002485
2486 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002487
2488
2489@unittest.skipUnless(supports_extended_attributes(),
2490 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002491# Kernels < 2.6.39 don't respect setxattr flags.
2492@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002493class ExtendedAttributeTests(unittest.TestCase):
2494
Larry Hastings9cf065c2012-06-22 16:30:09 -07002495 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002496 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002497 self.addCleanup(support.unlink, fn)
2498 create_file(fn)
2499
Benjamin Peterson799bd802011-08-31 22:15:17 -04002500 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002501 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002502 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002503
Victor Stinnerf12e5062011-10-16 22:12:03 +02002504 init_xattr = listxattr(fn)
2505 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002506
Larry Hastings9cf065c2012-06-22 16:30:09 -07002507 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002508 xattr = set(init_xattr)
2509 xattr.add("user.test")
2510 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002511 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2512 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2513 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002514
Benjamin Peterson799bd802011-08-31 22:15:17 -04002515 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002516 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002517 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002518
Benjamin Peterson799bd802011-08-31 22:15:17 -04002519 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002520 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002521 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002522
Larry Hastings9cf065c2012-06-22 16:30:09 -07002523 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002524 xattr.add("user.test2")
2525 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002526 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002527
Benjamin Peterson799bd802011-08-31 22:15:17 -04002528 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002529 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002530 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002531
Victor Stinnerf12e5062011-10-16 22:12:03 +02002532 xattr.remove("user.test")
2533 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002534 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2535 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2536 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2537 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002538 many = sorted("user.test{}".format(i) for i in range(100))
2539 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002540 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002541 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002542
Larry Hastings9cf065c2012-06-22 16:30:09 -07002543 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002544 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002545 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002546
2547 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2548 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002549
2550 def test_simple(self):
2551 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2552 os.listxattr)
2553
2554 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002555 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2556 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002557
2558 def test_fds(self):
2559 def getxattr(path, *args):
2560 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002561 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002562 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002563 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002564 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002565 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002566 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002567 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002568 def listxattr(path, *args):
2569 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002570 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002571 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2572
2573
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002574@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2575class TermsizeTests(unittest.TestCase):
2576 def test_does_not_crash(self):
2577 """Check if get_terminal_size() returns a meaningful value.
2578
2579 There's no easy portable way to actually check the size of the
2580 terminal, so let's check if it returns something sensible instead.
2581 """
2582 try:
2583 size = os.get_terminal_size()
2584 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002585 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002586 # Under win32 a generic OSError can be thrown if the
2587 # handle cannot be retrieved
2588 self.skipTest("failed to query terminal size")
2589 raise
2590
Antoine Pitroucfade362012-02-08 23:48:59 +01002591 self.assertGreaterEqual(size.columns, 0)
2592 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002593
2594 def test_stty_match(self):
2595 """Check if stty returns the same results
2596
2597 stty actually tests stdin, so get_terminal_size is invoked on
2598 stdin explicitly. If stty succeeded, then get_terminal_size()
2599 should work too.
2600 """
2601 try:
2602 size = subprocess.check_output(['stty', 'size']).decode().split()
2603 except (FileNotFoundError, subprocess.CalledProcessError):
2604 self.skipTest("stty invocation failed")
2605 expected = (int(size[1]), int(size[0])) # reversed order
2606
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002607 try:
2608 actual = os.get_terminal_size(sys.__stdin__.fileno())
2609 except OSError as e:
2610 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2611 # Under win32 a generic OSError can be thrown if the
2612 # handle cannot be retrieved
2613 self.skipTest("failed to query terminal size")
2614 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002615 self.assertEqual(expected, actual)
2616
2617
Victor Stinner292c8352012-10-30 02:17:38 +01002618class OSErrorTests(unittest.TestCase):
2619 def setUp(self):
2620 class Str(str):
2621 pass
2622
Victor Stinnerafe17062012-10-31 22:47:43 +01002623 self.bytes_filenames = []
2624 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002625 if support.TESTFN_UNENCODABLE is not None:
2626 decoded = support.TESTFN_UNENCODABLE
2627 else:
2628 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002629 self.unicode_filenames.append(decoded)
2630 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002631 if support.TESTFN_UNDECODABLE is not None:
2632 encoded = support.TESTFN_UNDECODABLE
2633 else:
2634 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002635 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002636 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002637 self.bytes_filenames.append(memoryview(encoded))
2638
2639 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002640
2641 def test_oserror_filename(self):
2642 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002643 (self.filenames, os.chdir,),
2644 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002645 (self.filenames, os.lstat,),
2646 (self.filenames, os.open, os.O_RDONLY),
2647 (self.filenames, os.rmdir,),
2648 (self.filenames, os.stat,),
2649 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002650 ]
2651 if sys.platform == "win32":
2652 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002653 (self.bytes_filenames, os.rename, b"dst"),
2654 (self.bytes_filenames, os.replace, b"dst"),
2655 (self.unicode_filenames, os.rename, "dst"),
2656 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002657 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002658 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002659 else:
2660 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002661 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002662 (self.filenames, os.rename, "dst"),
2663 (self.filenames, os.replace, "dst"),
2664 ))
2665 if hasattr(os, "chown"):
2666 funcs.append((self.filenames, os.chown, 0, 0))
2667 if hasattr(os, "lchown"):
2668 funcs.append((self.filenames, os.lchown, 0, 0))
2669 if hasattr(os, "truncate"):
2670 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002671 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002672 funcs.append((self.filenames, os.chflags, 0))
2673 if hasattr(os, "lchflags"):
2674 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002675 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002676 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002677 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002678 if sys.platform == "win32":
2679 funcs.append((self.bytes_filenames, os.link, b"dst"))
2680 funcs.append((self.unicode_filenames, os.link, "dst"))
2681 else:
2682 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002683 if hasattr(os, "listxattr"):
2684 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002685 (self.filenames, os.listxattr,),
2686 (self.filenames, os.getxattr, "user.test"),
2687 (self.filenames, os.setxattr, "user.test", b'user'),
2688 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002689 ))
2690 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002691 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002692 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002693 if sys.platform == "win32":
2694 funcs.append((self.unicode_filenames, os.readlink,))
2695 else:
2696 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002697
Steve Dowercc16be82016-09-08 10:35:16 -07002698
Victor Stinnerafe17062012-10-31 22:47:43 +01002699 for filenames, func, *func_args in funcs:
2700 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002701 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002702 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01002703 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002704 else:
2705 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2706 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002707 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07002708 self.assertIs(err.filename, name, str(func))
2709 except RuntimeError as err:
2710 if sys.platform != 'win32':
2711 raise
2712
2713 # issue27781: undecodable bytes currently raise RuntimeError
2714 # by 3.6.0b4 this will become UnicodeDecodeError or nothing
2715 self.assertIsInstance(err.__context__, UnicodeDecodeError)
Victor Stinner292c8352012-10-30 02:17:38 +01002716 else:
2717 self.fail("No exception thrown by {}".format(func))
2718
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002719class CPUCountTests(unittest.TestCase):
2720 def test_cpu_count(self):
2721 cpus = os.cpu_count()
2722 if cpus is not None:
2723 self.assertIsInstance(cpus, int)
2724 self.assertGreater(cpus, 0)
2725 else:
2726 self.skipTest("Could not determine the number of CPUs")
2727
Victor Stinnerdaf45552013-08-28 00:53:59 +02002728
2729class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002730 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002731 fd = os.open(__file__, os.O_RDONLY)
2732 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002733 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002734
Victor Stinnerdaf45552013-08-28 00:53:59 +02002735 os.set_inheritable(fd, True)
2736 self.assertEqual(os.get_inheritable(fd), True)
2737
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002738 @unittest.skipIf(fcntl is None, "need fcntl")
2739 def test_get_inheritable_cloexec(self):
2740 fd = os.open(__file__, os.O_RDONLY)
2741 self.addCleanup(os.close, fd)
2742 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002743
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002744 # clear FD_CLOEXEC flag
2745 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2746 flags &= ~fcntl.FD_CLOEXEC
2747 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002748
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002749 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002750
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002751 @unittest.skipIf(fcntl is None, "need fcntl")
2752 def test_set_inheritable_cloexec(self):
2753 fd = os.open(__file__, os.O_RDONLY)
2754 self.addCleanup(os.close, fd)
2755 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2756 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002757
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002758 os.set_inheritable(fd, True)
2759 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2760 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002761
Victor Stinnerdaf45552013-08-28 00:53:59 +02002762 def test_open(self):
2763 fd = os.open(__file__, os.O_RDONLY)
2764 self.addCleanup(os.close, fd)
2765 self.assertEqual(os.get_inheritable(fd), False)
2766
2767 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2768 def test_pipe(self):
2769 rfd, wfd = os.pipe()
2770 self.addCleanup(os.close, rfd)
2771 self.addCleanup(os.close, wfd)
2772 self.assertEqual(os.get_inheritable(rfd), False)
2773 self.assertEqual(os.get_inheritable(wfd), False)
2774
2775 def test_dup(self):
2776 fd1 = os.open(__file__, os.O_RDONLY)
2777 self.addCleanup(os.close, fd1)
2778
2779 fd2 = os.dup(fd1)
2780 self.addCleanup(os.close, fd2)
2781 self.assertEqual(os.get_inheritable(fd2), False)
2782
2783 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2784 def test_dup2(self):
2785 fd = os.open(__file__, os.O_RDONLY)
2786 self.addCleanup(os.close, fd)
2787
2788 # inheritable by default
2789 fd2 = os.open(__file__, os.O_RDONLY)
2790 try:
2791 os.dup2(fd, fd2)
2792 self.assertEqual(os.get_inheritable(fd2), True)
2793 finally:
2794 os.close(fd2)
2795
2796 # force non-inheritable
2797 fd3 = os.open(__file__, os.O_RDONLY)
2798 try:
2799 os.dup2(fd, fd3, inheritable=False)
2800 self.assertEqual(os.get_inheritable(fd3), False)
2801 finally:
2802 os.close(fd3)
2803
2804 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2805 def test_openpty(self):
2806 master_fd, slave_fd = os.openpty()
2807 self.addCleanup(os.close, master_fd)
2808 self.addCleanup(os.close, slave_fd)
2809 self.assertEqual(os.get_inheritable(master_fd), False)
2810 self.assertEqual(os.get_inheritable(slave_fd), False)
2811
2812
Brett Cannon3f9183b2016-08-26 14:44:48 -07002813class PathTConverterTests(unittest.TestCase):
2814 # tuples of (function name, allows fd arguments, additional arguments to
2815 # function, cleanup function)
2816 functions = [
2817 ('stat', True, (), None),
2818 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07002819 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07002820 ('chflags', False, (0,), None),
2821 ('lchflags', False, (0,), None),
2822 ('open', False, (0,), getattr(os, 'close', None)),
2823 ]
2824
2825 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07002826 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07002827 if os.name == 'nt':
2828 bytes_fspath = bytes_filename = None
2829 else:
2830 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07002831 bytes_fspath = _PathLike(bytes_filename)
2832 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002833 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03002834 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002835
Brett Cannonec6ce872016-09-06 15:50:29 -07002836 int_fspath = _PathLike(fd)
2837 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002838
2839 for name, allow_fd, extra_args, cleanup_fn in self.functions:
2840 with self.subTest(name=name):
2841 try:
2842 fn = getattr(os, name)
2843 except AttributeError:
2844 continue
2845
Brett Cannon8f96a302016-08-26 19:30:11 -07002846 for path in (str_filename, bytes_filename, str_fspath,
2847 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07002848 if path is None:
2849 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07002850 with self.subTest(name=name, path=path):
2851 result = fn(path, *extra_args)
2852 if cleanup_fn is not None:
2853 cleanup_fn(result)
2854
2855 with self.assertRaisesRegex(
2856 TypeError, 'should be string, bytes'):
2857 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002858
2859 if allow_fd:
2860 result = fn(fd, *extra_args) # should not fail
2861 if cleanup_fn is not None:
2862 cleanup_fn(result)
2863 else:
2864 with self.assertRaisesRegex(
2865 TypeError,
2866 'os.PathLike'):
2867 fn(fd, *extra_args)
2868
2869
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002870@unittest.skipUnless(hasattr(os, 'get_blocking'),
2871 'needs os.get_blocking() and os.set_blocking()')
2872class BlockingTests(unittest.TestCase):
2873 def test_blocking(self):
2874 fd = os.open(__file__, os.O_RDONLY)
2875 self.addCleanup(os.close, fd)
2876 self.assertEqual(os.get_blocking(fd), True)
2877
2878 os.set_blocking(fd, False)
2879 self.assertEqual(os.get_blocking(fd), False)
2880
2881 os.set_blocking(fd, True)
2882 self.assertEqual(os.get_blocking(fd), True)
2883
2884
Yury Selivanov97e2e062014-09-26 12:33:06 -04002885
2886class ExportsTests(unittest.TestCase):
2887 def test_os_all(self):
2888 self.assertIn('open', os.__all__)
2889 self.assertIn('walk', os.__all__)
2890
2891
Victor Stinner6036e442015-03-08 01:58:04 +01002892class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02002893 check_no_resource_warning = support.check_no_resource_warning
2894
Victor Stinner6036e442015-03-08 01:58:04 +01002895 def setUp(self):
2896 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07002897 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01002898 self.addCleanup(support.rmtree, self.path)
2899 os.mkdir(self.path)
2900
2901 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07002902 path = self.bytes_path if isinstance(name, bytes) else self.path
2903 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01002904 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01002905 return filename
2906
2907 def get_entries(self, names):
2908 entries = dict((entry.name, entry)
2909 for entry in os.scandir(self.path))
2910 self.assertEqual(sorted(entries.keys()), names)
2911 return entries
2912
2913 def assert_stat_equal(self, stat1, stat2, skip_fields):
2914 if skip_fields:
2915 for attr in dir(stat1):
2916 if not attr.startswith("st_"):
2917 continue
2918 if attr in ("st_dev", "st_ino", "st_nlink"):
2919 continue
2920 self.assertEqual(getattr(stat1, attr),
2921 getattr(stat2, attr),
2922 (stat1, stat2, attr))
2923 else:
2924 self.assertEqual(stat1, stat2)
2925
2926 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07002927 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01002928 self.assertEqual(entry.name, name)
2929 self.assertEqual(entry.path, os.path.join(self.path, name))
2930 self.assertEqual(entry.inode(),
2931 os.stat(entry.path, follow_symlinks=False).st_ino)
2932
2933 entry_stat = os.stat(entry.path)
2934 self.assertEqual(entry.is_dir(),
2935 stat.S_ISDIR(entry_stat.st_mode))
2936 self.assertEqual(entry.is_file(),
2937 stat.S_ISREG(entry_stat.st_mode))
2938 self.assertEqual(entry.is_symlink(),
2939 os.path.islink(entry.path))
2940
2941 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2942 self.assertEqual(entry.is_dir(follow_symlinks=False),
2943 stat.S_ISDIR(entry_lstat.st_mode))
2944 self.assertEqual(entry.is_file(follow_symlinks=False),
2945 stat.S_ISREG(entry_lstat.st_mode))
2946
2947 self.assert_stat_equal(entry.stat(),
2948 entry_stat,
2949 os.name == 'nt' and not is_symlink)
2950 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2951 entry_lstat,
2952 os.name == 'nt')
2953
2954 def test_attributes(self):
2955 link = hasattr(os, 'link')
2956 symlink = support.can_symlink()
2957
2958 dirname = os.path.join(self.path, "dir")
2959 os.mkdir(dirname)
2960 filename = self.create_file("file.txt")
2961 if link:
2962 os.link(filename, os.path.join(self.path, "link_file.txt"))
2963 if symlink:
2964 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2965 target_is_directory=True)
2966 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2967
2968 names = ['dir', 'file.txt']
2969 if link:
2970 names.append('link_file.txt')
2971 if symlink:
2972 names.extend(('symlink_dir', 'symlink_file.txt'))
2973 entries = self.get_entries(names)
2974
2975 entry = entries['dir']
2976 self.check_entry(entry, 'dir', True, False, False)
2977
2978 entry = entries['file.txt']
2979 self.check_entry(entry, 'file.txt', False, True, False)
2980
2981 if link:
2982 entry = entries['link_file.txt']
2983 self.check_entry(entry, 'link_file.txt', False, True, False)
2984
2985 if symlink:
2986 entry = entries['symlink_dir']
2987 self.check_entry(entry, 'symlink_dir', True, False, True)
2988
2989 entry = entries['symlink_file.txt']
2990 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2991
2992 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07002993 path = self.bytes_path if isinstance(name, bytes) else self.path
2994 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01002995 self.assertEqual(len(entries), 1)
2996
2997 entry = entries[0]
2998 self.assertEqual(entry.name, name)
2999 return entry
3000
Brett Cannon96881cd2016-06-10 14:37:21 -07003001 def create_file_entry(self, name='file.txt'):
3002 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003003 return self.get_entry(os.path.basename(filename))
3004
3005 def test_current_directory(self):
3006 filename = self.create_file()
3007 old_dir = os.getcwd()
3008 try:
3009 os.chdir(self.path)
3010
3011 # call scandir() without parameter: it must list the content
3012 # of the current directory
3013 entries = dict((entry.name, entry) for entry in os.scandir())
3014 self.assertEqual(sorted(entries.keys()),
3015 [os.path.basename(filename)])
3016 finally:
3017 os.chdir(old_dir)
3018
3019 def test_repr(self):
3020 entry = self.create_file_entry()
3021 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3022
Brett Cannon96881cd2016-06-10 14:37:21 -07003023 def test_fspath_protocol(self):
3024 entry = self.create_file_entry()
3025 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3026
3027 def test_fspath_protocol_bytes(self):
3028 bytes_filename = os.fsencode('bytesfile.txt')
3029 bytes_entry = self.create_file_entry(name=bytes_filename)
3030 fspath = os.fspath(bytes_entry)
3031 self.assertIsInstance(fspath, bytes)
3032 self.assertEqual(fspath,
3033 os.path.join(os.fsencode(self.path),bytes_filename))
3034
Victor Stinner6036e442015-03-08 01:58:04 +01003035 def test_removed_dir(self):
3036 path = os.path.join(self.path, 'dir')
3037
3038 os.mkdir(path)
3039 entry = self.get_entry('dir')
3040 os.rmdir(path)
3041
3042 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3043 if os.name == 'nt':
3044 self.assertTrue(entry.is_dir())
3045 self.assertFalse(entry.is_file())
3046 self.assertFalse(entry.is_symlink())
3047 if os.name == 'nt':
3048 self.assertRaises(FileNotFoundError, entry.inode)
3049 # don't fail
3050 entry.stat()
3051 entry.stat(follow_symlinks=False)
3052 else:
3053 self.assertGreater(entry.inode(), 0)
3054 self.assertRaises(FileNotFoundError, entry.stat)
3055 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3056
3057 def test_removed_file(self):
3058 entry = self.create_file_entry()
3059 os.unlink(entry.path)
3060
3061 self.assertFalse(entry.is_dir())
3062 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3063 if os.name == 'nt':
3064 self.assertTrue(entry.is_file())
3065 self.assertFalse(entry.is_symlink())
3066 if os.name == 'nt':
3067 self.assertRaises(FileNotFoundError, entry.inode)
3068 # don't fail
3069 entry.stat()
3070 entry.stat(follow_symlinks=False)
3071 else:
3072 self.assertGreater(entry.inode(), 0)
3073 self.assertRaises(FileNotFoundError, entry.stat)
3074 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3075
3076 def test_broken_symlink(self):
3077 if not support.can_symlink():
3078 return self.skipTest('cannot create symbolic link')
3079
3080 filename = self.create_file("file.txt")
3081 os.symlink(filename,
3082 os.path.join(self.path, "symlink.txt"))
3083 entries = self.get_entries(['file.txt', 'symlink.txt'])
3084 entry = entries['symlink.txt']
3085 os.unlink(filename)
3086
3087 self.assertGreater(entry.inode(), 0)
3088 self.assertFalse(entry.is_dir())
3089 self.assertFalse(entry.is_file()) # broken symlink returns False
3090 self.assertFalse(entry.is_dir(follow_symlinks=False))
3091 self.assertFalse(entry.is_file(follow_symlinks=False))
3092 self.assertTrue(entry.is_symlink())
3093 self.assertRaises(FileNotFoundError, entry.stat)
3094 # don't fail
3095 entry.stat(follow_symlinks=False)
3096
3097 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003098 self.create_file("file.txt")
3099
3100 path_bytes = os.fsencode(self.path)
3101 entries = list(os.scandir(path_bytes))
3102 self.assertEqual(len(entries), 1, entries)
3103 entry = entries[0]
3104
3105 self.assertEqual(entry.name, b'file.txt')
3106 self.assertEqual(entry.path,
3107 os.fsencode(os.path.join(self.path, 'file.txt')))
3108
3109 def test_empty_path(self):
3110 self.assertRaises(FileNotFoundError, os.scandir, '')
3111
3112 def test_consume_iterator_twice(self):
3113 self.create_file("file.txt")
3114 iterator = os.scandir(self.path)
3115
3116 entries = list(iterator)
3117 self.assertEqual(len(entries), 1, entries)
3118
3119 # check than consuming the iterator twice doesn't raise exception
3120 entries2 = list(iterator)
3121 self.assertEqual(len(entries2), 0, entries2)
3122
3123 def test_bad_path_type(self):
3124 for obj in [1234, 1.234, {}, []]:
3125 self.assertRaises(TypeError, os.scandir, obj)
3126
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003127 def test_close(self):
3128 self.create_file("file.txt")
3129 self.create_file("file2.txt")
3130 iterator = os.scandir(self.path)
3131 next(iterator)
3132 iterator.close()
3133 # multiple closes
3134 iterator.close()
3135 with self.check_no_resource_warning():
3136 del iterator
3137
3138 def test_context_manager(self):
3139 self.create_file("file.txt")
3140 self.create_file("file2.txt")
3141 with os.scandir(self.path) as iterator:
3142 next(iterator)
3143 with self.check_no_resource_warning():
3144 del iterator
3145
3146 def test_context_manager_close(self):
3147 self.create_file("file.txt")
3148 self.create_file("file2.txt")
3149 with os.scandir(self.path) as iterator:
3150 next(iterator)
3151 iterator.close()
3152
3153 def test_context_manager_exception(self):
3154 self.create_file("file.txt")
3155 self.create_file("file2.txt")
3156 with self.assertRaises(ZeroDivisionError):
3157 with os.scandir(self.path) as iterator:
3158 next(iterator)
3159 1/0
3160 with self.check_no_resource_warning():
3161 del iterator
3162
3163 def test_resource_warning(self):
3164 self.create_file("file.txt")
3165 self.create_file("file2.txt")
3166 iterator = os.scandir(self.path)
3167 next(iterator)
3168 with self.assertWarns(ResourceWarning):
3169 del iterator
3170 support.gc_collect()
3171 # exhausted iterator
3172 iterator = os.scandir(self.path)
3173 list(iterator)
3174 with self.check_no_resource_warning():
3175 del iterator
3176
Victor Stinner6036e442015-03-08 01:58:04 +01003177
Ethan Furmancdc08792016-06-02 15:06:09 -07003178class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003179
3180 # Abstracted so it can be overridden to test pure Python implementation
3181 # if a C version is provided.
3182 fspath = staticmethod(os.fspath)
3183
Ethan Furmancdc08792016-06-02 15:06:09 -07003184 def test_return_bytes(self):
3185 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003186 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003187
3188 def test_return_string(self):
3189 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003190 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003191
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003192 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003193 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003194 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003195
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003196 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003197 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3198 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3199
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003200 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003201 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3202 self.assertTrue(issubclass(_PathLike, os.PathLike))
3203 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003204
Ethan Furmancdc08792016-06-02 15:06:09 -07003205 def test_garbage_in_exception_out(self):
3206 vapor = type('blah', (), {})
3207 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003208 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003209
3210 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003211 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003212
Brett Cannon044283a2016-07-15 10:41:49 -07003213 def test_bad_pathlike(self):
3214 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003215 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003216 # __fspath__ attribute that is not callable.
3217 c = type('foo', (), {})
3218 c.__fspath__ = 1
3219 self.assertRaises(TypeError, self.fspath, c())
3220 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003221 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003222 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003223
3224# Only test if the C version is provided, otherwise TestPEP519 already tested
3225# the pure Python implementation.
3226if hasattr(os, "_fspath"):
3227 class TestPEP519PurePython(TestPEP519):
3228
3229 """Explicitly test the pure Python implementation of os.fspath()."""
3230
3231 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003232
3233
Fred Drake2e2be372001-09-20 21:33:42 +00003234if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003235 unittest.main()