blob: 01404ff4f63a71331d3a719c2845b9d957bd5993 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import re
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020023import subprocess
24import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010025import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54except ImportError:
55 groups = []
56try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
59except ImportError:
60 all_users = []
61try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020063except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020064 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020065
Berker Peksagce643912015-05-06 06:33:17 +030066from test.support.script_helper import assert_python_ok
Fred Drake38c2ef02001-07-17 20:52:51 +000067
Victor Stinner923590e2016-03-24 09:11:48 +010068
R David Murrayf2ad1732014-12-25 18:36:56 -050069root_in_posix = False
70if hasattr(os, 'geteuid'):
71 root_in_posix = (os.geteuid() == 0)
72
Mark Dickinson7cf03892010-04-16 13:45:35 +000073# Detect whether we're on a Linux system that uses the (now outdated
74# and unmaintained) linuxthreads threading library. There's an issue
75# when combining linuxthreads with a failed execv call: see
76# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020077if hasattr(sys, 'thread_info') and sys.thread_info.version:
78 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
79else:
80 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000081
Stefan Krahebee49a2013-01-17 15:31:00 +010082# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
83HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
84
Victor Stinner923590e2016-03-24 09:11:48 +010085
86@contextlib.contextmanager
87def ignore_deprecation_warnings(msg_regex, quiet=False):
88 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
89 yield
90
91
92@contextlib.contextmanager
93def bytes_filename_warn(expected):
94 msg = 'The Windows bytes API has been deprecated'
95 if os.name == 'nt':
96 with ignore_deprecation_warnings(msg, quiet=not expected):
97 yield
98 else:
99 yield
100
101
Victor Stinnerae39d232016-03-24 17:12:55 +0100102def create_file(filename, content=b'content'):
103 with open(filename, "xb", 0) as fp:
104 fp.write(content)
105
106
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000107# Tests creating TESTFN
108class FileTests(unittest.TestCase):
109 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000110 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000111 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000112 tearDown = setUp
113
114 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000115 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000116 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000117 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000118
Christian Heimesfdab48e2008-01-20 09:06:41 +0000119 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000120 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
121 # We must allocate two consecutive file descriptors, otherwise
122 # it will mess up other file descriptors (perhaps even the three
123 # standard ones).
124 second = os.dup(first)
125 try:
126 retries = 0
127 while second != first + 1:
128 os.close(first)
129 retries += 1
130 if retries > 10:
131 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000132 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000133 first, second = second, os.dup(second)
134 finally:
135 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000136 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000137 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000138 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000139
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000140 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000141 def test_rename(self):
142 path = support.TESTFN
143 old = sys.getrefcount(path)
144 self.assertRaises(TypeError, os.rename, path, 0)
145 new = sys.getrefcount(path)
146 self.assertEqual(old, new)
147
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000148 def test_read(self):
149 with open(support.TESTFN, "w+b") as fobj:
150 fobj.write(b"spam")
151 fobj.flush()
152 fd = fobj.fileno()
153 os.lseek(fd, 0, 0)
154 s = os.read(fd, 4)
155 self.assertEqual(type(s), bytes)
156 self.assertEqual(s, b"spam")
157
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200158 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200159 # Skip the test on 32-bit platforms: the number of bytes must fit in a
160 # Py_ssize_t type
161 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
162 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200163 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
164 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200165 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100166 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200167
168 # Issue #21932: Make sure that os.read() does not raise an
169 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200170 with open(support.TESTFN, "rb") as fp:
171 data = os.read(fp.fileno(), size)
172
173 # The test does not try to read more than 2 GB at once because the
174 # operating system is free to return less bytes than requested.
175 self.assertEqual(data, b'test')
176
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000177 def test_write(self):
178 # os.write() accepts bytes- and buffer-like objects but not strings
179 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
180 self.assertRaises(TypeError, os.write, fd, "beans")
181 os.write(fd, b"bacon\n")
182 os.write(fd, bytearray(b"eggs\n"))
183 os.write(fd, memoryview(b"spam\n"))
184 os.close(fd)
185 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000186 self.assertEqual(fobj.read().splitlines(),
187 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000188
Victor Stinnere0daff12011-03-20 23:36:35 +0100189 def write_windows_console(self, *args):
190 retcode = subprocess.call(args,
191 # use a new console to not flood the test output
192 creationflags=subprocess.CREATE_NEW_CONSOLE,
193 # use a shell to hide the console window (SW_HIDE)
194 shell=True)
195 self.assertEqual(retcode, 0)
196
197 @unittest.skipUnless(sys.platform == 'win32',
198 'test specific to the Windows console')
199 def test_write_windows_console(self):
200 # Issue #11395: the Windows console returns an error (12: not enough
201 # space error) on writing into stdout if stdout mode is binary and the
202 # length is greater than 66,000 bytes (or less, depending on heap
203 # usage).
204 code = "print('x' * 100000)"
205 self.write_windows_console(sys.executable, "-c", code)
206 self.write_windows_console(sys.executable, "-u", "-c", code)
207
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000208 def fdopen_helper(self, *args):
209 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200210 f = os.fdopen(fd, *args)
211 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000212
213 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200214 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
215 os.close(fd)
216
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000217 self.fdopen_helper()
218 self.fdopen_helper('r')
219 self.fdopen_helper('r', 100)
220
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100221 def test_replace(self):
222 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100223 self.addCleanup(support.unlink, support.TESTFN)
224 self.addCleanup(support.unlink, TESTFN2)
225
226 create_file(support.TESTFN, b"1")
227 create_file(TESTFN2, b"2")
228
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100229 os.replace(support.TESTFN, TESTFN2)
230 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
231 with open(TESTFN2, 'r') as f:
232 self.assertEqual(f.read(), "1")
233
Martin Panterbf19d162015-09-09 01:01:13 +0000234 def test_open_keywords(self):
235 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
236 dir_fd=None)
237 os.close(f)
238
239 def test_symlink_keywords(self):
240 symlink = support.get_attribute(os, "symlink")
241 try:
242 symlink(src='target', dst=support.TESTFN,
243 target_is_directory=False, dir_fd=None)
244 except (NotImplementedError, OSError):
245 pass # No OS support or unprivileged user
246
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200247
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000248# Test attributes on return values from os.*stat* family.
249class StatAttributeTests(unittest.TestCase):
250 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200251 self.fname = support.TESTFN
252 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100253 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000254
Serhiy Storchaka43767632013-11-03 21:31:38 +0200255 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000256 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000257 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000258
259 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000260 self.assertEqual(result[stat.ST_SIZE], 3)
261 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000262
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263 # Make sure all the attributes are there
264 members = dir(result)
265 for name in dir(stat):
266 if name[:3] == 'ST_':
267 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000268 if name.endswith("TIME"):
269 def trunc(x): return int(x)
270 else:
271 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000272 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000273 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000274 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000275
Larry Hastings6fe20b32012-04-19 15:07:49 -0700276 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700277 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700278 for name in 'st_atime st_mtime st_ctime'.split():
279 floaty = int(getattr(result, name) * 100000)
280 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700281 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700282
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000283 try:
284 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200285 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000286 except IndexError:
287 pass
288
289 # Make sure that assignment fails
290 try:
291 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200292 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000293 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000294 pass
295
296 try:
297 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200298 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000299 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000300 pass
301
302 try:
303 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200304 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000305 except AttributeError:
306 pass
307
308 # Use the stat_result constructor with a too-short tuple.
309 try:
310 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200311 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000312 except TypeError:
313 pass
314
Ezio Melotti42da6632011-03-15 05:18:48 +0200315 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000316 try:
317 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
318 except TypeError:
319 pass
320
Antoine Pitrou38425292010-09-21 18:19:07 +0000321 def test_stat_attributes(self):
322 self.check_stat_attributes(self.fname)
323
324 def test_stat_attributes_bytes(self):
325 try:
326 fname = self.fname.encode(sys.getfilesystemencoding())
327 except UnicodeEncodeError:
328 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner923590e2016-03-24 09:11:48 +0100329 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100330 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000331
Christian Heimes25827622013-10-12 01:27:08 +0200332 def test_stat_result_pickle(self):
333 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200334 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
335 p = pickle.dumps(result, proto)
336 self.assertIn(b'stat_result', p)
337 if proto < 4:
338 self.assertIn(b'cos\nstat_result\n', p)
339 unpickled = pickle.loads(p)
340 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200341
Serhiy Storchaka43767632013-11-03 21:31:38 +0200342 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000343 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000344 try:
345 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000346 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000347 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000348 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200349 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000350
351 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000352 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000353
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000354 # Make sure all the attributes are there.
355 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
356 'ffree', 'favail', 'flag', 'namemax')
357 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000358 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000359
360 # Make sure that assignment really fails
361 try:
362 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200363 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000364 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365 pass
366
367 try:
368 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200369 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000370 except AttributeError:
371 pass
372
373 # Use the constructor with a too-short tuple.
374 try:
375 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200376 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000377 except TypeError:
378 pass
379
Ezio Melotti42da6632011-03-15 05:18:48 +0200380 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000381 try:
382 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
383 except TypeError:
384 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000385
Christian Heimes25827622013-10-12 01:27:08 +0200386 @unittest.skipUnless(hasattr(os, 'statvfs'),
387 "need os.statvfs()")
388 def test_statvfs_result_pickle(self):
389 try:
390 result = os.statvfs(self.fname)
391 except OSError as e:
392 # On AtheOS, glibc always returns ENOSYS
393 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200394 self.skipTest('os.statvfs() failed with ENOSYS')
395
Serhiy Storchakabad12572014-12-15 14:03:42 +0200396 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
397 p = pickle.dumps(result, proto)
398 self.assertIn(b'statvfs_result', p)
399 if proto < 4:
400 self.assertIn(b'cos\nstatvfs_result\n', p)
401 unpickled = pickle.loads(p)
402 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200403
Serhiy Storchaka43767632013-11-03 21:31:38 +0200404 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
405 def test_1686475(self):
406 # Verify that an open file can be stat'ed
407 try:
408 os.stat(r"c:\pagefile.sys")
409 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600410 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200411 except OSError as e:
412 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000413
Serhiy Storchaka43767632013-11-03 21:31:38 +0200414 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
415 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
416 def test_15261(self):
417 # Verify that stat'ing a closed fd does not cause crash
418 r, w = os.pipe()
419 try:
420 os.stat(r) # should not raise error
421 finally:
422 os.close(r)
423 os.close(w)
424 with self.assertRaises(OSError) as ctx:
425 os.stat(r)
426 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100427
Zachary Ware63f277b2014-06-19 09:46:37 -0500428 def check_file_attributes(self, result):
429 self.assertTrue(hasattr(result, 'st_file_attributes'))
430 self.assertTrue(isinstance(result.st_file_attributes, int))
431 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
432
433 @unittest.skipUnless(sys.platform == "win32",
434 "st_file_attributes is Win32 specific")
435 def test_file_attributes(self):
436 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
437 result = os.stat(self.fname)
438 self.check_file_attributes(result)
439 self.assertEqual(
440 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
441 0)
442
443 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200444 dirname = support.TESTFN + "dir"
445 os.mkdir(dirname)
446 self.addCleanup(os.rmdir, dirname)
447
448 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500449 self.check_file_attributes(result)
450 self.assertEqual(
451 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
452 stat.FILE_ATTRIBUTE_DIRECTORY)
453
Victor Stinner47aacc82015-06-12 17:26:23 +0200454
455class UtimeTests(unittest.TestCase):
456 def setUp(self):
457 self.dirname = support.TESTFN
458 self.fname = os.path.join(self.dirname, "f1")
459
460 self.addCleanup(support.rmtree, self.dirname)
461 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100462 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200463
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200464 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100465 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200466 os.stat_float_times(state)
467
Victor Stinner47aacc82015-06-12 17:26:23 +0200468 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100469 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200470 old_float_times = os.stat_float_times(-1)
471 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200472
473 os.stat_float_times(True)
474
475 def support_subsecond(self, filename):
476 # Heuristic to check if the filesystem supports timestamp with
477 # subsecond resolution: check if float and int timestamps are different
478 st = os.stat(filename)
479 return ((st.st_atime != st[7])
480 or (st.st_mtime != st[8])
481 or (st.st_ctime != st[9]))
482
483 def _test_utime(self, set_time, filename=None):
484 if not filename:
485 filename = self.fname
486
487 support_subsecond = self.support_subsecond(filename)
488 if support_subsecond:
489 # Timestamp with a resolution of 1 microsecond (10^-6).
490 #
491 # The resolution of the C internal function used by os.utime()
492 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
493 # test with a resolution of 1 ns requires more work:
494 # see the issue #15745.
495 atime_ns = 1002003000 # 1.002003 seconds
496 mtime_ns = 4005006000 # 4.005006 seconds
497 else:
498 # use a resolution of 1 second
499 atime_ns = 5 * 10**9
500 mtime_ns = 8 * 10**9
501
502 set_time(filename, (atime_ns, mtime_ns))
503 st = os.stat(filename)
504
505 if support_subsecond:
506 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
507 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
508 else:
509 self.assertEqual(st.st_atime, atime_ns * 1e-9)
510 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
511 self.assertEqual(st.st_atime_ns, atime_ns)
512 self.assertEqual(st.st_mtime_ns, mtime_ns)
513
514 def test_utime(self):
515 def set_time(filename, ns):
516 # test the ns keyword parameter
517 os.utime(filename, ns=ns)
518 self._test_utime(set_time)
519
520 @staticmethod
521 def ns_to_sec(ns):
522 # Convert a number of nanosecond (int) to a number of seconds (float).
523 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
524 # issue, os.utime() rounds towards minus infinity.
525 return (ns * 1e-9) + 0.5e-9
526
527 def test_utime_by_indexed(self):
528 # pass times as floating point seconds as the second indexed parameter
529 def set_time(filename, ns):
530 atime_ns, mtime_ns = ns
531 atime = self.ns_to_sec(atime_ns)
532 mtime = self.ns_to_sec(mtime_ns)
533 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
534 # or utime(time_t)
535 os.utime(filename, (atime, mtime))
536 self._test_utime(set_time)
537
538 def test_utime_by_times(self):
539 def set_time(filename, ns):
540 atime_ns, mtime_ns = ns
541 atime = self.ns_to_sec(atime_ns)
542 mtime = self.ns_to_sec(mtime_ns)
543 # test the times keyword parameter
544 os.utime(filename, times=(atime, mtime))
545 self._test_utime(set_time)
546
547 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
548 "follow_symlinks support for utime required "
549 "for this test.")
550 def test_utime_nofollow_symlinks(self):
551 def set_time(filename, ns):
552 # use follow_symlinks=False to test utimensat(timespec)
553 # or lutimes(timeval)
554 os.utime(filename, ns=ns, follow_symlinks=False)
555 self._test_utime(set_time)
556
557 @unittest.skipUnless(os.utime in os.supports_fd,
558 "fd support for utime required for this test.")
559 def test_utime_fd(self):
560 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100561 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200562 # use a file descriptor to test futimens(timespec)
563 # or futimes(timeval)
564 os.utime(fp.fileno(), ns=ns)
565 self._test_utime(set_time)
566
567 @unittest.skipUnless(os.utime in os.supports_dir_fd,
568 "dir_fd support for utime required for this test.")
569 def test_utime_dir_fd(self):
570 def set_time(filename, ns):
571 dirname, name = os.path.split(filename)
572 dirfd = os.open(dirname, os.O_RDONLY)
573 try:
574 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
575 os.utime(name, dir_fd=dirfd, ns=ns)
576 finally:
577 os.close(dirfd)
578 self._test_utime(set_time)
579
580 def test_utime_directory(self):
581 def set_time(filename, ns):
582 # test calling os.utime() on a directory
583 os.utime(filename, ns=ns)
584 self._test_utime(set_time, filename=self.dirname)
585
586 def _test_utime_current(self, set_time):
587 # Get the system clock
588 current = time.time()
589
590 # Call os.utime() to set the timestamp to the current system clock
591 set_time(self.fname)
592
593 if not self.support_subsecond(self.fname):
594 delta = 1.0
595 else:
596 # On Windows, the usual resolution of time.time() is 15.6 ms
597 delta = 0.020
598 st = os.stat(self.fname)
599 msg = ("st_time=%r, current=%r, dt=%r"
600 % (st.st_mtime, current, st.st_mtime - current))
601 self.assertAlmostEqual(st.st_mtime, current,
602 delta=delta, msg=msg)
603
604 def test_utime_current(self):
605 def set_time(filename):
606 # Set to the current time in the new way
607 os.utime(self.fname)
608 self._test_utime_current(set_time)
609
610 def test_utime_current_old(self):
611 def set_time(filename):
612 # Set to the current time in the old explicit way.
613 os.utime(self.fname, None)
614 self._test_utime_current(set_time)
615
616 def get_file_system(self, path):
617 if sys.platform == 'win32':
618 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
619 import ctypes
620 kernel32 = ctypes.windll.kernel32
621 buf = ctypes.create_unicode_buffer("", 100)
622 ok = kernel32.GetVolumeInformationW(root, None, 0,
623 None, None, None,
624 buf, len(buf))
625 if ok:
626 return buf.value
627 # return None if the filesystem is unknown
628
629 def test_large_time(self):
630 # Many filesystems are limited to the year 2038. At least, the test
631 # pass with NTFS filesystem.
632 if self.get_file_system(self.dirname) != "NTFS":
633 self.skipTest("requires NTFS")
634
635 large = 5000000000 # some day in 2128
636 os.utime(self.fname, (large, large))
637 self.assertEqual(os.stat(self.fname).st_mtime, large)
638
639 def test_utime_invalid_arguments(self):
640 # seconds and nanoseconds parameters are mutually exclusive
641 with self.assertRaises(ValueError):
642 os.utime(self.fname, (5, 5), ns=(5, 5))
643
644
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000645from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000646
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000647class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000648 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000649 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000650
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000651 def setUp(self):
652 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000653 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000654 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000655 for key, value in self._reference().items():
656 os.environ[key] = value
657
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000658 def tearDown(self):
659 os.environ.clear()
660 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000661 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000662 os.environb.clear()
663 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000664
Christian Heimes90333392007-11-01 19:08:42 +0000665 def _reference(self):
666 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
667
668 def _empty_mapping(self):
669 os.environ.clear()
670 return os.environ
671
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000672 # Bug 1110478
Ezio Melottic7e139b2012-09-26 20:01:34 +0300673 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000674 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000675 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300676 os.environ.update(HELLO="World")
677 with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
678 value = popen.read().strip()
679 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000680
Ezio Melottic7e139b2012-09-26 20:01:34 +0300681 @unittest.skipUnless(os.path.exists('/bin/sh'), 'requires /bin/sh')
Christian Heimes1a13d592007-11-08 14:16:55 +0000682 def test_os_popen_iter(self):
Ezio Melottic7e139b2012-09-26 20:01:34 +0300683 with os.popen(
684 "/bin/sh -c 'echo \"line1\nline2\nline3\"'") as popen:
685 it = iter(popen)
686 self.assertEqual(next(it), "line1\n")
687 self.assertEqual(next(it), "line2\n")
688 self.assertEqual(next(it), "line3\n")
689 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000690
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000691 # Verify environ keys and values from the OS are of the
692 # correct str type.
693 def test_keyvalue_types(self):
694 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000695 self.assertEqual(type(key), str)
696 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000697
Christian Heimes90333392007-11-01 19:08:42 +0000698 def test_items(self):
699 for key, value in self._reference().items():
700 self.assertEqual(os.environ.get(key), value)
701
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000702 # Issue 7310
703 def test___repr__(self):
704 """Check that the repr() of os.environ looks like environ({...})."""
705 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000706 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
707 '{!r}: {!r}'.format(key, value)
708 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000709
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000710 def test_get_exec_path(self):
711 defpath_list = os.defpath.split(os.pathsep)
712 test_path = ['/monty', '/python', '', '/flying/circus']
713 test_env = {'PATH': os.pathsep.join(test_path)}
714
715 saved_environ = os.environ
716 try:
717 os.environ = dict(test_env)
718 # Test that defaulting to os.environ works.
719 self.assertSequenceEqual(test_path, os.get_exec_path())
720 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
721 finally:
722 os.environ = saved_environ
723
724 # No PATH environment variable
725 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
726 # Empty PATH environment variable
727 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
728 # Supplied PATH environment variable
729 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
730
Victor Stinnerb745a742010-05-18 17:17:23 +0000731 if os.supports_bytes_environ:
732 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000733 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000734 # ignore BytesWarning warning
735 with warnings.catch_warnings(record=True):
736 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000737 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000738 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000739 pass
740 else:
741 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000742
743 # bytes key and/or value
744 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
745 ['abc'])
746 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
747 ['abc'])
748 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
749 ['abc'])
750
751 @unittest.skipUnless(os.supports_bytes_environ,
752 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000753 def test_environb(self):
754 # os.environ -> os.environb
755 value = 'euro\u20ac'
756 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000757 value_bytes = value.encode(sys.getfilesystemencoding(),
758 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000759 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000760 msg = "U+20AC character is not encodable to %s" % (
761 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000762 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000763 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000764 self.assertEqual(os.environ['unicode'], value)
765 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000766
767 # os.environb -> os.environ
768 value = b'\xff'
769 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000770 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000771 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000772 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000773
Charles-François Natali2966f102011-11-26 11:32:46 +0100774 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
775 # #13415).
776 @support.requires_freebsd_version(7)
777 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100778 def test_unset_error(self):
779 if sys.platform == "win32":
780 # an environment variable is limited to 32,767 characters
781 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100782 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100783 else:
784 # "=" is not allowed in a variable name
785 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100786 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100787
Victor Stinner6d101392013-04-14 16:35:04 +0200788 def test_key_type(self):
789 missing = 'missingkey'
790 self.assertNotIn(missing, os.environ)
791
Victor Stinner839e5ea2013-04-14 16:43:03 +0200792 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200793 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200794 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200795 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200796
Victor Stinner839e5ea2013-04-14 16:43:03 +0200797 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200798 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200799 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200800 self.assertTrue(cm.exception.__suppress_context__)
801
Victor Stinner6d101392013-04-14 16:35:04 +0200802
Tim Petersc4e09402003-04-25 07:11:48 +0000803class WalkTests(unittest.TestCase):
804 """Tests for os.walk()."""
805
Victor Stinner0561c532015-03-12 10:28:24 +0100806 # Wrapper to hide minor differences between os.walk and os.fwalk
807 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200808 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200809 if 'follow_symlinks' in kwargs:
810 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200811 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100812
Charles-François Natali7372b062012-02-05 15:15:38 +0100813 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100814 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100815 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000816
817 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000818 # TESTFN/
819 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000820 # tmp1
821 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000822 # tmp2
823 # SUB11/ no kids
824 # SUB2/ a file kid and a dirsymlink kid
825 # tmp3
826 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200827 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000828 # TEST2/
829 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100830 self.walk_path = join(support.TESTFN, "TEST1")
831 self.sub1_path = join(self.walk_path, "SUB1")
832 self.sub11_path = join(self.sub1_path, "SUB11")
833 sub2_path = join(self.walk_path, "SUB2")
834 tmp1_path = join(self.walk_path, "tmp1")
835 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000836 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100837 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000838 t2_path = join(support.TESTFN, "TEST2")
839 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200840 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000841
842 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100843 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000844 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000845 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100846
Guido van Rossumd8faa362007-04-27 19:54:29 +0000847 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100848 with open(path, "x") as f:
849 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000850
Victor Stinner0561c532015-03-12 10:28:24 +0100851 if support.can_symlink():
852 os.symlink(os.path.abspath(t2_path), self.link_path)
853 os.symlink('broken', broken_link_path, True)
Serhiy Storchakaadca8462016-03-08 21:13:35 +0200854 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100855 else:
856 self.sub2_tree = (sub2_path, [], ["tmp3"])
857
858 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000859 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300860 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100861
Tim Petersc4e09402003-04-25 07:11:48 +0000862 self.assertEqual(len(all), 4)
863 # We can't know which order SUB1 and SUB2 will appear in.
864 # Not flipped: TESTFN, SUB1, SUB11, SUB2
865 # flipped: TESTFN, SUB2, SUB1, SUB11
866 flipped = all[0][1][0] != "SUB1"
867 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200868 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100869 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
870 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
871 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
872 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000873
Victor Stinner0561c532015-03-12 10:28:24 +0100874 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000875 # Prune the search.
876 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100877 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000878 all.append((root, dirs, files))
879 # Don't descend into SUB1.
880 if 'SUB1' in dirs:
881 # Note that this also mutates the dirs we appended to all!
882 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000883
Victor Stinner0561c532015-03-12 10:28:24 +0100884 self.assertEqual(len(all), 2)
885 self.assertEqual(all[0],
886 (self.walk_path, ["SUB2"], ["tmp1"]))
887
888 all[1][-1].sort()
889 self.assertEqual(all[1], self.sub2_tree)
890
891 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000892 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100893 all = list(self.walk(self.walk_path, topdown=False))
894
Victor Stinner53b0a412016-03-26 01:12:36 +0100895 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000896 # We can't know which order SUB1 and SUB2 will appear in.
897 # Not flipped: SUB11, SUB1, SUB2, TESTFN
898 # flipped: SUB2, SUB11, SUB1, TESTFN
899 flipped = all[3][1][0] != "SUB1"
900 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200901 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100902 self.assertEqual(all[3],
903 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
904 self.assertEqual(all[flipped],
905 (self.sub11_path, [], []))
906 self.assertEqual(all[flipped + 1],
907 (self.sub1_path, ["SUB11"], ["tmp2"]))
908 self.assertEqual(all[2 - 2 * flipped],
909 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000910
Victor Stinner0561c532015-03-12 10:28:24 +0100911 def test_walk_symlink(self):
912 if not support.can_symlink():
913 self.skipTest("need symlink support")
914
915 # Walk, following symlinks.
916 walk_it = self.walk(self.walk_path, follow_symlinks=True)
917 for root, dirs, files in walk_it:
918 if root == self.link_path:
919 self.assertEqual(dirs, [])
920 self.assertEqual(files, ["tmp4"])
921 break
922 else:
923 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000924
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200925 def test_walk_bad_dir(self):
926 # Walk top-down.
927 errors = []
928 walk_it = self.walk(self.walk_path, onerror=errors.append)
929 root, dirs, files = next(walk_it)
930 self.assertFalse(errors)
931 dir1 = dirs[0]
932 dir1new = dir1 + '.new'
933 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
934 roots = [r for r, d, f in walk_it]
935 self.assertTrue(errors)
936 self.assertNotIn(os.path.join(root, dir1), roots)
937 self.assertNotIn(os.path.join(root, dir1new), roots)
938 for dir2 in dirs[1:]:
939 self.assertIn(os.path.join(root, dir2), roots)
940
Charles-François Natali7372b062012-02-05 15:15:38 +0100941
942@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
943class FwalkTests(WalkTests):
944 """Tests for os.fwalk()."""
945
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200946 def walk(self, top, **kwargs):
947 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100948 yield (root, dirs, files)
949
Larry Hastingsc48fe982012-06-25 04:49:05 -0700950 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
951 """
952 compare with walk() results.
953 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700954 walk_kwargs = walk_kwargs.copy()
955 fwalk_kwargs = fwalk_kwargs.copy()
956 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
957 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
958 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700959
Charles-François Natali7372b062012-02-05 15:15:38 +0100960 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700961 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100962 expected[root] = (set(dirs), set(files))
963
Larry Hastingsc48fe982012-06-25 04:49:05 -0700964 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100965 self.assertIn(root, expected)
966 self.assertEqual(expected[root], (set(dirs), set(files)))
967
Larry Hastingsc48fe982012-06-25 04:49:05 -0700968 def test_compare_to_walk(self):
969 kwargs = {'top': support.TESTFN}
970 self._compare_to_walk(kwargs, kwargs)
971
Charles-François Natali7372b062012-02-05 15:15:38 +0100972 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700973 try:
974 fd = os.open(".", os.O_RDONLY)
975 walk_kwargs = {'top': support.TESTFN}
976 fwalk_kwargs = walk_kwargs.copy()
977 fwalk_kwargs['dir_fd'] = fd
978 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
979 finally:
980 os.close(fd)
981
982 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100983 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700984 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
985 args = support.TESTFN, topdown, None
986 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100987 # check that the FD is valid
988 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700989 # redundant check
990 os.stat(rootfd)
991 # check that listdir() returns consistent information
992 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100993
994 def test_fd_leak(self):
995 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
996 # we both check that calling fwalk() a large number of times doesn't
997 # yield EMFILE, and that the minimum allocated FD hasn't changed.
998 minfd = os.dup(1)
999 os.close(minfd)
1000 for i in range(256):
1001 for x in os.fwalk(support.TESTFN):
1002 pass
1003 newfd = os.dup(1)
1004 self.addCleanup(os.close, newfd)
1005 self.assertEqual(newfd, minfd)
1006
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001007class BytesWalkTests(WalkTests):
1008 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001009 def setUp(self):
1010 super().setUp()
1011 self.stack = contextlib.ExitStack()
1012 if os.name == 'nt':
Victor Stinner923590e2016-03-24 09:11:48 +01001013 self.stack.enter_context(bytes_filename_warn(False))
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001014
1015 def tearDown(self):
1016 self.stack.close()
1017 super().tearDown()
1018
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001019 def walk(self, top, **kwargs):
1020 if 'follow_symlinks' in kwargs:
1021 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1022 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1023 root = os.fsdecode(broot)
1024 dirs = list(map(os.fsdecode, bdirs))
1025 files = list(map(os.fsdecode, bfiles))
1026 yield (root, dirs, files)
1027 bdirs[:] = list(map(os.fsencode, dirs))
1028 bfiles[:] = list(map(os.fsencode, files))
1029
Charles-François Natali7372b062012-02-05 15:15:38 +01001030
Guido van Rossume7ba4952007-06-06 23:52:48 +00001031class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001032 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001033 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001034
1035 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001036 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001037 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1038 os.makedirs(path) # Should work
1039 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1040 os.makedirs(path)
1041
1042 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001043 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001044 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1045 os.makedirs(path)
1046 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1047 'dir5', 'dir6')
1048 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001049
Terry Reedy5a22b652010-12-02 07:05:56 +00001050 def test_exist_ok_existing_directory(self):
1051 path = os.path.join(support.TESTFN, 'dir1')
1052 mode = 0o777
1053 old_mask = os.umask(0o022)
1054 os.makedirs(path, mode)
1055 self.assertRaises(OSError, os.makedirs, path, mode)
1056 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001057 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001058 os.makedirs(path, mode=mode, exist_ok=True)
1059 os.umask(old_mask)
1060
Martin Pantera82642f2015-11-19 04:48:44 +00001061 # Issue #25583: A drive root could raise PermissionError on Windows
1062 os.makedirs(os.path.abspath('/'), exist_ok=True)
1063
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001064 def test_exist_ok_s_isgid_directory(self):
1065 path = os.path.join(support.TESTFN, 'dir1')
1066 S_ISGID = stat.S_ISGID
1067 mode = 0o777
1068 old_mask = os.umask(0o022)
1069 try:
1070 existing_testfn_mode = stat.S_IMODE(
1071 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001072 try:
1073 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001074 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001075 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001076 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1077 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1078 # The os should apply S_ISGID from the parent dir for us, but
1079 # this test need not depend on that behavior. Be explicit.
1080 os.makedirs(path, mode | S_ISGID)
1081 # http://bugs.python.org/issue14992
1082 # Should not fail when the bit is already set.
1083 os.makedirs(path, mode, exist_ok=True)
1084 # remove the bit.
1085 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001086 # May work even when the bit is not already set when demanded.
1087 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001088 finally:
1089 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001090
1091 def test_exist_ok_existing_regular_file(self):
1092 base = support.TESTFN
1093 path = os.path.join(support.TESTFN, 'dir1')
1094 f = open(path, 'w')
1095 f.write('abc')
1096 f.close()
1097 self.assertRaises(OSError, os.makedirs, path)
1098 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1099 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1100 os.remove(path)
1101
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001102 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001103 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001104 'dir4', 'dir5', 'dir6')
1105 # If the tests failed, the bottom-most directory ('../dir6')
1106 # may not have been created, so we look for the outermost directory
1107 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001108 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001109 path = os.path.dirname(path)
1110
1111 os.removedirs(path)
1112
Andrew Svetlov405faed2012-12-25 12:18:09 +02001113
R David Murrayf2ad1732014-12-25 18:36:56 -05001114@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1115class ChownFileTests(unittest.TestCase):
1116
Berker Peksag036a71b2015-07-21 09:29:48 +03001117 @classmethod
1118 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001119 os.mkdir(support.TESTFN)
1120
1121 def test_chown_uid_gid_arguments_must_be_index(self):
1122 stat = os.stat(support.TESTFN)
1123 uid = stat.st_uid
1124 gid = stat.st_gid
1125 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1126 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1127 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1128 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1129 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1130
1131 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1132 def test_chown(self):
1133 gid_1, gid_2 = groups[:2]
1134 uid = os.stat(support.TESTFN).st_uid
1135 os.chown(support.TESTFN, uid, gid_1)
1136 gid = os.stat(support.TESTFN).st_gid
1137 self.assertEqual(gid, gid_1)
1138 os.chown(support.TESTFN, uid, gid_2)
1139 gid = os.stat(support.TESTFN).st_gid
1140 self.assertEqual(gid, gid_2)
1141
1142 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1143 "test needs root privilege and more than one user")
1144 def test_chown_with_root(self):
1145 uid_1, uid_2 = all_users[:2]
1146 gid = os.stat(support.TESTFN).st_gid
1147 os.chown(support.TESTFN, uid_1, gid)
1148 uid = os.stat(support.TESTFN).st_uid
1149 self.assertEqual(uid, uid_1)
1150 os.chown(support.TESTFN, uid_2, gid)
1151 uid = os.stat(support.TESTFN).st_uid
1152 self.assertEqual(uid, uid_2)
1153
1154 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1155 "test needs non-root account and more than one user")
1156 def test_chown_without_permission(self):
1157 uid_1, uid_2 = all_users[:2]
1158 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001159 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001160 os.chown(support.TESTFN, uid_1, gid)
1161 os.chown(support.TESTFN, uid_2, gid)
1162
Berker Peksag036a71b2015-07-21 09:29:48 +03001163 @classmethod
1164 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001165 os.rmdir(support.TESTFN)
1166
1167
Andrew Svetlov405faed2012-12-25 12:18:09 +02001168class RemoveDirsTests(unittest.TestCase):
1169 def setUp(self):
1170 os.makedirs(support.TESTFN)
1171
1172 def tearDown(self):
1173 support.rmtree(support.TESTFN)
1174
1175 def test_remove_all(self):
1176 dira = os.path.join(support.TESTFN, 'dira')
1177 os.mkdir(dira)
1178 dirb = os.path.join(dira, 'dirb')
1179 os.mkdir(dirb)
1180 os.removedirs(dirb)
1181 self.assertFalse(os.path.exists(dirb))
1182 self.assertFalse(os.path.exists(dira))
1183 self.assertFalse(os.path.exists(support.TESTFN))
1184
1185 def test_remove_partial(self):
1186 dira = os.path.join(support.TESTFN, 'dira')
1187 os.mkdir(dira)
1188 dirb = os.path.join(dira, 'dirb')
1189 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001190 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001191 os.removedirs(dirb)
1192 self.assertFalse(os.path.exists(dirb))
1193 self.assertTrue(os.path.exists(dira))
1194 self.assertTrue(os.path.exists(support.TESTFN))
1195
1196 def test_remove_nothing(self):
1197 dira = os.path.join(support.TESTFN, 'dira')
1198 os.mkdir(dira)
1199 dirb = os.path.join(dira, 'dirb')
1200 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001201 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001202 with self.assertRaises(OSError):
1203 os.removedirs(dirb)
1204 self.assertTrue(os.path.exists(dirb))
1205 self.assertTrue(os.path.exists(dira))
1206 self.assertTrue(os.path.exists(support.TESTFN))
1207
1208
Guido van Rossume7ba4952007-06-06 23:52:48 +00001209class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001210 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001211 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001212 f.write(b'hello')
1213 f.close()
1214 with open(os.devnull, 'rb') as f:
1215 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001216
Andrew Svetlov405faed2012-12-25 12:18:09 +02001217
Guido van Rossume7ba4952007-06-06 23:52:48 +00001218class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001219 def test_urandom_length(self):
1220 self.assertEqual(len(os.urandom(0)), 0)
1221 self.assertEqual(len(os.urandom(1)), 1)
1222 self.assertEqual(len(os.urandom(10)), 10)
1223 self.assertEqual(len(os.urandom(100)), 100)
1224 self.assertEqual(len(os.urandom(1000)), 1000)
1225
1226 def test_urandom_value(self):
1227 data1 = os.urandom(16)
1228 data2 = os.urandom(16)
1229 self.assertNotEqual(data1, data2)
1230
1231 def get_urandom_subprocess(self, count):
1232 code = '\n'.join((
1233 'import os, sys',
1234 'data = os.urandom(%s)' % count,
1235 'sys.stdout.buffer.write(data)',
1236 'sys.stdout.buffer.flush()'))
1237 out = assert_python_ok('-c', code)
1238 stdout = out[1]
1239 self.assertEqual(len(stdout), 16)
1240 return stdout
1241
1242 def test_urandom_subprocess(self):
1243 data1 = self.get_urandom_subprocess(16)
1244 data2 = self.get_urandom_subprocess(16)
1245 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001246
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001247
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001248# os.urandom() doesn't use a file descriptor when it is implemented with the
1249# getentropy() function, the getrandom() function or the getrandom() syscall
1250OS_URANDOM_DONT_USE_FD = (
1251 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1252 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1253 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001254
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001255@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1256 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001257class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001258 @unittest.skipUnless(resource, "test requires the resource module")
1259 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001260 # Check urandom() failing when it is not able to open /dev/random.
1261 # We spawn a new process to make the test more robust (if getrlimit()
1262 # failed to restore the file descriptor limit after this, the whole
1263 # test suite would crash; this actually happened on the OS X Tiger
1264 # buildbot).
1265 code = """if 1:
1266 import errno
1267 import os
1268 import resource
1269
1270 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1271 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1272 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001273 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001274 except OSError as e:
1275 assert e.errno == errno.EMFILE, e.errno
1276 else:
1277 raise AssertionError("OSError not raised")
1278 """
1279 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001280
Antoine Pitroue472aea2014-04-26 14:33:03 +02001281 def test_urandom_fd_closed(self):
1282 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1283 # closed.
1284 code = """if 1:
1285 import os
1286 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001287 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001288 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001289 with test.support.SuppressCrashReport():
1290 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001291 sys.stdout.buffer.write(os.urandom(4))
1292 """
1293 rc, out, err = assert_python_ok('-Sc', code)
1294
1295 def test_urandom_fd_reopened(self):
1296 # Issue #21207: urandom() should detect its fd to /dev/urandom
1297 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001298 self.addCleanup(support.unlink, support.TESTFN)
1299 create_file(support.TESTFN, b"x" * 256)
1300
Antoine Pitroue472aea2014-04-26 14:33:03 +02001301 code = """if 1:
1302 import os
1303 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001304 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001305 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001306 with test.support.SuppressCrashReport():
1307 for fd in range(3, 256):
1308 try:
1309 os.close(fd)
1310 except OSError:
1311 pass
1312 else:
1313 # Found the urandom fd (XXX hopefully)
1314 break
1315 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001316 with open({TESTFN!r}, 'rb') as f:
1317 os.dup2(f.fileno(), fd)
1318 sys.stdout.buffer.write(os.urandom(4))
1319 sys.stdout.buffer.write(os.urandom(4))
1320 """.format(TESTFN=support.TESTFN)
1321 rc, out, err = assert_python_ok('-Sc', code)
1322 self.assertEqual(len(out), 8)
1323 self.assertNotEqual(out[0:4], out[4:8])
1324 rc, out2, err2 = assert_python_ok('-Sc', code)
1325 self.assertEqual(len(out2), 8)
1326 self.assertNotEqual(out2, out)
1327
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001328
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001329@contextlib.contextmanager
1330def _execvpe_mockup(defpath=None):
1331 """
1332 Stubs out execv and execve functions when used as context manager.
1333 Records exec calls. The mock execv and execve functions always raise an
1334 exception as they would normally never return.
1335 """
1336 # A list of tuples containing (function name, first arg, args)
1337 # of calls to execv or execve that have been made.
1338 calls = []
1339
1340 def mock_execv(name, *args):
1341 calls.append(('execv', name, args))
1342 raise RuntimeError("execv called")
1343
1344 def mock_execve(name, *args):
1345 calls.append(('execve', name, args))
1346 raise OSError(errno.ENOTDIR, "execve called")
1347
1348 try:
1349 orig_execv = os.execv
1350 orig_execve = os.execve
1351 orig_defpath = os.defpath
1352 os.execv = mock_execv
1353 os.execve = mock_execve
1354 if defpath is not None:
1355 os.defpath = defpath
1356 yield calls
1357 finally:
1358 os.execv = orig_execv
1359 os.execve = orig_execve
1360 os.defpath = orig_defpath
1361
Guido van Rossume7ba4952007-06-06 23:52:48 +00001362class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001363 @unittest.skipIf(USING_LINUXTHREADS,
1364 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001365 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001366 self.assertRaises(OSError, os.execvpe, 'no such app-',
1367 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001368
Thomas Heller6790d602007-08-30 17:15:14 +00001369 def test_execvpe_with_bad_arglist(self):
1370 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1371
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001372 @unittest.skipUnless(hasattr(os, '_execvpe'),
1373 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001374 def _test_internal_execvpe(self, test_type):
1375 program_path = os.sep + 'absolutepath'
1376 if test_type is bytes:
1377 program = b'executable'
1378 fullpath = os.path.join(os.fsencode(program_path), program)
1379 native_fullpath = fullpath
1380 arguments = [b'progname', 'arg1', 'arg2']
1381 else:
1382 program = 'executable'
1383 arguments = ['progname', 'arg1', 'arg2']
1384 fullpath = os.path.join(program_path, program)
1385 if os.name != "nt":
1386 native_fullpath = os.fsencode(fullpath)
1387 else:
1388 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001389 env = {'spam': 'beans'}
1390
Victor Stinnerb745a742010-05-18 17:17:23 +00001391 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001392 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001393 self.assertRaises(RuntimeError,
1394 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001395 self.assertEqual(len(calls), 1)
1396 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1397
Victor Stinnerb745a742010-05-18 17:17:23 +00001398 # test os._execvpe() with a relative path:
1399 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001400 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001401 self.assertRaises(OSError,
1402 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001403 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001404 self.assertSequenceEqual(calls[0],
1405 ('execve', native_fullpath, (arguments, env)))
1406
1407 # test os._execvpe() with a relative path:
1408 # os.get_exec_path() reads the 'PATH' variable
1409 with _execvpe_mockup() as calls:
1410 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001411 if test_type is bytes:
1412 env_path[b'PATH'] = program_path
1413 else:
1414 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001415 self.assertRaises(OSError,
1416 os._execvpe, program, arguments, env=env_path)
1417 self.assertEqual(len(calls), 1)
1418 self.assertSequenceEqual(calls[0],
1419 ('execve', native_fullpath, (arguments, env_path)))
1420
1421 def test_internal_execvpe_str(self):
1422 self._test_internal_execvpe(str)
1423 if os.name != "nt":
1424 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001425
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001426
Serhiy Storchaka43767632013-11-03 21:31:38 +02001427@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001428class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001429 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001430 try:
1431 os.stat(support.TESTFN)
1432 except FileNotFoundError:
1433 exists = False
1434 except OSError as exc:
1435 exists = True
1436 self.fail("file %s must not exist; os.stat failed with %s"
1437 % (support.TESTFN, exc))
1438 else:
1439 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001440
Thomas Wouters477c8d52006-05-27 19:21:47 +00001441 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001442 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001443
1444 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001445 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001446
1447 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001448 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001449
1450 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001451 self.addCleanup(support.unlink, support.TESTFN)
1452
Victor Stinnere77c9742016-03-25 10:28:23 +01001453 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001454 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001455
1456 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001457 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001458
Thomas Wouters477c8d52006-05-27 19:21:47 +00001459 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001460 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001461
Victor Stinnere77c9742016-03-25 10:28:23 +01001462
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001463class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001464 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001465 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1466 #singles.append("close")
1467 #We omit close because it doesn'r raise an exception on some platforms
1468 def get_single(f):
1469 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001470 if hasattr(os, f):
1471 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001472 return helper
1473 for f in singles:
1474 locals()["test_"+f] = get_single(f)
1475
Benjamin Peterson7522c742009-01-19 21:00:09 +00001476 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001477 try:
1478 f(support.make_bad_fd(), *args)
1479 except OSError as e:
1480 self.assertEqual(e.errno, errno.EBADF)
1481 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001482 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001483 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001484
Serhiy Storchaka43767632013-11-03 21:31:38 +02001485 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001486 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001487 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001488
Serhiy Storchaka43767632013-11-03 21:31:38 +02001489 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001490 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001491 fd = support.make_bad_fd()
1492 # Make sure none of the descriptors we are about to close are
1493 # currently valid (issue 6542).
1494 for i in range(10):
1495 try: os.fstat(fd+i)
1496 except OSError:
1497 pass
1498 else:
1499 break
1500 if i < 2:
1501 raise unittest.SkipTest(
1502 "Unable to acquire a range of invalid file descriptors")
1503 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001504
Serhiy Storchaka43767632013-11-03 21:31:38 +02001505 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001506 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001507 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001508
Serhiy Storchaka43767632013-11-03 21:31:38 +02001509 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001510 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001511 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001512
Serhiy Storchaka43767632013-11-03 21:31:38 +02001513 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001514 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001515 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001516
Serhiy Storchaka43767632013-11-03 21:31:38 +02001517 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001518 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001519 self.check(os.pathconf, "PC_NAME_MAX")
1520 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001521
Serhiy Storchaka43767632013-11-03 21:31:38 +02001522 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001523 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001524 self.check(os.truncate, 0)
1525 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001526
Serhiy Storchaka43767632013-11-03 21:31:38 +02001527 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001528 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001529 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001530
Serhiy Storchaka43767632013-11-03 21:31:38 +02001531 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001532 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001533 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001534
Victor Stinner57ddf782014-01-08 15:21:28 +01001535 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1536 def test_readv(self):
1537 buf = bytearray(10)
1538 self.check(os.readv, [buf])
1539
Serhiy Storchaka43767632013-11-03 21:31:38 +02001540 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001541 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001542 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001543
Serhiy Storchaka43767632013-11-03 21:31:38 +02001544 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001545 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001546 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001547
Victor Stinner57ddf782014-01-08 15:21:28 +01001548 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1549 def test_writev(self):
1550 self.check(os.writev, [b'abc'])
1551
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001552 def test_inheritable(self):
1553 self.check(os.get_inheritable)
1554 self.check(os.set_inheritable, True)
1555
1556 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1557 'needs os.get_blocking() and os.set_blocking()')
1558 def test_blocking(self):
1559 self.check(os.get_blocking)
1560 self.check(os.set_blocking, True)
1561
Brian Curtin1b9df392010-11-24 20:24:31 +00001562
1563class LinkTests(unittest.TestCase):
1564 def setUp(self):
1565 self.file1 = support.TESTFN
1566 self.file2 = os.path.join(support.TESTFN + "2")
1567
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001568 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001569 for file in (self.file1, self.file2):
1570 if os.path.exists(file):
1571 os.unlink(file)
1572
Brian Curtin1b9df392010-11-24 20:24:31 +00001573 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001574 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001575
Victor Stinner923590e2016-03-24 09:11:48 +01001576 with bytes_filename_warn(False):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001577 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001578 with open(file1, "r") as f1, open(file2, "r") as f2:
1579 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1580
1581 def test_link(self):
1582 self._test_link(self.file1, self.file2)
1583
1584 def test_link_bytes(self):
1585 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1586 bytes(self.file2, sys.getfilesystemencoding()))
1587
Brian Curtinf498b752010-11-30 15:54:04 +00001588 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001589 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001590 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001591 except UnicodeError:
1592 raise unittest.SkipTest("Unable to encode for this platform.")
1593
Brian Curtinf498b752010-11-30 15:54:04 +00001594 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001595 self.file2 = self.file1 + "2"
1596 self._test_link(self.file1, self.file2)
1597
Serhiy Storchaka43767632013-11-03 21:31:38 +02001598@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1599class PosixUidGidTests(unittest.TestCase):
1600 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1601 def test_setuid(self):
1602 if os.getuid() != 0:
1603 self.assertRaises(OSError, os.setuid, 0)
1604 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001605
Serhiy Storchaka43767632013-11-03 21:31:38 +02001606 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1607 def test_setgid(self):
1608 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1609 self.assertRaises(OSError, os.setgid, 0)
1610 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001611
Serhiy Storchaka43767632013-11-03 21:31:38 +02001612 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1613 def test_seteuid(self):
1614 if os.getuid() != 0:
1615 self.assertRaises(OSError, os.seteuid, 0)
1616 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001617
Serhiy Storchaka43767632013-11-03 21:31:38 +02001618 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1619 def test_setegid(self):
1620 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1621 self.assertRaises(OSError, os.setegid, 0)
1622 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001623
Serhiy Storchaka43767632013-11-03 21:31:38 +02001624 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1625 def test_setreuid(self):
1626 if os.getuid() != 0:
1627 self.assertRaises(OSError, os.setreuid, 0, 0)
1628 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1629 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001630
Serhiy Storchaka43767632013-11-03 21:31:38 +02001631 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1632 def test_setreuid_neg1(self):
1633 # Needs to accept -1. We run this in a subprocess to avoid
1634 # altering the test runner's process state (issue8045).
1635 subprocess.check_call([
1636 sys.executable, '-c',
1637 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001638
Serhiy Storchaka43767632013-11-03 21:31:38 +02001639 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1640 def test_setregid(self):
1641 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1642 self.assertRaises(OSError, os.setregid, 0, 0)
1643 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1644 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001645
Serhiy Storchaka43767632013-11-03 21:31:38 +02001646 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1647 def test_setregid_neg1(self):
1648 # Needs to accept -1. We run this in a subprocess to avoid
1649 # altering the test runner's process state (issue8045).
1650 subprocess.check_call([
1651 sys.executable, '-c',
1652 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001653
Serhiy Storchaka43767632013-11-03 21:31:38 +02001654@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1655class Pep383Tests(unittest.TestCase):
1656 def setUp(self):
1657 if support.TESTFN_UNENCODABLE:
1658 self.dir = support.TESTFN_UNENCODABLE
1659 elif support.TESTFN_NONASCII:
1660 self.dir = support.TESTFN_NONASCII
1661 else:
1662 self.dir = support.TESTFN
1663 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001664
Serhiy Storchaka43767632013-11-03 21:31:38 +02001665 bytesfn = []
1666 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001667 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001668 fn = os.fsencode(fn)
1669 except UnicodeEncodeError:
1670 return
1671 bytesfn.append(fn)
1672 add_filename(support.TESTFN_UNICODE)
1673 if support.TESTFN_UNENCODABLE:
1674 add_filename(support.TESTFN_UNENCODABLE)
1675 if support.TESTFN_NONASCII:
1676 add_filename(support.TESTFN_NONASCII)
1677 if not bytesfn:
1678 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001679
Serhiy Storchaka43767632013-11-03 21:31:38 +02001680 self.unicodefn = set()
1681 os.mkdir(self.dir)
1682 try:
1683 for fn in bytesfn:
1684 support.create_empty_file(os.path.join(self.bdir, fn))
1685 fn = os.fsdecode(fn)
1686 if fn in self.unicodefn:
1687 raise ValueError("duplicate filename")
1688 self.unicodefn.add(fn)
1689 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001690 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001691 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001692
Serhiy Storchaka43767632013-11-03 21:31:38 +02001693 def tearDown(self):
1694 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001695
Serhiy Storchaka43767632013-11-03 21:31:38 +02001696 def test_listdir(self):
1697 expected = self.unicodefn
1698 found = set(os.listdir(self.dir))
1699 self.assertEqual(found, expected)
1700 # test listdir without arguments
1701 current_directory = os.getcwd()
1702 try:
1703 os.chdir(os.sep)
1704 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1705 finally:
1706 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001707
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708 def test_open(self):
1709 for fn in self.unicodefn:
1710 f = open(os.path.join(self.dir, fn), 'rb')
1711 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001712
Serhiy Storchaka43767632013-11-03 21:31:38 +02001713 @unittest.skipUnless(hasattr(os, 'statvfs'),
1714 "need os.statvfs()")
1715 def test_statvfs(self):
1716 # issue #9645
1717 for fn in self.unicodefn:
1718 # should not fail with file not found error
1719 fullname = os.path.join(self.dir, fn)
1720 os.statvfs(fullname)
1721
1722 def test_stat(self):
1723 for fn in self.unicodefn:
1724 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001725
Brian Curtineb24d742010-04-12 17:16:38 +00001726@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1727class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001728 def _kill(self, sig):
1729 # Start sys.executable as a subprocess and communicate from the
1730 # subprocess to the parent that the interpreter is ready. When it
1731 # becomes ready, send *sig* via os.kill to the subprocess and check
1732 # that the return code is equal to *sig*.
1733 import ctypes
1734 from ctypes import wintypes
1735 import msvcrt
1736
1737 # Since we can't access the contents of the process' stdout until the
1738 # process has exited, use PeekNamedPipe to see what's inside stdout
1739 # without waiting. This is done so we can tell that the interpreter
1740 # is started and running at a point where it could handle a signal.
1741 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1742 PeekNamedPipe.restype = wintypes.BOOL
1743 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1744 ctypes.POINTER(ctypes.c_char), # stdout buf
1745 wintypes.DWORD, # Buffer size
1746 ctypes.POINTER(wintypes.DWORD), # bytes read
1747 ctypes.POINTER(wintypes.DWORD), # bytes avail
1748 ctypes.POINTER(wintypes.DWORD)) # bytes left
1749 msg = "running"
1750 proc = subprocess.Popen([sys.executable, "-c",
1751 "import sys;"
1752 "sys.stdout.write('{}');"
1753 "sys.stdout.flush();"
1754 "input()".format(msg)],
1755 stdout=subprocess.PIPE,
1756 stderr=subprocess.PIPE,
1757 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001758 self.addCleanup(proc.stdout.close)
1759 self.addCleanup(proc.stderr.close)
1760 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001761
1762 count, max = 0, 100
1763 while count < max and proc.poll() is None:
1764 # Create a string buffer to store the result of stdout from the pipe
1765 buf = ctypes.create_string_buffer(len(msg))
1766 # Obtain the text currently in proc.stdout
1767 # Bytes read/avail/left are left as NULL and unused
1768 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1769 buf, ctypes.sizeof(buf), None, None, None)
1770 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1771 if buf.value:
1772 self.assertEqual(msg, buf.value.decode())
1773 break
1774 time.sleep(0.1)
1775 count += 1
1776 else:
1777 self.fail("Did not receive communication from the subprocess")
1778
Brian Curtineb24d742010-04-12 17:16:38 +00001779 os.kill(proc.pid, sig)
1780 self.assertEqual(proc.wait(), sig)
1781
1782 def test_kill_sigterm(self):
1783 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001784 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001785
1786 def test_kill_int(self):
1787 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001788 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001789
1790 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001791 tagname = "test_os_%s" % uuid.uuid1()
1792 m = mmap.mmap(-1, 1, tagname)
1793 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001794 # Run a script which has console control handling enabled.
1795 proc = subprocess.Popen([sys.executable,
1796 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001797 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001798 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1799 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001800 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001801 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001802 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001803 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001804 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001805 count += 1
1806 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001807 # Forcefully kill the process if we weren't able to signal it.
1808 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001809 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001810 os.kill(proc.pid, event)
1811 # proc.send_signal(event) could also be done here.
1812 # Allow time for the signal to be passed and the process to exit.
1813 time.sleep(0.5)
1814 if not proc.poll():
1815 # Forcefully kill the process if we weren't able to signal it.
1816 os.kill(proc.pid, signal.SIGINT)
1817 self.fail("subprocess did not stop on {}".format(name))
1818
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001819 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001820 def test_CTRL_C_EVENT(self):
1821 from ctypes import wintypes
1822 import ctypes
1823
1824 # Make a NULL value by creating a pointer with no argument.
1825 NULL = ctypes.POINTER(ctypes.c_int)()
1826 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1827 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1828 wintypes.BOOL)
1829 SetConsoleCtrlHandler.restype = wintypes.BOOL
1830
1831 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001832 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001833 # by subprocesses.
1834 SetConsoleCtrlHandler(NULL, 0)
1835
1836 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1837
1838 def test_CTRL_BREAK_EVENT(self):
1839 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1840
1841
Brian Curtind40e6f72010-07-08 21:39:08 +00001842@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001843class Win32ListdirTests(unittest.TestCase):
1844 """Test listdir on Windows."""
1845
1846 def setUp(self):
1847 self.created_paths = []
1848 for i in range(2):
1849 dir_name = 'SUB%d' % i
1850 dir_path = os.path.join(support.TESTFN, dir_name)
1851 file_name = 'FILE%d' % i
1852 file_path = os.path.join(support.TESTFN, file_name)
1853 os.makedirs(dir_path)
1854 with open(file_path, 'w') as f:
1855 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1856 self.created_paths.extend([dir_name, file_name])
1857 self.created_paths.sort()
1858
1859 def tearDown(self):
1860 shutil.rmtree(support.TESTFN)
1861
1862 def test_listdir_no_extended_path(self):
1863 """Test when the path is not an "extended" path."""
1864 # unicode
1865 self.assertEqual(
1866 sorted(os.listdir(support.TESTFN)),
1867 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001868
Tim Golden781bbeb2013-10-25 20:24:06 +01001869 # bytes
Victor Stinner923590e2016-03-24 09:11:48 +01001870 with bytes_filename_warn(False):
1871 self.assertEqual(
1872 sorted(os.listdir(os.fsencode(support.TESTFN))),
1873 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001874
1875 def test_listdir_extended_path(self):
1876 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001877 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001878 # unicode
1879 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1880 self.assertEqual(
1881 sorted(os.listdir(path)),
1882 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001883
Tim Golden781bbeb2013-10-25 20:24:06 +01001884 # bytes
Victor Stinner923590e2016-03-24 09:11:48 +01001885 with bytes_filename_warn(False):
1886 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1887 self.assertEqual(
1888 sorted(os.listdir(path)),
1889 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001890
1891
1892@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001893@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001894class Win32SymlinkTests(unittest.TestCase):
1895 filelink = 'filelinktest'
1896 filelink_target = os.path.abspath(__file__)
1897 dirlink = 'dirlinktest'
1898 dirlink_target = os.path.dirname(filelink_target)
1899 missing_link = 'missing link'
1900
1901 def setUp(self):
1902 assert os.path.exists(self.dirlink_target)
1903 assert os.path.exists(self.filelink_target)
1904 assert not os.path.exists(self.dirlink)
1905 assert not os.path.exists(self.filelink)
1906 assert not os.path.exists(self.missing_link)
1907
1908 def tearDown(self):
1909 if os.path.exists(self.filelink):
1910 os.remove(self.filelink)
1911 if os.path.exists(self.dirlink):
1912 os.rmdir(self.dirlink)
1913 if os.path.lexists(self.missing_link):
1914 os.remove(self.missing_link)
1915
1916 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001917 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001918 self.assertTrue(os.path.exists(self.dirlink))
1919 self.assertTrue(os.path.isdir(self.dirlink))
1920 self.assertTrue(os.path.islink(self.dirlink))
1921 self.check_stat(self.dirlink, self.dirlink_target)
1922
1923 def test_file_link(self):
1924 os.symlink(self.filelink_target, self.filelink)
1925 self.assertTrue(os.path.exists(self.filelink))
1926 self.assertTrue(os.path.isfile(self.filelink))
1927 self.assertTrue(os.path.islink(self.filelink))
1928 self.check_stat(self.filelink, self.filelink_target)
1929
1930 def _create_missing_dir_link(self):
1931 'Create a "directory" link to a non-existent target'
1932 linkname = self.missing_link
1933 if os.path.lexists(linkname):
1934 os.remove(linkname)
1935 target = r'c:\\target does not exist.29r3c740'
1936 assert not os.path.exists(target)
1937 target_is_dir = True
1938 os.symlink(target, linkname, target_is_dir)
1939
1940 def test_remove_directory_link_to_missing_target(self):
1941 self._create_missing_dir_link()
1942 # For compatibility with Unix, os.remove will check the
1943 # directory status and call RemoveDirectory if the symlink
1944 # was created with target_is_dir==True.
1945 os.remove(self.missing_link)
1946
1947 @unittest.skip("currently fails; consider for improvement")
1948 def test_isdir_on_directory_link_to_missing_target(self):
1949 self._create_missing_dir_link()
1950 # consider having isdir return true for directory links
1951 self.assertTrue(os.path.isdir(self.missing_link))
1952
1953 @unittest.skip("currently fails; consider for improvement")
1954 def test_rmdir_on_directory_link_to_missing_target(self):
1955 self._create_missing_dir_link()
1956 # consider allowing rmdir to remove directory links
1957 os.rmdir(self.missing_link)
1958
1959 def check_stat(self, link, target):
1960 self.assertEqual(os.stat(link), os.stat(target))
1961 self.assertNotEqual(os.lstat(link), os.stat(link))
1962
Brian Curtind25aef52011-06-13 15:16:04 -05001963 bytes_link = os.fsencode(link)
Victor Stinner923590e2016-03-24 09:11:48 +01001964 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001965 self.assertEqual(os.stat(bytes_link), os.stat(target))
Victor Stinner923590e2016-03-24 09:11:48 +01001966 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001967 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001968
1969 def test_12084(self):
1970 level1 = os.path.abspath(support.TESTFN)
1971 level2 = os.path.join(level1, "level2")
1972 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01001973 self.addCleanup(support.rmtree, level1)
1974
1975 os.mkdir(level1)
1976 os.mkdir(level2)
1977 os.mkdir(level3)
1978
1979 file1 = os.path.abspath(os.path.join(level1, "file1"))
1980 create_file(file1)
1981
1982 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05001983 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01001984 os.chdir(level2)
1985 link = os.path.join(level2, "link")
1986 os.symlink(os.path.relpath(file1), "link")
1987 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05001988
Victor Stinnerae39d232016-03-24 17:12:55 +01001989 # Check os.stat calls from the same dir as the link
1990 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05001991
Victor Stinnerae39d232016-03-24 17:12:55 +01001992 # Check os.stat calls from a dir below the link
1993 os.chdir(level1)
1994 self.assertEqual(os.stat(file1),
1995 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05001996
Victor Stinnerae39d232016-03-24 17:12:55 +01001997 # Check os.stat calls from a dir above the link
1998 os.chdir(level3)
1999 self.assertEqual(os.stat(file1),
2000 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002001 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002002 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002003
Brian Curtind40e6f72010-07-08 21:39:08 +00002004
Tim Golden0321cf22014-05-05 19:46:17 +01002005@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2006class Win32JunctionTests(unittest.TestCase):
2007 junction = 'junctiontest'
2008 junction_target = os.path.dirname(os.path.abspath(__file__))
2009
2010 def setUp(self):
2011 assert os.path.exists(self.junction_target)
2012 assert not os.path.exists(self.junction)
2013
2014 def tearDown(self):
2015 if os.path.exists(self.junction):
2016 # os.rmdir delegates to Windows' RemoveDirectoryW,
2017 # which removes junction points safely.
2018 os.rmdir(self.junction)
2019
2020 def test_create_junction(self):
2021 _winapi.CreateJunction(self.junction_target, self.junction)
2022 self.assertTrue(os.path.exists(self.junction))
2023 self.assertTrue(os.path.isdir(self.junction))
2024
2025 # Junctions are not recognized as links.
2026 self.assertFalse(os.path.islink(self.junction))
2027
2028 def test_unlink_removes_junction(self):
2029 _winapi.CreateJunction(self.junction_target, self.junction)
2030 self.assertTrue(os.path.exists(self.junction))
2031
2032 os.unlink(self.junction)
2033 self.assertFalse(os.path.exists(self.junction))
2034
2035
Jason R. Coombs3a092862013-05-27 23:21:28 -04002036@support.skip_unless_symlink
2037class NonLocalSymlinkTests(unittest.TestCase):
2038
2039 def setUp(self):
2040 """
2041 Create this structure:
2042
2043 base
2044 \___ some_dir
2045 """
2046 os.makedirs('base/some_dir')
2047
2048 def tearDown(self):
2049 shutil.rmtree('base')
2050
2051 def test_directory_link_nonlocal(self):
2052 """
2053 The symlink target should resolve relative to the link, not relative
2054 to the current directory.
2055
2056 Then, link base/some_link -> base/some_dir and ensure that some_link
2057 is resolved as a directory.
2058
2059 In issue13772, it was discovered that directory detection failed if
2060 the symlink target was not specified relative to the current
2061 directory, which was a defect in the implementation.
2062 """
2063 src = os.path.join('base', 'some_link')
2064 os.symlink('some_dir', src)
2065 assert os.path.isdir(src)
2066
2067
Victor Stinnere8d51452010-08-19 01:05:19 +00002068class FSEncodingTests(unittest.TestCase):
2069 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002070 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2071 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002072
Victor Stinnere8d51452010-08-19 01:05:19 +00002073 def test_identity(self):
2074 # assert fsdecode(fsencode(x)) == x
2075 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2076 try:
2077 bytesfn = os.fsencode(fn)
2078 except UnicodeEncodeError:
2079 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002080 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002081
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002082
Brett Cannonefb00c02012-02-29 18:31:31 -05002083
2084class DeviceEncodingTests(unittest.TestCase):
2085
2086 def test_bad_fd(self):
2087 # Return None when an fd doesn't actually exist.
2088 self.assertIsNone(os.device_encoding(123456))
2089
Philip Jenveye308b7c2012-02-29 16:16:15 -08002090 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2091 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002092 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002093 def test_device_encoding(self):
2094 encoding = os.device_encoding(0)
2095 self.assertIsNotNone(encoding)
2096 self.assertTrue(codecs.lookup(encoding))
2097
2098
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002099class PidTests(unittest.TestCase):
2100 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2101 def test_getppid(self):
2102 p = subprocess.Popen([sys.executable, '-c',
2103 'import os; print(os.getppid())'],
2104 stdout=subprocess.PIPE)
2105 stdout, _ = p.communicate()
2106 # We are the parent of our subprocess
2107 self.assertEqual(int(stdout), os.getpid())
2108
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002109 def test_waitpid(self):
2110 args = [sys.executable, '-c', 'pass']
2111 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2112 status = os.waitpid(pid, 0)
2113 self.assertEqual(status, (pid, 0))
2114
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002115
Brian Curtin0151b8e2010-09-24 13:43:43 +00002116# The introduction of this TestCase caused at least two different errors on
2117# *nix buildbots. Temporarily skip this to let the buildbots move along.
2118@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002119@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2120class LoginTests(unittest.TestCase):
2121 def test_getlogin(self):
2122 user_name = os.getlogin()
2123 self.assertNotEqual(len(user_name), 0)
2124
2125
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002126@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2127 "needs os.getpriority and os.setpriority")
2128class ProgramPriorityTests(unittest.TestCase):
2129 """Tests for os.getpriority() and os.setpriority()."""
2130
2131 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002132
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002133 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2134 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2135 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002136 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2137 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002138 raise unittest.SkipTest("unable to reliably test setpriority "
2139 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002140 else:
2141 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002142 finally:
2143 try:
2144 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2145 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002146 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002147 raise
2148
2149
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002150if threading is not None:
2151 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002152
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002153 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002154
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002155 def __init__(self, conn):
2156 asynchat.async_chat.__init__(self, conn)
2157 self.in_buffer = []
2158 self.closed = False
2159 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002160
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002161 def handle_read(self):
2162 data = self.recv(4096)
2163 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002164
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002165 def get_data(self):
2166 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002167
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002168 def handle_close(self):
2169 self.close()
2170 self.closed = True
2171
2172 def handle_error(self):
2173 raise
2174
2175 def __init__(self, address):
2176 threading.Thread.__init__(self)
2177 asyncore.dispatcher.__init__(self)
2178 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2179 self.bind(address)
2180 self.listen(5)
2181 self.host, self.port = self.socket.getsockname()[:2]
2182 self.handler_instance = None
2183 self._active = False
2184 self._active_lock = threading.Lock()
2185
2186 # --- public API
2187
2188 @property
2189 def running(self):
2190 return self._active
2191
2192 def start(self):
2193 assert not self.running
2194 self.__flag = threading.Event()
2195 threading.Thread.start(self)
2196 self.__flag.wait()
2197
2198 def stop(self):
2199 assert self.running
2200 self._active = False
2201 self.join()
2202
2203 def wait(self):
2204 # wait for handler connection to be closed, then stop the server
2205 while not getattr(self.handler_instance, "closed", False):
2206 time.sleep(0.001)
2207 self.stop()
2208
2209 # --- internals
2210
2211 def run(self):
2212 self._active = True
2213 self.__flag.set()
2214 while self._active and asyncore.socket_map:
2215 self._active_lock.acquire()
2216 asyncore.loop(timeout=0.001, count=1)
2217 self._active_lock.release()
2218 asyncore.close_all()
2219
2220 def handle_accept(self):
2221 conn, addr = self.accept()
2222 self.handler_instance = self.Handler(conn)
2223
2224 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002225 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002226 handle_read = handle_connect
2227
2228 def writable(self):
2229 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002230
2231 def handle_error(self):
2232 raise
2233
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002234
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002235@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002236@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2237class TestSendfile(unittest.TestCase):
2238
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002239 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002240 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002241 not sys.platform.startswith("solaris") and \
2242 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002243 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2244 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002245
2246 @classmethod
2247 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002248 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002249 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002250
2251 @classmethod
2252 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002253 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002254 support.unlink(support.TESTFN)
2255
2256 def setUp(self):
2257 self.server = SendfileTestServer((support.HOST, 0))
2258 self.server.start()
2259 self.client = socket.socket()
2260 self.client.connect((self.server.host, self.server.port))
2261 self.client.settimeout(1)
2262 # synchronize by waiting for "220 ready" response
2263 self.client.recv(1024)
2264 self.sockno = self.client.fileno()
2265 self.file = open(support.TESTFN, 'rb')
2266 self.fileno = self.file.fileno()
2267
2268 def tearDown(self):
2269 self.file.close()
2270 self.client.close()
2271 if self.server.running:
2272 self.server.stop()
2273
2274 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2275 """A higher level wrapper representing how an application is
2276 supposed to use sendfile().
2277 """
2278 while 1:
2279 try:
2280 if self.SUPPORT_HEADERS_TRAILERS:
2281 return os.sendfile(sock, file, offset, nbytes, headers,
2282 trailers)
2283 else:
2284 return os.sendfile(sock, file, offset, nbytes)
2285 except OSError as err:
2286 if err.errno == errno.ECONNRESET:
2287 # disconnected
2288 raise
2289 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2290 # we have to retry send data
2291 continue
2292 else:
2293 raise
2294
2295 def test_send_whole_file(self):
2296 # normal send
2297 total_sent = 0
2298 offset = 0
2299 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002300 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002301 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2302 if sent == 0:
2303 break
2304 offset += sent
2305 total_sent += sent
2306 self.assertTrue(sent <= nbytes)
2307 self.assertEqual(offset, total_sent)
2308
2309 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002310 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002311 self.client.close()
2312 self.server.wait()
2313 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002314 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002315 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002316
2317 def test_send_at_certain_offset(self):
2318 # start sending a file at a certain offset
2319 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002320 offset = len(self.DATA) // 2
2321 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002322 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002323 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002324 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2325 if sent == 0:
2326 break
2327 offset += sent
2328 total_sent += sent
2329 self.assertTrue(sent <= nbytes)
2330
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002331 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002332 self.client.close()
2333 self.server.wait()
2334 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002335 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002336 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002337 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002338 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002339
2340 def test_offset_overflow(self):
2341 # specify an offset > file size
2342 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002343 try:
2344 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2345 except OSError as e:
2346 # Solaris can raise EINVAL if offset >= file length, ignore.
2347 if e.errno != errno.EINVAL:
2348 raise
2349 else:
2350 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002351 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002352 self.client.close()
2353 self.server.wait()
2354 data = self.server.handler_instance.get_data()
2355 self.assertEqual(data, b'')
2356
2357 def test_invalid_offset(self):
2358 with self.assertRaises(OSError) as cm:
2359 os.sendfile(self.sockno, self.fileno, -1, 4096)
2360 self.assertEqual(cm.exception.errno, errno.EINVAL)
2361
Martin Panterbf19d162015-09-09 01:01:13 +00002362 def test_keywords(self):
2363 # Keyword arguments should be supported
2364 os.sendfile(out=self.sockno, offset=0, count=4096,
2365 **{'in': self.fileno})
2366 if self.SUPPORT_HEADERS_TRAILERS:
2367 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002368 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002369
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002370 # --- headers / trailers tests
2371
Serhiy Storchaka43767632013-11-03 21:31:38 +02002372 @requires_headers_trailers
2373 def test_headers(self):
2374 total_sent = 0
2375 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2376 headers=[b"x" * 512])
2377 total_sent += sent
2378 offset = 4096
2379 nbytes = 4096
2380 while 1:
2381 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2382 offset, nbytes)
2383 if sent == 0:
2384 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002385 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002386 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002387
Serhiy Storchaka43767632013-11-03 21:31:38 +02002388 expected_data = b"x" * 512 + self.DATA
2389 self.assertEqual(total_sent, len(expected_data))
2390 self.client.close()
2391 self.server.wait()
2392 data = self.server.handler_instance.get_data()
2393 self.assertEqual(hash(data), hash(expected_data))
2394
2395 @requires_headers_trailers
2396 def test_trailers(self):
2397 TESTFN2 = support.TESTFN + "2"
2398 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002399
2400 self.addCleanup(support.unlink, TESTFN2)
2401 create_file(TESTFN2, file_data)
2402
2403 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002404 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2405 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002406 self.client.close()
2407 self.server.wait()
2408 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002409 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002410
Serhiy Storchaka43767632013-11-03 21:31:38 +02002411 @requires_headers_trailers
2412 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2413 'test needs os.SF_NODISKIO')
2414 def test_flags(self):
2415 try:
2416 os.sendfile(self.sockno, self.fileno, 0, 4096,
2417 flags=os.SF_NODISKIO)
2418 except OSError as err:
2419 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2420 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002421
2422
Larry Hastings9cf065c2012-06-22 16:30:09 -07002423def supports_extended_attributes():
2424 if not hasattr(os, "setxattr"):
2425 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002426
Larry Hastings9cf065c2012-06-22 16:30:09 -07002427 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002428 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002429 try:
2430 os.setxattr(fp.fileno(), b"user.test", b"")
2431 except OSError:
2432 return False
2433 finally:
2434 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002435
2436 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002437
2438
2439@unittest.skipUnless(supports_extended_attributes(),
2440 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002441# Kernels < 2.6.39 don't respect setxattr flags.
2442@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002443class ExtendedAttributeTests(unittest.TestCase):
2444
Larry Hastings9cf065c2012-06-22 16:30:09 -07002445 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002446 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002447 self.addCleanup(support.unlink, fn)
2448 create_file(fn)
2449
Benjamin Peterson799bd802011-08-31 22:15:17 -04002450 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002451 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002452 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002453
Victor Stinnerf12e5062011-10-16 22:12:03 +02002454 init_xattr = listxattr(fn)
2455 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002456
Larry Hastings9cf065c2012-06-22 16:30:09 -07002457 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002458 xattr = set(init_xattr)
2459 xattr.add("user.test")
2460 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002461 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2462 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2463 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002464
Benjamin Peterson799bd802011-08-31 22:15:17 -04002465 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002466 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002467 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002468
Benjamin Peterson799bd802011-08-31 22:15:17 -04002469 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002470 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002471 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002472
Larry Hastings9cf065c2012-06-22 16:30:09 -07002473 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002474 xattr.add("user.test2")
2475 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002476 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002477
Benjamin Peterson799bd802011-08-31 22:15:17 -04002478 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002479 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002480 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002481
Victor Stinnerf12e5062011-10-16 22:12:03 +02002482 xattr.remove("user.test")
2483 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002484 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2485 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2486 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2487 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002488 many = sorted("user.test{}".format(i) for i in range(100))
2489 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002490 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002491 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002492
Larry Hastings9cf065c2012-06-22 16:30:09 -07002493 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002494 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002495 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002496
2497 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2498 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002499
2500 def test_simple(self):
2501 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2502 os.listxattr)
2503
2504 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002505 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2506 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002507
2508 def test_fds(self):
2509 def getxattr(path, *args):
2510 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002511 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002512 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002513 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002514 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002515 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002516 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002517 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002518 def listxattr(path, *args):
2519 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002520 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002521 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2522
2523
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002524@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2525class Win32DeprecatedBytesAPI(unittest.TestCase):
2526 def test_deprecated(self):
2527 import nt
2528 filename = os.fsencode(support.TESTFN)
Victor Stinner923590e2016-03-24 09:11:48 +01002529 for func, *args in (
2530 (nt._getfullpathname, filename),
2531 (nt._isdir, filename),
2532 (os.access, filename, os.R_OK),
2533 (os.chdir, filename),
2534 (os.chmod, filename, 0o777),
2535 (os.getcwdb,),
2536 (os.link, filename, filename),
2537 (os.listdir, filename),
2538 (os.lstat, filename),
2539 (os.mkdir, filename),
2540 (os.open, filename, os.O_RDONLY),
2541 (os.rename, filename, filename),
2542 (os.rmdir, filename),
2543 (os.startfile, filename),
2544 (os.stat, filename),
2545 (os.unlink, filename),
2546 (os.utime, filename),
2547 ):
2548 with bytes_filename_warn(True):
2549 try:
2550 func(*args)
2551 except OSError:
2552 # ignore OSError, we only care about DeprecationWarning
2553 pass
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002554
Victor Stinner28216442011-11-16 00:34:44 +01002555 @support.skip_unless_symlink
2556 def test_symlink(self):
Victor Stinnere984eb52016-03-25 22:51:17 +01002557 self.addCleanup(support.unlink, support.TESTFN)
2558
Victor Stinner28216442011-11-16 00:34:44 +01002559 filename = os.fsencode(support.TESTFN)
Victor Stinner923590e2016-03-24 09:11:48 +01002560 with bytes_filename_warn(True):
2561 os.symlink(filename, filename)
Victor Stinner28216442011-11-16 00:34:44 +01002562
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002563
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002564@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2565class TermsizeTests(unittest.TestCase):
2566 def test_does_not_crash(self):
2567 """Check if get_terminal_size() returns a meaningful value.
2568
2569 There's no easy portable way to actually check the size of the
2570 terminal, so let's check if it returns something sensible instead.
2571 """
2572 try:
2573 size = os.get_terminal_size()
2574 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002575 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002576 # Under win32 a generic OSError can be thrown if the
2577 # handle cannot be retrieved
2578 self.skipTest("failed to query terminal size")
2579 raise
2580
Antoine Pitroucfade362012-02-08 23:48:59 +01002581 self.assertGreaterEqual(size.columns, 0)
2582 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002583
2584 def test_stty_match(self):
2585 """Check if stty returns the same results
2586
2587 stty actually tests stdin, so get_terminal_size is invoked on
2588 stdin explicitly. If stty succeeded, then get_terminal_size()
2589 should work too.
2590 """
2591 try:
2592 size = subprocess.check_output(['stty', 'size']).decode().split()
2593 except (FileNotFoundError, subprocess.CalledProcessError):
2594 self.skipTest("stty invocation failed")
2595 expected = (int(size[1]), int(size[0])) # reversed order
2596
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002597 try:
2598 actual = os.get_terminal_size(sys.__stdin__.fileno())
2599 except OSError as e:
2600 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2601 # Under win32 a generic OSError can be thrown if the
2602 # handle cannot be retrieved
2603 self.skipTest("failed to query terminal size")
2604 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002605 self.assertEqual(expected, actual)
2606
2607
Victor Stinner292c8352012-10-30 02:17:38 +01002608class OSErrorTests(unittest.TestCase):
2609 def setUp(self):
2610 class Str(str):
2611 pass
2612
Victor Stinnerafe17062012-10-31 22:47:43 +01002613 self.bytes_filenames = []
2614 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002615 if support.TESTFN_UNENCODABLE is not None:
2616 decoded = support.TESTFN_UNENCODABLE
2617 else:
2618 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002619 self.unicode_filenames.append(decoded)
2620 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002621 if support.TESTFN_UNDECODABLE is not None:
2622 encoded = support.TESTFN_UNDECODABLE
2623 else:
2624 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002625 self.bytes_filenames.append(encoded)
2626 self.bytes_filenames.append(memoryview(encoded))
2627
2628 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002629
2630 def test_oserror_filename(self):
2631 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002632 (self.filenames, os.chdir,),
2633 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002634 (self.filenames, os.lstat,),
2635 (self.filenames, os.open, os.O_RDONLY),
2636 (self.filenames, os.rmdir,),
2637 (self.filenames, os.stat,),
2638 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002639 ]
2640 if sys.platform == "win32":
2641 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002642 (self.bytes_filenames, os.rename, b"dst"),
2643 (self.bytes_filenames, os.replace, b"dst"),
2644 (self.unicode_filenames, os.rename, "dst"),
2645 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002646 # Issue #16414: Don't test undecodable names with listdir()
2647 # because of a Windows bug.
2648 #
2649 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2650 # empty list (instead of failing), whereas os.listdir(b'\xff')
2651 # raises a FileNotFoundError. It looks like a Windows bug:
2652 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2653 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2654 # ERROR_PATH_NOT_FOUND (3).
2655 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002656 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002657 else:
2658 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002659 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002660 (self.filenames, os.rename, "dst"),
2661 (self.filenames, os.replace, "dst"),
2662 ))
2663 if hasattr(os, "chown"):
2664 funcs.append((self.filenames, os.chown, 0, 0))
2665 if hasattr(os, "lchown"):
2666 funcs.append((self.filenames, os.lchown, 0, 0))
2667 if hasattr(os, "truncate"):
2668 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002669 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002670 funcs.append((self.filenames, os.chflags, 0))
2671 if hasattr(os, "lchflags"):
2672 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002673 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002674 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002675 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002676 if sys.platform == "win32":
2677 funcs.append((self.bytes_filenames, os.link, b"dst"))
2678 funcs.append((self.unicode_filenames, os.link, "dst"))
2679 else:
2680 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002681 if hasattr(os, "listxattr"):
2682 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002683 (self.filenames, os.listxattr,),
2684 (self.filenames, os.getxattr, "user.test"),
2685 (self.filenames, os.setxattr, "user.test", b'user'),
2686 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002687 ))
2688 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002689 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002690 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002691 if sys.platform == "win32":
2692 funcs.append((self.unicode_filenames, os.readlink,))
2693 else:
2694 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002695
Victor Stinnerafe17062012-10-31 22:47:43 +01002696 for filenames, func, *func_args in funcs:
2697 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002698 try:
Victor Stinner923590e2016-03-24 09:11:48 +01002699 with bytes_filename_warn(False):
2700 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002701 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002702 self.assertIs(err.filename, name)
2703 else:
2704 self.fail("No exception thrown by {}".format(func))
2705
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002706class CPUCountTests(unittest.TestCase):
2707 def test_cpu_count(self):
2708 cpus = os.cpu_count()
2709 if cpus is not None:
2710 self.assertIsInstance(cpus, int)
2711 self.assertGreater(cpus, 0)
2712 else:
2713 self.skipTest("Could not determine the number of CPUs")
2714
Victor Stinnerdaf45552013-08-28 00:53:59 +02002715
2716class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002717 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002718 fd = os.open(__file__, os.O_RDONLY)
2719 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002720 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002721
Victor Stinnerdaf45552013-08-28 00:53:59 +02002722 os.set_inheritable(fd, True)
2723 self.assertEqual(os.get_inheritable(fd), True)
2724
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002725 @unittest.skipIf(fcntl is None, "need fcntl")
2726 def test_get_inheritable_cloexec(self):
2727 fd = os.open(__file__, os.O_RDONLY)
2728 self.addCleanup(os.close, fd)
2729 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002730
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002731 # clear FD_CLOEXEC flag
2732 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2733 flags &= ~fcntl.FD_CLOEXEC
2734 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002735
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002736 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002737
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002738 @unittest.skipIf(fcntl is None, "need fcntl")
2739 def test_set_inheritable_cloexec(self):
2740 fd = os.open(__file__, os.O_RDONLY)
2741 self.addCleanup(os.close, fd)
2742 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2743 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002744
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002745 os.set_inheritable(fd, True)
2746 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2747 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002748
Victor Stinnerdaf45552013-08-28 00:53:59 +02002749 def test_open(self):
2750 fd = os.open(__file__, os.O_RDONLY)
2751 self.addCleanup(os.close, fd)
2752 self.assertEqual(os.get_inheritable(fd), False)
2753
2754 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2755 def test_pipe(self):
2756 rfd, wfd = os.pipe()
2757 self.addCleanup(os.close, rfd)
2758 self.addCleanup(os.close, wfd)
2759 self.assertEqual(os.get_inheritable(rfd), False)
2760 self.assertEqual(os.get_inheritable(wfd), False)
2761
2762 def test_dup(self):
2763 fd1 = os.open(__file__, os.O_RDONLY)
2764 self.addCleanup(os.close, fd1)
2765
2766 fd2 = os.dup(fd1)
2767 self.addCleanup(os.close, fd2)
2768 self.assertEqual(os.get_inheritable(fd2), False)
2769
2770 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2771 def test_dup2(self):
2772 fd = os.open(__file__, os.O_RDONLY)
2773 self.addCleanup(os.close, fd)
2774
2775 # inheritable by default
2776 fd2 = os.open(__file__, os.O_RDONLY)
2777 try:
2778 os.dup2(fd, fd2)
2779 self.assertEqual(os.get_inheritable(fd2), True)
2780 finally:
2781 os.close(fd2)
2782
2783 # force non-inheritable
2784 fd3 = os.open(__file__, os.O_RDONLY)
2785 try:
2786 os.dup2(fd, fd3, inheritable=False)
2787 self.assertEqual(os.get_inheritable(fd3), False)
2788 finally:
2789 os.close(fd3)
2790
2791 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2792 def test_openpty(self):
2793 master_fd, slave_fd = os.openpty()
2794 self.addCleanup(os.close, master_fd)
2795 self.addCleanup(os.close, slave_fd)
2796 self.assertEqual(os.get_inheritable(master_fd), False)
2797 self.assertEqual(os.get_inheritable(slave_fd), False)
2798
2799
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002800@unittest.skipUnless(hasattr(os, 'get_blocking'),
2801 'needs os.get_blocking() and os.set_blocking()')
2802class BlockingTests(unittest.TestCase):
2803 def test_blocking(self):
2804 fd = os.open(__file__, os.O_RDONLY)
2805 self.addCleanup(os.close, fd)
2806 self.assertEqual(os.get_blocking(fd), True)
2807
2808 os.set_blocking(fd, False)
2809 self.assertEqual(os.get_blocking(fd), False)
2810
2811 os.set_blocking(fd, True)
2812 self.assertEqual(os.get_blocking(fd), True)
2813
2814
Yury Selivanov97e2e062014-09-26 12:33:06 -04002815
2816class ExportsTests(unittest.TestCase):
2817 def test_os_all(self):
2818 self.assertIn('open', os.__all__)
2819 self.assertIn('walk', os.__all__)
2820
2821
Victor Stinner6036e442015-03-08 01:58:04 +01002822class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02002823 check_no_resource_warning = support.check_no_resource_warning
2824
Victor Stinner6036e442015-03-08 01:58:04 +01002825 def setUp(self):
2826 self.path = os.path.realpath(support.TESTFN)
2827 self.addCleanup(support.rmtree, self.path)
2828 os.mkdir(self.path)
2829
2830 def create_file(self, name="file.txt"):
2831 filename = os.path.join(self.path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01002832 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01002833 return filename
2834
2835 def get_entries(self, names):
2836 entries = dict((entry.name, entry)
2837 for entry in os.scandir(self.path))
2838 self.assertEqual(sorted(entries.keys()), names)
2839 return entries
2840
2841 def assert_stat_equal(self, stat1, stat2, skip_fields):
2842 if skip_fields:
2843 for attr in dir(stat1):
2844 if not attr.startswith("st_"):
2845 continue
2846 if attr in ("st_dev", "st_ino", "st_nlink"):
2847 continue
2848 self.assertEqual(getattr(stat1, attr),
2849 getattr(stat2, attr),
2850 (stat1, stat2, attr))
2851 else:
2852 self.assertEqual(stat1, stat2)
2853
2854 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
2855 self.assertEqual(entry.name, name)
2856 self.assertEqual(entry.path, os.path.join(self.path, name))
2857 self.assertEqual(entry.inode(),
2858 os.stat(entry.path, follow_symlinks=False).st_ino)
2859
2860 entry_stat = os.stat(entry.path)
2861 self.assertEqual(entry.is_dir(),
2862 stat.S_ISDIR(entry_stat.st_mode))
2863 self.assertEqual(entry.is_file(),
2864 stat.S_ISREG(entry_stat.st_mode))
2865 self.assertEqual(entry.is_symlink(),
2866 os.path.islink(entry.path))
2867
2868 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2869 self.assertEqual(entry.is_dir(follow_symlinks=False),
2870 stat.S_ISDIR(entry_lstat.st_mode))
2871 self.assertEqual(entry.is_file(follow_symlinks=False),
2872 stat.S_ISREG(entry_lstat.st_mode))
2873
2874 self.assert_stat_equal(entry.stat(),
2875 entry_stat,
2876 os.name == 'nt' and not is_symlink)
2877 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2878 entry_lstat,
2879 os.name == 'nt')
2880
2881 def test_attributes(self):
2882 link = hasattr(os, 'link')
2883 symlink = support.can_symlink()
2884
2885 dirname = os.path.join(self.path, "dir")
2886 os.mkdir(dirname)
2887 filename = self.create_file("file.txt")
2888 if link:
2889 os.link(filename, os.path.join(self.path, "link_file.txt"))
2890 if symlink:
2891 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2892 target_is_directory=True)
2893 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2894
2895 names = ['dir', 'file.txt']
2896 if link:
2897 names.append('link_file.txt')
2898 if symlink:
2899 names.extend(('symlink_dir', 'symlink_file.txt'))
2900 entries = self.get_entries(names)
2901
2902 entry = entries['dir']
2903 self.check_entry(entry, 'dir', True, False, False)
2904
2905 entry = entries['file.txt']
2906 self.check_entry(entry, 'file.txt', False, True, False)
2907
2908 if link:
2909 entry = entries['link_file.txt']
2910 self.check_entry(entry, 'link_file.txt', False, True, False)
2911
2912 if symlink:
2913 entry = entries['symlink_dir']
2914 self.check_entry(entry, 'symlink_dir', True, False, True)
2915
2916 entry = entries['symlink_file.txt']
2917 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2918
2919 def get_entry(self, name):
2920 entries = list(os.scandir(self.path))
2921 self.assertEqual(len(entries), 1)
2922
2923 entry = entries[0]
2924 self.assertEqual(entry.name, name)
2925 return entry
2926
2927 def create_file_entry(self):
2928 filename = self.create_file()
2929 return self.get_entry(os.path.basename(filename))
2930
2931 def test_current_directory(self):
2932 filename = self.create_file()
2933 old_dir = os.getcwd()
2934 try:
2935 os.chdir(self.path)
2936
2937 # call scandir() without parameter: it must list the content
2938 # of the current directory
2939 entries = dict((entry.name, entry) for entry in os.scandir())
2940 self.assertEqual(sorted(entries.keys()),
2941 [os.path.basename(filename)])
2942 finally:
2943 os.chdir(old_dir)
2944
2945 def test_repr(self):
2946 entry = self.create_file_entry()
2947 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2948
2949 def test_removed_dir(self):
2950 path = os.path.join(self.path, 'dir')
2951
2952 os.mkdir(path)
2953 entry = self.get_entry('dir')
2954 os.rmdir(path)
2955
2956 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2957 if os.name == 'nt':
2958 self.assertTrue(entry.is_dir())
2959 self.assertFalse(entry.is_file())
2960 self.assertFalse(entry.is_symlink())
2961 if os.name == 'nt':
2962 self.assertRaises(FileNotFoundError, entry.inode)
2963 # don't fail
2964 entry.stat()
2965 entry.stat(follow_symlinks=False)
2966 else:
2967 self.assertGreater(entry.inode(), 0)
2968 self.assertRaises(FileNotFoundError, entry.stat)
2969 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2970
2971 def test_removed_file(self):
2972 entry = self.create_file_entry()
2973 os.unlink(entry.path)
2974
2975 self.assertFalse(entry.is_dir())
2976 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2977 if os.name == 'nt':
2978 self.assertTrue(entry.is_file())
2979 self.assertFalse(entry.is_symlink())
2980 if os.name == 'nt':
2981 self.assertRaises(FileNotFoundError, entry.inode)
2982 # don't fail
2983 entry.stat()
2984 entry.stat(follow_symlinks=False)
2985 else:
2986 self.assertGreater(entry.inode(), 0)
2987 self.assertRaises(FileNotFoundError, entry.stat)
2988 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2989
2990 def test_broken_symlink(self):
2991 if not support.can_symlink():
2992 return self.skipTest('cannot create symbolic link')
2993
2994 filename = self.create_file("file.txt")
2995 os.symlink(filename,
2996 os.path.join(self.path, "symlink.txt"))
2997 entries = self.get_entries(['file.txt', 'symlink.txt'])
2998 entry = entries['symlink.txt']
2999 os.unlink(filename)
3000
3001 self.assertGreater(entry.inode(), 0)
3002 self.assertFalse(entry.is_dir())
3003 self.assertFalse(entry.is_file()) # broken symlink returns False
3004 self.assertFalse(entry.is_dir(follow_symlinks=False))
3005 self.assertFalse(entry.is_file(follow_symlinks=False))
3006 self.assertTrue(entry.is_symlink())
3007 self.assertRaises(FileNotFoundError, entry.stat)
3008 # don't fail
3009 entry.stat(follow_symlinks=False)
3010
3011 def test_bytes(self):
3012 if os.name == "nt":
3013 # On Windows, os.scandir(bytes) must raise an exception
Victor Stinner923590e2016-03-24 09:11:48 +01003014 with bytes_filename_warn(True):
3015 self.assertRaises(TypeError, os.scandir, b'.')
Victor Stinner6036e442015-03-08 01:58:04 +01003016 return
3017
3018 self.create_file("file.txt")
3019
3020 path_bytes = os.fsencode(self.path)
3021 entries = list(os.scandir(path_bytes))
3022 self.assertEqual(len(entries), 1, entries)
3023 entry = entries[0]
3024
3025 self.assertEqual(entry.name, b'file.txt')
3026 self.assertEqual(entry.path,
3027 os.fsencode(os.path.join(self.path, 'file.txt')))
3028
3029 def test_empty_path(self):
3030 self.assertRaises(FileNotFoundError, os.scandir, '')
3031
3032 def test_consume_iterator_twice(self):
3033 self.create_file("file.txt")
3034 iterator = os.scandir(self.path)
3035
3036 entries = list(iterator)
3037 self.assertEqual(len(entries), 1, entries)
3038
3039 # check than consuming the iterator twice doesn't raise exception
3040 entries2 = list(iterator)
3041 self.assertEqual(len(entries2), 0, entries2)
3042
3043 def test_bad_path_type(self):
3044 for obj in [1234, 1.234, {}, []]:
3045 self.assertRaises(TypeError, os.scandir, obj)
3046
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003047 def test_close(self):
3048 self.create_file("file.txt")
3049 self.create_file("file2.txt")
3050 iterator = os.scandir(self.path)
3051 next(iterator)
3052 iterator.close()
3053 # multiple closes
3054 iterator.close()
3055 with self.check_no_resource_warning():
3056 del iterator
3057
3058 def test_context_manager(self):
3059 self.create_file("file.txt")
3060 self.create_file("file2.txt")
3061 with os.scandir(self.path) as iterator:
3062 next(iterator)
3063 with self.check_no_resource_warning():
3064 del iterator
3065
3066 def test_context_manager_close(self):
3067 self.create_file("file.txt")
3068 self.create_file("file2.txt")
3069 with os.scandir(self.path) as iterator:
3070 next(iterator)
3071 iterator.close()
3072
3073 def test_context_manager_exception(self):
3074 self.create_file("file.txt")
3075 self.create_file("file2.txt")
3076 with self.assertRaises(ZeroDivisionError):
3077 with os.scandir(self.path) as iterator:
3078 next(iterator)
3079 1/0
3080 with self.check_no_resource_warning():
3081 del iterator
3082
3083 def test_resource_warning(self):
3084 self.create_file("file.txt")
3085 self.create_file("file2.txt")
3086 iterator = os.scandir(self.path)
3087 next(iterator)
3088 with self.assertWarns(ResourceWarning):
3089 del iterator
3090 support.gc_collect()
3091 # exhausted iterator
3092 iterator = os.scandir(self.path)
3093 list(iterator)
3094 with self.check_no_resource_warning():
3095 del iterator
3096
Victor Stinner6036e442015-03-08 01:58:04 +01003097
Fred Drake2e2be372001-09-20 21:33:42 +00003098if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003099 unittest.main()