blob: 8c6a8c0815ff73fa18e5c4137908a2d3d6d3e2a5 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import re
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020023import subprocess
24import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010025import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54except ImportError:
55 groups = []
56try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
59except ImportError:
60 all_users = []
61try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020063except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020064 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020065
Berker Peksagce643912015-05-06 06:33:17 +030066from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020067from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
87@contextlib.contextmanager
88def ignore_deprecation_warnings(msg_regex, quiet=False):
89 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
90 yield
91
92
93@contextlib.contextmanager
94def bytes_filename_warn(expected):
95 msg = 'The Windows bytes API has been deprecated'
96 if os.name == 'nt':
97 with ignore_deprecation_warnings(msg, quiet=not expected):
98 yield
99 else:
100 yield
101
102
Victor Stinnerae39d232016-03-24 17:12:55 +0100103def create_file(filename, content=b'content'):
104 with open(filename, "xb", 0) as fp:
105 fp.write(content)
106
107
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000108# Tests creating TESTFN
109class FileTests(unittest.TestCase):
110 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000111 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000112 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000113 tearDown = setUp
114
115 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000116 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000118 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000119
Christian Heimesfdab48e2008-01-20 09:06:41 +0000120 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000121 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
122 # We must allocate two consecutive file descriptors, otherwise
123 # it will mess up other file descriptors (perhaps even the three
124 # standard ones).
125 second = os.dup(first)
126 try:
127 retries = 0
128 while second != first + 1:
129 os.close(first)
130 retries += 1
131 if retries > 10:
132 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000133 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000134 first, second = second, os.dup(second)
135 finally:
136 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000137 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000138 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000139 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000140
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000141 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000142 def test_rename(self):
143 path = support.TESTFN
144 old = sys.getrefcount(path)
145 self.assertRaises(TypeError, os.rename, path, 0)
146 new = sys.getrefcount(path)
147 self.assertEqual(old, new)
148
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000149 def test_read(self):
150 with open(support.TESTFN, "w+b") as fobj:
151 fobj.write(b"spam")
152 fobj.flush()
153 fd = fobj.fileno()
154 os.lseek(fd, 0, 0)
155 s = os.read(fd, 4)
156 self.assertEqual(type(s), bytes)
157 self.assertEqual(s, b"spam")
158
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200159 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200160 # Skip the test on 32-bit platforms: the number of bytes must fit in a
161 # Py_ssize_t type
162 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
163 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200164 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
165 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200166 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100167 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200168
169 # Issue #21932: Make sure that os.read() does not raise an
170 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200171 with open(support.TESTFN, "rb") as fp:
172 data = os.read(fp.fileno(), size)
173
174 # The test does not try to read more than 2 GB at once because the
175 # operating system is free to return less bytes than requested.
176 self.assertEqual(data, b'test')
177
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000178 def test_write(self):
179 # os.write() accepts bytes- and buffer-like objects but not strings
180 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
181 self.assertRaises(TypeError, os.write, fd, "beans")
182 os.write(fd, b"bacon\n")
183 os.write(fd, bytearray(b"eggs\n"))
184 os.write(fd, memoryview(b"spam\n"))
185 os.close(fd)
186 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000187 self.assertEqual(fobj.read().splitlines(),
188 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000189
Victor Stinnere0daff12011-03-20 23:36:35 +0100190 def write_windows_console(self, *args):
191 retcode = subprocess.call(args,
192 # use a new console to not flood the test output
193 creationflags=subprocess.CREATE_NEW_CONSOLE,
194 # use a shell to hide the console window (SW_HIDE)
195 shell=True)
196 self.assertEqual(retcode, 0)
197
198 @unittest.skipUnless(sys.platform == 'win32',
199 'test specific to the Windows console')
200 def test_write_windows_console(self):
201 # Issue #11395: the Windows console returns an error (12: not enough
202 # space error) on writing into stdout if stdout mode is binary and the
203 # length is greater than 66,000 bytes (or less, depending on heap
204 # usage).
205 code = "print('x' * 100000)"
206 self.write_windows_console(sys.executable, "-c", code)
207 self.write_windows_console(sys.executable, "-u", "-c", code)
208
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000209 def fdopen_helper(self, *args):
210 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200211 f = os.fdopen(fd, *args)
212 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000213
214 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200215 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
216 os.close(fd)
217
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218 self.fdopen_helper()
219 self.fdopen_helper('r')
220 self.fdopen_helper('r', 100)
221
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100222 def test_replace(self):
223 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100224 self.addCleanup(support.unlink, support.TESTFN)
225 self.addCleanup(support.unlink, TESTFN2)
226
227 create_file(support.TESTFN, b"1")
228 create_file(TESTFN2, b"2")
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 os.replace(support.TESTFN, TESTFN2)
231 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
232 with open(TESTFN2, 'r') as f:
233 self.assertEqual(f.read(), "1")
234
Martin Panterbf19d162015-09-09 01:01:13 +0000235 def test_open_keywords(self):
236 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
237 dir_fd=None)
238 os.close(f)
239
240 def test_symlink_keywords(self):
241 symlink = support.get_attribute(os, "symlink")
242 try:
243 symlink(src='target', dst=support.TESTFN,
244 target_is_directory=False, dir_fd=None)
245 except (NotImplementedError, OSError):
246 pass # No OS support or unprivileged user
247
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249# Test attributes on return values from os.*stat* family.
250class StatAttributeTests(unittest.TestCase):
251 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200252 self.fname = support.TESTFN
253 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100254 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255
Serhiy Storchaka43767632013-11-03 21:31:38 +0200256 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000257 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000258 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
260 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(result[stat.ST_SIZE], 3)
262 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264 # Make sure all the attributes are there
265 members = dir(result)
266 for name in dir(stat):
267 if name[:3] == 'ST_':
268 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000269 if name.endswith("TIME"):
270 def trunc(x): return int(x)
271 else:
272 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000275 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276
Larry Hastings6fe20b32012-04-19 15:07:49 -0700277 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700278 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700279 for name in 'st_atime st_mtime st_ctime'.split():
280 floaty = int(getattr(result, name) * 100000)
281 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700282 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700283
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 try:
285 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200286 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000287 except IndexError:
288 pass
289
290 # Make sure that assignment fails
291 try:
292 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200293 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000294 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 pass
296
297 try:
298 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200299 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000300 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 pass
302
303 try:
304 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200305 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000306 except AttributeError:
307 pass
308
309 # Use the stat_result constructor with a too-short tuple.
310 try:
311 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200312 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000313 except TypeError:
314 pass
315
Ezio Melotti42da6632011-03-15 05:18:48 +0200316 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000317 try:
318 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
319 except TypeError:
320 pass
321
Antoine Pitrou38425292010-09-21 18:19:07 +0000322 def test_stat_attributes(self):
323 self.check_stat_attributes(self.fname)
324
325 def test_stat_attributes_bytes(self):
326 try:
327 fname = self.fname.encode(sys.getfilesystemencoding())
328 except UnicodeEncodeError:
329 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner923590e2016-03-24 09:11:48 +0100330 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100331 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000332
Christian Heimes25827622013-10-12 01:27:08 +0200333 def test_stat_result_pickle(self):
334 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200335 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
336 p = pickle.dumps(result, proto)
337 self.assertIn(b'stat_result', p)
338 if proto < 4:
339 self.assertIn(b'cos\nstat_result\n', p)
340 unpickled = pickle.loads(p)
341 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200342
Serhiy Storchaka43767632013-11-03 21:31:38 +0200343 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000345 try:
346 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000347 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000348 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000349 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200350 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000351
352 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000353 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000355 # Make sure all the attributes are there.
356 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
357 'ffree', 'favail', 'flag', 'namemax')
358 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000359 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000360
361 # Make sure that assignment really fails
362 try:
363 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200364 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000365 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 pass
367
368 try:
369 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200370 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000371 except AttributeError:
372 pass
373
374 # Use the constructor with a too-short tuple.
375 try:
376 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200377 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000378 except TypeError:
379 pass
380
Ezio Melotti42da6632011-03-15 05:18:48 +0200381 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000382 try:
383 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
384 except TypeError:
385 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000386
Christian Heimes25827622013-10-12 01:27:08 +0200387 @unittest.skipUnless(hasattr(os, 'statvfs'),
388 "need os.statvfs()")
389 def test_statvfs_result_pickle(self):
390 try:
391 result = os.statvfs(self.fname)
392 except OSError as e:
393 # On AtheOS, glibc always returns ENOSYS
394 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200395 self.skipTest('os.statvfs() failed with ENOSYS')
396
Serhiy Storchakabad12572014-12-15 14:03:42 +0200397 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
398 p = pickle.dumps(result, proto)
399 self.assertIn(b'statvfs_result', p)
400 if proto < 4:
401 self.assertIn(b'cos\nstatvfs_result\n', p)
402 unpickled = pickle.loads(p)
403 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200404
Serhiy Storchaka43767632013-11-03 21:31:38 +0200405 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
406 def test_1686475(self):
407 # Verify that an open file can be stat'ed
408 try:
409 os.stat(r"c:\pagefile.sys")
410 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600411 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200412 except OSError as e:
413 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000414
Serhiy Storchaka43767632013-11-03 21:31:38 +0200415 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
416 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
417 def test_15261(self):
418 # Verify that stat'ing a closed fd does not cause crash
419 r, w = os.pipe()
420 try:
421 os.stat(r) # should not raise error
422 finally:
423 os.close(r)
424 os.close(w)
425 with self.assertRaises(OSError) as ctx:
426 os.stat(r)
427 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100428
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 def check_file_attributes(self, result):
430 self.assertTrue(hasattr(result, 'st_file_attributes'))
431 self.assertTrue(isinstance(result.st_file_attributes, int))
432 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
433
434 @unittest.skipUnless(sys.platform == "win32",
435 "st_file_attributes is Win32 specific")
436 def test_file_attributes(self):
437 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
438 result = os.stat(self.fname)
439 self.check_file_attributes(result)
440 self.assertEqual(
441 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
442 0)
443
444 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200445 dirname = support.TESTFN + "dir"
446 os.mkdir(dirname)
447 self.addCleanup(os.rmdir, dirname)
448
449 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500450 self.check_file_attributes(result)
451 self.assertEqual(
452 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
453 stat.FILE_ATTRIBUTE_DIRECTORY)
454
Victor Stinner47aacc82015-06-12 17:26:23 +0200455
456class UtimeTests(unittest.TestCase):
457 def setUp(self):
458 self.dirname = support.TESTFN
459 self.fname = os.path.join(self.dirname, "f1")
460
461 self.addCleanup(support.rmtree, self.dirname)
462 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100463 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200464
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200465 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100466 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200467 os.stat_float_times(state)
468
Victor Stinner47aacc82015-06-12 17:26:23 +0200469 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100470 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200471 old_float_times = os.stat_float_times(-1)
472 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200473
474 os.stat_float_times(True)
475
476 def support_subsecond(self, filename):
477 # Heuristic to check if the filesystem supports timestamp with
478 # subsecond resolution: check if float and int timestamps are different
479 st = os.stat(filename)
480 return ((st.st_atime != st[7])
481 or (st.st_mtime != st[8])
482 or (st.st_ctime != st[9]))
483
484 def _test_utime(self, set_time, filename=None):
485 if not filename:
486 filename = self.fname
487
488 support_subsecond = self.support_subsecond(filename)
489 if support_subsecond:
490 # Timestamp with a resolution of 1 microsecond (10^-6).
491 #
492 # The resolution of the C internal function used by os.utime()
493 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
494 # test with a resolution of 1 ns requires more work:
495 # see the issue #15745.
496 atime_ns = 1002003000 # 1.002003 seconds
497 mtime_ns = 4005006000 # 4.005006 seconds
498 else:
499 # use a resolution of 1 second
500 atime_ns = 5 * 10**9
501 mtime_ns = 8 * 10**9
502
503 set_time(filename, (atime_ns, mtime_ns))
504 st = os.stat(filename)
505
506 if support_subsecond:
507 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
508 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
509 else:
510 self.assertEqual(st.st_atime, atime_ns * 1e-9)
511 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
512 self.assertEqual(st.st_atime_ns, atime_ns)
513 self.assertEqual(st.st_mtime_ns, mtime_ns)
514
515 def test_utime(self):
516 def set_time(filename, ns):
517 # test the ns keyword parameter
518 os.utime(filename, ns=ns)
519 self._test_utime(set_time)
520
521 @staticmethod
522 def ns_to_sec(ns):
523 # Convert a number of nanosecond (int) to a number of seconds (float).
524 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
525 # issue, os.utime() rounds towards minus infinity.
526 return (ns * 1e-9) + 0.5e-9
527
528 def test_utime_by_indexed(self):
529 # pass times as floating point seconds as the second indexed parameter
530 def set_time(filename, ns):
531 atime_ns, mtime_ns = ns
532 atime = self.ns_to_sec(atime_ns)
533 mtime = self.ns_to_sec(mtime_ns)
534 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
535 # or utime(time_t)
536 os.utime(filename, (atime, mtime))
537 self._test_utime(set_time)
538
539 def test_utime_by_times(self):
540 def set_time(filename, ns):
541 atime_ns, mtime_ns = ns
542 atime = self.ns_to_sec(atime_ns)
543 mtime = self.ns_to_sec(mtime_ns)
544 # test the times keyword parameter
545 os.utime(filename, times=(atime, mtime))
546 self._test_utime(set_time)
547
548 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
549 "follow_symlinks support for utime required "
550 "for this test.")
551 def test_utime_nofollow_symlinks(self):
552 def set_time(filename, ns):
553 # use follow_symlinks=False to test utimensat(timespec)
554 # or lutimes(timeval)
555 os.utime(filename, ns=ns, follow_symlinks=False)
556 self._test_utime(set_time)
557
558 @unittest.skipUnless(os.utime in os.supports_fd,
559 "fd support for utime required for this test.")
560 def test_utime_fd(self):
561 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100562 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 # use a file descriptor to test futimens(timespec)
564 # or futimes(timeval)
565 os.utime(fp.fileno(), ns=ns)
566 self._test_utime(set_time)
567
568 @unittest.skipUnless(os.utime in os.supports_dir_fd,
569 "dir_fd support for utime required for this test.")
570 def test_utime_dir_fd(self):
571 def set_time(filename, ns):
572 dirname, name = os.path.split(filename)
573 dirfd = os.open(dirname, os.O_RDONLY)
574 try:
575 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
576 os.utime(name, dir_fd=dirfd, ns=ns)
577 finally:
578 os.close(dirfd)
579 self._test_utime(set_time)
580
581 def test_utime_directory(self):
582 def set_time(filename, ns):
583 # test calling os.utime() on a directory
584 os.utime(filename, ns=ns)
585 self._test_utime(set_time, filename=self.dirname)
586
587 def _test_utime_current(self, set_time):
588 # Get the system clock
589 current = time.time()
590
591 # Call os.utime() to set the timestamp to the current system clock
592 set_time(self.fname)
593
594 if not self.support_subsecond(self.fname):
595 delta = 1.0
596 else:
597 # On Windows, the usual resolution of time.time() is 15.6 ms
598 delta = 0.020
599 st = os.stat(self.fname)
600 msg = ("st_time=%r, current=%r, dt=%r"
601 % (st.st_mtime, current, st.st_mtime - current))
602 self.assertAlmostEqual(st.st_mtime, current,
603 delta=delta, msg=msg)
604
605 def test_utime_current(self):
606 def set_time(filename):
607 # Set to the current time in the new way
608 os.utime(self.fname)
609 self._test_utime_current(set_time)
610
611 def test_utime_current_old(self):
612 def set_time(filename):
613 # Set to the current time in the old explicit way.
614 os.utime(self.fname, None)
615 self._test_utime_current(set_time)
616
617 def get_file_system(self, path):
618 if sys.platform == 'win32':
619 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
620 import ctypes
621 kernel32 = ctypes.windll.kernel32
622 buf = ctypes.create_unicode_buffer("", 100)
623 ok = kernel32.GetVolumeInformationW(root, None, 0,
624 None, None, None,
625 buf, len(buf))
626 if ok:
627 return buf.value
628 # return None if the filesystem is unknown
629
630 def test_large_time(self):
631 # Many filesystems are limited to the year 2038. At least, the test
632 # pass with NTFS filesystem.
633 if self.get_file_system(self.dirname) != "NTFS":
634 self.skipTest("requires NTFS")
635
636 large = 5000000000 # some day in 2128
637 os.utime(self.fname, (large, large))
638 self.assertEqual(os.stat(self.fname).st_mtime, large)
639
640 def test_utime_invalid_arguments(self):
641 # seconds and nanoseconds parameters are mutually exclusive
642 with self.assertRaises(ValueError):
643 os.utime(self.fname, (5, 5), ns=(5, 5))
644
645
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000646from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000647
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000648class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000649 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000650 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000651
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000652 def setUp(self):
653 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000654 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000655 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000656 for key, value in self._reference().items():
657 os.environ[key] = value
658
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000659 def tearDown(self):
660 os.environ.clear()
661 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000662 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000663 os.environb.clear()
664 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000665
Christian Heimes90333392007-11-01 19:08:42 +0000666 def _reference(self):
667 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
668
669 def _empty_mapping(self):
670 os.environ.clear()
671 return os.environ
672
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000673 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200674 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
675 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000676 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000677 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300678 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200679 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300680 value = popen.read().strip()
681 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000682
Xavier de Gayed1415312016-07-22 12:15:29 +0200683 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
684 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000685 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200686 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
687 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300688 it = iter(popen)
689 self.assertEqual(next(it), "line1\n")
690 self.assertEqual(next(it), "line2\n")
691 self.assertEqual(next(it), "line3\n")
692 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000693
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000694 # Verify environ keys and values from the OS are of the
695 # correct str type.
696 def test_keyvalue_types(self):
697 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000698 self.assertEqual(type(key), str)
699 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000700
Christian Heimes90333392007-11-01 19:08:42 +0000701 def test_items(self):
702 for key, value in self._reference().items():
703 self.assertEqual(os.environ.get(key), value)
704
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000705 # Issue 7310
706 def test___repr__(self):
707 """Check that the repr() of os.environ looks like environ({...})."""
708 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000709 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
710 '{!r}: {!r}'.format(key, value)
711 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000712
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000713 def test_get_exec_path(self):
714 defpath_list = os.defpath.split(os.pathsep)
715 test_path = ['/monty', '/python', '', '/flying/circus']
716 test_env = {'PATH': os.pathsep.join(test_path)}
717
718 saved_environ = os.environ
719 try:
720 os.environ = dict(test_env)
721 # Test that defaulting to os.environ works.
722 self.assertSequenceEqual(test_path, os.get_exec_path())
723 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
724 finally:
725 os.environ = saved_environ
726
727 # No PATH environment variable
728 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
729 # Empty PATH environment variable
730 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
731 # Supplied PATH environment variable
732 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
733
Victor Stinnerb745a742010-05-18 17:17:23 +0000734 if os.supports_bytes_environ:
735 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000736 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000737 # ignore BytesWarning warning
738 with warnings.catch_warnings(record=True):
739 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000740 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000741 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000742 pass
743 else:
744 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000745
746 # bytes key and/or value
747 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
748 ['abc'])
749 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
750 ['abc'])
751 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
752 ['abc'])
753
754 @unittest.skipUnless(os.supports_bytes_environ,
755 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000756 def test_environb(self):
757 # os.environ -> os.environb
758 value = 'euro\u20ac'
759 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000760 value_bytes = value.encode(sys.getfilesystemencoding(),
761 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000762 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000763 msg = "U+20AC character is not encodable to %s" % (
764 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000765 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000766 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000767 self.assertEqual(os.environ['unicode'], value)
768 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000769
770 # os.environb -> os.environ
771 value = b'\xff'
772 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000773 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000774 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000775 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000776
Charles-François Natali2966f102011-11-26 11:32:46 +0100777 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
778 # #13415).
779 @support.requires_freebsd_version(7)
780 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100781 def test_unset_error(self):
782 if sys.platform == "win32":
783 # an environment variable is limited to 32,767 characters
784 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100785 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100786 else:
787 # "=" is not allowed in a variable name
788 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100789 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100790
Victor Stinner6d101392013-04-14 16:35:04 +0200791 def test_key_type(self):
792 missing = 'missingkey'
793 self.assertNotIn(missing, os.environ)
794
Victor Stinner839e5ea2013-04-14 16:43:03 +0200795 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200796 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200797 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200798 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200799
Victor Stinner839e5ea2013-04-14 16:43:03 +0200800 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200801 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200802 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200803 self.assertTrue(cm.exception.__suppress_context__)
804
Victor Stinner6d101392013-04-14 16:35:04 +0200805
Tim Petersc4e09402003-04-25 07:11:48 +0000806class WalkTests(unittest.TestCase):
807 """Tests for os.walk()."""
808
Victor Stinner0561c532015-03-12 10:28:24 +0100809 # Wrapper to hide minor differences between os.walk and os.fwalk
810 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200811 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200812 if 'follow_symlinks' in kwargs:
813 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200814 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100815
Charles-François Natali7372b062012-02-05 15:15:38 +0100816 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100817 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100818 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000819
820 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000821 # TESTFN/
822 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000823 # tmp1
824 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000825 # tmp2
826 # SUB11/ no kids
827 # SUB2/ a file kid and a dirsymlink kid
828 # tmp3
829 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200830 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000831 # TEST2/
832 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100833 self.walk_path = join(support.TESTFN, "TEST1")
834 self.sub1_path = join(self.walk_path, "SUB1")
835 self.sub11_path = join(self.sub1_path, "SUB11")
836 sub2_path = join(self.walk_path, "SUB2")
837 tmp1_path = join(self.walk_path, "tmp1")
838 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000839 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100840 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000841 t2_path = join(support.TESTFN, "TEST2")
842 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200843 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000844
845 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100846 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000847 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000848 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100849
Guido van Rossumd8faa362007-04-27 19:54:29 +0000850 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100851 with open(path, "x") as f:
852 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000853
Victor Stinner0561c532015-03-12 10:28:24 +0100854 if support.can_symlink():
855 os.symlink(os.path.abspath(t2_path), self.link_path)
856 os.symlink('broken', broken_link_path, True)
Serhiy Storchakaadca8462016-03-08 21:13:35 +0200857 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100858 else:
859 self.sub2_tree = (sub2_path, [], ["tmp3"])
860
861 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000862 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300863 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100864
Tim Petersc4e09402003-04-25 07:11:48 +0000865 self.assertEqual(len(all), 4)
866 # We can't know which order SUB1 and SUB2 will appear in.
867 # Not flipped: TESTFN, SUB1, SUB11, SUB2
868 # flipped: TESTFN, SUB2, SUB1, SUB11
869 flipped = all[0][1][0] != "SUB1"
870 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200871 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100872 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
873 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
874 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
875 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000876
Brett Cannon3f9183b2016-08-26 14:44:48 -0700877 def test_walk_prune(self, walk_path=None):
878 if walk_path is None:
879 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000880 # Prune the search.
881 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700882 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000883 all.append((root, dirs, files))
884 # Don't descend into SUB1.
885 if 'SUB1' in dirs:
886 # Note that this also mutates the dirs we appended to all!
887 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000888
Victor Stinner0561c532015-03-12 10:28:24 +0100889 self.assertEqual(len(all), 2)
890 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700891 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100892
893 all[1][-1].sort()
894 self.assertEqual(all[1], self.sub2_tree)
895
Brett Cannon3f9183b2016-08-26 14:44:48 -0700896 def test_file_like_path(self):
897 class FileLike:
898 def __init__(self, path):
899 self._path = path
900 def __str__(self):
901 return str(self._path)
902 def __fspath__(self):
903 return self._path
904
905 self.test_walk_prune(FileLike(self.walk_path))
906
Victor Stinner0561c532015-03-12 10:28:24 +0100907 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000908 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100909 all = list(self.walk(self.walk_path, topdown=False))
910
Victor Stinner53b0a412016-03-26 01:12:36 +0100911 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000912 # We can't know which order SUB1 and SUB2 will appear in.
913 # Not flipped: SUB11, SUB1, SUB2, TESTFN
914 # flipped: SUB2, SUB11, SUB1, TESTFN
915 flipped = all[3][1][0] != "SUB1"
916 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200917 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100918 self.assertEqual(all[3],
919 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
920 self.assertEqual(all[flipped],
921 (self.sub11_path, [], []))
922 self.assertEqual(all[flipped + 1],
923 (self.sub1_path, ["SUB11"], ["tmp2"]))
924 self.assertEqual(all[2 - 2 * flipped],
925 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000926
Victor Stinner0561c532015-03-12 10:28:24 +0100927 def test_walk_symlink(self):
928 if not support.can_symlink():
929 self.skipTest("need symlink support")
930
931 # Walk, following symlinks.
932 walk_it = self.walk(self.walk_path, follow_symlinks=True)
933 for root, dirs, files in walk_it:
934 if root == self.link_path:
935 self.assertEqual(dirs, [])
936 self.assertEqual(files, ["tmp4"])
937 break
938 else:
939 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000940
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200941 def test_walk_bad_dir(self):
942 # Walk top-down.
943 errors = []
944 walk_it = self.walk(self.walk_path, onerror=errors.append)
945 root, dirs, files = next(walk_it)
946 self.assertFalse(errors)
947 dir1 = dirs[0]
948 dir1new = dir1 + '.new'
949 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
950 roots = [r for r, d, f in walk_it]
951 self.assertTrue(errors)
952 self.assertNotIn(os.path.join(root, dir1), roots)
953 self.assertNotIn(os.path.join(root, dir1new), roots)
954 for dir2 in dirs[1:]:
955 self.assertIn(os.path.join(root, dir2), roots)
956
Charles-François Natali7372b062012-02-05 15:15:38 +0100957
958@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
959class FwalkTests(WalkTests):
960 """Tests for os.fwalk()."""
961
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200962 def walk(self, top, **kwargs):
963 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100964 yield (root, dirs, files)
965
Larry Hastingsc48fe982012-06-25 04:49:05 -0700966 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
967 """
968 compare with walk() results.
969 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700970 walk_kwargs = walk_kwargs.copy()
971 fwalk_kwargs = fwalk_kwargs.copy()
972 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
973 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
974 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700975
Charles-François Natali7372b062012-02-05 15:15:38 +0100976 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700977 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100978 expected[root] = (set(dirs), set(files))
979
Larry Hastingsc48fe982012-06-25 04:49:05 -0700980 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100981 self.assertIn(root, expected)
982 self.assertEqual(expected[root], (set(dirs), set(files)))
983
Larry Hastingsc48fe982012-06-25 04:49:05 -0700984 def test_compare_to_walk(self):
985 kwargs = {'top': support.TESTFN}
986 self._compare_to_walk(kwargs, kwargs)
987
Charles-François Natali7372b062012-02-05 15:15:38 +0100988 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700989 try:
990 fd = os.open(".", os.O_RDONLY)
991 walk_kwargs = {'top': support.TESTFN}
992 fwalk_kwargs = walk_kwargs.copy()
993 fwalk_kwargs['dir_fd'] = fd
994 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
995 finally:
996 os.close(fd)
997
998 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100999 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001000 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1001 args = support.TESTFN, topdown, None
1002 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001003 # check that the FD is valid
1004 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001005 # redundant check
1006 os.stat(rootfd)
1007 # check that listdir() returns consistent information
1008 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001009
1010 def test_fd_leak(self):
1011 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1012 # we both check that calling fwalk() a large number of times doesn't
1013 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1014 minfd = os.dup(1)
1015 os.close(minfd)
1016 for i in range(256):
1017 for x in os.fwalk(support.TESTFN):
1018 pass
1019 newfd = os.dup(1)
1020 self.addCleanup(os.close, newfd)
1021 self.assertEqual(newfd, minfd)
1022
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001023class BytesWalkTests(WalkTests):
1024 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001025 def setUp(self):
1026 super().setUp()
1027 self.stack = contextlib.ExitStack()
1028 if os.name == 'nt':
Victor Stinner923590e2016-03-24 09:11:48 +01001029 self.stack.enter_context(bytes_filename_warn(False))
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001030
1031 def tearDown(self):
1032 self.stack.close()
1033 super().tearDown()
1034
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001035 def walk(self, top, **kwargs):
1036 if 'follow_symlinks' in kwargs:
1037 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1038 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1039 root = os.fsdecode(broot)
1040 dirs = list(map(os.fsdecode, bdirs))
1041 files = list(map(os.fsdecode, bfiles))
1042 yield (root, dirs, files)
1043 bdirs[:] = list(map(os.fsencode, dirs))
1044 bfiles[:] = list(map(os.fsencode, files))
1045
Charles-François Natali7372b062012-02-05 15:15:38 +01001046
Guido van Rossume7ba4952007-06-06 23:52:48 +00001047class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001048 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001049 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001050
1051 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001052 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001053 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1054 os.makedirs(path) # Should work
1055 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1056 os.makedirs(path)
1057
1058 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001059 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001060 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1061 os.makedirs(path)
1062 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1063 'dir5', 'dir6')
1064 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001065
Terry Reedy5a22b652010-12-02 07:05:56 +00001066 def test_exist_ok_existing_directory(self):
1067 path = os.path.join(support.TESTFN, 'dir1')
1068 mode = 0o777
1069 old_mask = os.umask(0o022)
1070 os.makedirs(path, mode)
1071 self.assertRaises(OSError, os.makedirs, path, mode)
1072 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001073 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001074 os.makedirs(path, mode=mode, exist_ok=True)
1075 os.umask(old_mask)
1076
Martin Pantera82642f2015-11-19 04:48:44 +00001077 # Issue #25583: A drive root could raise PermissionError on Windows
1078 os.makedirs(os.path.abspath('/'), exist_ok=True)
1079
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001080 def test_exist_ok_s_isgid_directory(self):
1081 path = os.path.join(support.TESTFN, 'dir1')
1082 S_ISGID = stat.S_ISGID
1083 mode = 0o777
1084 old_mask = os.umask(0o022)
1085 try:
1086 existing_testfn_mode = stat.S_IMODE(
1087 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001088 try:
1089 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001090 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001091 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001092 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1093 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1094 # The os should apply S_ISGID from the parent dir for us, but
1095 # this test need not depend on that behavior. Be explicit.
1096 os.makedirs(path, mode | S_ISGID)
1097 # http://bugs.python.org/issue14992
1098 # Should not fail when the bit is already set.
1099 os.makedirs(path, mode, exist_ok=True)
1100 # remove the bit.
1101 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001102 # May work even when the bit is not already set when demanded.
1103 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001104 finally:
1105 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001106
1107 def test_exist_ok_existing_regular_file(self):
1108 base = support.TESTFN
1109 path = os.path.join(support.TESTFN, 'dir1')
1110 f = open(path, 'w')
1111 f.write('abc')
1112 f.close()
1113 self.assertRaises(OSError, os.makedirs, path)
1114 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1115 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1116 os.remove(path)
1117
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001118 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001119 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001120 'dir4', 'dir5', 'dir6')
1121 # If the tests failed, the bottom-most directory ('../dir6')
1122 # may not have been created, so we look for the outermost directory
1123 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001124 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001125 path = os.path.dirname(path)
1126
1127 os.removedirs(path)
1128
Andrew Svetlov405faed2012-12-25 12:18:09 +02001129
R David Murrayf2ad1732014-12-25 18:36:56 -05001130@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1131class ChownFileTests(unittest.TestCase):
1132
Berker Peksag036a71b2015-07-21 09:29:48 +03001133 @classmethod
1134 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001135 os.mkdir(support.TESTFN)
1136
1137 def test_chown_uid_gid_arguments_must_be_index(self):
1138 stat = os.stat(support.TESTFN)
1139 uid = stat.st_uid
1140 gid = stat.st_gid
1141 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1142 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1143 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1144 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1145 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1146
1147 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1148 def test_chown(self):
1149 gid_1, gid_2 = groups[:2]
1150 uid = os.stat(support.TESTFN).st_uid
1151 os.chown(support.TESTFN, uid, gid_1)
1152 gid = os.stat(support.TESTFN).st_gid
1153 self.assertEqual(gid, gid_1)
1154 os.chown(support.TESTFN, uid, gid_2)
1155 gid = os.stat(support.TESTFN).st_gid
1156 self.assertEqual(gid, gid_2)
1157
1158 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1159 "test needs root privilege and more than one user")
1160 def test_chown_with_root(self):
1161 uid_1, uid_2 = all_users[:2]
1162 gid = os.stat(support.TESTFN).st_gid
1163 os.chown(support.TESTFN, uid_1, gid)
1164 uid = os.stat(support.TESTFN).st_uid
1165 self.assertEqual(uid, uid_1)
1166 os.chown(support.TESTFN, uid_2, gid)
1167 uid = os.stat(support.TESTFN).st_uid
1168 self.assertEqual(uid, uid_2)
1169
1170 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1171 "test needs non-root account and more than one user")
1172 def test_chown_without_permission(self):
1173 uid_1, uid_2 = all_users[:2]
1174 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001175 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001176 os.chown(support.TESTFN, uid_1, gid)
1177 os.chown(support.TESTFN, uid_2, gid)
1178
Berker Peksag036a71b2015-07-21 09:29:48 +03001179 @classmethod
1180 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001181 os.rmdir(support.TESTFN)
1182
1183
Andrew Svetlov405faed2012-12-25 12:18:09 +02001184class RemoveDirsTests(unittest.TestCase):
1185 def setUp(self):
1186 os.makedirs(support.TESTFN)
1187
1188 def tearDown(self):
1189 support.rmtree(support.TESTFN)
1190
1191 def test_remove_all(self):
1192 dira = os.path.join(support.TESTFN, 'dira')
1193 os.mkdir(dira)
1194 dirb = os.path.join(dira, 'dirb')
1195 os.mkdir(dirb)
1196 os.removedirs(dirb)
1197 self.assertFalse(os.path.exists(dirb))
1198 self.assertFalse(os.path.exists(dira))
1199 self.assertFalse(os.path.exists(support.TESTFN))
1200
1201 def test_remove_partial(self):
1202 dira = os.path.join(support.TESTFN, 'dira')
1203 os.mkdir(dira)
1204 dirb = os.path.join(dira, 'dirb')
1205 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001206 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001207 os.removedirs(dirb)
1208 self.assertFalse(os.path.exists(dirb))
1209 self.assertTrue(os.path.exists(dira))
1210 self.assertTrue(os.path.exists(support.TESTFN))
1211
1212 def test_remove_nothing(self):
1213 dira = os.path.join(support.TESTFN, 'dira')
1214 os.mkdir(dira)
1215 dirb = os.path.join(dira, 'dirb')
1216 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001217 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001218 with self.assertRaises(OSError):
1219 os.removedirs(dirb)
1220 self.assertTrue(os.path.exists(dirb))
1221 self.assertTrue(os.path.exists(dira))
1222 self.assertTrue(os.path.exists(support.TESTFN))
1223
1224
Guido van Rossume7ba4952007-06-06 23:52:48 +00001225class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001226 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001227 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001228 f.write(b'hello')
1229 f.close()
1230 with open(os.devnull, 'rb') as f:
1231 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001232
Andrew Svetlov405faed2012-12-25 12:18:09 +02001233
Guido van Rossume7ba4952007-06-06 23:52:48 +00001234class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001235 def test_urandom_length(self):
1236 self.assertEqual(len(os.urandom(0)), 0)
1237 self.assertEqual(len(os.urandom(1)), 1)
1238 self.assertEqual(len(os.urandom(10)), 10)
1239 self.assertEqual(len(os.urandom(100)), 100)
1240 self.assertEqual(len(os.urandom(1000)), 1000)
1241
1242 def test_urandom_value(self):
1243 data1 = os.urandom(16)
1244 data2 = os.urandom(16)
1245 self.assertNotEqual(data1, data2)
1246
1247 def get_urandom_subprocess(self, count):
1248 code = '\n'.join((
1249 'import os, sys',
1250 'data = os.urandom(%s)' % count,
1251 'sys.stdout.buffer.write(data)',
1252 'sys.stdout.buffer.flush()'))
1253 out = assert_python_ok('-c', code)
1254 stdout = out[1]
1255 self.assertEqual(len(stdout), 16)
1256 return stdout
1257
1258 def test_urandom_subprocess(self):
1259 data1 = self.get_urandom_subprocess(16)
1260 data2 = self.get_urandom_subprocess(16)
1261 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001262
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001263
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001264# os.urandom() doesn't use a file descriptor when it is implemented with the
1265# getentropy() function, the getrandom() function or the getrandom() syscall
1266OS_URANDOM_DONT_USE_FD = (
1267 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1268 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1269 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001270
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001271@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1272 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001273class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001274 @unittest.skipUnless(resource, "test requires the resource module")
1275 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001276 # Check urandom() failing when it is not able to open /dev/random.
1277 # We spawn a new process to make the test more robust (if getrlimit()
1278 # failed to restore the file descriptor limit after this, the whole
1279 # test suite would crash; this actually happened on the OS X Tiger
1280 # buildbot).
1281 code = """if 1:
1282 import errno
1283 import os
1284 import resource
1285
1286 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1287 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1288 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001289 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001290 except OSError as e:
1291 assert e.errno == errno.EMFILE, e.errno
1292 else:
1293 raise AssertionError("OSError not raised")
1294 """
1295 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001296
Antoine Pitroue472aea2014-04-26 14:33:03 +02001297 def test_urandom_fd_closed(self):
1298 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1299 # closed.
1300 code = """if 1:
1301 import os
1302 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001303 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001304 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001305 with test.support.SuppressCrashReport():
1306 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001307 sys.stdout.buffer.write(os.urandom(4))
1308 """
1309 rc, out, err = assert_python_ok('-Sc', code)
1310
1311 def test_urandom_fd_reopened(self):
1312 # Issue #21207: urandom() should detect its fd to /dev/urandom
1313 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001314 self.addCleanup(support.unlink, support.TESTFN)
1315 create_file(support.TESTFN, b"x" * 256)
1316
Antoine Pitroue472aea2014-04-26 14:33:03 +02001317 code = """if 1:
1318 import os
1319 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001320 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001321 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001322 with test.support.SuppressCrashReport():
1323 for fd in range(3, 256):
1324 try:
1325 os.close(fd)
1326 except OSError:
1327 pass
1328 else:
1329 # Found the urandom fd (XXX hopefully)
1330 break
1331 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001332 with open({TESTFN!r}, 'rb') as f:
1333 os.dup2(f.fileno(), fd)
1334 sys.stdout.buffer.write(os.urandom(4))
1335 sys.stdout.buffer.write(os.urandom(4))
1336 """.format(TESTFN=support.TESTFN)
1337 rc, out, err = assert_python_ok('-Sc', code)
1338 self.assertEqual(len(out), 8)
1339 self.assertNotEqual(out[0:4], out[4:8])
1340 rc, out2, err2 = assert_python_ok('-Sc', code)
1341 self.assertEqual(len(out2), 8)
1342 self.assertNotEqual(out2, out)
1343
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001344
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001345@contextlib.contextmanager
1346def _execvpe_mockup(defpath=None):
1347 """
1348 Stubs out execv and execve functions when used as context manager.
1349 Records exec calls. The mock execv and execve functions always raise an
1350 exception as they would normally never return.
1351 """
1352 # A list of tuples containing (function name, first arg, args)
1353 # of calls to execv or execve that have been made.
1354 calls = []
1355
1356 def mock_execv(name, *args):
1357 calls.append(('execv', name, args))
1358 raise RuntimeError("execv called")
1359
1360 def mock_execve(name, *args):
1361 calls.append(('execve', name, args))
1362 raise OSError(errno.ENOTDIR, "execve called")
1363
1364 try:
1365 orig_execv = os.execv
1366 orig_execve = os.execve
1367 orig_defpath = os.defpath
1368 os.execv = mock_execv
1369 os.execve = mock_execve
1370 if defpath is not None:
1371 os.defpath = defpath
1372 yield calls
1373 finally:
1374 os.execv = orig_execv
1375 os.execve = orig_execve
1376 os.defpath = orig_defpath
1377
Guido van Rossume7ba4952007-06-06 23:52:48 +00001378class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001379 @unittest.skipIf(USING_LINUXTHREADS,
1380 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001381 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001382 self.assertRaises(OSError, os.execvpe, 'no such app-',
1383 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001384
Thomas Heller6790d602007-08-30 17:15:14 +00001385 def test_execvpe_with_bad_arglist(self):
1386 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1387
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001388 @unittest.skipUnless(hasattr(os, '_execvpe'),
1389 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001390 def _test_internal_execvpe(self, test_type):
1391 program_path = os.sep + 'absolutepath'
1392 if test_type is bytes:
1393 program = b'executable'
1394 fullpath = os.path.join(os.fsencode(program_path), program)
1395 native_fullpath = fullpath
1396 arguments = [b'progname', 'arg1', 'arg2']
1397 else:
1398 program = 'executable'
1399 arguments = ['progname', 'arg1', 'arg2']
1400 fullpath = os.path.join(program_path, program)
1401 if os.name != "nt":
1402 native_fullpath = os.fsencode(fullpath)
1403 else:
1404 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001405 env = {'spam': 'beans'}
1406
Victor Stinnerb745a742010-05-18 17:17:23 +00001407 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001408 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001409 self.assertRaises(RuntimeError,
1410 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001411 self.assertEqual(len(calls), 1)
1412 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1413
Victor Stinnerb745a742010-05-18 17:17:23 +00001414 # test os._execvpe() with a relative path:
1415 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001416 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001417 self.assertRaises(OSError,
1418 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001419 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001420 self.assertSequenceEqual(calls[0],
1421 ('execve', native_fullpath, (arguments, env)))
1422
1423 # test os._execvpe() with a relative path:
1424 # os.get_exec_path() reads the 'PATH' variable
1425 with _execvpe_mockup() as calls:
1426 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001427 if test_type is bytes:
1428 env_path[b'PATH'] = program_path
1429 else:
1430 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001431 self.assertRaises(OSError,
1432 os._execvpe, program, arguments, env=env_path)
1433 self.assertEqual(len(calls), 1)
1434 self.assertSequenceEqual(calls[0],
1435 ('execve', native_fullpath, (arguments, env_path)))
1436
1437 def test_internal_execvpe_str(self):
1438 self._test_internal_execvpe(str)
1439 if os.name != "nt":
1440 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001441
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001442
Serhiy Storchaka43767632013-11-03 21:31:38 +02001443@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001444class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001445 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001446 try:
1447 os.stat(support.TESTFN)
1448 except FileNotFoundError:
1449 exists = False
1450 except OSError as exc:
1451 exists = True
1452 self.fail("file %s must not exist; os.stat failed with %s"
1453 % (support.TESTFN, exc))
1454 else:
1455 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001456
Thomas Wouters477c8d52006-05-27 19:21:47 +00001457 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001458 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001459
1460 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001461 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001462
1463 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001464 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001465
1466 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001467 self.addCleanup(support.unlink, support.TESTFN)
1468
Victor Stinnere77c9742016-03-25 10:28:23 +01001469 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001470 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001471
1472 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001473 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001474
Thomas Wouters477c8d52006-05-27 19:21:47 +00001475 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001476 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001477
Victor Stinnere77c9742016-03-25 10:28:23 +01001478
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001479class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001480 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001481 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1482 #singles.append("close")
1483 #We omit close because it doesn'r raise an exception on some platforms
1484 def get_single(f):
1485 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001486 if hasattr(os, f):
1487 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001488 return helper
1489 for f in singles:
1490 locals()["test_"+f] = get_single(f)
1491
Benjamin Peterson7522c742009-01-19 21:00:09 +00001492 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001493 try:
1494 f(support.make_bad_fd(), *args)
1495 except OSError as e:
1496 self.assertEqual(e.errno, errno.EBADF)
1497 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001498 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001499 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001500
Serhiy Storchaka43767632013-11-03 21:31:38 +02001501 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001502 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001503 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001504
Serhiy Storchaka43767632013-11-03 21:31:38 +02001505 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001506 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001507 fd = support.make_bad_fd()
1508 # Make sure none of the descriptors we are about to close are
1509 # currently valid (issue 6542).
1510 for i in range(10):
1511 try: os.fstat(fd+i)
1512 except OSError:
1513 pass
1514 else:
1515 break
1516 if i < 2:
1517 raise unittest.SkipTest(
1518 "Unable to acquire a range of invalid file descriptors")
1519 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001520
Serhiy Storchaka43767632013-11-03 21:31:38 +02001521 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001522 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001523 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001524
Serhiy Storchaka43767632013-11-03 21:31:38 +02001525 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001526 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001527 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001528
Serhiy Storchaka43767632013-11-03 21:31:38 +02001529 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001530 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001531 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001532
Serhiy Storchaka43767632013-11-03 21:31:38 +02001533 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001534 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001535 self.check(os.pathconf, "PC_NAME_MAX")
1536 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001537
Serhiy Storchaka43767632013-11-03 21:31:38 +02001538 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001539 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001540 self.check(os.truncate, 0)
1541 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001542
Serhiy Storchaka43767632013-11-03 21:31:38 +02001543 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001544 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001545 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001546
Serhiy Storchaka43767632013-11-03 21:31:38 +02001547 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001548 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001549 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001550
Victor Stinner57ddf782014-01-08 15:21:28 +01001551 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1552 def test_readv(self):
1553 buf = bytearray(10)
1554 self.check(os.readv, [buf])
1555
Serhiy Storchaka43767632013-11-03 21:31:38 +02001556 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001557 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001558 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001559
Serhiy Storchaka43767632013-11-03 21:31:38 +02001560 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001561 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001562 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001563
Victor Stinner57ddf782014-01-08 15:21:28 +01001564 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1565 def test_writev(self):
1566 self.check(os.writev, [b'abc'])
1567
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001568 def test_inheritable(self):
1569 self.check(os.get_inheritable)
1570 self.check(os.set_inheritable, True)
1571
1572 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1573 'needs os.get_blocking() and os.set_blocking()')
1574 def test_blocking(self):
1575 self.check(os.get_blocking)
1576 self.check(os.set_blocking, True)
1577
Brian Curtin1b9df392010-11-24 20:24:31 +00001578
1579class LinkTests(unittest.TestCase):
1580 def setUp(self):
1581 self.file1 = support.TESTFN
1582 self.file2 = os.path.join(support.TESTFN + "2")
1583
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001584 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001585 for file in (self.file1, self.file2):
1586 if os.path.exists(file):
1587 os.unlink(file)
1588
Brian Curtin1b9df392010-11-24 20:24:31 +00001589 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001590 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001591
Victor Stinner923590e2016-03-24 09:11:48 +01001592 with bytes_filename_warn(False):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001593 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001594 with open(file1, "r") as f1, open(file2, "r") as f2:
1595 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1596
1597 def test_link(self):
1598 self._test_link(self.file1, self.file2)
1599
1600 def test_link_bytes(self):
1601 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1602 bytes(self.file2, sys.getfilesystemencoding()))
1603
Brian Curtinf498b752010-11-30 15:54:04 +00001604 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001605 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001606 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001607 except UnicodeError:
1608 raise unittest.SkipTest("Unable to encode for this platform.")
1609
Brian Curtinf498b752010-11-30 15:54:04 +00001610 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001611 self.file2 = self.file1 + "2"
1612 self._test_link(self.file1, self.file2)
1613
Serhiy Storchaka43767632013-11-03 21:31:38 +02001614@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1615class PosixUidGidTests(unittest.TestCase):
1616 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1617 def test_setuid(self):
1618 if os.getuid() != 0:
1619 self.assertRaises(OSError, os.setuid, 0)
1620 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001621
Serhiy Storchaka43767632013-11-03 21:31:38 +02001622 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1623 def test_setgid(self):
1624 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1625 self.assertRaises(OSError, os.setgid, 0)
1626 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001627
Serhiy Storchaka43767632013-11-03 21:31:38 +02001628 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1629 def test_seteuid(self):
1630 if os.getuid() != 0:
1631 self.assertRaises(OSError, os.seteuid, 0)
1632 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001633
Serhiy Storchaka43767632013-11-03 21:31:38 +02001634 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1635 def test_setegid(self):
1636 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1637 self.assertRaises(OSError, os.setegid, 0)
1638 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001639
Serhiy Storchaka43767632013-11-03 21:31:38 +02001640 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1641 def test_setreuid(self):
1642 if os.getuid() != 0:
1643 self.assertRaises(OSError, os.setreuid, 0, 0)
1644 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1645 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001646
Serhiy Storchaka43767632013-11-03 21:31:38 +02001647 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1648 def test_setreuid_neg1(self):
1649 # Needs to accept -1. We run this in a subprocess to avoid
1650 # altering the test runner's process state (issue8045).
1651 subprocess.check_call([
1652 sys.executable, '-c',
1653 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001654
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1656 def test_setregid(self):
1657 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1658 self.assertRaises(OSError, os.setregid, 0, 0)
1659 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1660 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001661
Serhiy Storchaka43767632013-11-03 21:31:38 +02001662 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1663 def test_setregid_neg1(self):
1664 # Needs to accept -1. We run this in a subprocess to avoid
1665 # altering the test runner's process state (issue8045).
1666 subprocess.check_call([
1667 sys.executable, '-c',
1668 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001669
Serhiy Storchaka43767632013-11-03 21:31:38 +02001670@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1671class Pep383Tests(unittest.TestCase):
1672 def setUp(self):
1673 if support.TESTFN_UNENCODABLE:
1674 self.dir = support.TESTFN_UNENCODABLE
1675 elif support.TESTFN_NONASCII:
1676 self.dir = support.TESTFN_NONASCII
1677 else:
1678 self.dir = support.TESTFN
1679 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001680
Serhiy Storchaka43767632013-11-03 21:31:38 +02001681 bytesfn = []
1682 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001683 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001684 fn = os.fsencode(fn)
1685 except UnicodeEncodeError:
1686 return
1687 bytesfn.append(fn)
1688 add_filename(support.TESTFN_UNICODE)
1689 if support.TESTFN_UNENCODABLE:
1690 add_filename(support.TESTFN_UNENCODABLE)
1691 if support.TESTFN_NONASCII:
1692 add_filename(support.TESTFN_NONASCII)
1693 if not bytesfn:
1694 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001695
Serhiy Storchaka43767632013-11-03 21:31:38 +02001696 self.unicodefn = set()
1697 os.mkdir(self.dir)
1698 try:
1699 for fn in bytesfn:
1700 support.create_empty_file(os.path.join(self.bdir, fn))
1701 fn = os.fsdecode(fn)
1702 if fn in self.unicodefn:
1703 raise ValueError("duplicate filename")
1704 self.unicodefn.add(fn)
1705 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001706 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001707 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001708
Serhiy Storchaka43767632013-11-03 21:31:38 +02001709 def tearDown(self):
1710 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001711
Serhiy Storchaka43767632013-11-03 21:31:38 +02001712 def test_listdir(self):
1713 expected = self.unicodefn
1714 found = set(os.listdir(self.dir))
1715 self.assertEqual(found, expected)
1716 # test listdir without arguments
1717 current_directory = os.getcwd()
1718 try:
1719 os.chdir(os.sep)
1720 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1721 finally:
1722 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001723
Serhiy Storchaka43767632013-11-03 21:31:38 +02001724 def test_open(self):
1725 for fn in self.unicodefn:
1726 f = open(os.path.join(self.dir, fn), 'rb')
1727 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001728
Serhiy Storchaka43767632013-11-03 21:31:38 +02001729 @unittest.skipUnless(hasattr(os, 'statvfs'),
1730 "need os.statvfs()")
1731 def test_statvfs(self):
1732 # issue #9645
1733 for fn in self.unicodefn:
1734 # should not fail with file not found error
1735 fullname = os.path.join(self.dir, fn)
1736 os.statvfs(fullname)
1737
1738 def test_stat(self):
1739 for fn in self.unicodefn:
1740 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001741
Brian Curtineb24d742010-04-12 17:16:38 +00001742@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1743class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001744 def _kill(self, sig):
1745 # Start sys.executable as a subprocess and communicate from the
1746 # subprocess to the parent that the interpreter is ready. When it
1747 # becomes ready, send *sig* via os.kill to the subprocess and check
1748 # that the return code is equal to *sig*.
1749 import ctypes
1750 from ctypes import wintypes
1751 import msvcrt
1752
1753 # Since we can't access the contents of the process' stdout until the
1754 # process has exited, use PeekNamedPipe to see what's inside stdout
1755 # without waiting. This is done so we can tell that the interpreter
1756 # is started and running at a point where it could handle a signal.
1757 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1758 PeekNamedPipe.restype = wintypes.BOOL
1759 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1760 ctypes.POINTER(ctypes.c_char), # stdout buf
1761 wintypes.DWORD, # Buffer size
1762 ctypes.POINTER(wintypes.DWORD), # bytes read
1763 ctypes.POINTER(wintypes.DWORD), # bytes avail
1764 ctypes.POINTER(wintypes.DWORD)) # bytes left
1765 msg = "running"
1766 proc = subprocess.Popen([sys.executable, "-c",
1767 "import sys;"
1768 "sys.stdout.write('{}');"
1769 "sys.stdout.flush();"
1770 "input()".format(msg)],
1771 stdout=subprocess.PIPE,
1772 stderr=subprocess.PIPE,
1773 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001774 self.addCleanup(proc.stdout.close)
1775 self.addCleanup(proc.stderr.close)
1776 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001777
1778 count, max = 0, 100
1779 while count < max and proc.poll() is None:
1780 # Create a string buffer to store the result of stdout from the pipe
1781 buf = ctypes.create_string_buffer(len(msg))
1782 # Obtain the text currently in proc.stdout
1783 # Bytes read/avail/left are left as NULL and unused
1784 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1785 buf, ctypes.sizeof(buf), None, None, None)
1786 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1787 if buf.value:
1788 self.assertEqual(msg, buf.value.decode())
1789 break
1790 time.sleep(0.1)
1791 count += 1
1792 else:
1793 self.fail("Did not receive communication from the subprocess")
1794
Brian Curtineb24d742010-04-12 17:16:38 +00001795 os.kill(proc.pid, sig)
1796 self.assertEqual(proc.wait(), sig)
1797
1798 def test_kill_sigterm(self):
1799 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001800 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001801
1802 def test_kill_int(self):
1803 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001804 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001805
1806 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001807 tagname = "test_os_%s" % uuid.uuid1()
1808 m = mmap.mmap(-1, 1, tagname)
1809 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001810 # Run a script which has console control handling enabled.
1811 proc = subprocess.Popen([sys.executable,
1812 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001813 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001814 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1815 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001816 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001817 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001818 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001819 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001820 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001821 count += 1
1822 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001823 # Forcefully kill the process if we weren't able to signal it.
1824 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001825 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001826 os.kill(proc.pid, event)
1827 # proc.send_signal(event) could also be done here.
1828 # Allow time for the signal to be passed and the process to exit.
1829 time.sleep(0.5)
1830 if not proc.poll():
1831 # Forcefully kill the process if we weren't able to signal it.
1832 os.kill(proc.pid, signal.SIGINT)
1833 self.fail("subprocess did not stop on {}".format(name))
1834
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001835 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001836 def test_CTRL_C_EVENT(self):
1837 from ctypes import wintypes
1838 import ctypes
1839
1840 # Make a NULL value by creating a pointer with no argument.
1841 NULL = ctypes.POINTER(ctypes.c_int)()
1842 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1843 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1844 wintypes.BOOL)
1845 SetConsoleCtrlHandler.restype = wintypes.BOOL
1846
1847 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001848 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001849 # by subprocesses.
1850 SetConsoleCtrlHandler(NULL, 0)
1851
1852 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1853
1854 def test_CTRL_BREAK_EVENT(self):
1855 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1856
1857
Brian Curtind40e6f72010-07-08 21:39:08 +00001858@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001859class Win32ListdirTests(unittest.TestCase):
1860 """Test listdir on Windows."""
1861
1862 def setUp(self):
1863 self.created_paths = []
1864 for i in range(2):
1865 dir_name = 'SUB%d' % i
1866 dir_path = os.path.join(support.TESTFN, dir_name)
1867 file_name = 'FILE%d' % i
1868 file_path = os.path.join(support.TESTFN, file_name)
1869 os.makedirs(dir_path)
1870 with open(file_path, 'w') as f:
1871 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1872 self.created_paths.extend([dir_name, file_name])
1873 self.created_paths.sort()
1874
1875 def tearDown(self):
1876 shutil.rmtree(support.TESTFN)
1877
1878 def test_listdir_no_extended_path(self):
1879 """Test when the path is not an "extended" path."""
1880 # unicode
1881 self.assertEqual(
1882 sorted(os.listdir(support.TESTFN)),
1883 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001884
Tim Golden781bbeb2013-10-25 20:24:06 +01001885 # bytes
Victor Stinner923590e2016-03-24 09:11:48 +01001886 with bytes_filename_warn(False):
1887 self.assertEqual(
1888 sorted(os.listdir(os.fsencode(support.TESTFN))),
1889 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001890
1891 def test_listdir_extended_path(self):
1892 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001893 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001894 # unicode
1895 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1896 self.assertEqual(
1897 sorted(os.listdir(path)),
1898 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001899
Tim Golden781bbeb2013-10-25 20:24:06 +01001900 # bytes
Victor Stinner923590e2016-03-24 09:11:48 +01001901 with bytes_filename_warn(False):
1902 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1903 self.assertEqual(
1904 sorted(os.listdir(path)),
1905 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001906
1907
1908@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001909@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001910class Win32SymlinkTests(unittest.TestCase):
1911 filelink = 'filelinktest'
1912 filelink_target = os.path.abspath(__file__)
1913 dirlink = 'dirlinktest'
1914 dirlink_target = os.path.dirname(filelink_target)
1915 missing_link = 'missing link'
1916
1917 def setUp(self):
1918 assert os.path.exists(self.dirlink_target)
1919 assert os.path.exists(self.filelink_target)
1920 assert not os.path.exists(self.dirlink)
1921 assert not os.path.exists(self.filelink)
1922 assert not os.path.exists(self.missing_link)
1923
1924 def tearDown(self):
1925 if os.path.exists(self.filelink):
1926 os.remove(self.filelink)
1927 if os.path.exists(self.dirlink):
1928 os.rmdir(self.dirlink)
1929 if os.path.lexists(self.missing_link):
1930 os.remove(self.missing_link)
1931
1932 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001933 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001934 self.assertTrue(os.path.exists(self.dirlink))
1935 self.assertTrue(os.path.isdir(self.dirlink))
1936 self.assertTrue(os.path.islink(self.dirlink))
1937 self.check_stat(self.dirlink, self.dirlink_target)
1938
1939 def test_file_link(self):
1940 os.symlink(self.filelink_target, self.filelink)
1941 self.assertTrue(os.path.exists(self.filelink))
1942 self.assertTrue(os.path.isfile(self.filelink))
1943 self.assertTrue(os.path.islink(self.filelink))
1944 self.check_stat(self.filelink, self.filelink_target)
1945
1946 def _create_missing_dir_link(self):
1947 'Create a "directory" link to a non-existent target'
1948 linkname = self.missing_link
1949 if os.path.lexists(linkname):
1950 os.remove(linkname)
1951 target = r'c:\\target does not exist.29r3c740'
1952 assert not os.path.exists(target)
1953 target_is_dir = True
1954 os.symlink(target, linkname, target_is_dir)
1955
1956 def test_remove_directory_link_to_missing_target(self):
1957 self._create_missing_dir_link()
1958 # For compatibility with Unix, os.remove will check the
1959 # directory status and call RemoveDirectory if the symlink
1960 # was created with target_is_dir==True.
1961 os.remove(self.missing_link)
1962
1963 @unittest.skip("currently fails; consider for improvement")
1964 def test_isdir_on_directory_link_to_missing_target(self):
1965 self._create_missing_dir_link()
1966 # consider having isdir return true for directory links
1967 self.assertTrue(os.path.isdir(self.missing_link))
1968
1969 @unittest.skip("currently fails; consider for improvement")
1970 def test_rmdir_on_directory_link_to_missing_target(self):
1971 self._create_missing_dir_link()
1972 # consider allowing rmdir to remove directory links
1973 os.rmdir(self.missing_link)
1974
1975 def check_stat(self, link, target):
1976 self.assertEqual(os.stat(link), os.stat(target))
1977 self.assertNotEqual(os.lstat(link), os.stat(link))
1978
Brian Curtind25aef52011-06-13 15:16:04 -05001979 bytes_link = os.fsencode(link)
Victor Stinner923590e2016-03-24 09:11:48 +01001980 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001981 self.assertEqual(os.stat(bytes_link), os.stat(target))
Victor Stinner923590e2016-03-24 09:11:48 +01001982 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001983 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001984
1985 def test_12084(self):
1986 level1 = os.path.abspath(support.TESTFN)
1987 level2 = os.path.join(level1, "level2")
1988 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01001989 self.addCleanup(support.rmtree, level1)
1990
1991 os.mkdir(level1)
1992 os.mkdir(level2)
1993 os.mkdir(level3)
1994
1995 file1 = os.path.abspath(os.path.join(level1, "file1"))
1996 create_file(file1)
1997
1998 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05001999 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002000 os.chdir(level2)
2001 link = os.path.join(level2, "link")
2002 os.symlink(os.path.relpath(file1), "link")
2003 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002004
Victor Stinnerae39d232016-03-24 17:12:55 +01002005 # Check os.stat calls from the same dir as the link
2006 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002007
Victor Stinnerae39d232016-03-24 17:12:55 +01002008 # Check os.stat calls from a dir below the link
2009 os.chdir(level1)
2010 self.assertEqual(os.stat(file1),
2011 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002012
Victor Stinnerae39d232016-03-24 17:12:55 +01002013 # Check os.stat calls from a dir above the link
2014 os.chdir(level3)
2015 self.assertEqual(os.stat(file1),
2016 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002017 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002018 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002019
Brian Curtind40e6f72010-07-08 21:39:08 +00002020
Tim Golden0321cf22014-05-05 19:46:17 +01002021@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2022class Win32JunctionTests(unittest.TestCase):
2023 junction = 'junctiontest'
2024 junction_target = os.path.dirname(os.path.abspath(__file__))
2025
2026 def setUp(self):
2027 assert os.path.exists(self.junction_target)
2028 assert not os.path.exists(self.junction)
2029
2030 def tearDown(self):
2031 if os.path.exists(self.junction):
2032 # os.rmdir delegates to Windows' RemoveDirectoryW,
2033 # which removes junction points safely.
2034 os.rmdir(self.junction)
2035
2036 def test_create_junction(self):
2037 _winapi.CreateJunction(self.junction_target, self.junction)
2038 self.assertTrue(os.path.exists(self.junction))
2039 self.assertTrue(os.path.isdir(self.junction))
2040
2041 # Junctions are not recognized as links.
2042 self.assertFalse(os.path.islink(self.junction))
2043
2044 def test_unlink_removes_junction(self):
2045 _winapi.CreateJunction(self.junction_target, self.junction)
2046 self.assertTrue(os.path.exists(self.junction))
2047
2048 os.unlink(self.junction)
2049 self.assertFalse(os.path.exists(self.junction))
2050
2051
Jason R. Coombs3a092862013-05-27 23:21:28 -04002052@support.skip_unless_symlink
2053class NonLocalSymlinkTests(unittest.TestCase):
2054
2055 def setUp(self):
2056 """
2057 Create this structure:
2058
2059 base
2060 \___ some_dir
2061 """
2062 os.makedirs('base/some_dir')
2063
2064 def tearDown(self):
2065 shutil.rmtree('base')
2066
2067 def test_directory_link_nonlocal(self):
2068 """
2069 The symlink target should resolve relative to the link, not relative
2070 to the current directory.
2071
2072 Then, link base/some_link -> base/some_dir and ensure that some_link
2073 is resolved as a directory.
2074
2075 In issue13772, it was discovered that directory detection failed if
2076 the symlink target was not specified relative to the current
2077 directory, which was a defect in the implementation.
2078 """
2079 src = os.path.join('base', 'some_link')
2080 os.symlink('some_dir', src)
2081 assert os.path.isdir(src)
2082
2083
Victor Stinnere8d51452010-08-19 01:05:19 +00002084class FSEncodingTests(unittest.TestCase):
2085 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002086 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2087 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002088
Victor Stinnere8d51452010-08-19 01:05:19 +00002089 def test_identity(self):
2090 # assert fsdecode(fsencode(x)) == x
2091 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2092 try:
2093 bytesfn = os.fsencode(fn)
2094 except UnicodeEncodeError:
2095 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002096 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002097
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002098
Brett Cannonefb00c02012-02-29 18:31:31 -05002099
2100class DeviceEncodingTests(unittest.TestCase):
2101
2102 def test_bad_fd(self):
2103 # Return None when an fd doesn't actually exist.
2104 self.assertIsNone(os.device_encoding(123456))
2105
Philip Jenveye308b7c2012-02-29 16:16:15 -08002106 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2107 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002108 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002109 def test_device_encoding(self):
2110 encoding = os.device_encoding(0)
2111 self.assertIsNotNone(encoding)
2112 self.assertTrue(codecs.lookup(encoding))
2113
2114
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002115class PidTests(unittest.TestCase):
2116 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2117 def test_getppid(self):
2118 p = subprocess.Popen([sys.executable, '-c',
2119 'import os; print(os.getppid())'],
2120 stdout=subprocess.PIPE)
2121 stdout, _ = p.communicate()
2122 # We are the parent of our subprocess
2123 self.assertEqual(int(stdout), os.getpid())
2124
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002125 def test_waitpid(self):
2126 args = [sys.executable, '-c', 'pass']
2127 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2128 status = os.waitpid(pid, 0)
2129 self.assertEqual(status, (pid, 0))
2130
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002131
Brian Curtin0151b8e2010-09-24 13:43:43 +00002132# The introduction of this TestCase caused at least two different errors on
2133# *nix buildbots. Temporarily skip this to let the buildbots move along.
2134@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002135@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2136class LoginTests(unittest.TestCase):
2137 def test_getlogin(self):
2138 user_name = os.getlogin()
2139 self.assertNotEqual(len(user_name), 0)
2140
2141
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002142@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2143 "needs os.getpriority and os.setpriority")
2144class ProgramPriorityTests(unittest.TestCase):
2145 """Tests for os.getpriority() and os.setpriority()."""
2146
2147 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002148
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002149 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2150 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2151 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002152 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2153 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002154 raise unittest.SkipTest("unable to reliably test setpriority "
2155 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002156 else:
2157 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002158 finally:
2159 try:
2160 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2161 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002162 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002163 raise
2164
2165
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002166if threading is not None:
2167 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002168
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002169 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002170
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002171 def __init__(self, conn):
2172 asynchat.async_chat.__init__(self, conn)
2173 self.in_buffer = []
2174 self.closed = False
2175 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002176
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002177 def handle_read(self):
2178 data = self.recv(4096)
2179 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002180
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002181 def get_data(self):
2182 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002183
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002184 def handle_close(self):
2185 self.close()
2186 self.closed = True
2187
2188 def handle_error(self):
2189 raise
2190
2191 def __init__(self, address):
2192 threading.Thread.__init__(self)
2193 asyncore.dispatcher.__init__(self)
2194 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2195 self.bind(address)
2196 self.listen(5)
2197 self.host, self.port = self.socket.getsockname()[:2]
2198 self.handler_instance = None
2199 self._active = False
2200 self._active_lock = threading.Lock()
2201
2202 # --- public API
2203
2204 @property
2205 def running(self):
2206 return self._active
2207
2208 def start(self):
2209 assert not self.running
2210 self.__flag = threading.Event()
2211 threading.Thread.start(self)
2212 self.__flag.wait()
2213
2214 def stop(self):
2215 assert self.running
2216 self._active = False
2217 self.join()
2218
2219 def wait(self):
2220 # wait for handler connection to be closed, then stop the server
2221 while not getattr(self.handler_instance, "closed", False):
2222 time.sleep(0.001)
2223 self.stop()
2224
2225 # --- internals
2226
2227 def run(self):
2228 self._active = True
2229 self.__flag.set()
2230 while self._active and asyncore.socket_map:
2231 self._active_lock.acquire()
2232 asyncore.loop(timeout=0.001, count=1)
2233 self._active_lock.release()
2234 asyncore.close_all()
2235
2236 def handle_accept(self):
2237 conn, addr = self.accept()
2238 self.handler_instance = self.Handler(conn)
2239
2240 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002241 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002242 handle_read = handle_connect
2243
2244 def writable(self):
2245 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002246
2247 def handle_error(self):
2248 raise
2249
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002250
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002251@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002252@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2253class TestSendfile(unittest.TestCase):
2254
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002255 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002256 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002257 not sys.platform.startswith("solaris") and \
2258 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002259 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2260 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002261
2262 @classmethod
2263 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002264 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002265 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002266
2267 @classmethod
2268 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002269 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002270 support.unlink(support.TESTFN)
2271
2272 def setUp(self):
2273 self.server = SendfileTestServer((support.HOST, 0))
2274 self.server.start()
2275 self.client = socket.socket()
2276 self.client.connect((self.server.host, self.server.port))
2277 self.client.settimeout(1)
2278 # synchronize by waiting for "220 ready" response
2279 self.client.recv(1024)
2280 self.sockno = self.client.fileno()
2281 self.file = open(support.TESTFN, 'rb')
2282 self.fileno = self.file.fileno()
2283
2284 def tearDown(self):
2285 self.file.close()
2286 self.client.close()
2287 if self.server.running:
2288 self.server.stop()
2289
2290 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2291 """A higher level wrapper representing how an application is
2292 supposed to use sendfile().
2293 """
2294 while 1:
2295 try:
2296 if self.SUPPORT_HEADERS_TRAILERS:
2297 return os.sendfile(sock, file, offset, nbytes, headers,
2298 trailers)
2299 else:
2300 return os.sendfile(sock, file, offset, nbytes)
2301 except OSError as err:
2302 if err.errno == errno.ECONNRESET:
2303 # disconnected
2304 raise
2305 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2306 # we have to retry send data
2307 continue
2308 else:
2309 raise
2310
2311 def test_send_whole_file(self):
2312 # normal send
2313 total_sent = 0
2314 offset = 0
2315 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002316 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002317 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2318 if sent == 0:
2319 break
2320 offset += sent
2321 total_sent += sent
2322 self.assertTrue(sent <= nbytes)
2323 self.assertEqual(offset, total_sent)
2324
2325 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002326 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002327 self.client.close()
2328 self.server.wait()
2329 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002330 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002331 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002332
2333 def test_send_at_certain_offset(self):
2334 # start sending a file at a certain offset
2335 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002336 offset = len(self.DATA) // 2
2337 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002338 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002339 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002340 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2341 if sent == 0:
2342 break
2343 offset += sent
2344 total_sent += sent
2345 self.assertTrue(sent <= nbytes)
2346
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002347 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002348 self.client.close()
2349 self.server.wait()
2350 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002351 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002352 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002353 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002354 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002355
2356 def test_offset_overflow(self):
2357 # specify an offset > file size
2358 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002359 try:
2360 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2361 except OSError as e:
2362 # Solaris can raise EINVAL if offset >= file length, ignore.
2363 if e.errno != errno.EINVAL:
2364 raise
2365 else:
2366 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002367 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002368 self.client.close()
2369 self.server.wait()
2370 data = self.server.handler_instance.get_data()
2371 self.assertEqual(data, b'')
2372
2373 def test_invalid_offset(self):
2374 with self.assertRaises(OSError) as cm:
2375 os.sendfile(self.sockno, self.fileno, -1, 4096)
2376 self.assertEqual(cm.exception.errno, errno.EINVAL)
2377
Martin Panterbf19d162015-09-09 01:01:13 +00002378 def test_keywords(self):
2379 # Keyword arguments should be supported
2380 os.sendfile(out=self.sockno, offset=0, count=4096,
2381 **{'in': self.fileno})
2382 if self.SUPPORT_HEADERS_TRAILERS:
2383 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002384 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002385
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002386 # --- headers / trailers tests
2387
Serhiy Storchaka43767632013-11-03 21:31:38 +02002388 @requires_headers_trailers
2389 def test_headers(self):
2390 total_sent = 0
2391 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2392 headers=[b"x" * 512])
2393 total_sent += sent
2394 offset = 4096
2395 nbytes = 4096
2396 while 1:
2397 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2398 offset, nbytes)
2399 if sent == 0:
2400 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002401 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002402 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002403
Serhiy Storchaka43767632013-11-03 21:31:38 +02002404 expected_data = b"x" * 512 + self.DATA
2405 self.assertEqual(total_sent, len(expected_data))
2406 self.client.close()
2407 self.server.wait()
2408 data = self.server.handler_instance.get_data()
2409 self.assertEqual(hash(data), hash(expected_data))
2410
2411 @requires_headers_trailers
2412 def test_trailers(self):
2413 TESTFN2 = support.TESTFN + "2"
2414 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002415
2416 self.addCleanup(support.unlink, TESTFN2)
2417 create_file(TESTFN2, file_data)
2418
2419 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002420 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2421 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002422 self.client.close()
2423 self.server.wait()
2424 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002425 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002426
Serhiy Storchaka43767632013-11-03 21:31:38 +02002427 @requires_headers_trailers
2428 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2429 'test needs os.SF_NODISKIO')
2430 def test_flags(self):
2431 try:
2432 os.sendfile(self.sockno, self.fileno, 0, 4096,
2433 flags=os.SF_NODISKIO)
2434 except OSError as err:
2435 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2436 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002437
2438
Larry Hastings9cf065c2012-06-22 16:30:09 -07002439def supports_extended_attributes():
2440 if not hasattr(os, "setxattr"):
2441 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002442
Larry Hastings9cf065c2012-06-22 16:30:09 -07002443 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002444 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002445 try:
2446 os.setxattr(fp.fileno(), b"user.test", b"")
2447 except OSError:
2448 return False
2449 finally:
2450 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002451
2452 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002453
2454
2455@unittest.skipUnless(supports_extended_attributes(),
2456 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002457# Kernels < 2.6.39 don't respect setxattr flags.
2458@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002459class ExtendedAttributeTests(unittest.TestCase):
2460
Larry Hastings9cf065c2012-06-22 16:30:09 -07002461 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002462 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002463 self.addCleanup(support.unlink, fn)
2464 create_file(fn)
2465
Benjamin Peterson799bd802011-08-31 22:15:17 -04002466 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002467 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002468 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002469
Victor Stinnerf12e5062011-10-16 22:12:03 +02002470 init_xattr = listxattr(fn)
2471 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002472
Larry Hastings9cf065c2012-06-22 16:30:09 -07002473 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002474 xattr = set(init_xattr)
2475 xattr.add("user.test")
2476 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002477 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2478 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2479 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002480
Benjamin Peterson799bd802011-08-31 22:15:17 -04002481 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002482 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002483 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002484
Benjamin Peterson799bd802011-08-31 22:15:17 -04002485 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002486 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002487 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002488
Larry Hastings9cf065c2012-06-22 16:30:09 -07002489 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002490 xattr.add("user.test2")
2491 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002492 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002493
Benjamin Peterson799bd802011-08-31 22:15:17 -04002494 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002495 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002496 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002497
Victor Stinnerf12e5062011-10-16 22:12:03 +02002498 xattr.remove("user.test")
2499 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002500 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2501 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2502 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2503 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002504 many = sorted("user.test{}".format(i) for i in range(100))
2505 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002506 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002507 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002508
Larry Hastings9cf065c2012-06-22 16:30:09 -07002509 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002510 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002511 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002512
2513 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2514 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002515
2516 def test_simple(self):
2517 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2518 os.listxattr)
2519
2520 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002521 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2522 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002523
2524 def test_fds(self):
2525 def getxattr(path, *args):
2526 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002527 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002528 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002529 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002530 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002531 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002532 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002533 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002534 def listxattr(path, *args):
2535 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002536 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002537 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2538
2539
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002540@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2541class Win32DeprecatedBytesAPI(unittest.TestCase):
2542 def test_deprecated(self):
2543 import nt
2544 filename = os.fsencode(support.TESTFN)
Victor Stinner923590e2016-03-24 09:11:48 +01002545 for func, *args in (
2546 (nt._getfullpathname, filename),
2547 (nt._isdir, filename),
2548 (os.access, filename, os.R_OK),
2549 (os.chdir, filename),
2550 (os.chmod, filename, 0o777),
2551 (os.getcwdb,),
2552 (os.link, filename, filename),
2553 (os.listdir, filename),
2554 (os.lstat, filename),
2555 (os.mkdir, filename),
2556 (os.open, filename, os.O_RDONLY),
2557 (os.rename, filename, filename),
2558 (os.rmdir, filename),
2559 (os.startfile, filename),
2560 (os.stat, filename),
2561 (os.unlink, filename),
2562 (os.utime, filename),
2563 ):
2564 with bytes_filename_warn(True):
2565 try:
2566 func(*args)
2567 except OSError:
2568 # ignore OSError, we only care about DeprecationWarning
2569 pass
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002570
Victor Stinner28216442011-11-16 00:34:44 +01002571 @support.skip_unless_symlink
2572 def test_symlink(self):
Victor Stinnere984eb52016-03-25 22:51:17 +01002573 self.addCleanup(support.unlink, support.TESTFN)
2574
Victor Stinner28216442011-11-16 00:34:44 +01002575 filename = os.fsencode(support.TESTFN)
Victor Stinner923590e2016-03-24 09:11:48 +01002576 with bytes_filename_warn(True):
2577 os.symlink(filename, filename)
Victor Stinner28216442011-11-16 00:34:44 +01002578
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002579
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002580@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2581class TermsizeTests(unittest.TestCase):
2582 def test_does_not_crash(self):
2583 """Check if get_terminal_size() returns a meaningful value.
2584
2585 There's no easy portable way to actually check the size of the
2586 terminal, so let's check if it returns something sensible instead.
2587 """
2588 try:
2589 size = os.get_terminal_size()
2590 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002591 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002592 # Under win32 a generic OSError can be thrown if the
2593 # handle cannot be retrieved
2594 self.skipTest("failed to query terminal size")
2595 raise
2596
Antoine Pitroucfade362012-02-08 23:48:59 +01002597 self.assertGreaterEqual(size.columns, 0)
2598 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002599
2600 def test_stty_match(self):
2601 """Check if stty returns the same results
2602
2603 stty actually tests stdin, so get_terminal_size is invoked on
2604 stdin explicitly. If stty succeeded, then get_terminal_size()
2605 should work too.
2606 """
2607 try:
2608 size = subprocess.check_output(['stty', 'size']).decode().split()
2609 except (FileNotFoundError, subprocess.CalledProcessError):
2610 self.skipTest("stty invocation failed")
2611 expected = (int(size[1]), int(size[0])) # reversed order
2612
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002613 try:
2614 actual = os.get_terminal_size(sys.__stdin__.fileno())
2615 except OSError as e:
2616 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2617 # Under win32 a generic OSError can be thrown if the
2618 # handle cannot be retrieved
2619 self.skipTest("failed to query terminal size")
2620 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002621 self.assertEqual(expected, actual)
2622
2623
Victor Stinner292c8352012-10-30 02:17:38 +01002624class OSErrorTests(unittest.TestCase):
2625 def setUp(self):
2626 class Str(str):
2627 pass
2628
Victor Stinnerafe17062012-10-31 22:47:43 +01002629 self.bytes_filenames = []
2630 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002631 if support.TESTFN_UNENCODABLE is not None:
2632 decoded = support.TESTFN_UNENCODABLE
2633 else:
2634 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002635 self.unicode_filenames.append(decoded)
2636 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002637 if support.TESTFN_UNDECODABLE is not None:
2638 encoded = support.TESTFN_UNDECODABLE
2639 else:
2640 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002641 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002642 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002643 self.bytes_filenames.append(memoryview(encoded))
2644
2645 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002646
2647 def test_oserror_filename(self):
2648 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002649 (self.filenames, os.chdir,),
2650 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002651 (self.filenames, os.lstat,),
2652 (self.filenames, os.open, os.O_RDONLY),
2653 (self.filenames, os.rmdir,),
2654 (self.filenames, os.stat,),
2655 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002656 ]
2657 if sys.platform == "win32":
2658 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002659 (self.bytes_filenames, os.rename, b"dst"),
2660 (self.bytes_filenames, os.replace, b"dst"),
2661 (self.unicode_filenames, os.rename, "dst"),
2662 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002663 # Issue #16414: Don't test undecodable names with listdir()
2664 # because of a Windows bug.
2665 #
2666 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2667 # empty list (instead of failing), whereas os.listdir(b'\xff')
2668 # raises a FileNotFoundError. It looks like a Windows bug:
2669 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2670 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2671 # ERROR_PATH_NOT_FOUND (3).
2672 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002673 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002674 else:
2675 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002676 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002677 (self.filenames, os.rename, "dst"),
2678 (self.filenames, os.replace, "dst"),
2679 ))
2680 if hasattr(os, "chown"):
2681 funcs.append((self.filenames, os.chown, 0, 0))
2682 if hasattr(os, "lchown"):
2683 funcs.append((self.filenames, os.lchown, 0, 0))
2684 if hasattr(os, "truncate"):
2685 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002686 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002687 funcs.append((self.filenames, os.chflags, 0))
2688 if hasattr(os, "lchflags"):
2689 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002690 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002691 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002692 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002693 if sys.platform == "win32":
2694 funcs.append((self.bytes_filenames, os.link, b"dst"))
2695 funcs.append((self.unicode_filenames, os.link, "dst"))
2696 else:
2697 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002698 if hasattr(os, "listxattr"):
2699 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002700 (self.filenames, os.listxattr,),
2701 (self.filenames, os.getxattr, "user.test"),
2702 (self.filenames, os.setxattr, "user.test", b'user'),
2703 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002704 ))
2705 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002706 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002707 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002708 if sys.platform == "win32":
2709 funcs.append((self.unicode_filenames, os.readlink,))
2710 else:
2711 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002712
Victor Stinnerafe17062012-10-31 22:47:43 +01002713 for filenames, func, *func_args in funcs:
2714 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002715 try:
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002716 if isinstance(name, str):
Victor Stinner923590e2016-03-24 09:11:48 +01002717 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002718 elif isinstance(name, bytes):
2719 with bytes_filename_warn(False):
2720 func(name, *func_args)
2721 else:
2722 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
2723 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002724 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002725 self.assertIs(err.filename, name)
2726 else:
2727 self.fail("No exception thrown by {}".format(func))
2728
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002729class CPUCountTests(unittest.TestCase):
2730 def test_cpu_count(self):
2731 cpus = os.cpu_count()
2732 if cpus is not None:
2733 self.assertIsInstance(cpus, int)
2734 self.assertGreater(cpus, 0)
2735 else:
2736 self.skipTest("Could not determine the number of CPUs")
2737
Victor Stinnerdaf45552013-08-28 00:53:59 +02002738
2739class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002740 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002741 fd = os.open(__file__, os.O_RDONLY)
2742 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002743 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002744
Victor Stinnerdaf45552013-08-28 00:53:59 +02002745 os.set_inheritable(fd, True)
2746 self.assertEqual(os.get_inheritable(fd), True)
2747
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002748 @unittest.skipIf(fcntl is None, "need fcntl")
2749 def test_get_inheritable_cloexec(self):
2750 fd = os.open(__file__, os.O_RDONLY)
2751 self.addCleanup(os.close, fd)
2752 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002753
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002754 # clear FD_CLOEXEC flag
2755 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2756 flags &= ~fcntl.FD_CLOEXEC
2757 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002758
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002759 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002760
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002761 @unittest.skipIf(fcntl is None, "need fcntl")
2762 def test_set_inheritable_cloexec(self):
2763 fd = os.open(__file__, os.O_RDONLY)
2764 self.addCleanup(os.close, fd)
2765 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2766 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002767
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002768 os.set_inheritable(fd, True)
2769 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2770 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002771
Victor Stinnerdaf45552013-08-28 00:53:59 +02002772 def test_open(self):
2773 fd = os.open(__file__, os.O_RDONLY)
2774 self.addCleanup(os.close, fd)
2775 self.assertEqual(os.get_inheritable(fd), False)
2776
2777 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2778 def test_pipe(self):
2779 rfd, wfd = os.pipe()
2780 self.addCleanup(os.close, rfd)
2781 self.addCleanup(os.close, wfd)
2782 self.assertEqual(os.get_inheritable(rfd), False)
2783 self.assertEqual(os.get_inheritable(wfd), False)
2784
2785 def test_dup(self):
2786 fd1 = os.open(__file__, os.O_RDONLY)
2787 self.addCleanup(os.close, fd1)
2788
2789 fd2 = os.dup(fd1)
2790 self.addCleanup(os.close, fd2)
2791 self.assertEqual(os.get_inheritable(fd2), False)
2792
2793 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2794 def test_dup2(self):
2795 fd = os.open(__file__, os.O_RDONLY)
2796 self.addCleanup(os.close, fd)
2797
2798 # inheritable by default
2799 fd2 = os.open(__file__, os.O_RDONLY)
2800 try:
2801 os.dup2(fd, fd2)
2802 self.assertEqual(os.get_inheritable(fd2), True)
2803 finally:
2804 os.close(fd2)
2805
2806 # force non-inheritable
2807 fd3 = os.open(__file__, os.O_RDONLY)
2808 try:
2809 os.dup2(fd, fd3, inheritable=False)
2810 self.assertEqual(os.get_inheritable(fd3), False)
2811 finally:
2812 os.close(fd3)
2813
2814 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2815 def test_openpty(self):
2816 master_fd, slave_fd = os.openpty()
2817 self.addCleanup(os.close, master_fd)
2818 self.addCleanup(os.close, slave_fd)
2819 self.assertEqual(os.get_inheritable(master_fd), False)
2820 self.assertEqual(os.get_inheritable(slave_fd), False)
2821
2822
Brett Cannon3f9183b2016-08-26 14:44:48 -07002823class PathTConverterTests(unittest.TestCase):
2824 # tuples of (function name, allows fd arguments, additional arguments to
2825 # function, cleanup function)
2826 functions = [
2827 ('stat', True, (), None),
2828 ('lstat', False, (), None),
2829 ('access', True, (os.F_OK,), None),
2830 ('chflags', False, (0,), None),
2831 ('lchflags', False, (0,), None),
2832 ('open', False, (0,), getattr(os, 'close', None)),
2833 ]
2834
2835 def test_path_t_converter(self):
2836 class PathLike:
2837 def __init__(self, path):
2838 self.path = path
2839
2840 def __fspath__(self):
2841 return self.path
2842
2843 str_filename = support.TESTFN
2844 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannon3f9183b2016-08-26 14:44:48 -07002845 fd = os.open(PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
2846 self.addCleanup(os.close, fd)
2847 self.addCleanup(support.unlink, support.TESTFN)
2848
2849 int_fspath = PathLike(fd)
2850 str_fspath = PathLike(str_filename)
2851 bytes_fspath = PathLike(bytes_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002852
2853 for name, allow_fd, extra_args, cleanup_fn in self.functions:
2854 with self.subTest(name=name):
2855 try:
2856 fn = getattr(os, name)
2857 except AttributeError:
2858 continue
2859
Brett Cannon8f96a302016-08-26 19:30:11 -07002860 for path in (str_filename, bytes_filename, str_fspath,
2861 bytes_fspath):
Brett Cannon3f9183b2016-08-26 14:44:48 -07002862 with self.subTest(name=name, path=path):
2863 result = fn(path, *extra_args)
2864 if cleanup_fn is not None:
2865 cleanup_fn(result)
2866
2867 with self.assertRaisesRegex(
2868 TypeError, 'should be string, bytes'):
2869 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07002870
2871 if allow_fd:
2872 result = fn(fd, *extra_args) # should not fail
2873 if cleanup_fn is not None:
2874 cleanup_fn(result)
2875 else:
2876 with self.assertRaisesRegex(
2877 TypeError,
2878 'os.PathLike'):
2879 fn(fd, *extra_args)
2880
2881
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002882@unittest.skipUnless(hasattr(os, 'get_blocking'),
2883 'needs os.get_blocking() and os.set_blocking()')
2884class BlockingTests(unittest.TestCase):
2885 def test_blocking(self):
2886 fd = os.open(__file__, os.O_RDONLY)
2887 self.addCleanup(os.close, fd)
2888 self.assertEqual(os.get_blocking(fd), True)
2889
2890 os.set_blocking(fd, False)
2891 self.assertEqual(os.get_blocking(fd), False)
2892
2893 os.set_blocking(fd, True)
2894 self.assertEqual(os.get_blocking(fd), True)
2895
2896
Yury Selivanov97e2e062014-09-26 12:33:06 -04002897
2898class ExportsTests(unittest.TestCase):
2899 def test_os_all(self):
2900 self.assertIn('open', os.__all__)
2901 self.assertIn('walk', os.__all__)
2902
2903
Victor Stinner6036e442015-03-08 01:58:04 +01002904class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02002905 check_no_resource_warning = support.check_no_resource_warning
2906
Victor Stinner6036e442015-03-08 01:58:04 +01002907 def setUp(self):
2908 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07002909 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01002910 self.addCleanup(support.rmtree, self.path)
2911 os.mkdir(self.path)
2912
2913 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07002914 path = self.bytes_path if isinstance(name, bytes) else self.path
2915 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01002916 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01002917 return filename
2918
2919 def get_entries(self, names):
2920 entries = dict((entry.name, entry)
2921 for entry in os.scandir(self.path))
2922 self.assertEqual(sorted(entries.keys()), names)
2923 return entries
2924
2925 def assert_stat_equal(self, stat1, stat2, skip_fields):
2926 if skip_fields:
2927 for attr in dir(stat1):
2928 if not attr.startswith("st_"):
2929 continue
2930 if attr in ("st_dev", "st_ino", "st_nlink"):
2931 continue
2932 self.assertEqual(getattr(stat1, attr),
2933 getattr(stat2, attr),
2934 (stat1, stat2, attr))
2935 else:
2936 self.assertEqual(stat1, stat2)
2937
2938 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07002939 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01002940 self.assertEqual(entry.name, name)
2941 self.assertEqual(entry.path, os.path.join(self.path, name))
2942 self.assertEqual(entry.inode(),
2943 os.stat(entry.path, follow_symlinks=False).st_ino)
2944
2945 entry_stat = os.stat(entry.path)
2946 self.assertEqual(entry.is_dir(),
2947 stat.S_ISDIR(entry_stat.st_mode))
2948 self.assertEqual(entry.is_file(),
2949 stat.S_ISREG(entry_stat.st_mode))
2950 self.assertEqual(entry.is_symlink(),
2951 os.path.islink(entry.path))
2952
2953 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2954 self.assertEqual(entry.is_dir(follow_symlinks=False),
2955 stat.S_ISDIR(entry_lstat.st_mode))
2956 self.assertEqual(entry.is_file(follow_symlinks=False),
2957 stat.S_ISREG(entry_lstat.st_mode))
2958
2959 self.assert_stat_equal(entry.stat(),
2960 entry_stat,
2961 os.name == 'nt' and not is_symlink)
2962 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2963 entry_lstat,
2964 os.name == 'nt')
2965
2966 def test_attributes(self):
2967 link = hasattr(os, 'link')
2968 symlink = support.can_symlink()
2969
2970 dirname = os.path.join(self.path, "dir")
2971 os.mkdir(dirname)
2972 filename = self.create_file("file.txt")
2973 if link:
2974 os.link(filename, os.path.join(self.path, "link_file.txt"))
2975 if symlink:
2976 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2977 target_is_directory=True)
2978 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2979
2980 names = ['dir', 'file.txt']
2981 if link:
2982 names.append('link_file.txt')
2983 if symlink:
2984 names.extend(('symlink_dir', 'symlink_file.txt'))
2985 entries = self.get_entries(names)
2986
2987 entry = entries['dir']
2988 self.check_entry(entry, 'dir', True, False, False)
2989
2990 entry = entries['file.txt']
2991 self.check_entry(entry, 'file.txt', False, True, False)
2992
2993 if link:
2994 entry = entries['link_file.txt']
2995 self.check_entry(entry, 'link_file.txt', False, True, False)
2996
2997 if symlink:
2998 entry = entries['symlink_dir']
2999 self.check_entry(entry, 'symlink_dir', True, False, True)
3000
3001 entry = entries['symlink_file.txt']
3002 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3003
3004 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003005 path = self.bytes_path if isinstance(name, bytes) else self.path
3006 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003007 self.assertEqual(len(entries), 1)
3008
3009 entry = entries[0]
3010 self.assertEqual(entry.name, name)
3011 return entry
3012
Brett Cannon96881cd2016-06-10 14:37:21 -07003013 def create_file_entry(self, name='file.txt'):
3014 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003015 return self.get_entry(os.path.basename(filename))
3016
3017 def test_current_directory(self):
3018 filename = self.create_file()
3019 old_dir = os.getcwd()
3020 try:
3021 os.chdir(self.path)
3022
3023 # call scandir() without parameter: it must list the content
3024 # of the current directory
3025 entries = dict((entry.name, entry) for entry in os.scandir())
3026 self.assertEqual(sorted(entries.keys()),
3027 [os.path.basename(filename)])
3028 finally:
3029 os.chdir(old_dir)
3030
3031 def test_repr(self):
3032 entry = self.create_file_entry()
3033 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3034
Brett Cannon96881cd2016-06-10 14:37:21 -07003035 def test_fspath_protocol(self):
3036 entry = self.create_file_entry()
3037 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3038
Martin Panter0c9ad592016-06-13 03:28:35 +00003039 @unittest.skipIf(os.name == "nt", "test requires bytes path support")
Brett Cannon96881cd2016-06-10 14:37:21 -07003040 def test_fspath_protocol_bytes(self):
3041 bytes_filename = os.fsencode('bytesfile.txt')
3042 bytes_entry = self.create_file_entry(name=bytes_filename)
3043 fspath = os.fspath(bytes_entry)
3044 self.assertIsInstance(fspath, bytes)
3045 self.assertEqual(fspath,
3046 os.path.join(os.fsencode(self.path),bytes_filename))
3047
Victor Stinner6036e442015-03-08 01:58:04 +01003048 def test_removed_dir(self):
3049 path = os.path.join(self.path, 'dir')
3050
3051 os.mkdir(path)
3052 entry = self.get_entry('dir')
3053 os.rmdir(path)
3054
3055 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3056 if os.name == 'nt':
3057 self.assertTrue(entry.is_dir())
3058 self.assertFalse(entry.is_file())
3059 self.assertFalse(entry.is_symlink())
3060 if os.name == 'nt':
3061 self.assertRaises(FileNotFoundError, entry.inode)
3062 # don't fail
3063 entry.stat()
3064 entry.stat(follow_symlinks=False)
3065 else:
3066 self.assertGreater(entry.inode(), 0)
3067 self.assertRaises(FileNotFoundError, entry.stat)
3068 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3069
3070 def test_removed_file(self):
3071 entry = self.create_file_entry()
3072 os.unlink(entry.path)
3073
3074 self.assertFalse(entry.is_dir())
3075 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3076 if os.name == 'nt':
3077 self.assertTrue(entry.is_file())
3078 self.assertFalse(entry.is_symlink())
3079 if os.name == 'nt':
3080 self.assertRaises(FileNotFoundError, entry.inode)
3081 # don't fail
3082 entry.stat()
3083 entry.stat(follow_symlinks=False)
3084 else:
3085 self.assertGreater(entry.inode(), 0)
3086 self.assertRaises(FileNotFoundError, entry.stat)
3087 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3088
3089 def test_broken_symlink(self):
3090 if not support.can_symlink():
3091 return self.skipTest('cannot create symbolic link')
3092
3093 filename = self.create_file("file.txt")
3094 os.symlink(filename,
3095 os.path.join(self.path, "symlink.txt"))
3096 entries = self.get_entries(['file.txt', 'symlink.txt'])
3097 entry = entries['symlink.txt']
3098 os.unlink(filename)
3099
3100 self.assertGreater(entry.inode(), 0)
3101 self.assertFalse(entry.is_dir())
3102 self.assertFalse(entry.is_file()) # broken symlink returns False
3103 self.assertFalse(entry.is_dir(follow_symlinks=False))
3104 self.assertFalse(entry.is_file(follow_symlinks=False))
3105 self.assertTrue(entry.is_symlink())
3106 self.assertRaises(FileNotFoundError, entry.stat)
3107 # don't fail
3108 entry.stat(follow_symlinks=False)
3109
3110 def test_bytes(self):
3111 if os.name == "nt":
3112 # On Windows, os.scandir(bytes) must raise an exception
Victor Stinner923590e2016-03-24 09:11:48 +01003113 with bytes_filename_warn(True):
3114 self.assertRaises(TypeError, os.scandir, b'.')
Victor Stinner6036e442015-03-08 01:58:04 +01003115 return
3116
3117 self.create_file("file.txt")
3118
3119 path_bytes = os.fsencode(self.path)
3120 entries = list(os.scandir(path_bytes))
3121 self.assertEqual(len(entries), 1, entries)
3122 entry = entries[0]
3123
3124 self.assertEqual(entry.name, b'file.txt')
3125 self.assertEqual(entry.path,
3126 os.fsencode(os.path.join(self.path, 'file.txt')))
3127
3128 def test_empty_path(self):
3129 self.assertRaises(FileNotFoundError, os.scandir, '')
3130
3131 def test_consume_iterator_twice(self):
3132 self.create_file("file.txt")
3133 iterator = os.scandir(self.path)
3134
3135 entries = list(iterator)
3136 self.assertEqual(len(entries), 1, entries)
3137
3138 # check than consuming the iterator twice doesn't raise exception
3139 entries2 = list(iterator)
3140 self.assertEqual(len(entries2), 0, entries2)
3141
3142 def test_bad_path_type(self):
3143 for obj in [1234, 1.234, {}, []]:
3144 self.assertRaises(TypeError, os.scandir, obj)
3145
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003146 def test_close(self):
3147 self.create_file("file.txt")
3148 self.create_file("file2.txt")
3149 iterator = os.scandir(self.path)
3150 next(iterator)
3151 iterator.close()
3152 # multiple closes
3153 iterator.close()
3154 with self.check_no_resource_warning():
3155 del iterator
3156
3157 def test_context_manager(self):
3158 self.create_file("file.txt")
3159 self.create_file("file2.txt")
3160 with os.scandir(self.path) as iterator:
3161 next(iterator)
3162 with self.check_no_resource_warning():
3163 del iterator
3164
3165 def test_context_manager_close(self):
3166 self.create_file("file.txt")
3167 self.create_file("file2.txt")
3168 with os.scandir(self.path) as iterator:
3169 next(iterator)
3170 iterator.close()
3171
3172 def test_context_manager_exception(self):
3173 self.create_file("file.txt")
3174 self.create_file("file2.txt")
3175 with self.assertRaises(ZeroDivisionError):
3176 with os.scandir(self.path) as iterator:
3177 next(iterator)
3178 1/0
3179 with self.check_no_resource_warning():
3180 del iterator
3181
3182 def test_resource_warning(self):
3183 self.create_file("file.txt")
3184 self.create_file("file2.txt")
3185 iterator = os.scandir(self.path)
3186 next(iterator)
3187 with self.assertWarns(ResourceWarning):
3188 del iterator
3189 support.gc_collect()
3190 # exhausted iterator
3191 iterator = os.scandir(self.path)
3192 list(iterator)
3193 with self.check_no_resource_warning():
3194 del iterator
3195
Victor Stinner6036e442015-03-08 01:58:04 +01003196
Ethan Furmancdc08792016-06-02 15:06:09 -07003197class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003198
3199 # Abstracted so it can be overridden to test pure Python implementation
3200 # if a C version is provided.
3201 fspath = staticmethod(os.fspath)
3202
3203 class PathLike:
3204 def __init__(self, path=''):
3205 self.path = path
3206 def __fspath__(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003207 if isinstance(self.path, BaseException):
3208 raise self.path
3209 else:
3210 return self.path
Ethan Furmancdc08792016-06-02 15:06:09 -07003211
3212 def test_return_bytes(self):
3213 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003214 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003215
3216 def test_return_string(self):
3217 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003218 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003219
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003220 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003221 for p in "path/like/object", b"path/like/object":
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003222 pathlike = self.PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003223
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003224 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003225 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3226 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3227
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003228 def test_pathlike(self):
3229 self.assertEqual('#feelthegil', self.fspath(self.PathLike('#feelthegil')))
3230 self.assertTrue(issubclass(self.PathLike, os.PathLike))
3231 self.assertTrue(isinstance(self.PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003232
Ethan Furmancdc08792016-06-02 15:06:09 -07003233 def test_garbage_in_exception_out(self):
3234 vapor = type('blah', (), {})
3235 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003236 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003237
3238 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003239 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003240
Brett Cannon044283a2016-07-15 10:41:49 -07003241 def test_bad_pathlike(self):
3242 # __fspath__ returns a value other than str or bytes.
3243 self.assertRaises(TypeError, self.fspath, self.PathLike(42))
3244 # __fspath__ attribute that is not callable.
3245 c = type('foo', (), {})
3246 c.__fspath__ = 1
3247 self.assertRaises(TypeError, self.fspath, c())
3248 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003249 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannon33ed8812016-07-15 11:26:53 -07003250 self.PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003251
3252# Only test if the C version is provided, otherwise TestPEP519 already tested
3253# the pure Python implementation.
3254if hasattr(os, "_fspath"):
3255 class TestPEP519PurePython(TestPEP519):
3256
3257 """Explicitly test the pure Python implementation of os.fspath()."""
3258
3259 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003260
3261
Fred Drake2e2be372001-09-20 21:33:42 +00003262if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003263 unittest.main()