blob: aa9b53874849d5d262c8c5d7770f1bfe995f735d [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Benjamin Peterson799bd802011-08-31 22:15:17 -040018import re
Victor Stinner47aacc82015-06-12 17:26:23 +020019import shutil
20import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000021import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010022import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020023import subprocess
24import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010025import sysconfig
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000031try:
32 import threading
33except ImportError:
34 threading = None
Antoine Pitrouec34ab52013-08-16 20:44:38 +020035try:
36 import resource
37except ImportError:
38 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020039try:
40 import fcntl
41except ImportError:
42 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010043try:
44 import _winapi
45except ImportError:
46 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020047try:
R David Murrayf2ad1732014-12-25 18:36:56 -050048 import grp
49 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
50 if hasattr(os, 'getgid'):
51 process_gid = os.getgid()
52 if process_gid not in groups:
53 groups.append(process_gid)
54except ImportError:
55 groups = []
56try:
57 import pwd
58 all_users = [u.pw_uid for u in pwd.getpwall()]
59except ImportError:
60 all_users = []
61try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020062 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020063except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020064 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020065
Berker Peksagce643912015-05-06 06:33:17 +030066from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020067from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000068
Victor Stinner923590e2016-03-24 09:11:48 +010069
R David Murrayf2ad1732014-12-25 18:36:56 -050070root_in_posix = False
71if hasattr(os, 'geteuid'):
72 root_in_posix = (os.geteuid() == 0)
73
Mark Dickinson7cf03892010-04-16 13:45:35 +000074# Detect whether we're on a Linux system that uses the (now outdated
75# and unmaintained) linuxthreads threading library. There's an issue
76# when combining linuxthreads with a failed execv call: see
77# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020078if hasattr(sys, 'thread_info') and sys.thread_info.version:
79 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
80else:
81 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000082
Stefan Krahebee49a2013-01-17 15:31:00 +010083# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
84HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
85
Victor Stinner923590e2016-03-24 09:11:48 +010086
87@contextlib.contextmanager
88def ignore_deprecation_warnings(msg_regex, quiet=False):
89 with support.check_warnings((msg_regex, DeprecationWarning), quiet=quiet):
90 yield
91
92
93@contextlib.contextmanager
94def bytes_filename_warn(expected):
95 msg = 'The Windows bytes API has been deprecated'
96 if os.name == 'nt':
97 with ignore_deprecation_warnings(msg, quiet=not expected):
98 yield
99 else:
100 yield
101
102
Victor Stinnerae39d232016-03-24 17:12:55 +0100103def create_file(filename, content=b'content'):
104 with open(filename, "xb", 0) as fp:
105 fp.write(content)
106
107
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000108# Tests creating TESTFN
109class FileTests(unittest.TestCase):
110 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000111 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000112 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000113 tearDown = setUp
114
115 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000116 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000118 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000119
Christian Heimesfdab48e2008-01-20 09:06:41 +0000120 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000121 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
122 # We must allocate two consecutive file descriptors, otherwise
123 # it will mess up other file descriptors (perhaps even the three
124 # standard ones).
125 second = os.dup(first)
126 try:
127 retries = 0
128 while second != first + 1:
129 os.close(first)
130 retries += 1
131 if retries > 10:
132 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000133 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000134 first, second = second, os.dup(second)
135 finally:
136 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000137 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000138 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000139 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000140
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000141 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000142 def test_rename(self):
143 path = support.TESTFN
144 old = sys.getrefcount(path)
145 self.assertRaises(TypeError, os.rename, path, 0)
146 new = sys.getrefcount(path)
147 self.assertEqual(old, new)
148
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000149 def test_read(self):
150 with open(support.TESTFN, "w+b") as fobj:
151 fobj.write(b"spam")
152 fobj.flush()
153 fd = fobj.fileno()
154 os.lseek(fd, 0, 0)
155 s = os.read(fd, 4)
156 self.assertEqual(type(s), bytes)
157 self.assertEqual(s, b"spam")
158
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200159 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200160 # Skip the test on 32-bit platforms: the number of bytes must fit in a
161 # Py_ssize_t type
162 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
163 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200164 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
165 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200166 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100167 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200168
169 # Issue #21932: Make sure that os.read() does not raise an
170 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200171 with open(support.TESTFN, "rb") as fp:
172 data = os.read(fp.fileno(), size)
173
174 # The test does not try to read more than 2 GB at once because the
175 # operating system is free to return less bytes than requested.
176 self.assertEqual(data, b'test')
177
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000178 def test_write(self):
179 # os.write() accepts bytes- and buffer-like objects but not strings
180 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
181 self.assertRaises(TypeError, os.write, fd, "beans")
182 os.write(fd, b"bacon\n")
183 os.write(fd, bytearray(b"eggs\n"))
184 os.write(fd, memoryview(b"spam\n"))
185 os.close(fd)
186 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000187 self.assertEqual(fobj.read().splitlines(),
188 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000189
Victor Stinnere0daff12011-03-20 23:36:35 +0100190 def write_windows_console(self, *args):
191 retcode = subprocess.call(args,
192 # use a new console to not flood the test output
193 creationflags=subprocess.CREATE_NEW_CONSOLE,
194 # use a shell to hide the console window (SW_HIDE)
195 shell=True)
196 self.assertEqual(retcode, 0)
197
198 @unittest.skipUnless(sys.platform == 'win32',
199 'test specific to the Windows console')
200 def test_write_windows_console(self):
201 # Issue #11395: the Windows console returns an error (12: not enough
202 # space error) on writing into stdout if stdout mode is binary and the
203 # length is greater than 66,000 bytes (or less, depending on heap
204 # usage).
205 code = "print('x' * 100000)"
206 self.write_windows_console(sys.executable, "-c", code)
207 self.write_windows_console(sys.executable, "-u", "-c", code)
208
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000209 def fdopen_helper(self, *args):
210 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200211 f = os.fdopen(fd, *args)
212 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000213
214 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200215 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
216 os.close(fd)
217
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218 self.fdopen_helper()
219 self.fdopen_helper('r')
220 self.fdopen_helper('r', 100)
221
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100222 def test_replace(self):
223 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100224 self.addCleanup(support.unlink, support.TESTFN)
225 self.addCleanup(support.unlink, TESTFN2)
226
227 create_file(support.TESTFN, b"1")
228 create_file(TESTFN2, b"2")
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 os.replace(support.TESTFN, TESTFN2)
231 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
232 with open(TESTFN2, 'r') as f:
233 self.assertEqual(f.read(), "1")
234
Martin Panterbf19d162015-09-09 01:01:13 +0000235 def test_open_keywords(self):
236 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
237 dir_fd=None)
238 os.close(f)
239
240 def test_symlink_keywords(self):
241 symlink = support.get_attribute(os, "symlink")
242 try:
243 symlink(src='target', dst=support.TESTFN,
244 target_is_directory=False, dir_fd=None)
245 except (NotImplementedError, OSError):
246 pass # No OS support or unprivileged user
247
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249# Test attributes on return values from os.*stat* family.
250class StatAttributeTests(unittest.TestCase):
251 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200252 self.fname = support.TESTFN
253 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100254 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255
Serhiy Storchaka43767632013-11-03 21:31:38 +0200256 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000257 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000258 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
260 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(result[stat.ST_SIZE], 3)
262 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264 # Make sure all the attributes are there
265 members = dir(result)
266 for name in dir(stat):
267 if name[:3] == 'ST_':
268 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000269 if name.endswith("TIME"):
270 def trunc(x): return int(x)
271 else:
272 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000275 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276
Larry Hastings6fe20b32012-04-19 15:07:49 -0700277 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700278 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700279 for name in 'st_atime st_mtime st_ctime'.split():
280 floaty = int(getattr(result, name) * 100000)
281 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700282 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700283
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 try:
285 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200286 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000287 except IndexError:
288 pass
289
290 # Make sure that assignment fails
291 try:
292 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200293 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000294 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 pass
296
297 try:
298 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200299 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000300 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 pass
302
303 try:
304 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200305 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000306 except AttributeError:
307 pass
308
309 # Use the stat_result constructor with a too-short tuple.
310 try:
311 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200312 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000313 except TypeError:
314 pass
315
Ezio Melotti42da6632011-03-15 05:18:48 +0200316 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000317 try:
318 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
319 except TypeError:
320 pass
321
Antoine Pitrou38425292010-09-21 18:19:07 +0000322 def test_stat_attributes(self):
323 self.check_stat_attributes(self.fname)
324
325 def test_stat_attributes_bytes(self):
326 try:
327 fname = self.fname.encode(sys.getfilesystemencoding())
328 except UnicodeEncodeError:
329 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Victor Stinner923590e2016-03-24 09:11:48 +0100330 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +0100331 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000332
Christian Heimes25827622013-10-12 01:27:08 +0200333 def test_stat_result_pickle(self):
334 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200335 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
336 p = pickle.dumps(result, proto)
337 self.assertIn(b'stat_result', p)
338 if proto < 4:
339 self.assertIn(b'cos\nstat_result\n', p)
340 unpickled = pickle.loads(p)
341 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200342
Serhiy Storchaka43767632013-11-03 21:31:38 +0200343 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000344 def test_statvfs_attributes(self):
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000345 try:
346 result = os.statvfs(self.fname)
Guido van Rossumb940e112007-01-10 16:19:56 +0000347 except OSError as e:
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000348 # On AtheOS, glibc always returns ENOSYS
Martin v. Löwisf90ae202002-06-11 06:22:31 +0000349 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200350 self.skipTest('os.statvfs() failed with ENOSYS')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000351
352 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000353 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000355 # Make sure all the attributes are there.
356 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
357 'ffree', 'favail', 'flag', 'namemax')
358 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000359 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000360
361 # Make sure that assignment really fails
362 try:
363 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200364 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000365 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000366 pass
367
368 try:
369 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200370 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000371 except AttributeError:
372 pass
373
374 # Use the constructor with a too-short tuple.
375 try:
376 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200377 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000378 except TypeError:
379 pass
380
Ezio Melotti42da6632011-03-15 05:18:48 +0200381 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000382 try:
383 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
384 except TypeError:
385 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000386
Christian Heimes25827622013-10-12 01:27:08 +0200387 @unittest.skipUnless(hasattr(os, 'statvfs'),
388 "need os.statvfs()")
389 def test_statvfs_result_pickle(self):
390 try:
391 result = os.statvfs(self.fname)
392 except OSError as e:
393 # On AtheOS, glibc always returns ENOSYS
394 if e.errno == errno.ENOSYS:
Victor Stinner370cb252013-10-12 01:33:54 +0200395 self.skipTest('os.statvfs() failed with ENOSYS')
396
Serhiy Storchakabad12572014-12-15 14:03:42 +0200397 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
398 p = pickle.dumps(result, proto)
399 self.assertIn(b'statvfs_result', p)
400 if proto < 4:
401 self.assertIn(b'cos\nstatvfs_result\n', p)
402 unpickled = pickle.loads(p)
403 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200404
Serhiy Storchaka43767632013-11-03 21:31:38 +0200405 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
406 def test_1686475(self):
407 # Verify that an open file can be stat'ed
408 try:
409 os.stat(r"c:\pagefile.sys")
410 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600411 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200412 except OSError as e:
413 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000414
Serhiy Storchaka43767632013-11-03 21:31:38 +0200415 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
416 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
417 def test_15261(self):
418 # Verify that stat'ing a closed fd does not cause crash
419 r, w = os.pipe()
420 try:
421 os.stat(r) # should not raise error
422 finally:
423 os.close(r)
424 os.close(w)
425 with self.assertRaises(OSError) as ctx:
426 os.stat(r)
427 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100428
Zachary Ware63f277b2014-06-19 09:46:37 -0500429 def check_file_attributes(self, result):
430 self.assertTrue(hasattr(result, 'st_file_attributes'))
431 self.assertTrue(isinstance(result.st_file_attributes, int))
432 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
433
434 @unittest.skipUnless(sys.platform == "win32",
435 "st_file_attributes is Win32 specific")
436 def test_file_attributes(self):
437 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
438 result = os.stat(self.fname)
439 self.check_file_attributes(result)
440 self.assertEqual(
441 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
442 0)
443
444 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200445 dirname = support.TESTFN + "dir"
446 os.mkdir(dirname)
447 self.addCleanup(os.rmdir, dirname)
448
449 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500450 self.check_file_attributes(result)
451 self.assertEqual(
452 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
453 stat.FILE_ATTRIBUTE_DIRECTORY)
454
Victor Stinner47aacc82015-06-12 17:26:23 +0200455
456class UtimeTests(unittest.TestCase):
457 def setUp(self):
458 self.dirname = support.TESTFN
459 self.fname = os.path.join(self.dirname, "f1")
460
461 self.addCleanup(support.rmtree, self.dirname)
462 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100463 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200464
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200465 def restore_float_times(state):
Victor Stinner923590e2016-03-24 09:11:48 +0100466 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200467 os.stat_float_times(state)
468
Victor Stinner47aacc82015-06-12 17:26:23 +0200469 # ensure that st_atime and st_mtime are float
Victor Stinner923590e2016-03-24 09:11:48 +0100470 with ignore_deprecation_warnings('stat_float_times'):
Victor Stinnerc0b1e0f2015-07-20 17:12:57 +0200471 old_float_times = os.stat_float_times(-1)
472 self.addCleanup(restore_float_times, old_float_times)
Victor Stinner47aacc82015-06-12 17:26:23 +0200473
474 os.stat_float_times(True)
475
476 def support_subsecond(self, filename):
477 # Heuristic to check if the filesystem supports timestamp with
478 # subsecond resolution: check if float and int timestamps are different
479 st = os.stat(filename)
480 return ((st.st_atime != st[7])
481 or (st.st_mtime != st[8])
482 or (st.st_ctime != st[9]))
483
484 def _test_utime(self, set_time, filename=None):
485 if not filename:
486 filename = self.fname
487
488 support_subsecond = self.support_subsecond(filename)
489 if support_subsecond:
490 # Timestamp with a resolution of 1 microsecond (10^-6).
491 #
492 # The resolution of the C internal function used by os.utime()
493 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
494 # test with a resolution of 1 ns requires more work:
495 # see the issue #15745.
496 atime_ns = 1002003000 # 1.002003 seconds
497 mtime_ns = 4005006000 # 4.005006 seconds
498 else:
499 # use a resolution of 1 second
500 atime_ns = 5 * 10**9
501 mtime_ns = 8 * 10**9
502
503 set_time(filename, (atime_ns, mtime_ns))
504 st = os.stat(filename)
505
506 if support_subsecond:
507 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
508 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
509 else:
510 self.assertEqual(st.st_atime, atime_ns * 1e-9)
511 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
512 self.assertEqual(st.st_atime_ns, atime_ns)
513 self.assertEqual(st.st_mtime_ns, mtime_ns)
514
515 def test_utime(self):
516 def set_time(filename, ns):
517 # test the ns keyword parameter
518 os.utime(filename, ns=ns)
519 self._test_utime(set_time)
520
521 @staticmethod
522 def ns_to_sec(ns):
523 # Convert a number of nanosecond (int) to a number of seconds (float).
524 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
525 # issue, os.utime() rounds towards minus infinity.
526 return (ns * 1e-9) + 0.5e-9
527
528 def test_utime_by_indexed(self):
529 # pass times as floating point seconds as the second indexed parameter
530 def set_time(filename, ns):
531 atime_ns, mtime_ns = ns
532 atime = self.ns_to_sec(atime_ns)
533 mtime = self.ns_to_sec(mtime_ns)
534 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
535 # or utime(time_t)
536 os.utime(filename, (atime, mtime))
537 self._test_utime(set_time)
538
539 def test_utime_by_times(self):
540 def set_time(filename, ns):
541 atime_ns, mtime_ns = ns
542 atime = self.ns_to_sec(atime_ns)
543 mtime = self.ns_to_sec(mtime_ns)
544 # test the times keyword parameter
545 os.utime(filename, times=(atime, mtime))
546 self._test_utime(set_time)
547
548 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
549 "follow_symlinks support for utime required "
550 "for this test.")
551 def test_utime_nofollow_symlinks(self):
552 def set_time(filename, ns):
553 # use follow_symlinks=False to test utimensat(timespec)
554 # or lutimes(timeval)
555 os.utime(filename, ns=ns, follow_symlinks=False)
556 self._test_utime(set_time)
557
558 @unittest.skipUnless(os.utime in os.supports_fd,
559 "fd support for utime required for this test.")
560 def test_utime_fd(self):
561 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100562 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200563 # use a file descriptor to test futimens(timespec)
564 # or futimes(timeval)
565 os.utime(fp.fileno(), ns=ns)
566 self._test_utime(set_time)
567
568 @unittest.skipUnless(os.utime in os.supports_dir_fd,
569 "dir_fd support for utime required for this test.")
570 def test_utime_dir_fd(self):
571 def set_time(filename, ns):
572 dirname, name = os.path.split(filename)
573 dirfd = os.open(dirname, os.O_RDONLY)
574 try:
575 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
576 os.utime(name, dir_fd=dirfd, ns=ns)
577 finally:
578 os.close(dirfd)
579 self._test_utime(set_time)
580
581 def test_utime_directory(self):
582 def set_time(filename, ns):
583 # test calling os.utime() on a directory
584 os.utime(filename, ns=ns)
585 self._test_utime(set_time, filename=self.dirname)
586
587 def _test_utime_current(self, set_time):
588 # Get the system clock
589 current = time.time()
590
591 # Call os.utime() to set the timestamp to the current system clock
592 set_time(self.fname)
593
594 if not self.support_subsecond(self.fname):
595 delta = 1.0
596 else:
597 # On Windows, the usual resolution of time.time() is 15.6 ms
598 delta = 0.020
599 st = os.stat(self.fname)
600 msg = ("st_time=%r, current=%r, dt=%r"
601 % (st.st_mtime, current, st.st_mtime - current))
602 self.assertAlmostEqual(st.st_mtime, current,
603 delta=delta, msg=msg)
604
605 def test_utime_current(self):
606 def set_time(filename):
607 # Set to the current time in the new way
608 os.utime(self.fname)
609 self._test_utime_current(set_time)
610
611 def test_utime_current_old(self):
612 def set_time(filename):
613 # Set to the current time in the old explicit way.
614 os.utime(self.fname, None)
615 self._test_utime_current(set_time)
616
617 def get_file_system(self, path):
618 if sys.platform == 'win32':
619 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
620 import ctypes
621 kernel32 = ctypes.windll.kernel32
622 buf = ctypes.create_unicode_buffer("", 100)
623 ok = kernel32.GetVolumeInformationW(root, None, 0,
624 None, None, None,
625 buf, len(buf))
626 if ok:
627 return buf.value
628 # return None if the filesystem is unknown
629
630 def test_large_time(self):
631 # Many filesystems are limited to the year 2038. At least, the test
632 # pass with NTFS filesystem.
633 if self.get_file_system(self.dirname) != "NTFS":
634 self.skipTest("requires NTFS")
635
636 large = 5000000000 # some day in 2128
637 os.utime(self.fname, (large, large))
638 self.assertEqual(os.stat(self.fname).st_mtime, large)
639
640 def test_utime_invalid_arguments(self):
641 # seconds and nanoseconds parameters are mutually exclusive
642 with self.assertRaises(ValueError):
643 os.utime(self.fname, (5, 5), ns=(5, 5))
644
645
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000646from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000647
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000648class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000649 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000650 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000651
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000652 def setUp(self):
653 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000654 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000655 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000656 for key, value in self._reference().items():
657 os.environ[key] = value
658
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000659 def tearDown(self):
660 os.environ.clear()
661 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000662 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000663 os.environb.clear()
664 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000665
Christian Heimes90333392007-11-01 19:08:42 +0000666 def _reference(self):
667 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
668
669 def _empty_mapping(self):
670 os.environ.clear()
671 return os.environ
672
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000673 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200674 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
675 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000676 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000677 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300678 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200679 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300680 value = popen.read().strip()
681 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000682
Xavier de Gayed1415312016-07-22 12:15:29 +0200683 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
684 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000685 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200686 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
687 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300688 it = iter(popen)
689 self.assertEqual(next(it), "line1\n")
690 self.assertEqual(next(it), "line2\n")
691 self.assertEqual(next(it), "line3\n")
692 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000693
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000694 # Verify environ keys and values from the OS are of the
695 # correct str type.
696 def test_keyvalue_types(self):
697 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000698 self.assertEqual(type(key), str)
699 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000700
Christian Heimes90333392007-11-01 19:08:42 +0000701 def test_items(self):
702 for key, value in self._reference().items():
703 self.assertEqual(os.environ.get(key), value)
704
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000705 # Issue 7310
706 def test___repr__(self):
707 """Check that the repr() of os.environ looks like environ({...})."""
708 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000709 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
710 '{!r}: {!r}'.format(key, value)
711 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000712
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000713 def test_get_exec_path(self):
714 defpath_list = os.defpath.split(os.pathsep)
715 test_path = ['/monty', '/python', '', '/flying/circus']
716 test_env = {'PATH': os.pathsep.join(test_path)}
717
718 saved_environ = os.environ
719 try:
720 os.environ = dict(test_env)
721 # Test that defaulting to os.environ works.
722 self.assertSequenceEqual(test_path, os.get_exec_path())
723 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
724 finally:
725 os.environ = saved_environ
726
727 # No PATH environment variable
728 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
729 # Empty PATH environment variable
730 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
731 # Supplied PATH environment variable
732 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
733
Victor Stinnerb745a742010-05-18 17:17:23 +0000734 if os.supports_bytes_environ:
735 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000736 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000737 # ignore BytesWarning warning
738 with warnings.catch_warnings(record=True):
739 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000740 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000741 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000742 pass
743 else:
744 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000745
746 # bytes key and/or value
747 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
748 ['abc'])
749 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
750 ['abc'])
751 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
752 ['abc'])
753
754 @unittest.skipUnless(os.supports_bytes_environ,
755 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000756 def test_environb(self):
757 # os.environ -> os.environb
758 value = 'euro\u20ac'
759 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000760 value_bytes = value.encode(sys.getfilesystemencoding(),
761 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000762 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000763 msg = "U+20AC character is not encodable to %s" % (
764 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000765 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000766 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000767 self.assertEqual(os.environ['unicode'], value)
768 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000769
770 # os.environb -> os.environ
771 value = b'\xff'
772 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000773 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000774 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000775 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000776
Charles-François Natali2966f102011-11-26 11:32:46 +0100777 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
778 # #13415).
779 @support.requires_freebsd_version(7)
780 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100781 def test_unset_error(self):
782 if sys.platform == "win32":
783 # an environment variable is limited to 32,767 characters
784 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100785 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100786 else:
787 # "=" is not allowed in a variable name
788 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100789 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100790
Victor Stinner6d101392013-04-14 16:35:04 +0200791 def test_key_type(self):
792 missing = 'missingkey'
793 self.assertNotIn(missing, os.environ)
794
Victor Stinner839e5ea2013-04-14 16:43:03 +0200795 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200796 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200797 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200798 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200799
Victor Stinner839e5ea2013-04-14 16:43:03 +0200800 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200801 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200802 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200803 self.assertTrue(cm.exception.__suppress_context__)
804
Victor Stinner6d101392013-04-14 16:35:04 +0200805
Tim Petersc4e09402003-04-25 07:11:48 +0000806class WalkTests(unittest.TestCase):
807 """Tests for os.walk()."""
808
Victor Stinner0561c532015-03-12 10:28:24 +0100809 # Wrapper to hide minor differences between os.walk and os.fwalk
810 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200811 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200812 if 'follow_symlinks' in kwargs:
813 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200814 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100815
Charles-François Natali7372b062012-02-05 15:15:38 +0100816 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100817 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100818 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000819
820 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000821 # TESTFN/
822 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000823 # tmp1
824 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000825 # tmp2
826 # SUB11/ no kids
827 # SUB2/ a file kid and a dirsymlink kid
828 # tmp3
829 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200830 # broken_link
Guido van Rossumd8faa362007-04-27 19:54:29 +0000831 # TEST2/
832 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100833 self.walk_path = join(support.TESTFN, "TEST1")
834 self.sub1_path = join(self.walk_path, "SUB1")
835 self.sub11_path = join(self.sub1_path, "SUB11")
836 sub2_path = join(self.walk_path, "SUB2")
837 tmp1_path = join(self.walk_path, "tmp1")
838 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000839 tmp3_path = join(sub2_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100840 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000841 t2_path = join(support.TESTFN, "TEST2")
842 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200843 broken_link_path = join(sub2_path, "broken_link")
Tim Petersc4e09402003-04-25 07:11:48 +0000844
845 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100846 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000847 os.makedirs(sub2_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000848 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100849
Guido van Rossumd8faa362007-04-27 19:54:29 +0000850 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100851 with open(path, "x") as f:
852 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000853
Victor Stinner0561c532015-03-12 10:28:24 +0100854 if support.can_symlink():
855 os.symlink(os.path.abspath(t2_path), self.link_path)
856 os.symlink('broken', broken_link_path, True)
Serhiy Storchakaadca8462016-03-08 21:13:35 +0200857 self.sub2_tree = (sub2_path, ["link"], ["broken_link", "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100858 else:
859 self.sub2_tree = (sub2_path, [], ["tmp3"])
860
861 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000862 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300863 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100864
Tim Petersc4e09402003-04-25 07:11:48 +0000865 self.assertEqual(len(all), 4)
866 # We can't know which order SUB1 and SUB2 will appear in.
867 # Not flipped: TESTFN, SUB1, SUB11, SUB2
868 # flipped: TESTFN, SUB2, SUB1, SUB11
869 flipped = all[0][1][0] != "SUB1"
870 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200871 all[3 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100872 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
873 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
874 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
875 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000876
Victor Stinner0561c532015-03-12 10:28:24 +0100877 def test_walk_prune(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000878 # Prune the search.
879 all = []
Victor Stinner0561c532015-03-12 10:28:24 +0100880 for root, dirs, files in self.walk(self.walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000881 all.append((root, dirs, files))
882 # Don't descend into SUB1.
883 if 'SUB1' in dirs:
884 # Note that this also mutates the dirs we appended to all!
885 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000886
Victor Stinner0561c532015-03-12 10:28:24 +0100887 self.assertEqual(len(all), 2)
888 self.assertEqual(all[0],
889 (self.walk_path, ["SUB2"], ["tmp1"]))
890
891 all[1][-1].sort()
892 self.assertEqual(all[1], self.sub2_tree)
893
894 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000895 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100896 all = list(self.walk(self.walk_path, topdown=False))
897
Victor Stinner53b0a412016-03-26 01:12:36 +0100898 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000899 # We can't know which order SUB1 and SUB2 will appear in.
900 # Not flipped: SUB11, SUB1, SUB2, TESTFN
901 # flipped: SUB2, SUB11, SUB1, TESTFN
902 flipped = all[3][1][0] != "SUB1"
903 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200904 all[2 - 2 * flipped][-1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100905 self.assertEqual(all[3],
906 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
907 self.assertEqual(all[flipped],
908 (self.sub11_path, [], []))
909 self.assertEqual(all[flipped + 1],
910 (self.sub1_path, ["SUB11"], ["tmp2"]))
911 self.assertEqual(all[2 - 2 * flipped],
912 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000913
Victor Stinner0561c532015-03-12 10:28:24 +0100914 def test_walk_symlink(self):
915 if not support.can_symlink():
916 self.skipTest("need symlink support")
917
918 # Walk, following symlinks.
919 walk_it = self.walk(self.walk_path, follow_symlinks=True)
920 for root, dirs, files in walk_it:
921 if root == self.link_path:
922 self.assertEqual(dirs, [])
923 self.assertEqual(files, ["tmp4"])
924 break
925 else:
926 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000927
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200928 def test_walk_bad_dir(self):
929 # Walk top-down.
930 errors = []
931 walk_it = self.walk(self.walk_path, onerror=errors.append)
932 root, dirs, files = next(walk_it)
933 self.assertFalse(errors)
934 dir1 = dirs[0]
935 dir1new = dir1 + '.new'
936 os.rename(os.path.join(root, dir1), os.path.join(root, dir1new))
937 roots = [r for r, d, f in walk_it]
938 self.assertTrue(errors)
939 self.assertNotIn(os.path.join(root, dir1), roots)
940 self.assertNotIn(os.path.join(root, dir1new), roots)
941 for dir2 in dirs[1:]:
942 self.assertIn(os.path.join(root, dir2), roots)
943
Charles-François Natali7372b062012-02-05 15:15:38 +0100944
945@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
946class FwalkTests(WalkTests):
947 """Tests for os.fwalk()."""
948
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200949 def walk(self, top, **kwargs):
950 for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +0100951 yield (root, dirs, files)
952
Larry Hastingsc48fe982012-06-25 04:49:05 -0700953 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
954 """
955 compare with walk() results.
956 """
Larry Hastingsb4038062012-07-15 10:57:38 -0700957 walk_kwargs = walk_kwargs.copy()
958 fwalk_kwargs = fwalk_kwargs.copy()
959 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
960 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
961 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -0700962
Charles-François Natali7372b062012-02-05 15:15:38 +0100963 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -0700964 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100965 expected[root] = (set(dirs), set(files))
966
Larry Hastingsc48fe982012-06-25 04:49:05 -0700967 for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +0100968 self.assertIn(root, expected)
969 self.assertEqual(expected[root], (set(dirs), set(files)))
970
Larry Hastingsc48fe982012-06-25 04:49:05 -0700971 def test_compare_to_walk(self):
972 kwargs = {'top': support.TESTFN}
973 self._compare_to_walk(kwargs, kwargs)
974
Charles-François Natali7372b062012-02-05 15:15:38 +0100975 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -0700976 try:
977 fd = os.open(".", os.O_RDONLY)
978 walk_kwargs = {'top': support.TESTFN}
979 fwalk_kwargs = walk_kwargs.copy()
980 fwalk_kwargs['dir_fd'] = fd
981 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
982 finally:
983 os.close(fd)
984
985 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +0100986 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -0700987 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
988 args = support.TESTFN, topdown, None
989 for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +0100990 # check that the FD is valid
991 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -0700992 # redundant check
993 os.stat(rootfd)
994 # check that listdir() returns consistent information
995 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +0100996
997 def test_fd_leak(self):
998 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
999 # we both check that calling fwalk() a large number of times doesn't
1000 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1001 minfd = os.dup(1)
1002 os.close(minfd)
1003 for i in range(256):
1004 for x in os.fwalk(support.TESTFN):
1005 pass
1006 newfd = os.dup(1)
1007 self.addCleanup(os.close, newfd)
1008 self.assertEqual(newfd, minfd)
1009
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001010class BytesWalkTests(WalkTests):
1011 """Tests for os.walk() with bytes."""
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001012 def setUp(self):
1013 super().setUp()
1014 self.stack = contextlib.ExitStack()
1015 if os.name == 'nt':
Victor Stinner923590e2016-03-24 09:11:48 +01001016 self.stack.enter_context(bytes_filename_warn(False))
Serhiy Storchakaada6db72016-03-08 21:26:26 +02001017
1018 def tearDown(self):
1019 self.stack.close()
1020 super().tearDown()
1021
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001022 def walk(self, top, **kwargs):
1023 if 'follow_symlinks' in kwargs:
1024 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1025 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1026 root = os.fsdecode(broot)
1027 dirs = list(map(os.fsdecode, bdirs))
1028 files = list(map(os.fsdecode, bfiles))
1029 yield (root, dirs, files)
1030 bdirs[:] = list(map(os.fsencode, dirs))
1031 bfiles[:] = list(map(os.fsencode, files))
1032
Charles-François Natali7372b062012-02-05 15:15:38 +01001033
Guido van Rossume7ba4952007-06-06 23:52:48 +00001034class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001035 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001036 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001037
1038 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001039 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001040 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1041 os.makedirs(path) # Should work
1042 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1043 os.makedirs(path)
1044
1045 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001046 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001047 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1048 os.makedirs(path)
1049 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1050 'dir5', 'dir6')
1051 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001052
Terry Reedy5a22b652010-12-02 07:05:56 +00001053 def test_exist_ok_existing_directory(self):
1054 path = os.path.join(support.TESTFN, 'dir1')
1055 mode = 0o777
1056 old_mask = os.umask(0o022)
1057 os.makedirs(path, mode)
1058 self.assertRaises(OSError, os.makedirs, path, mode)
1059 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001060 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001061 os.makedirs(path, mode=mode, exist_ok=True)
1062 os.umask(old_mask)
1063
Martin Pantera82642f2015-11-19 04:48:44 +00001064 # Issue #25583: A drive root could raise PermissionError on Windows
1065 os.makedirs(os.path.abspath('/'), exist_ok=True)
1066
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001067 def test_exist_ok_s_isgid_directory(self):
1068 path = os.path.join(support.TESTFN, 'dir1')
1069 S_ISGID = stat.S_ISGID
1070 mode = 0o777
1071 old_mask = os.umask(0o022)
1072 try:
1073 existing_testfn_mode = stat.S_IMODE(
1074 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001075 try:
1076 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001077 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001078 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001079 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1080 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1081 # The os should apply S_ISGID from the parent dir for us, but
1082 # this test need not depend on that behavior. Be explicit.
1083 os.makedirs(path, mode | S_ISGID)
1084 # http://bugs.python.org/issue14992
1085 # Should not fail when the bit is already set.
1086 os.makedirs(path, mode, exist_ok=True)
1087 # remove the bit.
1088 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001089 # May work even when the bit is not already set when demanded.
1090 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001091 finally:
1092 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001093
1094 def test_exist_ok_existing_regular_file(self):
1095 base = support.TESTFN
1096 path = os.path.join(support.TESTFN, 'dir1')
1097 f = open(path, 'w')
1098 f.write('abc')
1099 f.close()
1100 self.assertRaises(OSError, os.makedirs, path)
1101 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1102 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1103 os.remove(path)
1104
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001105 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001106 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001107 'dir4', 'dir5', 'dir6')
1108 # If the tests failed, the bottom-most directory ('../dir6')
1109 # may not have been created, so we look for the outermost directory
1110 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001111 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001112 path = os.path.dirname(path)
1113
1114 os.removedirs(path)
1115
Andrew Svetlov405faed2012-12-25 12:18:09 +02001116
R David Murrayf2ad1732014-12-25 18:36:56 -05001117@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1118class ChownFileTests(unittest.TestCase):
1119
Berker Peksag036a71b2015-07-21 09:29:48 +03001120 @classmethod
1121 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001122 os.mkdir(support.TESTFN)
1123
1124 def test_chown_uid_gid_arguments_must_be_index(self):
1125 stat = os.stat(support.TESTFN)
1126 uid = stat.st_uid
1127 gid = stat.st_gid
1128 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1129 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1130 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1131 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1132 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1133
1134 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1135 def test_chown(self):
1136 gid_1, gid_2 = groups[:2]
1137 uid = os.stat(support.TESTFN).st_uid
1138 os.chown(support.TESTFN, uid, gid_1)
1139 gid = os.stat(support.TESTFN).st_gid
1140 self.assertEqual(gid, gid_1)
1141 os.chown(support.TESTFN, uid, gid_2)
1142 gid = os.stat(support.TESTFN).st_gid
1143 self.assertEqual(gid, gid_2)
1144
1145 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1146 "test needs root privilege and more than one user")
1147 def test_chown_with_root(self):
1148 uid_1, uid_2 = all_users[:2]
1149 gid = os.stat(support.TESTFN).st_gid
1150 os.chown(support.TESTFN, uid_1, gid)
1151 uid = os.stat(support.TESTFN).st_uid
1152 self.assertEqual(uid, uid_1)
1153 os.chown(support.TESTFN, uid_2, gid)
1154 uid = os.stat(support.TESTFN).st_uid
1155 self.assertEqual(uid, uid_2)
1156
1157 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1158 "test needs non-root account and more than one user")
1159 def test_chown_without_permission(self):
1160 uid_1, uid_2 = all_users[:2]
1161 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001162 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001163 os.chown(support.TESTFN, uid_1, gid)
1164 os.chown(support.TESTFN, uid_2, gid)
1165
Berker Peksag036a71b2015-07-21 09:29:48 +03001166 @classmethod
1167 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001168 os.rmdir(support.TESTFN)
1169
1170
Andrew Svetlov405faed2012-12-25 12:18:09 +02001171class RemoveDirsTests(unittest.TestCase):
1172 def setUp(self):
1173 os.makedirs(support.TESTFN)
1174
1175 def tearDown(self):
1176 support.rmtree(support.TESTFN)
1177
1178 def test_remove_all(self):
1179 dira = os.path.join(support.TESTFN, 'dira')
1180 os.mkdir(dira)
1181 dirb = os.path.join(dira, 'dirb')
1182 os.mkdir(dirb)
1183 os.removedirs(dirb)
1184 self.assertFalse(os.path.exists(dirb))
1185 self.assertFalse(os.path.exists(dira))
1186 self.assertFalse(os.path.exists(support.TESTFN))
1187
1188 def test_remove_partial(self):
1189 dira = os.path.join(support.TESTFN, 'dira')
1190 os.mkdir(dira)
1191 dirb = os.path.join(dira, 'dirb')
1192 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001193 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001194 os.removedirs(dirb)
1195 self.assertFalse(os.path.exists(dirb))
1196 self.assertTrue(os.path.exists(dira))
1197 self.assertTrue(os.path.exists(support.TESTFN))
1198
1199 def test_remove_nothing(self):
1200 dira = os.path.join(support.TESTFN, 'dira')
1201 os.mkdir(dira)
1202 dirb = os.path.join(dira, 'dirb')
1203 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001204 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001205 with self.assertRaises(OSError):
1206 os.removedirs(dirb)
1207 self.assertTrue(os.path.exists(dirb))
1208 self.assertTrue(os.path.exists(dira))
1209 self.assertTrue(os.path.exists(support.TESTFN))
1210
1211
Guido van Rossume7ba4952007-06-06 23:52:48 +00001212class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001213 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001214 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001215 f.write(b'hello')
1216 f.close()
1217 with open(os.devnull, 'rb') as f:
1218 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001219
Andrew Svetlov405faed2012-12-25 12:18:09 +02001220
Guido van Rossume7ba4952007-06-06 23:52:48 +00001221class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001222 def test_urandom_length(self):
1223 self.assertEqual(len(os.urandom(0)), 0)
1224 self.assertEqual(len(os.urandom(1)), 1)
1225 self.assertEqual(len(os.urandom(10)), 10)
1226 self.assertEqual(len(os.urandom(100)), 100)
1227 self.assertEqual(len(os.urandom(1000)), 1000)
1228
1229 def test_urandom_value(self):
1230 data1 = os.urandom(16)
1231 data2 = os.urandom(16)
1232 self.assertNotEqual(data1, data2)
1233
1234 def get_urandom_subprocess(self, count):
1235 code = '\n'.join((
1236 'import os, sys',
1237 'data = os.urandom(%s)' % count,
1238 'sys.stdout.buffer.write(data)',
1239 'sys.stdout.buffer.flush()'))
1240 out = assert_python_ok('-c', code)
1241 stdout = out[1]
1242 self.assertEqual(len(stdout), 16)
1243 return stdout
1244
1245 def test_urandom_subprocess(self):
1246 data1 = self.get_urandom_subprocess(16)
1247 data2 = self.get_urandom_subprocess(16)
1248 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001249
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001250
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001251# os.urandom() doesn't use a file descriptor when it is implemented with the
1252# getentropy() function, the getrandom() function or the getrandom() syscall
1253OS_URANDOM_DONT_USE_FD = (
1254 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1255 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1256 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001257
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001258@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1259 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001260class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001261 @unittest.skipUnless(resource, "test requires the resource module")
1262 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001263 # Check urandom() failing when it is not able to open /dev/random.
1264 # We spawn a new process to make the test more robust (if getrlimit()
1265 # failed to restore the file descriptor limit after this, the whole
1266 # test suite would crash; this actually happened on the OS X Tiger
1267 # buildbot).
1268 code = """if 1:
1269 import errno
1270 import os
1271 import resource
1272
1273 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1274 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1275 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001276 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001277 except OSError as e:
1278 assert e.errno == errno.EMFILE, e.errno
1279 else:
1280 raise AssertionError("OSError not raised")
1281 """
1282 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001283
Antoine Pitroue472aea2014-04-26 14:33:03 +02001284 def test_urandom_fd_closed(self):
1285 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1286 # closed.
1287 code = """if 1:
1288 import os
1289 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001290 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001291 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001292 with test.support.SuppressCrashReport():
1293 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001294 sys.stdout.buffer.write(os.urandom(4))
1295 """
1296 rc, out, err = assert_python_ok('-Sc', code)
1297
1298 def test_urandom_fd_reopened(self):
1299 # Issue #21207: urandom() should detect its fd to /dev/urandom
1300 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001301 self.addCleanup(support.unlink, support.TESTFN)
1302 create_file(support.TESTFN, b"x" * 256)
1303
Antoine Pitroue472aea2014-04-26 14:33:03 +02001304 code = """if 1:
1305 import os
1306 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001307 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001308 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001309 with test.support.SuppressCrashReport():
1310 for fd in range(3, 256):
1311 try:
1312 os.close(fd)
1313 except OSError:
1314 pass
1315 else:
1316 # Found the urandom fd (XXX hopefully)
1317 break
1318 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001319 with open({TESTFN!r}, 'rb') as f:
1320 os.dup2(f.fileno(), fd)
1321 sys.stdout.buffer.write(os.urandom(4))
1322 sys.stdout.buffer.write(os.urandom(4))
1323 """.format(TESTFN=support.TESTFN)
1324 rc, out, err = assert_python_ok('-Sc', code)
1325 self.assertEqual(len(out), 8)
1326 self.assertNotEqual(out[0:4], out[4:8])
1327 rc, out2, err2 = assert_python_ok('-Sc', code)
1328 self.assertEqual(len(out2), 8)
1329 self.assertNotEqual(out2, out)
1330
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001331
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001332@contextlib.contextmanager
1333def _execvpe_mockup(defpath=None):
1334 """
1335 Stubs out execv and execve functions when used as context manager.
1336 Records exec calls. The mock execv and execve functions always raise an
1337 exception as they would normally never return.
1338 """
1339 # A list of tuples containing (function name, first arg, args)
1340 # of calls to execv or execve that have been made.
1341 calls = []
1342
1343 def mock_execv(name, *args):
1344 calls.append(('execv', name, args))
1345 raise RuntimeError("execv called")
1346
1347 def mock_execve(name, *args):
1348 calls.append(('execve', name, args))
1349 raise OSError(errno.ENOTDIR, "execve called")
1350
1351 try:
1352 orig_execv = os.execv
1353 orig_execve = os.execve
1354 orig_defpath = os.defpath
1355 os.execv = mock_execv
1356 os.execve = mock_execve
1357 if defpath is not None:
1358 os.defpath = defpath
1359 yield calls
1360 finally:
1361 os.execv = orig_execv
1362 os.execve = orig_execve
1363 os.defpath = orig_defpath
1364
Guido van Rossume7ba4952007-06-06 23:52:48 +00001365class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001366 @unittest.skipIf(USING_LINUXTHREADS,
1367 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001368 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001369 self.assertRaises(OSError, os.execvpe, 'no such app-',
1370 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001371
Thomas Heller6790d602007-08-30 17:15:14 +00001372 def test_execvpe_with_bad_arglist(self):
1373 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
1374
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001375 @unittest.skipUnless(hasattr(os, '_execvpe'),
1376 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001377 def _test_internal_execvpe(self, test_type):
1378 program_path = os.sep + 'absolutepath'
1379 if test_type is bytes:
1380 program = b'executable'
1381 fullpath = os.path.join(os.fsencode(program_path), program)
1382 native_fullpath = fullpath
1383 arguments = [b'progname', 'arg1', 'arg2']
1384 else:
1385 program = 'executable'
1386 arguments = ['progname', 'arg1', 'arg2']
1387 fullpath = os.path.join(program_path, program)
1388 if os.name != "nt":
1389 native_fullpath = os.fsencode(fullpath)
1390 else:
1391 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001392 env = {'spam': 'beans'}
1393
Victor Stinnerb745a742010-05-18 17:17:23 +00001394 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001395 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001396 self.assertRaises(RuntimeError,
1397 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001398 self.assertEqual(len(calls), 1)
1399 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1400
Victor Stinnerb745a742010-05-18 17:17:23 +00001401 # test os._execvpe() with a relative path:
1402 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001403 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001404 self.assertRaises(OSError,
1405 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001406 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001407 self.assertSequenceEqual(calls[0],
1408 ('execve', native_fullpath, (arguments, env)))
1409
1410 # test os._execvpe() with a relative path:
1411 # os.get_exec_path() reads the 'PATH' variable
1412 with _execvpe_mockup() as calls:
1413 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001414 if test_type is bytes:
1415 env_path[b'PATH'] = program_path
1416 else:
1417 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001418 self.assertRaises(OSError,
1419 os._execvpe, program, arguments, env=env_path)
1420 self.assertEqual(len(calls), 1)
1421 self.assertSequenceEqual(calls[0],
1422 ('execve', native_fullpath, (arguments, env_path)))
1423
1424 def test_internal_execvpe_str(self):
1425 self._test_internal_execvpe(str)
1426 if os.name != "nt":
1427 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001428
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001429
Serhiy Storchaka43767632013-11-03 21:31:38 +02001430@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001431class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001432 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001433 try:
1434 os.stat(support.TESTFN)
1435 except FileNotFoundError:
1436 exists = False
1437 except OSError as exc:
1438 exists = True
1439 self.fail("file %s must not exist; os.stat failed with %s"
1440 % (support.TESTFN, exc))
1441 else:
1442 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001443
Thomas Wouters477c8d52006-05-27 19:21:47 +00001444 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001445 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001446
1447 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001448 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001449
1450 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001451 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001452
1453 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001454 self.addCleanup(support.unlink, support.TESTFN)
1455
Victor Stinnere77c9742016-03-25 10:28:23 +01001456 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001457 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001458
1459 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001460 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001461
Thomas Wouters477c8d52006-05-27 19:21:47 +00001462 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001463 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001464
Victor Stinnere77c9742016-03-25 10:28:23 +01001465
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001466class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001467 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001468 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1469 #singles.append("close")
1470 #We omit close because it doesn'r raise an exception on some platforms
1471 def get_single(f):
1472 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001473 if hasattr(os, f):
1474 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001475 return helper
1476 for f in singles:
1477 locals()["test_"+f] = get_single(f)
1478
Benjamin Peterson7522c742009-01-19 21:00:09 +00001479 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001480 try:
1481 f(support.make_bad_fd(), *args)
1482 except OSError as e:
1483 self.assertEqual(e.errno, errno.EBADF)
1484 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001485 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001486 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001487
Serhiy Storchaka43767632013-11-03 21:31:38 +02001488 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001489 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001490 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001491
Serhiy Storchaka43767632013-11-03 21:31:38 +02001492 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001493 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001494 fd = support.make_bad_fd()
1495 # Make sure none of the descriptors we are about to close are
1496 # currently valid (issue 6542).
1497 for i in range(10):
1498 try: os.fstat(fd+i)
1499 except OSError:
1500 pass
1501 else:
1502 break
1503 if i < 2:
1504 raise unittest.SkipTest(
1505 "Unable to acquire a range of invalid file descriptors")
1506 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001507
Serhiy Storchaka43767632013-11-03 21:31:38 +02001508 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001509 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001510 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001511
Serhiy Storchaka43767632013-11-03 21:31:38 +02001512 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001513 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001514 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001515
Serhiy Storchaka43767632013-11-03 21:31:38 +02001516 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001517 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001518 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001519
Serhiy Storchaka43767632013-11-03 21:31:38 +02001520 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001521 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001522 self.check(os.pathconf, "PC_NAME_MAX")
1523 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001524
Serhiy Storchaka43767632013-11-03 21:31:38 +02001525 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001526 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001527 self.check(os.truncate, 0)
1528 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001529
Serhiy Storchaka43767632013-11-03 21:31:38 +02001530 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001531 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001532 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001533
Serhiy Storchaka43767632013-11-03 21:31:38 +02001534 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001535 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001536 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001537
Victor Stinner57ddf782014-01-08 15:21:28 +01001538 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1539 def test_readv(self):
1540 buf = bytearray(10)
1541 self.check(os.readv, [buf])
1542
Serhiy Storchaka43767632013-11-03 21:31:38 +02001543 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001544 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001545 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001546
Serhiy Storchaka43767632013-11-03 21:31:38 +02001547 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001548 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001549 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001550
Victor Stinner57ddf782014-01-08 15:21:28 +01001551 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1552 def test_writev(self):
1553 self.check(os.writev, [b'abc'])
1554
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001555 def test_inheritable(self):
1556 self.check(os.get_inheritable)
1557 self.check(os.set_inheritable, True)
1558
1559 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1560 'needs os.get_blocking() and os.set_blocking()')
1561 def test_blocking(self):
1562 self.check(os.get_blocking)
1563 self.check(os.set_blocking, True)
1564
Brian Curtin1b9df392010-11-24 20:24:31 +00001565
1566class LinkTests(unittest.TestCase):
1567 def setUp(self):
1568 self.file1 = support.TESTFN
1569 self.file2 = os.path.join(support.TESTFN + "2")
1570
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001571 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001572 for file in (self.file1, self.file2):
1573 if os.path.exists(file):
1574 os.unlink(file)
1575
Brian Curtin1b9df392010-11-24 20:24:31 +00001576 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001577 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001578
Victor Stinner923590e2016-03-24 09:11:48 +01001579 with bytes_filename_warn(False):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001580 os.link(file1, file2)
Brian Curtin1b9df392010-11-24 20:24:31 +00001581 with open(file1, "r") as f1, open(file2, "r") as f2:
1582 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1583
1584 def test_link(self):
1585 self._test_link(self.file1, self.file2)
1586
1587 def test_link_bytes(self):
1588 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1589 bytes(self.file2, sys.getfilesystemencoding()))
1590
Brian Curtinf498b752010-11-30 15:54:04 +00001591 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001592 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001593 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001594 except UnicodeError:
1595 raise unittest.SkipTest("Unable to encode for this platform.")
1596
Brian Curtinf498b752010-11-30 15:54:04 +00001597 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001598 self.file2 = self.file1 + "2"
1599 self._test_link(self.file1, self.file2)
1600
Serhiy Storchaka43767632013-11-03 21:31:38 +02001601@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1602class PosixUidGidTests(unittest.TestCase):
1603 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1604 def test_setuid(self):
1605 if os.getuid() != 0:
1606 self.assertRaises(OSError, os.setuid, 0)
1607 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001608
Serhiy Storchaka43767632013-11-03 21:31:38 +02001609 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1610 def test_setgid(self):
1611 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1612 self.assertRaises(OSError, os.setgid, 0)
1613 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001614
Serhiy Storchaka43767632013-11-03 21:31:38 +02001615 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1616 def test_seteuid(self):
1617 if os.getuid() != 0:
1618 self.assertRaises(OSError, os.seteuid, 0)
1619 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001620
Serhiy Storchaka43767632013-11-03 21:31:38 +02001621 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1622 def test_setegid(self):
1623 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1624 self.assertRaises(OSError, os.setegid, 0)
1625 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001626
Serhiy Storchaka43767632013-11-03 21:31:38 +02001627 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1628 def test_setreuid(self):
1629 if os.getuid() != 0:
1630 self.assertRaises(OSError, os.setreuid, 0, 0)
1631 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1632 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001633
Serhiy Storchaka43767632013-11-03 21:31:38 +02001634 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1635 def test_setreuid_neg1(self):
1636 # Needs to accept -1. We run this in a subprocess to avoid
1637 # altering the test runner's process state (issue8045).
1638 subprocess.check_call([
1639 sys.executable, '-c',
1640 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001641
Serhiy Storchaka43767632013-11-03 21:31:38 +02001642 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1643 def test_setregid(self):
1644 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1645 self.assertRaises(OSError, os.setregid, 0, 0)
1646 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1647 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001648
Serhiy Storchaka43767632013-11-03 21:31:38 +02001649 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1650 def test_setregid_neg1(self):
1651 # Needs to accept -1. We run this in a subprocess to avoid
1652 # altering the test runner's process state (issue8045).
1653 subprocess.check_call([
1654 sys.executable, '-c',
1655 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001656
Serhiy Storchaka43767632013-11-03 21:31:38 +02001657@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1658class Pep383Tests(unittest.TestCase):
1659 def setUp(self):
1660 if support.TESTFN_UNENCODABLE:
1661 self.dir = support.TESTFN_UNENCODABLE
1662 elif support.TESTFN_NONASCII:
1663 self.dir = support.TESTFN_NONASCII
1664 else:
1665 self.dir = support.TESTFN
1666 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001667
Serhiy Storchaka43767632013-11-03 21:31:38 +02001668 bytesfn = []
1669 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001670 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001671 fn = os.fsencode(fn)
1672 except UnicodeEncodeError:
1673 return
1674 bytesfn.append(fn)
1675 add_filename(support.TESTFN_UNICODE)
1676 if support.TESTFN_UNENCODABLE:
1677 add_filename(support.TESTFN_UNENCODABLE)
1678 if support.TESTFN_NONASCII:
1679 add_filename(support.TESTFN_NONASCII)
1680 if not bytesfn:
1681 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001682
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 self.unicodefn = set()
1684 os.mkdir(self.dir)
1685 try:
1686 for fn in bytesfn:
1687 support.create_empty_file(os.path.join(self.bdir, fn))
1688 fn = os.fsdecode(fn)
1689 if fn in self.unicodefn:
1690 raise ValueError("duplicate filename")
1691 self.unicodefn.add(fn)
1692 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001693 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001694 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001695
Serhiy Storchaka43767632013-11-03 21:31:38 +02001696 def tearDown(self):
1697 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001698
Serhiy Storchaka43767632013-11-03 21:31:38 +02001699 def test_listdir(self):
1700 expected = self.unicodefn
1701 found = set(os.listdir(self.dir))
1702 self.assertEqual(found, expected)
1703 # test listdir without arguments
1704 current_directory = os.getcwd()
1705 try:
1706 os.chdir(os.sep)
1707 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1708 finally:
1709 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001710
Serhiy Storchaka43767632013-11-03 21:31:38 +02001711 def test_open(self):
1712 for fn in self.unicodefn:
1713 f = open(os.path.join(self.dir, fn), 'rb')
1714 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001715
Serhiy Storchaka43767632013-11-03 21:31:38 +02001716 @unittest.skipUnless(hasattr(os, 'statvfs'),
1717 "need os.statvfs()")
1718 def test_statvfs(self):
1719 # issue #9645
1720 for fn in self.unicodefn:
1721 # should not fail with file not found error
1722 fullname = os.path.join(self.dir, fn)
1723 os.statvfs(fullname)
1724
1725 def test_stat(self):
1726 for fn in self.unicodefn:
1727 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001728
Brian Curtineb24d742010-04-12 17:16:38 +00001729@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1730class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001731 def _kill(self, sig):
1732 # Start sys.executable as a subprocess and communicate from the
1733 # subprocess to the parent that the interpreter is ready. When it
1734 # becomes ready, send *sig* via os.kill to the subprocess and check
1735 # that the return code is equal to *sig*.
1736 import ctypes
1737 from ctypes import wintypes
1738 import msvcrt
1739
1740 # Since we can't access the contents of the process' stdout until the
1741 # process has exited, use PeekNamedPipe to see what's inside stdout
1742 # without waiting. This is done so we can tell that the interpreter
1743 # is started and running at a point where it could handle a signal.
1744 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1745 PeekNamedPipe.restype = wintypes.BOOL
1746 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1747 ctypes.POINTER(ctypes.c_char), # stdout buf
1748 wintypes.DWORD, # Buffer size
1749 ctypes.POINTER(wintypes.DWORD), # bytes read
1750 ctypes.POINTER(wintypes.DWORD), # bytes avail
1751 ctypes.POINTER(wintypes.DWORD)) # bytes left
1752 msg = "running"
1753 proc = subprocess.Popen([sys.executable, "-c",
1754 "import sys;"
1755 "sys.stdout.write('{}');"
1756 "sys.stdout.flush();"
1757 "input()".format(msg)],
1758 stdout=subprocess.PIPE,
1759 stderr=subprocess.PIPE,
1760 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001761 self.addCleanup(proc.stdout.close)
1762 self.addCleanup(proc.stderr.close)
1763 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001764
1765 count, max = 0, 100
1766 while count < max and proc.poll() is None:
1767 # Create a string buffer to store the result of stdout from the pipe
1768 buf = ctypes.create_string_buffer(len(msg))
1769 # Obtain the text currently in proc.stdout
1770 # Bytes read/avail/left are left as NULL and unused
1771 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1772 buf, ctypes.sizeof(buf), None, None, None)
1773 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1774 if buf.value:
1775 self.assertEqual(msg, buf.value.decode())
1776 break
1777 time.sleep(0.1)
1778 count += 1
1779 else:
1780 self.fail("Did not receive communication from the subprocess")
1781
Brian Curtineb24d742010-04-12 17:16:38 +00001782 os.kill(proc.pid, sig)
1783 self.assertEqual(proc.wait(), sig)
1784
1785 def test_kill_sigterm(self):
1786 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001787 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001788
1789 def test_kill_int(self):
1790 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001791 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001792
1793 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001794 tagname = "test_os_%s" % uuid.uuid1()
1795 m = mmap.mmap(-1, 1, tagname)
1796 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001797 # Run a script which has console control handling enabled.
1798 proc = subprocess.Popen([sys.executable,
1799 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001800 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001801 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1802 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001803 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001804 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001805 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001806 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001807 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001808 count += 1
1809 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001810 # Forcefully kill the process if we weren't able to signal it.
1811 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001812 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001813 os.kill(proc.pid, event)
1814 # proc.send_signal(event) could also be done here.
1815 # Allow time for the signal to be passed and the process to exit.
1816 time.sleep(0.5)
1817 if not proc.poll():
1818 # Forcefully kill the process if we weren't able to signal it.
1819 os.kill(proc.pid, signal.SIGINT)
1820 self.fail("subprocess did not stop on {}".format(name))
1821
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001822 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001823 def test_CTRL_C_EVENT(self):
1824 from ctypes import wintypes
1825 import ctypes
1826
1827 # Make a NULL value by creating a pointer with no argument.
1828 NULL = ctypes.POINTER(ctypes.c_int)()
1829 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1830 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1831 wintypes.BOOL)
1832 SetConsoleCtrlHandler.restype = wintypes.BOOL
1833
1834 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001835 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00001836 # by subprocesses.
1837 SetConsoleCtrlHandler(NULL, 0)
1838
1839 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
1840
1841 def test_CTRL_BREAK_EVENT(self):
1842 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
1843
1844
Brian Curtind40e6f72010-07-08 21:39:08 +00001845@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01001846class Win32ListdirTests(unittest.TestCase):
1847 """Test listdir on Windows."""
1848
1849 def setUp(self):
1850 self.created_paths = []
1851 for i in range(2):
1852 dir_name = 'SUB%d' % i
1853 dir_path = os.path.join(support.TESTFN, dir_name)
1854 file_name = 'FILE%d' % i
1855 file_path = os.path.join(support.TESTFN, file_name)
1856 os.makedirs(dir_path)
1857 with open(file_path, 'w') as f:
1858 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
1859 self.created_paths.extend([dir_name, file_name])
1860 self.created_paths.sort()
1861
1862 def tearDown(self):
1863 shutil.rmtree(support.TESTFN)
1864
1865 def test_listdir_no_extended_path(self):
1866 """Test when the path is not an "extended" path."""
1867 # unicode
1868 self.assertEqual(
1869 sorted(os.listdir(support.TESTFN)),
1870 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001871
Tim Golden781bbeb2013-10-25 20:24:06 +01001872 # bytes
Victor Stinner923590e2016-03-24 09:11:48 +01001873 with bytes_filename_warn(False):
1874 self.assertEqual(
1875 sorted(os.listdir(os.fsencode(support.TESTFN))),
1876 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001877
1878 def test_listdir_extended_path(self):
1879 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01001880 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01001881 # unicode
1882 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
1883 self.assertEqual(
1884 sorted(os.listdir(path)),
1885 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01001886
Tim Golden781bbeb2013-10-25 20:24:06 +01001887 # bytes
Victor Stinner923590e2016-03-24 09:11:48 +01001888 with bytes_filename_warn(False):
1889 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
1890 self.assertEqual(
1891 sorted(os.listdir(path)),
1892 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01001893
1894
1895@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00001896@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00001897class Win32SymlinkTests(unittest.TestCase):
1898 filelink = 'filelinktest'
1899 filelink_target = os.path.abspath(__file__)
1900 dirlink = 'dirlinktest'
1901 dirlink_target = os.path.dirname(filelink_target)
1902 missing_link = 'missing link'
1903
1904 def setUp(self):
1905 assert os.path.exists(self.dirlink_target)
1906 assert os.path.exists(self.filelink_target)
1907 assert not os.path.exists(self.dirlink)
1908 assert not os.path.exists(self.filelink)
1909 assert not os.path.exists(self.missing_link)
1910
1911 def tearDown(self):
1912 if os.path.exists(self.filelink):
1913 os.remove(self.filelink)
1914 if os.path.exists(self.dirlink):
1915 os.rmdir(self.dirlink)
1916 if os.path.lexists(self.missing_link):
1917 os.remove(self.missing_link)
1918
1919 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04001920 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00001921 self.assertTrue(os.path.exists(self.dirlink))
1922 self.assertTrue(os.path.isdir(self.dirlink))
1923 self.assertTrue(os.path.islink(self.dirlink))
1924 self.check_stat(self.dirlink, self.dirlink_target)
1925
1926 def test_file_link(self):
1927 os.symlink(self.filelink_target, self.filelink)
1928 self.assertTrue(os.path.exists(self.filelink))
1929 self.assertTrue(os.path.isfile(self.filelink))
1930 self.assertTrue(os.path.islink(self.filelink))
1931 self.check_stat(self.filelink, self.filelink_target)
1932
1933 def _create_missing_dir_link(self):
1934 'Create a "directory" link to a non-existent target'
1935 linkname = self.missing_link
1936 if os.path.lexists(linkname):
1937 os.remove(linkname)
1938 target = r'c:\\target does not exist.29r3c740'
1939 assert not os.path.exists(target)
1940 target_is_dir = True
1941 os.symlink(target, linkname, target_is_dir)
1942
1943 def test_remove_directory_link_to_missing_target(self):
1944 self._create_missing_dir_link()
1945 # For compatibility with Unix, os.remove will check the
1946 # directory status and call RemoveDirectory if the symlink
1947 # was created with target_is_dir==True.
1948 os.remove(self.missing_link)
1949
1950 @unittest.skip("currently fails; consider for improvement")
1951 def test_isdir_on_directory_link_to_missing_target(self):
1952 self._create_missing_dir_link()
1953 # consider having isdir return true for directory links
1954 self.assertTrue(os.path.isdir(self.missing_link))
1955
1956 @unittest.skip("currently fails; consider for improvement")
1957 def test_rmdir_on_directory_link_to_missing_target(self):
1958 self._create_missing_dir_link()
1959 # consider allowing rmdir to remove directory links
1960 os.rmdir(self.missing_link)
1961
1962 def check_stat(self, link, target):
1963 self.assertEqual(os.stat(link), os.stat(target))
1964 self.assertNotEqual(os.lstat(link), os.stat(link))
1965
Brian Curtind25aef52011-06-13 15:16:04 -05001966 bytes_link = os.fsencode(link)
Victor Stinner923590e2016-03-24 09:11:48 +01001967 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001968 self.assertEqual(os.stat(bytes_link), os.stat(target))
Victor Stinner923590e2016-03-24 09:11:48 +01001969 with bytes_filename_warn(True):
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01001970 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05001971
1972 def test_12084(self):
1973 level1 = os.path.abspath(support.TESTFN)
1974 level2 = os.path.join(level1, "level2")
1975 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01001976 self.addCleanup(support.rmtree, level1)
1977
1978 os.mkdir(level1)
1979 os.mkdir(level2)
1980 os.mkdir(level3)
1981
1982 file1 = os.path.abspath(os.path.join(level1, "file1"))
1983 create_file(file1)
1984
1985 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05001986 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01001987 os.chdir(level2)
1988 link = os.path.join(level2, "link")
1989 os.symlink(os.path.relpath(file1), "link")
1990 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05001991
Victor Stinnerae39d232016-03-24 17:12:55 +01001992 # Check os.stat calls from the same dir as the link
1993 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05001994
Victor Stinnerae39d232016-03-24 17:12:55 +01001995 # Check os.stat calls from a dir below the link
1996 os.chdir(level1)
1997 self.assertEqual(os.stat(file1),
1998 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05001999
Victor Stinnerae39d232016-03-24 17:12:55 +01002000 # Check os.stat calls from a dir above the link
2001 os.chdir(level3)
2002 self.assertEqual(os.stat(file1),
2003 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002004 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002005 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002006
Brian Curtind40e6f72010-07-08 21:39:08 +00002007
Tim Golden0321cf22014-05-05 19:46:17 +01002008@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2009class Win32JunctionTests(unittest.TestCase):
2010 junction = 'junctiontest'
2011 junction_target = os.path.dirname(os.path.abspath(__file__))
2012
2013 def setUp(self):
2014 assert os.path.exists(self.junction_target)
2015 assert not os.path.exists(self.junction)
2016
2017 def tearDown(self):
2018 if os.path.exists(self.junction):
2019 # os.rmdir delegates to Windows' RemoveDirectoryW,
2020 # which removes junction points safely.
2021 os.rmdir(self.junction)
2022
2023 def test_create_junction(self):
2024 _winapi.CreateJunction(self.junction_target, self.junction)
2025 self.assertTrue(os.path.exists(self.junction))
2026 self.assertTrue(os.path.isdir(self.junction))
2027
2028 # Junctions are not recognized as links.
2029 self.assertFalse(os.path.islink(self.junction))
2030
2031 def test_unlink_removes_junction(self):
2032 _winapi.CreateJunction(self.junction_target, self.junction)
2033 self.assertTrue(os.path.exists(self.junction))
2034
2035 os.unlink(self.junction)
2036 self.assertFalse(os.path.exists(self.junction))
2037
2038
Jason R. Coombs3a092862013-05-27 23:21:28 -04002039@support.skip_unless_symlink
2040class NonLocalSymlinkTests(unittest.TestCase):
2041
2042 def setUp(self):
2043 """
2044 Create this structure:
2045
2046 base
2047 \___ some_dir
2048 """
2049 os.makedirs('base/some_dir')
2050
2051 def tearDown(self):
2052 shutil.rmtree('base')
2053
2054 def test_directory_link_nonlocal(self):
2055 """
2056 The symlink target should resolve relative to the link, not relative
2057 to the current directory.
2058
2059 Then, link base/some_link -> base/some_dir and ensure that some_link
2060 is resolved as a directory.
2061
2062 In issue13772, it was discovered that directory detection failed if
2063 the symlink target was not specified relative to the current
2064 directory, which was a defect in the implementation.
2065 """
2066 src = os.path.join('base', 'some_link')
2067 os.symlink('some_dir', src)
2068 assert os.path.isdir(src)
2069
2070
Victor Stinnere8d51452010-08-19 01:05:19 +00002071class FSEncodingTests(unittest.TestCase):
2072 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002073 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2074 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002075
Victor Stinnere8d51452010-08-19 01:05:19 +00002076 def test_identity(self):
2077 # assert fsdecode(fsencode(x)) == x
2078 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2079 try:
2080 bytesfn = os.fsencode(fn)
2081 except UnicodeEncodeError:
2082 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002083 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002084
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002085
Brett Cannonefb00c02012-02-29 18:31:31 -05002086
2087class DeviceEncodingTests(unittest.TestCase):
2088
2089 def test_bad_fd(self):
2090 # Return None when an fd doesn't actually exist.
2091 self.assertIsNone(os.device_encoding(123456))
2092
Philip Jenveye308b7c2012-02-29 16:16:15 -08002093 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2094 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002095 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002096 def test_device_encoding(self):
2097 encoding = os.device_encoding(0)
2098 self.assertIsNotNone(encoding)
2099 self.assertTrue(codecs.lookup(encoding))
2100
2101
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002102class PidTests(unittest.TestCase):
2103 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2104 def test_getppid(self):
2105 p = subprocess.Popen([sys.executable, '-c',
2106 'import os; print(os.getppid())'],
2107 stdout=subprocess.PIPE)
2108 stdout, _ = p.communicate()
2109 # We are the parent of our subprocess
2110 self.assertEqual(int(stdout), os.getpid())
2111
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002112 def test_waitpid(self):
2113 args = [sys.executable, '-c', 'pass']
2114 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2115 status = os.waitpid(pid, 0)
2116 self.assertEqual(status, (pid, 0))
2117
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002118
Brian Curtin0151b8e2010-09-24 13:43:43 +00002119# The introduction of this TestCase caused at least two different errors on
2120# *nix buildbots. Temporarily skip this to let the buildbots move along.
2121@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002122@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2123class LoginTests(unittest.TestCase):
2124 def test_getlogin(self):
2125 user_name = os.getlogin()
2126 self.assertNotEqual(len(user_name), 0)
2127
2128
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002129@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2130 "needs os.getpriority and os.setpriority")
2131class ProgramPriorityTests(unittest.TestCase):
2132 """Tests for os.getpriority() and os.setpriority()."""
2133
2134 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002135
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002136 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2137 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2138 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002139 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2140 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002141 raise unittest.SkipTest("unable to reliably test setpriority "
2142 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002143 else:
2144 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002145 finally:
2146 try:
2147 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2148 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002149 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002150 raise
2151
2152
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002153if threading is not None:
2154 class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002155
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002156 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002157
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002158 def __init__(self, conn):
2159 asynchat.async_chat.__init__(self, conn)
2160 self.in_buffer = []
2161 self.closed = False
2162 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002163
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002164 def handle_read(self):
2165 data = self.recv(4096)
2166 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002167
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002168 def get_data(self):
2169 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002170
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002171 def handle_close(self):
2172 self.close()
2173 self.closed = True
2174
2175 def handle_error(self):
2176 raise
2177
2178 def __init__(self, address):
2179 threading.Thread.__init__(self)
2180 asyncore.dispatcher.__init__(self)
2181 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2182 self.bind(address)
2183 self.listen(5)
2184 self.host, self.port = self.socket.getsockname()[:2]
2185 self.handler_instance = None
2186 self._active = False
2187 self._active_lock = threading.Lock()
2188
2189 # --- public API
2190
2191 @property
2192 def running(self):
2193 return self._active
2194
2195 def start(self):
2196 assert not self.running
2197 self.__flag = threading.Event()
2198 threading.Thread.start(self)
2199 self.__flag.wait()
2200
2201 def stop(self):
2202 assert self.running
2203 self._active = False
2204 self.join()
2205
2206 def wait(self):
2207 # wait for handler connection to be closed, then stop the server
2208 while not getattr(self.handler_instance, "closed", False):
2209 time.sleep(0.001)
2210 self.stop()
2211
2212 # --- internals
2213
2214 def run(self):
2215 self._active = True
2216 self.__flag.set()
2217 while self._active and asyncore.socket_map:
2218 self._active_lock.acquire()
2219 asyncore.loop(timeout=0.001, count=1)
2220 self._active_lock.release()
2221 asyncore.close_all()
2222
2223 def handle_accept(self):
2224 conn, addr = self.accept()
2225 self.handler_instance = self.Handler(conn)
2226
2227 def handle_connect(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002228 self.close()
Giampaolo Rodola'566f8a62011-05-18 21:28:39 +02002229 handle_read = handle_connect
2230
2231 def writable(self):
2232 return 0
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002233
2234 def handle_error(self):
2235 raise
2236
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002237
Giampaolo Rodolà46134642011-02-25 20:01:05 +00002238@unittest.skipUnless(threading is not None, "test needs threading module")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002239@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2240class TestSendfile(unittest.TestCase):
2241
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002242 DATA = b"12345abcde" * 16 * 1024 # 160 KB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002243 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002244 not sys.platform.startswith("solaris") and \
2245 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002246 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2247 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002248
2249 @classmethod
2250 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002251 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002252 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002253
2254 @classmethod
2255 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002256 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002257 support.unlink(support.TESTFN)
2258
2259 def setUp(self):
2260 self.server = SendfileTestServer((support.HOST, 0))
2261 self.server.start()
2262 self.client = socket.socket()
2263 self.client.connect((self.server.host, self.server.port))
2264 self.client.settimeout(1)
2265 # synchronize by waiting for "220 ready" response
2266 self.client.recv(1024)
2267 self.sockno = self.client.fileno()
2268 self.file = open(support.TESTFN, 'rb')
2269 self.fileno = self.file.fileno()
2270
2271 def tearDown(self):
2272 self.file.close()
2273 self.client.close()
2274 if self.server.running:
2275 self.server.stop()
2276
2277 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2278 """A higher level wrapper representing how an application is
2279 supposed to use sendfile().
2280 """
2281 while 1:
2282 try:
2283 if self.SUPPORT_HEADERS_TRAILERS:
2284 return os.sendfile(sock, file, offset, nbytes, headers,
2285 trailers)
2286 else:
2287 return os.sendfile(sock, file, offset, nbytes)
2288 except OSError as err:
2289 if err.errno == errno.ECONNRESET:
2290 # disconnected
2291 raise
2292 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2293 # we have to retry send data
2294 continue
2295 else:
2296 raise
2297
2298 def test_send_whole_file(self):
2299 # normal send
2300 total_sent = 0
2301 offset = 0
2302 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002303 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002304 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2305 if sent == 0:
2306 break
2307 offset += sent
2308 total_sent += sent
2309 self.assertTrue(sent <= nbytes)
2310 self.assertEqual(offset, total_sent)
2311
2312 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002313 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002314 self.client.close()
2315 self.server.wait()
2316 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002317 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002318 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002319
2320 def test_send_at_certain_offset(self):
2321 # start sending a file at a certain offset
2322 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002323 offset = len(self.DATA) // 2
2324 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002325 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002326 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002327 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2328 if sent == 0:
2329 break
2330 offset += sent
2331 total_sent += sent
2332 self.assertTrue(sent <= nbytes)
2333
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002334 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002335 self.client.close()
2336 self.server.wait()
2337 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002338 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002339 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002340 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002341 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002342
2343 def test_offset_overflow(self):
2344 # specify an offset > file size
2345 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002346 try:
2347 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2348 except OSError as e:
2349 # Solaris can raise EINVAL if offset >= file length, ignore.
2350 if e.errno != errno.EINVAL:
2351 raise
2352 else:
2353 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002354 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002355 self.client.close()
2356 self.server.wait()
2357 data = self.server.handler_instance.get_data()
2358 self.assertEqual(data, b'')
2359
2360 def test_invalid_offset(self):
2361 with self.assertRaises(OSError) as cm:
2362 os.sendfile(self.sockno, self.fileno, -1, 4096)
2363 self.assertEqual(cm.exception.errno, errno.EINVAL)
2364
Martin Panterbf19d162015-09-09 01:01:13 +00002365 def test_keywords(self):
2366 # Keyword arguments should be supported
2367 os.sendfile(out=self.sockno, offset=0, count=4096,
2368 **{'in': self.fileno})
2369 if self.SUPPORT_HEADERS_TRAILERS:
2370 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002371 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002372
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002373 # --- headers / trailers tests
2374
Serhiy Storchaka43767632013-11-03 21:31:38 +02002375 @requires_headers_trailers
2376 def test_headers(self):
2377 total_sent = 0
2378 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2379 headers=[b"x" * 512])
2380 total_sent += sent
2381 offset = 4096
2382 nbytes = 4096
2383 while 1:
2384 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2385 offset, nbytes)
2386 if sent == 0:
2387 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002388 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002389 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002390
Serhiy Storchaka43767632013-11-03 21:31:38 +02002391 expected_data = b"x" * 512 + self.DATA
2392 self.assertEqual(total_sent, len(expected_data))
2393 self.client.close()
2394 self.server.wait()
2395 data = self.server.handler_instance.get_data()
2396 self.assertEqual(hash(data), hash(expected_data))
2397
2398 @requires_headers_trailers
2399 def test_trailers(self):
2400 TESTFN2 = support.TESTFN + "2"
2401 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002402
2403 self.addCleanup(support.unlink, TESTFN2)
2404 create_file(TESTFN2, file_data)
2405
2406 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002407 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2408 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002409 self.client.close()
2410 self.server.wait()
2411 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002412 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002413
Serhiy Storchaka43767632013-11-03 21:31:38 +02002414 @requires_headers_trailers
2415 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2416 'test needs os.SF_NODISKIO')
2417 def test_flags(self):
2418 try:
2419 os.sendfile(self.sockno, self.fileno, 0, 4096,
2420 flags=os.SF_NODISKIO)
2421 except OSError as err:
2422 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2423 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002424
2425
Larry Hastings9cf065c2012-06-22 16:30:09 -07002426def supports_extended_attributes():
2427 if not hasattr(os, "setxattr"):
2428 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002429
Larry Hastings9cf065c2012-06-22 16:30:09 -07002430 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002431 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002432 try:
2433 os.setxattr(fp.fileno(), b"user.test", b"")
2434 except OSError:
2435 return False
2436 finally:
2437 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002438
2439 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002440
2441
2442@unittest.skipUnless(supports_extended_attributes(),
2443 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002444# Kernels < 2.6.39 don't respect setxattr flags.
2445@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002446class ExtendedAttributeTests(unittest.TestCase):
2447
Larry Hastings9cf065c2012-06-22 16:30:09 -07002448 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002449 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002450 self.addCleanup(support.unlink, fn)
2451 create_file(fn)
2452
Benjamin Peterson799bd802011-08-31 22:15:17 -04002453 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002454 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002455 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002456
Victor Stinnerf12e5062011-10-16 22:12:03 +02002457 init_xattr = listxattr(fn)
2458 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002459
Larry Hastings9cf065c2012-06-22 16:30:09 -07002460 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002461 xattr = set(init_xattr)
2462 xattr.add("user.test")
2463 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002464 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2465 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2466 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002467
Benjamin Peterson799bd802011-08-31 22:15:17 -04002468 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002469 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002470 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002471
Benjamin Peterson799bd802011-08-31 22:15:17 -04002472 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002473 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002474 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002475
Larry Hastings9cf065c2012-06-22 16:30:09 -07002476 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002477 xattr.add("user.test2")
2478 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002479 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002480
Benjamin Peterson799bd802011-08-31 22:15:17 -04002481 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002482 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002483 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002484
Victor Stinnerf12e5062011-10-16 22:12:03 +02002485 xattr.remove("user.test")
2486 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002487 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2488 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2489 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2490 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002491 many = sorted("user.test{}".format(i) for i in range(100))
2492 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002493 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002494 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002495
Larry Hastings9cf065c2012-06-22 16:30:09 -07002496 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002497 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002498 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002499
2500 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2501 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002502
2503 def test_simple(self):
2504 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2505 os.listxattr)
2506
2507 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002508 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2509 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002510
2511 def test_fds(self):
2512 def getxattr(path, *args):
2513 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002514 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002515 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002516 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002517 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002518 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002519 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002520 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002521 def listxattr(path, *args):
2522 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002523 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002524 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2525
2526
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002527@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2528class Win32DeprecatedBytesAPI(unittest.TestCase):
2529 def test_deprecated(self):
2530 import nt
2531 filename = os.fsencode(support.TESTFN)
Victor Stinner923590e2016-03-24 09:11:48 +01002532 for func, *args in (
2533 (nt._getfullpathname, filename),
2534 (nt._isdir, filename),
2535 (os.access, filename, os.R_OK),
2536 (os.chdir, filename),
2537 (os.chmod, filename, 0o777),
2538 (os.getcwdb,),
2539 (os.link, filename, filename),
2540 (os.listdir, filename),
2541 (os.lstat, filename),
2542 (os.mkdir, filename),
2543 (os.open, filename, os.O_RDONLY),
2544 (os.rename, filename, filename),
2545 (os.rmdir, filename),
2546 (os.startfile, filename),
2547 (os.stat, filename),
2548 (os.unlink, filename),
2549 (os.utime, filename),
2550 ):
2551 with bytes_filename_warn(True):
2552 try:
2553 func(*args)
2554 except OSError:
2555 # ignore OSError, we only care about DeprecationWarning
2556 pass
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002557
Victor Stinner28216442011-11-16 00:34:44 +01002558 @support.skip_unless_symlink
2559 def test_symlink(self):
Victor Stinnere984eb52016-03-25 22:51:17 +01002560 self.addCleanup(support.unlink, support.TESTFN)
2561
Victor Stinner28216442011-11-16 00:34:44 +01002562 filename = os.fsencode(support.TESTFN)
Victor Stinner923590e2016-03-24 09:11:48 +01002563 with bytes_filename_warn(True):
2564 os.symlink(filename, filename)
Victor Stinner28216442011-11-16 00:34:44 +01002565
Victor Stinner1ab6c2d2011-11-15 22:27:41 +01002566
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002567@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2568class TermsizeTests(unittest.TestCase):
2569 def test_does_not_crash(self):
2570 """Check if get_terminal_size() returns a meaningful value.
2571
2572 There's no easy portable way to actually check the size of the
2573 terminal, so let's check if it returns something sensible instead.
2574 """
2575 try:
2576 size = os.get_terminal_size()
2577 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002578 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002579 # Under win32 a generic OSError can be thrown if the
2580 # handle cannot be retrieved
2581 self.skipTest("failed to query terminal size")
2582 raise
2583
Antoine Pitroucfade362012-02-08 23:48:59 +01002584 self.assertGreaterEqual(size.columns, 0)
2585 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002586
2587 def test_stty_match(self):
2588 """Check if stty returns the same results
2589
2590 stty actually tests stdin, so get_terminal_size is invoked on
2591 stdin explicitly. If stty succeeded, then get_terminal_size()
2592 should work too.
2593 """
2594 try:
2595 size = subprocess.check_output(['stty', 'size']).decode().split()
2596 except (FileNotFoundError, subprocess.CalledProcessError):
2597 self.skipTest("stty invocation failed")
2598 expected = (int(size[1]), int(size[0])) # reversed order
2599
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002600 try:
2601 actual = os.get_terminal_size(sys.__stdin__.fileno())
2602 except OSError as e:
2603 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2604 # Under win32 a generic OSError can be thrown if the
2605 # handle cannot be retrieved
2606 self.skipTest("failed to query terminal size")
2607 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002608 self.assertEqual(expected, actual)
2609
2610
Victor Stinner292c8352012-10-30 02:17:38 +01002611class OSErrorTests(unittest.TestCase):
2612 def setUp(self):
2613 class Str(str):
2614 pass
2615
Victor Stinnerafe17062012-10-31 22:47:43 +01002616 self.bytes_filenames = []
2617 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002618 if support.TESTFN_UNENCODABLE is not None:
2619 decoded = support.TESTFN_UNENCODABLE
2620 else:
2621 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002622 self.unicode_filenames.append(decoded)
2623 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002624 if support.TESTFN_UNDECODABLE is not None:
2625 encoded = support.TESTFN_UNDECODABLE
2626 else:
2627 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002628 self.bytes_filenames.append(encoded)
2629 self.bytes_filenames.append(memoryview(encoded))
2630
2631 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002632
2633 def test_oserror_filename(self):
2634 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002635 (self.filenames, os.chdir,),
2636 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002637 (self.filenames, os.lstat,),
2638 (self.filenames, os.open, os.O_RDONLY),
2639 (self.filenames, os.rmdir,),
2640 (self.filenames, os.stat,),
2641 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002642 ]
2643 if sys.platform == "win32":
2644 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002645 (self.bytes_filenames, os.rename, b"dst"),
2646 (self.bytes_filenames, os.replace, b"dst"),
2647 (self.unicode_filenames, os.rename, "dst"),
2648 (self.unicode_filenames, os.replace, "dst"),
Victor Stinner64e039a2012-11-07 00:10:14 +01002649 # Issue #16414: Don't test undecodable names with listdir()
2650 # because of a Windows bug.
2651 #
2652 # With the ANSI code page 932, os.listdir(b'\xe7') return an
2653 # empty list (instead of failing), whereas os.listdir(b'\xff')
2654 # raises a FileNotFoundError. It looks like a Windows bug:
2655 # b'\xe7' directory does not exist, FindFirstFileA(b'\xe7')
2656 # fails with ERROR_FILE_NOT_FOUND (2), instead of
2657 # ERROR_PATH_NOT_FOUND (3).
2658 (self.unicode_filenames, os.listdir,),
Victor Stinner292c8352012-10-30 02:17:38 +01002659 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002660 else:
2661 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002662 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002663 (self.filenames, os.rename, "dst"),
2664 (self.filenames, os.replace, "dst"),
2665 ))
2666 if hasattr(os, "chown"):
2667 funcs.append((self.filenames, os.chown, 0, 0))
2668 if hasattr(os, "lchown"):
2669 funcs.append((self.filenames, os.lchown, 0, 0))
2670 if hasattr(os, "truncate"):
2671 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002672 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002673 funcs.append((self.filenames, os.chflags, 0))
2674 if hasattr(os, "lchflags"):
2675 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002676 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002677 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002678 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002679 if sys.platform == "win32":
2680 funcs.append((self.bytes_filenames, os.link, b"dst"))
2681 funcs.append((self.unicode_filenames, os.link, "dst"))
2682 else:
2683 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002684 if hasattr(os, "listxattr"):
2685 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002686 (self.filenames, os.listxattr,),
2687 (self.filenames, os.getxattr, "user.test"),
2688 (self.filenames, os.setxattr, "user.test", b'user'),
2689 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002690 ))
2691 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002692 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002693 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002694 if sys.platform == "win32":
2695 funcs.append((self.unicode_filenames, os.readlink,))
2696 else:
2697 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002698
Victor Stinnerafe17062012-10-31 22:47:43 +01002699 for filenames, func, *func_args in funcs:
2700 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002701 try:
Victor Stinner923590e2016-03-24 09:11:48 +01002702 with bytes_filename_warn(False):
2703 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01002704 except OSError as err:
Victor Stinner292c8352012-10-30 02:17:38 +01002705 self.assertIs(err.filename, name)
2706 else:
2707 self.fail("No exception thrown by {}".format(func))
2708
Charles-Francois Natali44feda32013-05-20 14:40:46 +02002709class CPUCountTests(unittest.TestCase):
2710 def test_cpu_count(self):
2711 cpus = os.cpu_count()
2712 if cpus is not None:
2713 self.assertIsInstance(cpus, int)
2714 self.assertGreater(cpus, 0)
2715 else:
2716 self.skipTest("Could not determine the number of CPUs")
2717
Victor Stinnerdaf45552013-08-28 00:53:59 +02002718
2719class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002720 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02002721 fd = os.open(__file__, os.O_RDONLY)
2722 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002723 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02002724
Victor Stinnerdaf45552013-08-28 00:53:59 +02002725 os.set_inheritable(fd, True)
2726 self.assertEqual(os.get_inheritable(fd), True)
2727
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002728 @unittest.skipIf(fcntl is None, "need fcntl")
2729 def test_get_inheritable_cloexec(self):
2730 fd = os.open(__file__, os.O_RDONLY)
2731 self.addCleanup(os.close, fd)
2732 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002733
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002734 # clear FD_CLOEXEC flag
2735 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
2736 flags &= ~fcntl.FD_CLOEXEC
2737 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002738
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002739 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002740
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002741 @unittest.skipIf(fcntl is None, "need fcntl")
2742 def test_set_inheritable_cloexec(self):
2743 fd = os.open(__file__, os.O_RDONLY)
2744 self.addCleanup(os.close, fd)
2745 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2746 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002747
Victor Stinner4f7a36f2013-09-08 14:14:38 +02002748 os.set_inheritable(fd, True)
2749 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
2750 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02002751
Victor Stinnerdaf45552013-08-28 00:53:59 +02002752 def test_open(self):
2753 fd = os.open(__file__, os.O_RDONLY)
2754 self.addCleanup(os.close, fd)
2755 self.assertEqual(os.get_inheritable(fd), False)
2756
2757 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
2758 def test_pipe(self):
2759 rfd, wfd = os.pipe()
2760 self.addCleanup(os.close, rfd)
2761 self.addCleanup(os.close, wfd)
2762 self.assertEqual(os.get_inheritable(rfd), False)
2763 self.assertEqual(os.get_inheritable(wfd), False)
2764
2765 def test_dup(self):
2766 fd1 = os.open(__file__, os.O_RDONLY)
2767 self.addCleanup(os.close, fd1)
2768
2769 fd2 = os.dup(fd1)
2770 self.addCleanup(os.close, fd2)
2771 self.assertEqual(os.get_inheritable(fd2), False)
2772
2773 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
2774 def test_dup2(self):
2775 fd = os.open(__file__, os.O_RDONLY)
2776 self.addCleanup(os.close, fd)
2777
2778 # inheritable by default
2779 fd2 = os.open(__file__, os.O_RDONLY)
2780 try:
2781 os.dup2(fd, fd2)
2782 self.assertEqual(os.get_inheritable(fd2), True)
2783 finally:
2784 os.close(fd2)
2785
2786 # force non-inheritable
2787 fd3 = os.open(__file__, os.O_RDONLY)
2788 try:
2789 os.dup2(fd, fd3, inheritable=False)
2790 self.assertEqual(os.get_inheritable(fd3), False)
2791 finally:
2792 os.close(fd3)
2793
2794 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
2795 def test_openpty(self):
2796 master_fd, slave_fd = os.openpty()
2797 self.addCleanup(os.close, master_fd)
2798 self.addCleanup(os.close, slave_fd)
2799 self.assertEqual(os.get_inheritable(master_fd), False)
2800 self.assertEqual(os.get_inheritable(slave_fd), False)
2801
2802
Victor Stinner1db9e7b2014-07-29 22:32:47 +02002803@unittest.skipUnless(hasattr(os, 'get_blocking'),
2804 'needs os.get_blocking() and os.set_blocking()')
2805class BlockingTests(unittest.TestCase):
2806 def test_blocking(self):
2807 fd = os.open(__file__, os.O_RDONLY)
2808 self.addCleanup(os.close, fd)
2809 self.assertEqual(os.get_blocking(fd), True)
2810
2811 os.set_blocking(fd, False)
2812 self.assertEqual(os.get_blocking(fd), False)
2813
2814 os.set_blocking(fd, True)
2815 self.assertEqual(os.get_blocking(fd), True)
2816
2817
Yury Selivanov97e2e062014-09-26 12:33:06 -04002818
2819class ExportsTests(unittest.TestCase):
2820 def test_os_all(self):
2821 self.assertIn('open', os.__all__)
2822 self.assertIn('walk', os.__all__)
2823
2824
Victor Stinner6036e442015-03-08 01:58:04 +01002825class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02002826 check_no_resource_warning = support.check_no_resource_warning
2827
Victor Stinner6036e442015-03-08 01:58:04 +01002828 def setUp(self):
2829 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07002830 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01002831 self.addCleanup(support.rmtree, self.path)
2832 os.mkdir(self.path)
2833
2834 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07002835 path = self.bytes_path if isinstance(name, bytes) else self.path
2836 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01002837 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01002838 return filename
2839
2840 def get_entries(self, names):
2841 entries = dict((entry.name, entry)
2842 for entry in os.scandir(self.path))
2843 self.assertEqual(sorted(entries.keys()), names)
2844 return entries
2845
2846 def assert_stat_equal(self, stat1, stat2, skip_fields):
2847 if skip_fields:
2848 for attr in dir(stat1):
2849 if not attr.startswith("st_"):
2850 continue
2851 if attr in ("st_dev", "st_ino", "st_nlink"):
2852 continue
2853 self.assertEqual(getattr(stat1, attr),
2854 getattr(stat2, attr),
2855 (stat1, stat2, attr))
2856 else:
2857 self.assertEqual(stat1, stat2)
2858
2859 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07002860 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01002861 self.assertEqual(entry.name, name)
2862 self.assertEqual(entry.path, os.path.join(self.path, name))
2863 self.assertEqual(entry.inode(),
2864 os.stat(entry.path, follow_symlinks=False).st_ino)
2865
2866 entry_stat = os.stat(entry.path)
2867 self.assertEqual(entry.is_dir(),
2868 stat.S_ISDIR(entry_stat.st_mode))
2869 self.assertEqual(entry.is_file(),
2870 stat.S_ISREG(entry_stat.st_mode))
2871 self.assertEqual(entry.is_symlink(),
2872 os.path.islink(entry.path))
2873
2874 entry_lstat = os.stat(entry.path, follow_symlinks=False)
2875 self.assertEqual(entry.is_dir(follow_symlinks=False),
2876 stat.S_ISDIR(entry_lstat.st_mode))
2877 self.assertEqual(entry.is_file(follow_symlinks=False),
2878 stat.S_ISREG(entry_lstat.st_mode))
2879
2880 self.assert_stat_equal(entry.stat(),
2881 entry_stat,
2882 os.name == 'nt' and not is_symlink)
2883 self.assert_stat_equal(entry.stat(follow_symlinks=False),
2884 entry_lstat,
2885 os.name == 'nt')
2886
2887 def test_attributes(self):
2888 link = hasattr(os, 'link')
2889 symlink = support.can_symlink()
2890
2891 dirname = os.path.join(self.path, "dir")
2892 os.mkdir(dirname)
2893 filename = self.create_file("file.txt")
2894 if link:
2895 os.link(filename, os.path.join(self.path, "link_file.txt"))
2896 if symlink:
2897 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
2898 target_is_directory=True)
2899 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
2900
2901 names = ['dir', 'file.txt']
2902 if link:
2903 names.append('link_file.txt')
2904 if symlink:
2905 names.extend(('symlink_dir', 'symlink_file.txt'))
2906 entries = self.get_entries(names)
2907
2908 entry = entries['dir']
2909 self.check_entry(entry, 'dir', True, False, False)
2910
2911 entry = entries['file.txt']
2912 self.check_entry(entry, 'file.txt', False, True, False)
2913
2914 if link:
2915 entry = entries['link_file.txt']
2916 self.check_entry(entry, 'link_file.txt', False, True, False)
2917
2918 if symlink:
2919 entry = entries['symlink_dir']
2920 self.check_entry(entry, 'symlink_dir', True, False, True)
2921
2922 entry = entries['symlink_file.txt']
2923 self.check_entry(entry, 'symlink_file.txt', False, True, True)
2924
2925 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07002926 path = self.bytes_path if isinstance(name, bytes) else self.path
2927 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01002928 self.assertEqual(len(entries), 1)
2929
2930 entry = entries[0]
2931 self.assertEqual(entry.name, name)
2932 return entry
2933
Brett Cannon96881cd2016-06-10 14:37:21 -07002934 def create_file_entry(self, name='file.txt'):
2935 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01002936 return self.get_entry(os.path.basename(filename))
2937
2938 def test_current_directory(self):
2939 filename = self.create_file()
2940 old_dir = os.getcwd()
2941 try:
2942 os.chdir(self.path)
2943
2944 # call scandir() without parameter: it must list the content
2945 # of the current directory
2946 entries = dict((entry.name, entry) for entry in os.scandir())
2947 self.assertEqual(sorted(entries.keys()),
2948 [os.path.basename(filename)])
2949 finally:
2950 os.chdir(old_dir)
2951
2952 def test_repr(self):
2953 entry = self.create_file_entry()
2954 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
2955
Brett Cannon96881cd2016-06-10 14:37:21 -07002956 def test_fspath_protocol(self):
2957 entry = self.create_file_entry()
2958 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
2959
Martin Panter0c9ad592016-06-13 03:28:35 +00002960 @unittest.skipIf(os.name == "nt", "test requires bytes path support")
Brett Cannon96881cd2016-06-10 14:37:21 -07002961 def test_fspath_protocol_bytes(self):
2962 bytes_filename = os.fsencode('bytesfile.txt')
2963 bytes_entry = self.create_file_entry(name=bytes_filename)
2964 fspath = os.fspath(bytes_entry)
2965 self.assertIsInstance(fspath, bytes)
2966 self.assertEqual(fspath,
2967 os.path.join(os.fsencode(self.path),bytes_filename))
2968
Victor Stinner6036e442015-03-08 01:58:04 +01002969 def test_removed_dir(self):
2970 path = os.path.join(self.path, 'dir')
2971
2972 os.mkdir(path)
2973 entry = self.get_entry('dir')
2974 os.rmdir(path)
2975
2976 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2977 if os.name == 'nt':
2978 self.assertTrue(entry.is_dir())
2979 self.assertFalse(entry.is_file())
2980 self.assertFalse(entry.is_symlink())
2981 if os.name == 'nt':
2982 self.assertRaises(FileNotFoundError, entry.inode)
2983 # don't fail
2984 entry.stat()
2985 entry.stat(follow_symlinks=False)
2986 else:
2987 self.assertGreater(entry.inode(), 0)
2988 self.assertRaises(FileNotFoundError, entry.stat)
2989 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
2990
2991 def test_removed_file(self):
2992 entry = self.create_file_entry()
2993 os.unlink(entry.path)
2994
2995 self.assertFalse(entry.is_dir())
2996 # On POSIX, is_dir() result depends if scandir() filled d_type or not
2997 if os.name == 'nt':
2998 self.assertTrue(entry.is_file())
2999 self.assertFalse(entry.is_symlink())
3000 if os.name == 'nt':
3001 self.assertRaises(FileNotFoundError, entry.inode)
3002 # don't fail
3003 entry.stat()
3004 entry.stat(follow_symlinks=False)
3005 else:
3006 self.assertGreater(entry.inode(), 0)
3007 self.assertRaises(FileNotFoundError, entry.stat)
3008 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3009
3010 def test_broken_symlink(self):
3011 if not support.can_symlink():
3012 return self.skipTest('cannot create symbolic link')
3013
3014 filename = self.create_file("file.txt")
3015 os.symlink(filename,
3016 os.path.join(self.path, "symlink.txt"))
3017 entries = self.get_entries(['file.txt', 'symlink.txt'])
3018 entry = entries['symlink.txt']
3019 os.unlink(filename)
3020
3021 self.assertGreater(entry.inode(), 0)
3022 self.assertFalse(entry.is_dir())
3023 self.assertFalse(entry.is_file()) # broken symlink returns False
3024 self.assertFalse(entry.is_dir(follow_symlinks=False))
3025 self.assertFalse(entry.is_file(follow_symlinks=False))
3026 self.assertTrue(entry.is_symlink())
3027 self.assertRaises(FileNotFoundError, entry.stat)
3028 # don't fail
3029 entry.stat(follow_symlinks=False)
3030
3031 def test_bytes(self):
3032 if os.name == "nt":
3033 # On Windows, os.scandir(bytes) must raise an exception
Victor Stinner923590e2016-03-24 09:11:48 +01003034 with bytes_filename_warn(True):
3035 self.assertRaises(TypeError, os.scandir, b'.')
Victor Stinner6036e442015-03-08 01:58:04 +01003036 return
3037
3038 self.create_file("file.txt")
3039
3040 path_bytes = os.fsencode(self.path)
3041 entries = list(os.scandir(path_bytes))
3042 self.assertEqual(len(entries), 1, entries)
3043 entry = entries[0]
3044
3045 self.assertEqual(entry.name, b'file.txt')
3046 self.assertEqual(entry.path,
3047 os.fsencode(os.path.join(self.path, 'file.txt')))
3048
3049 def test_empty_path(self):
3050 self.assertRaises(FileNotFoundError, os.scandir, '')
3051
3052 def test_consume_iterator_twice(self):
3053 self.create_file("file.txt")
3054 iterator = os.scandir(self.path)
3055
3056 entries = list(iterator)
3057 self.assertEqual(len(entries), 1, entries)
3058
3059 # check than consuming the iterator twice doesn't raise exception
3060 entries2 = list(iterator)
3061 self.assertEqual(len(entries2), 0, entries2)
3062
3063 def test_bad_path_type(self):
3064 for obj in [1234, 1.234, {}, []]:
3065 self.assertRaises(TypeError, os.scandir, obj)
3066
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003067 def test_close(self):
3068 self.create_file("file.txt")
3069 self.create_file("file2.txt")
3070 iterator = os.scandir(self.path)
3071 next(iterator)
3072 iterator.close()
3073 # multiple closes
3074 iterator.close()
3075 with self.check_no_resource_warning():
3076 del iterator
3077
3078 def test_context_manager(self):
3079 self.create_file("file.txt")
3080 self.create_file("file2.txt")
3081 with os.scandir(self.path) as iterator:
3082 next(iterator)
3083 with self.check_no_resource_warning():
3084 del iterator
3085
3086 def test_context_manager_close(self):
3087 self.create_file("file.txt")
3088 self.create_file("file2.txt")
3089 with os.scandir(self.path) as iterator:
3090 next(iterator)
3091 iterator.close()
3092
3093 def test_context_manager_exception(self):
3094 self.create_file("file.txt")
3095 self.create_file("file2.txt")
3096 with self.assertRaises(ZeroDivisionError):
3097 with os.scandir(self.path) as iterator:
3098 next(iterator)
3099 1/0
3100 with self.check_no_resource_warning():
3101 del iterator
3102
3103 def test_resource_warning(self):
3104 self.create_file("file.txt")
3105 self.create_file("file2.txt")
3106 iterator = os.scandir(self.path)
3107 next(iterator)
3108 with self.assertWarns(ResourceWarning):
3109 del iterator
3110 support.gc_collect()
3111 # exhausted iterator
3112 iterator = os.scandir(self.path)
3113 list(iterator)
3114 with self.check_no_resource_warning():
3115 del iterator
3116
Victor Stinner6036e442015-03-08 01:58:04 +01003117
Ethan Furmancdc08792016-06-02 15:06:09 -07003118class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003119
3120 # Abstracted so it can be overridden to test pure Python implementation
3121 # if a C version is provided.
3122 fspath = staticmethod(os.fspath)
3123
3124 class PathLike:
3125 def __init__(self, path=''):
3126 self.path = path
3127 def __fspath__(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003128 if isinstance(self.path, BaseException):
3129 raise self.path
3130 else:
3131 return self.path
Ethan Furmancdc08792016-06-02 15:06:09 -07003132
3133 def test_return_bytes(self):
3134 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003135 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003136
3137 def test_return_string(self):
3138 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003139 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003140
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003141 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003142 for p in "path/like/object", b"path/like/object":
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003143 pathlike = self.PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003144
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003145 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003146 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3147 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3148
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003149 def test_pathlike(self):
3150 self.assertEqual('#feelthegil', self.fspath(self.PathLike('#feelthegil')))
3151 self.assertTrue(issubclass(self.PathLike, os.PathLike))
3152 self.assertTrue(isinstance(self.PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003153
Ethan Furmancdc08792016-06-02 15:06:09 -07003154 def test_garbage_in_exception_out(self):
3155 vapor = type('blah', (), {})
3156 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003157 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003158
3159 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003160 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003161
Brett Cannon044283a2016-07-15 10:41:49 -07003162 def test_bad_pathlike(self):
3163 # __fspath__ returns a value other than str or bytes.
3164 self.assertRaises(TypeError, self.fspath, self.PathLike(42))
3165 # __fspath__ attribute that is not callable.
3166 c = type('foo', (), {})
3167 c.__fspath__ = 1
3168 self.assertRaises(TypeError, self.fspath, c())
3169 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003170 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannon33ed8812016-07-15 11:26:53 -07003171 self.PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003172
3173# Only test if the C version is provided, otherwise TestPEP519 already tested
3174# the pure Python implementation.
3175if hasattr(os, "_fspath"):
3176 class TestPEP519PurePython(TestPEP519):
3177
3178 """Explicitly test the pure Python implementation of os.fspath()."""
3179
3180 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003181
3182
Fred Drake2e2be372001-09-20 21:33:42 +00003183if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003184 unittest.main()