blob: 77e4a008ae61e8a4a244a93be952628ee775c754 [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031
Antoine Pitrouec34ab52013-08-16 20:44:38 +020032try:
33 import resource
34except ImportError:
35 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020036try:
37 import fcntl
38except ImportError:
39 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010040try:
41 import _winapi
42except ImportError:
43 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020044try:
R David Murrayf2ad1732014-12-25 18:36:56 -050045 import grp
46 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
47 if hasattr(os, 'getgid'):
48 process_gid = os.getgid()
49 if process_gid not in groups:
50 groups.append(process_gid)
51except ImportError:
52 groups = []
53try:
54 import pwd
55 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010056except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050057 all_users = []
58try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020060except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020062
Berker Peksagce643912015-05-06 06:33:17 +030063from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020064from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
Berker Peksag4af23d72016-09-15 20:32:44 +030084def requires_os_func(name):
85 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
86
87
Brett Cannonec6ce872016-09-06 15:50:29 -070088class _PathLike(os.PathLike):
89
90 def __init__(self, path=""):
91 self.path = path
92
93 def __str__(self):
94 return str(self.path)
95
96 def __fspath__(self):
97 if isinstance(self.path, BaseException):
98 raise self.path
99 else:
100 return self.path
101
102
Victor Stinnerae39d232016-03-24 17:12:55 +0100103def create_file(filename, content=b'content'):
104 with open(filename, "xb", 0) as fp:
105 fp.write(content)
106
107
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000108# Tests creating TESTFN
109class FileTests(unittest.TestCase):
110 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000111 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000112 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000113 tearDown = setUp
114
115 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000116 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000118 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000119
Christian Heimesfdab48e2008-01-20 09:06:41 +0000120 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000121 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
122 # We must allocate two consecutive file descriptors, otherwise
123 # it will mess up other file descriptors (perhaps even the three
124 # standard ones).
125 second = os.dup(first)
126 try:
127 retries = 0
128 while second != first + 1:
129 os.close(first)
130 retries += 1
131 if retries > 10:
132 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000133 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000134 first, second = second, os.dup(second)
135 finally:
136 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000137 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000138 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000139 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000140
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000141 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000142 def test_rename(self):
143 path = support.TESTFN
144 old = sys.getrefcount(path)
145 self.assertRaises(TypeError, os.rename, path, 0)
146 new = sys.getrefcount(path)
147 self.assertEqual(old, new)
148
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000149 def test_read(self):
150 with open(support.TESTFN, "w+b") as fobj:
151 fobj.write(b"spam")
152 fobj.flush()
153 fd = fobj.fileno()
154 os.lseek(fd, 0, 0)
155 s = os.read(fd, 4)
156 self.assertEqual(type(s), bytes)
157 self.assertEqual(s, b"spam")
158
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200159 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200160 # Skip the test on 32-bit platforms: the number of bytes must fit in a
161 # Py_ssize_t type
162 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
163 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200164 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
165 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200166 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100167 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200168
169 # Issue #21932: Make sure that os.read() does not raise an
170 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200171 with open(support.TESTFN, "rb") as fp:
172 data = os.read(fp.fileno(), size)
173
Victor Stinner8c663fd2017-11-08 14:44:44 -0800174 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200175 # operating system is free to return less bytes than requested.
176 self.assertEqual(data, b'test')
177
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000178 def test_write(self):
179 # os.write() accepts bytes- and buffer-like objects but not strings
180 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
181 self.assertRaises(TypeError, os.write, fd, "beans")
182 os.write(fd, b"bacon\n")
183 os.write(fd, bytearray(b"eggs\n"))
184 os.write(fd, memoryview(b"spam\n"))
185 os.close(fd)
186 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000187 self.assertEqual(fobj.read().splitlines(),
188 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000189
Victor Stinnere0daff12011-03-20 23:36:35 +0100190 def write_windows_console(self, *args):
191 retcode = subprocess.call(args,
192 # use a new console to not flood the test output
193 creationflags=subprocess.CREATE_NEW_CONSOLE,
194 # use a shell to hide the console window (SW_HIDE)
195 shell=True)
196 self.assertEqual(retcode, 0)
197
198 @unittest.skipUnless(sys.platform == 'win32',
199 'test specific to the Windows console')
200 def test_write_windows_console(self):
201 # Issue #11395: the Windows console returns an error (12: not enough
202 # space error) on writing into stdout if stdout mode is binary and the
203 # length is greater than 66,000 bytes (or less, depending on heap
204 # usage).
205 code = "print('x' * 100000)"
206 self.write_windows_console(sys.executable, "-c", code)
207 self.write_windows_console(sys.executable, "-u", "-c", code)
208
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000209 def fdopen_helper(self, *args):
210 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200211 f = os.fdopen(fd, *args)
212 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000213
214 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200215 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
216 os.close(fd)
217
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218 self.fdopen_helper()
219 self.fdopen_helper('r')
220 self.fdopen_helper('r', 100)
221
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100222 def test_replace(self):
223 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100224 self.addCleanup(support.unlink, support.TESTFN)
225 self.addCleanup(support.unlink, TESTFN2)
226
227 create_file(support.TESTFN, b"1")
228 create_file(TESTFN2, b"2")
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 os.replace(support.TESTFN, TESTFN2)
231 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
232 with open(TESTFN2, 'r') as f:
233 self.assertEqual(f.read(), "1")
234
Martin Panterbf19d162015-09-09 01:01:13 +0000235 def test_open_keywords(self):
236 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
237 dir_fd=None)
238 os.close(f)
239
240 def test_symlink_keywords(self):
241 symlink = support.get_attribute(os, "symlink")
242 try:
243 symlink(src='target', dst=support.TESTFN,
244 target_is_directory=False, dir_fd=None)
245 except (NotImplementedError, OSError):
246 pass # No OS support or unprivileged user
247
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249# Test attributes on return values from os.*stat* family.
250class StatAttributeTests(unittest.TestCase):
251 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200252 self.fname = support.TESTFN
253 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100254 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255
Serhiy Storchaka43767632013-11-03 21:31:38 +0200256 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000257 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000258 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
260 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(result[stat.ST_SIZE], 3)
262 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264 # Make sure all the attributes are there
265 members = dir(result)
266 for name in dir(stat):
267 if name[:3] == 'ST_':
268 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000269 if name.endswith("TIME"):
270 def trunc(x): return int(x)
271 else:
272 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000275 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276
Larry Hastings6fe20b32012-04-19 15:07:49 -0700277 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700278 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700279 for name in 'st_atime st_mtime st_ctime'.split():
280 floaty = int(getattr(result, name) * 100000)
281 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700282 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700283
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 try:
285 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200286 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000287 except IndexError:
288 pass
289
290 # Make sure that assignment fails
291 try:
292 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200293 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000294 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 pass
296
297 try:
298 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200299 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000300 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 pass
302
303 try:
304 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200305 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000306 except AttributeError:
307 pass
308
309 # Use the stat_result constructor with a too-short tuple.
310 try:
311 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200312 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000313 except TypeError:
314 pass
315
Ezio Melotti42da6632011-03-15 05:18:48 +0200316 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000317 try:
318 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
319 except TypeError:
320 pass
321
Antoine Pitrou38425292010-09-21 18:19:07 +0000322 def test_stat_attributes(self):
323 self.check_stat_attributes(self.fname)
324
325 def test_stat_attributes_bytes(self):
326 try:
327 fname = self.fname.encode(sys.getfilesystemencoding())
328 except UnicodeEncodeError:
329 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700330 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000331
Christian Heimes25827622013-10-12 01:27:08 +0200332 def test_stat_result_pickle(self):
333 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200334 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
335 p = pickle.dumps(result, proto)
336 self.assertIn(b'stat_result', p)
337 if proto < 4:
338 self.assertIn(b'cos\nstat_result\n', p)
339 unpickled = pickle.loads(p)
340 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200341
Serhiy Storchaka43767632013-11-03 21:31:38 +0200342 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000343 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700344 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345
346 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000347 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000348
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000349 # Make sure all the attributes are there.
350 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
351 'ffree', 'favail', 'flag', 'namemax')
352 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000353 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100355 self.assertTrue(isinstance(result.f_fsid, int))
356
357 # Test that the size of the tuple doesn't change
358 self.assertEqual(len(result), 10)
359
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000360 # Make sure that assignment really fails
361 try:
362 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200363 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000364 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365 pass
366
367 try:
368 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200369 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000370 except AttributeError:
371 pass
372
373 # Use the constructor with a too-short tuple.
374 try:
375 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200376 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000377 except TypeError:
378 pass
379
Ezio Melotti42da6632011-03-15 05:18:48 +0200380 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000381 try:
382 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
383 except TypeError:
384 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000385
Christian Heimes25827622013-10-12 01:27:08 +0200386 @unittest.skipUnless(hasattr(os, 'statvfs'),
387 "need os.statvfs()")
388 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700389 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200390
Serhiy Storchakabad12572014-12-15 14:03:42 +0200391 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
392 p = pickle.dumps(result, proto)
393 self.assertIn(b'statvfs_result', p)
394 if proto < 4:
395 self.assertIn(b'cos\nstatvfs_result\n', p)
396 unpickled = pickle.loads(p)
397 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200398
Serhiy Storchaka43767632013-11-03 21:31:38 +0200399 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
400 def test_1686475(self):
401 # Verify that an open file can be stat'ed
402 try:
403 os.stat(r"c:\pagefile.sys")
404 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600405 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200406 except OSError as e:
407 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000408
Serhiy Storchaka43767632013-11-03 21:31:38 +0200409 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
410 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
411 def test_15261(self):
412 # Verify that stat'ing a closed fd does not cause crash
413 r, w = os.pipe()
414 try:
415 os.stat(r) # should not raise error
416 finally:
417 os.close(r)
418 os.close(w)
419 with self.assertRaises(OSError) as ctx:
420 os.stat(r)
421 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100422
Zachary Ware63f277b2014-06-19 09:46:37 -0500423 def check_file_attributes(self, result):
424 self.assertTrue(hasattr(result, 'st_file_attributes'))
425 self.assertTrue(isinstance(result.st_file_attributes, int))
426 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
427
428 @unittest.skipUnless(sys.platform == "win32",
429 "st_file_attributes is Win32 specific")
430 def test_file_attributes(self):
431 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
432 result = os.stat(self.fname)
433 self.check_file_attributes(result)
434 self.assertEqual(
435 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
436 0)
437
438 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200439 dirname = support.TESTFN + "dir"
440 os.mkdir(dirname)
441 self.addCleanup(os.rmdir, dirname)
442
443 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500444 self.check_file_attributes(result)
445 self.assertEqual(
446 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
447 stat.FILE_ATTRIBUTE_DIRECTORY)
448
Berker Peksag0b4dc482016-09-17 15:49:59 +0300449 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
450 def test_access_denied(self):
451 # Default to FindFirstFile WIN32_FIND_DATA when access is
452 # denied. See issue 28075.
453 # os.environ['TEMP'] should be located on a volume that
454 # supports file ACLs.
455 fname = os.path.join(os.environ['TEMP'], self.fname)
456 self.addCleanup(support.unlink, fname)
457 create_file(fname, b'ABC')
458 # Deny the right to [S]YNCHRONIZE on the file to
459 # force CreateFile to fail with ERROR_ACCESS_DENIED.
460 DETACHED_PROCESS = 8
461 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500462 # bpo-30584: Use security identifier *S-1-5-32-545 instead
463 # of localized "Users" to not depend on the locale.
464 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300465 creationflags=DETACHED_PROCESS
466 )
467 result = os.stat(fname)
468 self.assertNotEqual(result.st_size, 0)
469
Victor Stinner47aacc82015-06-12 17:26:23 +0200470
471class UtimeTests(unittest.TestCase):
472 def setUp(self):
473 self.dirname = support.TESTFN
474 self.fname = os.path.join(self.dirname, "f1")
475
476 self.addCleanup(support.rmtree, self.dirname)
477 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100478 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200479
Victor Stinner47aacc82015-06-12 17:26:23 +0200480 def support_subsecond(self, filename):
481 # Heuristic to check if the filesystem supports timestamp with
482 # subsecond resolution: check if float and int timestamps are different
483 st = os.stat(filename)
484 return ((st.st_atime != st[7])
485 or (st.st_mtime != st[8])
486 or (st.st_ctime != st[9]))
487
488 def _test_utime(self, set_time, filename=None):
489 if not filename:
490 filename = self.fname
491
492 support_subsecond = self.support_subsecond(filename)
493 if support_subsecond:
494 # Timestamp with a resolution of 1 microsecond (10^-6).
495 #
496 # The resolution of the C internal function used by os.utime()
497 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
498 # test with a resolution of 1 ns requires more work:
499 # see the issue #15745.
500 atime_ns = 1002003000 # 1.002003 seconds
501 mtime_ns = 4005006000 # 4.005006 seconds
502 else:
503 # use a resolution of 1 second
504 atime_ns = 5 * 10**9
505 mtime_ns = 8 * 10**9
506
507 set_time(filename, (atime_ns, mtime_ns))
508 st = os.stat(filename)
509
510 if support_subsecond:
511 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
512 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
513 else:
514 self.assertEqual(st.st_atime, atime_ns * 1e-9)
515 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
516 self.assertEqual(st.st_atime_ns, atime_ns)
517 self.assertEqual(st.st_mtime_ns, mtime_ns)
518
519 def test_utime(self):
520 def set_time(filename, ns):
521 # test the ns keyword parameter
522 os.utime(filename, ns=ns)
523 self._test_utime(set_time)
524
525 @staticmethod
526 def ns_to_sec(ns):
527 # Convert a number of nanosecond (int) to a number of seconds (float).
528 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
529 # issue, os.utime() rounds towards minus infinity.
530 return (ns * 1e-9) + 0.5e-9
531
532 def test_utime_by_indexed(self):
533 # pass times as floating point seconds as the second indexed parameter
534 def set_time(filename, ns):
535 atime_ns, mtime_ns = ns
536 atime = self.ns_to_sec(atime_ns)
537 mtime = self.ns_to_sec(mtime_ns)
538 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
539 # or utime(time_t)
540 os.utime(filename, (atime, mtime))
541 self._test_utime(set_time)
542
543 def test_utime_by_times(self):
544 def set_time(filename, ns):
545 atime_ns, mtime_ns = ns
546 atime = self.ns_to_sec(atime_ns)
547 mtime = self.ns_to_sec(mtime_ns)
548 # test the times keyword parameter
549 os.utime(filename, times=(atime, mtime))
550 self._test_utime(set_time)
551
552 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
553 "follow_symlinks support for utime required "
554 "for this test.")
555 def test_utime_nofollow_symlinks(self):
556 def set_time(filename, ns):
557 # use follow_symlinks=False to test utimensat(timespec)
558 # or lutimes(timeval)
559 os.utime(filename, ns=ns, follow_symlinks=False)
560 self._test_utime(set_time)
561
562 @unittest.skipUnless(os.utime in os.supports_fd,
563 "fd support for utime required for this test.")
564 def test_utime_fd(self):
565 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100566 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200567 # use a file descriptor to test futimens(timespec)
568 # or futimes(timeval)
569 os.utime(fp.fileno(), ns=ns)
570 self._test_utime(set_time)
571
572 @unittest.skipUnless(os.utime in os.supports_dir_fd,
573 "dir_fd support for utime required for this test.")
574 def test_utime_dir_fd(self):
575 def set_time(filename, ns):
576 dirname, name = os.path.split(filename)
577 dirfd = os.open(dirname, os.O_RDONLY)
578 try:
579 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
580 os.utime(name, dir_fd=dirfd, ns=ns)
581 finally:
582 os.close(dirfd)
583 self._test_utime(set_time)
584
585 def test_utime_directory(self):
586 def set_time(filename, ns):
587 # test calling os.utime() on a directory
588 os.utime(filename, ns=ns)
589 self._test_utime(set_time, filename=self.dirname)
590
591 def _test_utime_current(self, set_time):
592 # Get the system clock
593 current = time.time()
594
595 # Call os.utime() to set the timestamp to the current system clock
596 set_time(self.fname)
597
598 if not self.support_subsecond(self.fname):
599 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700600 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200601 # On Windows, the usual resolution of time.time() is 15.6 ms.
602 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700603 #
604 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
605 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200606 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200607 st = os.stat(self.fname)
608 msg = ("st_time=%r, current=%r, dt=%r"
609 % (st.st_mtime, current, st.st_mtime - current))
610 self.assertAlmostEqual(st.st_mtime, current,
611 delta=delta, msg=msg)
612
613 def test_utime_current(self):
614 def set_time(filename):
615 # Set to the current time in the new way
616 os.utime(self.fname)
617 self._test_utime_current(set_time)
618
619 def test_utime_current_old(self):
620 def set_time(filename):
621 # Set to the current time in the old explicit way.
622 os.utime(self.fname, None)
623 self._test_utime_current(set_time)
624
625 def get_file_system(self, path):
626 if sys.platform == 'win32':
627 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
628 import ctypes
629 kernel32 = ctypes.windll.kernel32
630 buf = ctypes.create_unicode_buffer("", 100)
631 ok = kernel32.GetVolumeInformationW(root, None, 0,
632 None, None, None,
633 buf, len(buf))
634 if ok:
635 return buf.value
636 # return None if the filesystem is unknown
637
638 def test_large_time(self):
639 # Many filesystems are limited to the year 2038. At least, the test
640 # pass with NTFS filesystem.
641 if self.get_file_system(self.dirname) != "NTFS":
642 self.skipTest("requires NTFS")
643
644 large = 5000000000 # some day in 2128
645 os.utime(self.fname, (large, large))
646 self.assertEqual(os.stat(self.fname).st_mtime, large)
647
648 def test_utime_invalid_arguments(self):
649 # seconds and nanoseconds parameters are mutually exclusive
650 with self.assertRaises(ValueError):
651 os.utime(self.fname, (5, 5), ns=(5, 5))
652
653
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000654from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000655
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000656class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000657 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000658 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000659
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000660 def setUp(self):
661 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000662 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000663 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000664 for key, value in self._reference().items():
665 os.environ[key] = value
666
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000667 def tearDown(self):
668 os.environ.clear()
669 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000670 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000671 os.environb.clear()
672 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000673
Christian Heimes90333392007-11-01 19:08:42 +0000674 def _reference(self):
675 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
676
677 def _empty_mapping(self):
678 os.environ.clear()
679 return os.environ
680
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000681 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200682 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
683 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000684 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000685 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300686 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200687 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300688 value = popen.read().strip()
689 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000690
Xavier de Gayed1415312016-07-22 12:15:29 +0200691 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
692 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000693 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200694 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
695 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300696 it = iter(popen)
697 self.assertEqual(next(it), "line1\n")
698 self.assertEqual(next(it), "line2\n")
699 self.assertEqual(next(it), "line3\n")
700 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000701
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000702 # Verify environ keys and values from the OS are of the
703 # correct str type.
704 def test_keyvalue_types(self):
705 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000706 self.assertEqual(type(key), str)
707 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000708
Christian Heimes90333392007-11-01 19:08:42 +0000709 def test_items(self):
710 for key, value in self._reference().items():
711 self.assertEqual(os.environ.get(key), value)
712
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000713 # Issue 7310
714 def test___repr__(self):
715 """Check that the repr() of os.environ looks like environ({...})."""
716 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000717 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
718 '{!r}: {!r}'.format(key, value)
719 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000720
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000721 def test_get_exec_path(self):
722 defpath_list = os.defpath.split(os.pathsep)
723 test_path = ['/monty', '/python', '', '/flying/circus']
724 test_env = {'PATH': os.pathsep.join(test_path)}
725
726 saved_environ = os.environ
727 try:
728 os.environ = dict(test_env)
729 # Test that defaulting to os.environ works.
730 self.assertSequenceEqual(test_path, os.get_exec_path())
731 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
732 finally:
733 os.environ = saved_environ
734
735 # No PATH environment variable
736 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
737 # Empty PATH environment variable
738 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
739 # Supplied PATH environment variable
740 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
741
Victor Stinnerb745a742010-05-18 17:17:23 +0000742 if os.supports_bytes_environ:
743 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000744 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000745 # ignore BytesWarning warning
746 with warnings.catch_warnings(record=True):
747 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000748 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000749 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000750 pass
751 else:
752 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000753
754 # bytes key and/or value
755 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
756 ['abc'])
757 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
758 ['abc'])
759 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
760 ['abc'])
761
762 @unittest.skipUnless(os.supports_bytes_environ,
763 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000764 def test_environb(self):
765 # os.environ -> os.environb
766 value = 'euro\u20ac'
767 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000768 value_bytes = value.encode(sys.getfilesystemencoding(),
769 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000770 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000771 msg = "U+20AC character is not encodable to %s" % (
772 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000773 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000774 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000775 self.assertEqual(os.environ['unicode'], value)
776 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000777
778 # os.environb -> os.environ
779 value = b'\xff'
780 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000781 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000782 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000783 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000784
Victor Stinner13ff2452018-01-22 18:32:50 +0100785 # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415).
Charles-François Natali2966f102011-11-26 11:32:46 +0100786 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100787 def test_unset_error(self):
788 if sys.platform == "win32":
789 # an environment variable is limited to 32,767 characters
790 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100791 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100792 else:
793 # "=" is not allowed in a variable name
794 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100795 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100796
Victor Stinner6d101392013-04-14 16:35:04 +0200797 def test_key_type(self):
798 missing = 'missingkey'
799 self.assertNotIn(missing, os.environ)
800
Victor Stinner839e5ea2013-04-14 16:43:03 +0200801 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200802 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200803 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200804 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200805
Victor Stinner839e5ea2013-04-14 16:43:03 +0200806 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200807 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200808 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200809 self.assertTrue(cm.exception.__suppress_context__)
810
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300811 def _test_environ_iteration(self, collection):
812 iterator = iter(collection)
813 new_key = "__new_key__"
814
815 next(iterator) # start iteration over os.environ.items
816
817 # add a new key in os.environ mapping
818 os.environ[new_key] = "test_environ_iteration"
819
820 try:
821 next(iterator) # force iteration over modified mapping
822 self.assertEqual(os.environ[new_key], "test_environ_iteration")
823 finally:
824 del os.environ[new_key]
825
826 def test_iter_error_when_changing_os_environ(self):
827 self._test_environ_iteration(os.environ)
828
829 def test_iter_error_when_changing_os_environ_items(self):
830 self._test_environ_iteration(os.environ.items())
831
832 def test_iter_error_when_changing_os_environ_values(self):
833 self._test_environ_iteration(os.environ.values())
834
Victor Stinner6d101392013-04-14 16:35:04 +0200835
Tim Petersc4e09402003-04-25 07:11:48 +0000836class WalkTests(unittest.TestCase):
837 """Tests for os.walk()."""
838
Victor Stinner0561c532015-03-12 10:28:24 +0100839 # Wrapper to hide minor differences between os.walk and os.fwalk
840 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200841 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200842 if 'follow_symlinks' in kwargs:
843 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200844 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100845
Charles-François Natali7372b062012-02-05 15:15:38 +0100846 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100847 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100848 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000849
850 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000851 # TESTFN/
852 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000853 # tmp1
854 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000855 # tmp2
856 # SUB11/ no kids
857 # SUB2/ a file kid and a dirsymlink kid
858 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300859 # SUB21/ not readable
860 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000861 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200862 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300863 # broken_link2
864 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000865 # TEST2/
866 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100867 self.walk_path = join(support.TESTFN, "TEST1")
868 self.sub1_path = join(self.walk_path, "SUB1")
869 self.sub11_path = join(self.sub1_path, "SUB11")
870 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300871 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100872 tmp1_path = join(self.walk_path, "tmp1")
873 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000874 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300875 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100876 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000877 t2_path = join(support.TESTFN, "TEST2")
878 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200879 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300880 broken_link2_path = join(sub2_path, "broken_link2")
881 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000882
883 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100884 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000885 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300886 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000887 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100888
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300889 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100890 with open(path, "x") as f:
891 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000892
Victor Stinner0561c532015-03-12 10:28:24 +0100893 if support.can_symlink():
894 os.symlink(os.path.abspath(t2_path), self.link_path)
895 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300896 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
897 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300898 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300899 ["broken_link", "broken_link2", "broken_link3",
900 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100901 else:
902 self.sub2_tree = (sub2_path, [], ["tmp3"])
903
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300904 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300905 try:
906 os.listdir(sub21_path)
907 except PermissionError:
908 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
909 else:
910 os.chmod(sub21_path, stat.S_IRWXU)
911 os.unlink(tmp5_path)
912 os.rmdir(sub21_path)
913 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300914
Victor Stinner0561c532015-03-12 10:28:24 +0100915 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000916 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300917 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100918
Tim Petersc4e09402003-04-25 07:11:48 +0000919 self.assertEqual(len(all), 4)
920 # We can't know which order SUB1 and SUB2 will appear in.
921 # Not flipped: TESTFN, SUB1, SUB11, SUB2
922 # flipped: TESTFN, SUB2, SUB1, SUB11
923 flipped = all[0][1][0] != "SUB1"
924 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200925 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300926 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100927 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
928 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
929 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
930 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000931
Brett Cannon3f9183b2016-08-26 14:44:48 -0700932 def test_walk_prune(self, walk_path=None):
933 if walk_path is None:
934 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000935 # Prune the search.
936 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700937 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000938 all.append((root, dirs, files))
939 # Don't descend into SUB1.
940 if 'SUB1' in dirs:
941 # Note that this also mutates the dirs we appended to all!
942 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000943
Victor Stinner0561c532015-03-12 10:28:24 +0100944 self.assertEqual(len(all), 2)
945 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700946 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100947
948 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300949 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100950 self.assertEqual(all[1], self.sub2_tree)
951
Brett Cannon3f9183b2016-08-26 14:44:48 -0700952 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700953 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700954
Victor Stinner0561c532015-03-12 10:28:24 +0100955 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000956 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100957 all = list(self.walk(self.walk_path, topdown=False))
958
Victor Stinner53b0a412016-03-26 01:12:36 +0100959 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000960 # We can't know which order SUB1 and SUB2 will appear in.
961 # Not flipped: SUB11, SUB1, SUB2, TESTFN
962 # flipped: SUB2, SUB11, SUB1, TESTFN
963 flipped = all[3][1][0] != "SUB1"
964 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200965 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300966 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100967 self.assertEqual(all[3],
968 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
969 self.assertEqual(all[flipped],
970 (self.sub11_path, [], []))
971 self.assertEqual(all[flipped + 1],
972 (self.sub1_path, ["SUB11"], ["tmp2"]))
973 self.assertEqual(all[2 - 2 * flipped],
974 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000975
Victor Stinner0561c532015-03-12 10:28:24 +0100976 def test_walk_symlink(self):
977 if not support.can_symlink():
978 self.skipTest("need symlink support")
979
980 # Walk, following symlinks.
981 walk_it = self.walk(self.walk_path, follow_symlinks=True)
982 for root, dirs, files in walk_it:
983 if root == self.link_path:
984 self.assertEqual(dirs, [])
985 self.assertEqual(files, ["tmp4"])
986 break
987 else:
988 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000989
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200990 def test_walk_bad_dir(self):
991 # Walk top-down.
992 errors = []
993 walk_it = self.walk(self.walk_path, onerror=errors.append)
994 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300995 self.assertEqual(errors, [])
996 dir1 = 'SUB1'
997 path1 = os.path.join(root, dir1)
998 path1new = os.path.join(root, dir1 + '.new')
999 os.rename(path1, path1new)
1000 try:
1001 roots = [r for r, d, f in walk_it]
1002 self.assertTrue(errors)
1003 self.assertNotIn(path1, roots)
1004 self.assertNotIn(path1new, roots)
1005 for dir2 in dirs:
1006 if dir2 != dir1:
1007 self.assertIn(os.path.join(root, dir2), roots)
1008 finally:
1009 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001010
Charles-François Natali7372b062012-02-05 15:15:38 +01001011
1012@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1013class FwalkTests(WalkTests):
1014 """Tests for os.fwalk()."""
1015
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001016 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001017 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001018 yield (root, dirs, files)
1019
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001020 def fwalk(self, *args, **kwargs):
1021 return os.fwalk(*args, **kwargs)
1022
Larry Hastingsc48fe982012-06-25 04:49:05 -07001023 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1024 """
1025 compare with walk() results.
1026 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001027 walk_kwargs = walk_kwargs.copy()
1028 fwalk_kwargs = fwalk_kwargs.copy()
1029 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1030 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1031 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001032
Charles-François Natali7372b062012-02-05 15:15:38 +01001033 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001034 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001035 expected[root] = (set(dirs), set(files))
1036
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001037 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001038 self.assertIn(root, expected)
1039 self.assertEqual(expected[root], (set(dirs), set(files)))
1040
Larry Hastingsc48fe982012-06-25 04:49:05 -07001041 def test_compare_to_walk(self):
1042 kwargs = {'top': support.TESTFN}
1043 self._compare_to_walk(kwargs, kwargs)
1044
Charles-François Natali7372b062012-02-05 15:15:38 +01001045 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001046 try:
1047 fd = os.open(".", os.O_RDONLY)
1048 walk_kwargs = {'top': support.TESTFN}
1049 fwalk_kwargs = walk_kwargs.copy()
1050 fwalk_kwargs['dir_fd'] = fd
1051 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1052 finally:
1053 os.close(fd)
1054
1055 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001056 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001057 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1058 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001059 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001060 # check that the FD is valid
1061 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001062 # redundant check
1063 os.stat(rootfd)
1064 # check that listdir() returns consistent information
1065 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001066
1067 def test_fd_leak(self):
1068 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1069 # we both check that calling fwalk() a large number of times doesn't
1070 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1071 minfd = os.dup(1)
1072 os.close(minfd)
1073 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001074 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001075 pass
1076 newfd = os.dup(1)
1077 self.addCleanup(os.close, newfd)
1078 self.assertEqual(newfd, minfd)
1079
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001080class BytesWalkTests(WalkTests):
1081 """Tests for os.walk() with bytes."""
1082 def walk(self, top, **kwargs):
1083 if 'follow_symlinks' in kwargs:
1084 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1085 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1086 root = os.fsdecode(broot)
1087 dirs = list(map(os.fsdecode, bdirs))
1088 files = list(map(os.fsdecode, bfiles))
1089 yield (root, dirs, files)
1090 bdirs[:] = list(map(os.fsencode, dirs))
1091 bfiles[:] = list(map(os.fsencode, files))
1092
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001093@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1094class BytesFwalkTests(FwalkTests):
1095 """Tests for os.walk() with bytes."""
1096 def fwalk(self, top='.', *args, **kwargs):
1097 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1098 root = os.fsdecode(broot)
1099 dirs = list(map(os.fsdecode, bdirs))
1100 files = list(map(os.fsdecode, bfiles))
1101 yield (root, dirs, files, topfd)
1102 bdirs[:] = list(map(os.fsencode, dirs))
1103 bfiles[:] = list(map(os.fsencode, files))
1104
Charles-François Natali7372b062012-02-05 15:15:38 +01001105
Guido van Rossume7ba4952007-06-06 23:52:48 +00001106class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001107 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001108 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001109
1110 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001111 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001112 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1113 os.makedirs(path) # Should work
1114 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1115 os.makedirs(path)
1116
1117 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001118 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001119 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1120 os.makedirs(path)
1121 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1122 'dir5', 'dir6')
1123 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001124
Serhiy Storchakae304e332017-03-24 13:27:42 +02001125 def test_mode(self):
1126 with support.temp_umask(0o002):
1127 base = support.TESTFN
1128 parent = os.path.join(base, 'dir1')
1129 path = os.path.join(parent, 'dir2')
1130 os.makedirs(path, 0o555)
1131 self.assertTrue(os.path.exists(path))
1132 self.assertTrue(os.path.isdir(path))
1133 if os.name != 'nt':
1134 self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o555)
1135 self.assertEqual(stat.S_IMODE(os.stat(parent).st_mode), 0o775)
1136
Terry Reedy5a22b652010-12-02 07:05:56 +00001137 def test_exist_ok_existing_directory(self):
1138 path = os.path.join(support.TESTFN, 'dir1')
1139 mode = 0o777
1140 old_mask = os.umask(0o022)
1141 os.makedirs(path, mode)
1142 self.assertRaises(OSError, os.makedirs, path, mode)
1143 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001144 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001145 os.makedirs(path, mode=mode, exist_ok=True)
1146 os.umask(old_mask)
1147
Martin Pantera82642f2015-11-19 04:48:44 +00001148 # Issue #25583: A drive root could raise PermissionError on Windows
1149 os.makedirs(os.path.abspath('/'), exist_ok=True)
1150
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001151 def test_exist_ok_s_isgid_directory(self):
1152 path = os.path.join(support.TESTFN, 'dir1')
1153 S_ISGID = stat.S_ISGID
1154 mode = 0o777
1155 old_mask = os.umask(0o022)
1156 try:
1157 existing_testfn_mode = stat.S_IMODE(
1158 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001159 try:
1160 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001161 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001162 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001163 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1164 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1165 # The os should apply S_ISGID from the parent dir for us, but
1166 # this test need not depend on that behavior. Be explicit.
1167 os.makedirs(path, mode | S_ISGID)
1168 # http://bugs.python.org/issue14992
1169 # Should not fail when the bit is already set.
1170 os.makedirs(path, mode, exist_ok=True)
1171 # remove the bit.
1172 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001173 # May work even when the bit is not already set when demanded.
1174 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001175 finally:
1176 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001177
1178 def test_exist_ok_existing_regular_file(self):
1179 base = support.TESTFN
1180 path = os.path.join(support.TESTFN, 'dir1')
1181 f = open(path, 'w')
1182 f.write('abc')
1183 f.close()
1184 self.assertRaises(OSError, os.makedirs, path)
1185 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1186 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1187 os.remove(path)
1188
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001189 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001190 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001191 'dir4', 'dir5', 'dir6')
1192 # If the tests failed, the bottom-most directory ('../dir6')
1193 # may not have been created, so we look for the outermost directory
1194 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001195 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001196 path = os.path.dirname(path)
1197
1198 os.removedirs(path)
1199
Andrew Svetlov405faed2012-12-25 12:18:09 +02001200
R David Murrayf2ad1732014-12-25 18:36:56 -05001201@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1202class ChownFileTests(unittest.TestCase):
1203
Berker Peksag036a71b2015-07-21 09:29:48 +03001204 @classmethod
1205 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001206 os.mkdir(support.TESTFN)
1207
1208 def test_chown_uid_gid_arguments_must_be_index(self):
1209 stat = os.stat(support.TESTFN)
1210 uid = stat.st_uid
1211 gid = stat.st_gid
1212 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1213 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1214 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1215 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1216 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1217
1218 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1219 def test_chown(self):
1220 gid_1, gid_2 = groups[:2]
1221 uid = os.stat(support.TESTFN).st_uid
1222 os.chown(support.TESTFN, uid, gid_1)
1223 gid = os.stat(support.TESTFN).st_gid
1224 self.assertEqual(gid, gid_1)
1225 os.chown(support.TESTFN, uid, gid_2)
1226 gid = os.stat(support.TESTFN).st_gid
1227 self.assertEqual(gid, gid_2)
1228
1229 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1230 "test needs root privilege and more than one user")
1231 def test_chown_with_root(self):
1232 uid_1, uid_2 = all_users[:2]
1233 gid = os.stat(support.TESTFN).st_gid
1234 os.chown(support.TESTFN, uid_1, gid)
1235 uid = os.stat(support.TESTFN).st_uid
1236 self.assertEqual(uid, uid_1)
1237 os.chown(support.TESTFN, uid_2, gid)
1238 uid = os.stat(support.TESTFN).st_uid
1239 self.assertEqual(uid, uid_2)
1240
1241 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1242 "test needs non-root account and more than one user")
1243 def test_chown_without_permission(self):
1244 uid_1, uid_2 = all_users[:2]
1245 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001246 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001247 os.chown(support.TESTFN, uid_1, gid)
1248 os.chown(support.TESTFN, uid_2, gid)
1249
Berker Peksag036a71b2015-07-21 09:29:48 +03001250 @classmethod
1251 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001252 os.rmdir(support.TESTFN)
1253
1254
Andrew Svetlov405faed2012-12-25 12:18:09 +02001255class RemoveDirsTests(unittest.TestCase):
1256 def setUp(self):
1257 os.makedirs(support.TESTFN)
1258
1259 def tearDown(self):
1260 support.rmtree(support.TESTFN)
1261
1262 def test_remove_all(self):
1263 dira = os.path.join(support.TESTFN, 'dira')
1264 os.mkdir(dira)
1265 dirb = os.path.join(dira, 'dirb')
1266 os.mkdir(dirb)
1267 os.removedirs(dirb)
1268 self.assertFalse(os.path.exists(dirb))
1269 self.assertFalse(os.path.exists(dira))
1270 self.assertFalse(os.path.exists(support.TESTFN))
1271
1272 def test_remove_partial(self):
1273 dira = os.path.join(support.TESTFN, 'dira')
1274 os.mkdir(dira)
1275 dirb = os.path.join(dira, 'dirb')
1276 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001277 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001278 os.removedirs(dirb)
1279 self.assertFalse(os.path.exists(dirb))
1280 self.assertTrue(os.path.exists(dira))
1281 self.assertTrue(os.path.exists(support.TESTFN))
1282
1283 def test_remove_nothing(self):
1284 dira = os.path.join(support.TESTFN, 'dira')
1285 os.mkdir(dira)
1286 dirb = os.path.join(dira, 'dirb')
1287 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001288 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001289 with self.assertRaises(OSError):
1290 os.removedirs(dirb)
1291 self.assertTrue(os.path.exists(dirb))
1292 self.assertTrue(os.path.exists(dira))
1293 self.assertTrue(os.path.exists(support.TESTFN))
1294
1295
Guido van Rossume7ba4952007-06-06 23:52:48 +00001296class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001297 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001298 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001299 f.write(b'hello')
1300 f.close()
1301 with open(os.devnull, 'rb') as f:
1302 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001303
Andrew Svetlov405faed2012-12-25 12:18:09 +02001304
Guido van Rossume7ba4952007-06-06 23:52:48 +00001305class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001306 def test_urandom_length(self):
1307 self.assertEqual(len(os.urandom(0)), 0)
1308 self.assertEqual(len(os.urandom(1)), 1)
1309 self.assertEqual(len(os.urandom(10)), 10)
1310 self.assertEqual(len(os.urandom(100)), 100)
1311 self.assertEqual(len(os.urandom(1000)), 1000)
1312
1313 def test_urandom_value(self):
1314 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001315 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001316 data2 = os.urandom(16)
1317 self.assertNotEqual(data1, data2)
1318
1319 def get_urandom_subprocess(self, count):
1320 code = '\n'.join((
1321 'import os, sys',
1322 'data = os.urandom(%s)' % count,
1323 'sys.stdout.buffer.write(data)',
1324 'sys.stdout.buffer.flush()'))
1325 out = assert_python_ok('-c', code)
1326 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001327 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001328 return stdout
1329
1330 def test_urandom_subprocess(self):
1331 data1 = self.get_urandom_subprocess(16)
1332 data2 = self.get_urandom_subprocess(16)
1333 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001334
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001335
Victor Stinner9b1f4742016-09-06 16:18:52 -07001336@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1337class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001338 @classmethod
1339 def setUpClass(cls):
1340 try:
1341 os.getrandom(1)
1342 except OSError as exc:
1343 if exc.errno == errno.ENOSYS:
1344 # Python compiled on a more recent Linux version
1345 # than the current Linux kernel
1346 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1347 else:
1348 raise
1349
Victor Stinner9b1f4742016-09-06 16:18:52 -07001350 def test_getrandom_type(self):
1351 data = os.getrandom(16)
1352 self.assertIsInstance(data, bytes)
1353 self.assertEqual(len(data), 16)
1354
1355 def test_getrandom0(self):
1356 empty = os.getrandom(0)
1357 self.assertEqual(empty, b'')
1358
1359 def test_getrandom_random(self):
1360 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1361
1362 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1363 # resource /dev/random
1364
1365 def test_getrandom_nonblock(self):
1366 # The call must not fail. Check also that the flag exists
1367 try:
1368 os.getrandom(1, os.GRND_NONBLOCK)
1369 except BlockingIOError:
1370 # System urandom is not initialized yet
1371 pass
1372
1373 def test_getrandom_value(self):
1374 data1 = os.getrandom(16)
1375 data2 = os.getrandom(16)
1376 self.assertNotEqual(data1, data2)
1377
1378
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001379# os.urandom() doesn't use a file descriptor when it is implemented with the
1380# getentropy() function, the getrandom() function or the getrandom() syscall
1381OS_URANDOM_DONT_USE_FD = (
1382 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1383 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1384 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001385
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001386@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1387 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001388class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001389 @unittest.skipUnless(resource, "test requires the resource module")
1390 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001391 # Check urandom() failing when it is not able to open /dev/random.
1392 # We spawn a new process to make the test more robust (if getrlimit()
1393 # failed to restore the file descriptor limit after this, the whole
1394 # test suite would crash; this actually happened on the OS X Tiger
1395 # buildbot).
1396 code = """if 1:
1397 import errno
1398 import os
1399 import resource
1400
1401 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1402 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1403 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001404 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001405 except OSError as e:
1406 assert e.errno == errno.EMFILE, e.errno
1407 else:
1408 raise AssertionError("OSError not raised")
1409 """
1410 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001411
Antoine Pitroue472aea2014-04-26 14:33:03 +02001412 def test_urandom_fd_closed(self):
1413 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1414 # closed.
1415 code = """if 1:
1416 import os
1417 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001418 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001419 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001420 with test.support.SuppressCrashReport():
1421 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001422 sys.stdout.buffer.write(os.urandom(4))
1423 """
1424 rc, out, err = assert_python_ok('-Sc', code)
1425
1426 def test_urandom_fd_reopened(self):
1427 # Issue #21207: urandom() should detect its fd to /dev/urandom
1428 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001429 self.addCleanup(support.unlink, support.TESTFN)
1430 create_file(support.TESTFN, b"x" * 256)
1431
Antoine Pitroue472aea2014-04-26 14:33:03 +02001432 code = """if 1:
1433 import os
1434 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001435 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001436 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001437 with test.support.SuppressCrashReport():
1438 for fd in range(3, 256):
1439 try:
1440 os.close(fd)
1441 except OSError:
1442 pass
1443 else:
1444 # Found the urandom fd (XXX hopefully)
1445 break
1446 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001447 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001448 new_fd = f.fileno()
1449 # Issue #26935: posix allows new_fd and fd to be equal but
1450 # some libc implementations have dup2 return an error in this
1451 # case.
1452 if new_fd != fd:
1453 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001454 sys.stdout.buffer.write(os.urandom(4))
1455 sys.stdout.buffer.write(os.urandom(4))
1456 """.format(TESTFN=support.TESTFN)
1457 rc, out, err = assert_python_ok('-Sc', code)
1458 self.assertEqual(len(out), 8)
1459 self.assertNotEqual(out[0:4], out[4:8])
1460 rc, out2, err2 = assert_python_ok('-Sc', code)
1461 self.assertEqual(len(out2), 8)
1462 self.assertNotEqual(out2, out)
1463
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001464
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001465@contextlib.contextmanager
1466def _execvpe_mockup(defpath=None):
1467 """
1468 Stubs out execv and execve functions when used as context manager.
1469 Records exec calls. The mock execv and execve functions always raise an
1470 exception as they would normally never return.
1471 """
1472 # A list of tuples containing (function name, first arg, args)
1473 # of calls to execv or execve that have been made.
1474 calls = []
1475
1476 def mock_execv(name, *args):
1477 calls.append(('execv', name, args))
1478 raise RuntimeError("execv called")
1479
1480 def mock_execve(name, *args):
1481 calls.append(('execve', name, args))
1482 raise OSError(errno.ENOTDIR, "execve called")
1483
1484 try:
1485 orig_execv = os.execv
1486 orig_execve = os.execve
1487 orig_defpath = os.defpath
1488 os.execv = mock_execv
1489 os.execve = mock_execve
1490 if defpath is not None:
1491 os.defpath = defpath
1492 yield calls
1493 finally:
1494 os.execv = orig_execv
1495 os.execve = orig_execve
1496 os.defpath = orig_defpath
1497
Victor Stinner4659ccf2016-09-14 10:57:00 +02001498
Guido van Rossume7ba4952007-06-06 23:52:48 +00001499class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001500 @unittest.skipIf(USING_LINUXTHREADS,
1501 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001502 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001503 self.assertRaises(OSError, os.execvpe, 'no such app-',
1504 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001505
Steve Dowerbce26262016-11-19 19:17:26 -08001506 def test_execv_with_bad_arglist(self):
1507 self.assertRaises(ValueError, os.execv, 'notepad', ())
1508 self.assertRaises(ValueError, os.execv, 'notepad', [])
1509 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1510 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1511
Thomas Heller6790d602007-08-30 17:15:14 +00001512 def test_execvpe_with_bad_arglist(self):
1513 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001514 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1515 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001516
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001517 @unittest.skipUnless(hasattr(os, '_execvpe'),
1518 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001519 def _test_internal_execvpe(self, test_type):
1520 program_path = os.sep + 'absolutepath'
1521 if test_type is bytes:
1522 program = b'executable'
1523 fullpath = os.path.join(os.fsencode(program_path), program)
1524 native_fullpath = fullpath
1525 arguments = [b'progname', 'arg1', 'arg2']
1526 else:
1527 program = 'executable'
1528 arguments = ['progname', 'arg1', 'arg2']
1529 fullpath = os.path.join(program_path, program)
1530 if os.name != "nt":
1531 native_fullpath = os.fsencode(fullpath)
1532 else:
1533 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001534 env = {'spam': 'beans'}
1535
Victor Stinnerb745a742010-05-18 17:17:23 +00001536 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001537 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001538 self.assertRaises(RuntimeError,
1539 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001540 self.assertEqual(len(calls), 1)
1541 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1542
Victor Stinnerb745a742010-05-18 17:17:23 +00001543 # test os._execvpe() with a relative path:
1544 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001545 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001546 self.assertRaises(OSError,
1547 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001548 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001549 self.assertSequenceEqual(calls[0],
1550 ('execve', native_fullpath, (arguments, env)))
1551
1552 # test os._execvpe() with a relative path:
1553 # os.get_exec_path() reads the 'PATH' variable
1554 with _execvpe_mockup() as calls:
1555 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001556 if test_type is bytes:
1557 env_path[b'PATH'] = program_path
1558 else:
1559 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001560 self.assertRaises(OSError,
1561 os._execvpe, program, arguments, env=env_path)
1562 self.assertEqual(len(calls), 1)
1563 self.assertSequenceEqual(calls[0],
1564 ('execve', native_fullpath, (arguments, env_path)))
1565
1566 def test_internal_execvpe_str(self):
1567 self._test_internal_execvpe(str)
1568 if os.name != "nt":
1569 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001570
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001571 def test_execve_invalid_env(self):
1572 args = [sys.executable, '-c', 'pass']
1573
Ville Skyttä49b27342017-08-03 09:00:59 +03001574 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001575 newenv = os.environ.copy()
1576 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1577 with self.assertRaises(ValueError):
1578 os.execve(args[0], args, newenv)
1579
Ville Skyttä49b27342017-08-03 09:00:59 +03001580 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001581 newenv = os.environ.copy()
1582 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1583 with self.assertRaises(ValueError):
1584 os.execve(args[0], args, newenv)
1585
Ville Skyttä49b27342017-08-03 09:00:59 +03001586 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001587 newenv = os.environ.copy()
1588 newenv["FRUIT=ORANGE"] = "lemon"
1589 with self.assertRaises(ValueError):
1590 os.execve(args[0], args, newenv)
1591
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001592
Serhiy Storchaka43767632013-11-03 21:31:38 +02001593@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001594class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001595 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001596 try:
1597 os.stat(support.TESTFN)
1598 except FileNotFoundError:
1599 exists = False
1600 except OSError as exc:
1601 exists = True
1602 self.fail("file %s must not exist; os.stat failed with %s"
1603 % (support.TESTFN, exc))
1604 else:
1605 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001606
Thomas Wouters477c8d52006-05-27 19:21:47 +00001607 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001608 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001609
1610 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001611 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001612
1613 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001614 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001615
1616 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001617 self.addCleanup(support.unlink, support.TESTFN)
1618
Victor Stinnere77c9742016-03-25 10:28:23 +01001619 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001620 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001621
1622 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001623 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001624
Thomas Wouters477c8d52006-05-27 19:21:47 +00001625 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001626 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001627
Victor Stinnere77c9742016-03-25 10:28:23 +01001628
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001629class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001630 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001631 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1632 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001633 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001634 def get_single(f):
1635 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001636 if hasattr(os, f):
1637 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001638 return helper
1639 for f in singles:
1640 locals()["test_"+f] = get_single(f)
1641
Benjamin Peterson7522c742009-01-19 21:00:09 +00001642 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001643 try:
1644 f(support.make_bad_fd(), *args)
1645 except OSError as e:
1646 self.assertEqual(e.errno, errno.EBADF)
1647 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001648 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001649 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001650
Serhiy Storchaka43767632013-11-03 21:31:38 +02001651 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001652 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001653 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001654
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001656 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001657 fd = support.make_bad_fd()
1658 # Make sure none of the descriptors we are about to close are
1659 # currently valid (issue 6542).
1660 for i in range(10):
1661 try: os.fstat(fd+i)
1662 except OSError:
1663 pass
1664 else:
1665 break
1666 if i < 2:
1667 raise unittest.SkipTest(
1668 "Unable to acquire a range of invalid file descriptors")
1669 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001670
Serhiy Storchaka43767632013-11-03 21:31:38 +02001671 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001672 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001673 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001674
Serhiy Storchaka43767632013-11-03 21:31:38 +02001675 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001676 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001678
Serhiy Storchaka43767632013-11-03 21:31:38 +02001679 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001680 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001681 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001682
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001684 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685 self.check(os.pathconf, "PC_NAME_MAX")
1686 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001687
Serhiy Storchaka43767632013-11-03 21:31:38 +02001688 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001689 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001690 self.check(os.truncate, 0)
1691 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001692
Serhiy Storchaka43767632013-11-03 21:31:38 +02001693 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001694 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001695 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001696
Serhiy Storchaka43767632013-11-03 21:31:38 +02001697 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001698 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001699 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001700
Victor Stinner57ddf782014-01-08 15:21:28 +01001701 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1702 def test_readv(self):
1703 buf = bytearray(10)
1704 self.check(os.readv, [buf])
1705
Serhiy Storchaka43767632013-11-03 21:31:38 +02001706 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001707 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001709
Serhiy Storchaka43767632013-11-03 21:31:38 +02001710 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001711 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001712 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001713
Victor Stinner57ddf782014-01-08 15:21:28 +01001714 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1715 def test_writev(self):
1716 self.check(os.writev, [b'abc'])
1717
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001718 def test_inheritable(self):
1719 self.check(os.get_inheritable)
1720 self.check(os.set_inheritable, True)
1721
1722 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1723 'needs os.get_blocking() and os.set_blocking()')
1724 def test_blocking(self):
1725 self.check(os.get_blocking)
1726 self.check(os.set_blocking, True)
1727
Brian Curtin1b9df392010-11-24 20:24:31 +00001728
1729class LinkTests(unittest.TestCase):
1730 def setUp(self):
1731 self.file1 = support.TESTFN
1732 self.file2 = os.path.join(support.TESTFN + "2")
1733
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001734 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001735 for file in (self.file1, self.file2):
1736 if os.path.exists(file):
1737 os.unlink(file)
1738
Brian Curtin1b9df392010-11-24 20:24:31 +00001739 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001740 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001741
xdegaye6a55d092017-11-12 17:57:04 +01001742 try:
1743 os.link(file1, file2)
1744 except PermissionError as e:
1745 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001746 with open(file1, "r") as f1, open(file2, "r") as f2:
1747 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1748
1749 def test_link(self):
1750 self._test_link(self.file1, self.file2)
1751
1752 def test_link_bytes(self):
1753 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1754 bytes(self.file2, sys.getfilesystemencoding()))
1755
Brian Curtinf498b752010-11-30 15:54:04 +00001756 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001757 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001758 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001759 except UnicodeError:
1760 raise unittest.SkipTest("Unable to encode for this platform.")
1761
Brian Curtinf498b752010-11-30 15:54:04 +00001762 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001763 self.file2 = self.file1 + "2"
1764 self._test_link(self.file1, self.file2)
1765
Serhiy Storchaka43767632013-11-03 21:31:38 +02001766@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1767class PosixUidGidTests(unittest.TestCase):
1768 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1769 def test_setuid(self):
1770 if os.getuid() != 0:
1771 self.assertRaises(OSError, os.setuid, 0)
1772 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001773
Serhiy Storchaka43767632013-11-03 21:31:38 +02001774 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1775 def test_setgid(self):
1776 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1777 self.assertRaises(OSError, os.setgid, 0)
1778 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001779
Serhiy Storchaka43767632013-11-03 21:31:38 +02001780 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1781 def test_seteuid(self):
1782 if os.getuid() != 0:
1783 self.assertRaises(OSError, os.seteuid, 0)
1784 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001785
Serhiy Storchaka43767632013-11-03 21:31:38 +02001786 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1787 def test_setegid(self):
1788 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1789 self.assertRaises(OSError, os.setegid, 0)
1790 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001791
Serhiy Storchaka43767632013-11-03 21:31:38 +02001792 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1793 def test_setreuid(self):
1794 if os.getuid() != 0:
1795 self.assertRaises(OSError, os.setreuid, 0, 0)
1796 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1797 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001798
Serhiy Storchaka43767632013-11-03 21:31:38 +02001799 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1800 def test_setreuid_neg1(self):
1801 # Needs to accept -1. We run this in a subprocess to avoid
1802 # altering the test runner's process state (issue8045).
1803 subprocess.check_call([
1804 sys.executable, '-c',
1805 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001806
Serhiy Storchaka43767632013-11-03 21:31:38 +02001807 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1808 def test_setregid(self):
1809 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1810 self.assertRaises(OSError, os.setregid, 0, 0)
1811 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1812 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001813
Serhiy Storchaka43767632013-11-03 21:31:38 +02001814 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1815 def test_setregid_neg1(self):
1816 # Needs to accept -1. We run this in a subprocess to avoid
1817 # altering the test runner's process state (issue8045).
1818 subprocess.check_call([
1819 sys.executable, '-c',
1820 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001821
Serhiy Storchaka43767632013-11-03 21:31:38 +02001822@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1823class Pep383Tests(unittest.TestCase):
1824 def setUp(self):
1825 if support.TESTFN_UNENCODABLE:
1826 self.dir = support.TESTFN_UNENCODABLE
1827 elif support.TESTFN_NONASCII:
1828 self.dir = support.TESTFN_NONASCII
1829 else:
1830 self.dir = support.TESTFN
1831 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001832
Serhiy Storchaka43767632013-11-03 21:31:38 +02001833 bytesfn = []
1834 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001835 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001836 fn = os.fsencode(fn)
1837 except UnicodeEncodeError:
1838 return
1839 bytesfn.append(fn)
1840 add_filename(support.TESTFN_UNICODE)
1841 if support.TESTFN_UNENCODABLE:
1842 add_filename(support.TESTFN_UNENCODABLE)
1843 if support.TESTFN_NONASCII:
1844 add_filename(support.TESTFN_NONASCII)
1845 if not bytesfn:
1846 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001847
Serhiy Storchaka43767632013-11-03 21:31:38 +02001848 self.unicodefn = set()
1849 os.mkdir(self.dir)
1850 try:
1851 for fn in bytesfn:
1852 support.create_empty_file(os.path.join(self.bdir, fn))
1853 fn = os.fsdecode(fn)
1854 if fn in self.unicodefn:
1855 raise ValueError("duplicate filename")
1856 self.unicodefn.add(fn)
1857 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001858 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001859 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001860
Serhiy Storchaka43767632013-11-03 21:31:38 +02001861 def tearDown(self):
1862 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001863
Serhiy Storchaka43767632013-11-03 21:31:38 +02001864 def test_listdir(self):
1865 expected = self.unicodefn
1866 found = set(os.listdir(self.dir))
1867 self.assertEqual(found, expected)
1868 # test listdir without arguments
1869 current_directory = os.getcwd()
1870 try:
1871 os.chdir(os.sep)
1872 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1873 finally:
1874 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001875
Serhiy Storchaka43767632013-11-03 21:31:38 +02001876 def test_open(self):
1877 for fn in self.unicodefn:
1878 f = open(os.path.join(self.dir, fn), 'rb')
1879 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001880
Serhiy Storchaka43767632013-11-03 21:31:38 +02001881 @unittest.skipUnless(hasattr(os, 'statvfs'),
1882 "need os.statvfs()")
1883 def test_statvfs(self):
1884 # issue #9645
1885 for fn in self.unicodefn:
1886 # should not fail with file not found error
1887 fullname = os.path.join(self.dir, fn)
1888 os.statvfs(fullname)
1889
1890 def test_stat(self):
1891 for fn in self.unicodefn:
1892 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001893
Brian Curtineb24d742010-04-12 17:16:38 +00001894@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1895class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001896 def _kill(self, sig):
1897 # Start sys.executable as a subprocess and communicate from the
1898 # subprocess to the parent that the interpreter is ready. When it
1899 # becomes ready, send *sig* via os.kill to the subprocess and check
1900 # that the return code is equal to *sig*.
1901 import ctypes
1902 from ctypes import wintypes
1903 import msvcrt
1904
1905 # Since we can't access the contents of the process' stdout until the
1906 # process has exited, use PeekNamedPipe to see what's inside stdout
1907 # without waiting. This is done so we can tell that the interpreter
1908 # is started and running at a point where it could handle a signal.
1909 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1910 PeekNamedPipe.restype = wintypes.BOOL
1911 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1912 ctypes.POINTER(ctypes.c_char), # stdout buf
1913 wintypes.DWORD, # Buffer size
1914 ctypes.POINTER(wintypes.DWORD), # bytes read
1915 ctypes.POINTER(wintypes.DWORD), # bytes avail
1916 ctypes.POINTER(wintypes.DWORD)) # bytes left
1917 msg = "running"
1918 proc = subprocess.Popen([sys.executable, "-c",
1919 "import sys;"
1920 "sys.stdout.write('{}');"
1921 "sys.stdout.flush();"
1922 "input()".format(msg)],
1923 stdout=subprocess.PIPE,
1924 stderr=subprocess.PIPE,
1925 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001926 self.addCleanup(proc.stdout.close)
1927 self.addCleanup(proc.stderr.close)
1928 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001929
1930 count, max = 0, 100
1931 while count < max and proc.poll() is None:
1932 # Create a string buffer to store the result of stdout from the pipe
1933 buf = ctypes.create_string_buffer(len(msg))
1934 # Obtain the text currently in proc.stdout
1935 # Bytes read/avail/left are left as NULL and unused
1936 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1937 buf, ctypes.sizeof(buf), None, None, None)
1938 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1939 if buf.value:
1940 self.assertEqual(msg, buf.value.decode())
1941 break
1942 time.sleep(0.1)
1943 count += 1
1944 else:
1945 self.fail("Did not receive communication from the subprocess")
1946
Brian Curtineb24d742010-04-12 17:16:38 +00001947 os.kill(proc.pid, sig)
1948 self.assertEqual(proc.wait(), sig)
1949
1950 def test_kill_sigterm(self):
1951 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001952 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001953
1954 def test_kill_int(self):
1955 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001956 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001957
1958 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001959 tagname = "test_os_%s" % uuid.uuid1()
1960 m = mmap.mmap(-1, 1, tagname)
1961 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001962 # Run a script which has console control handling enabled.
1963 proc = subprocess.Popen([sys.executable,
1964 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001965 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001966 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1967 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001968 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001969 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001970 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001971 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001972 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001973 count += 1
1974 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001975 # Forcefully kill the process if we weren't able to signal it.
1976 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001977 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001978 os.kill(proc.pid, event)
1979 # proc.send_signal(event) could also be done here.
1980 # Allow time for the signal to be passed and the process to exit.
1981 time.sleep(0.5)
1982 if not proc.poll():
1983 # Forcefully kill the process if we weren't able to signal it.
1984 os.kill(proc.pid, signal.SIGINT)
1985 self.fail("subprocess did not stop on {}".format(name))
1986
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001987 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001988 def test_CTRL_C_EVENT(self):
1989 from ctypes import wintypes
1990 import ctypes
1991
1992 # Make a NULL value by creating a pointer with no argument.
1993 NULL = ctypes.POINTER(ctypes.c_int)()
1994 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1995 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1996 wintypes.BOOL)
1997 SetConsoleCtrlHandler.restype = wintypes.BOOL
1998
1999 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002000 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002001 # by subprocesses.
2002 SetConsoleCtrlHandler(NULL, 0)
2003
2004 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2005
2006 def test_CTRL_BREAK_EVENT(self):
2007 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2008
2009
Brian Curtind40e6f72010-07-08 21:39:08 +00002010@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002011class Win32ListdirTests(unittest.TestCase):
2012 """Test listdir on Windows."""
2013
2014 def setUp(self):
2015 self.created_paths = []
2016 for i in range(2):
2017 dir_name = 'SUB%d' % i
2018 dir_path = os.path.join(support.TESTFN, dir_name)
2019 file_name = 'FILE%d' % i
2020 file_path = os.path.join(support.TESTFN, file_name)
2021 os.makedirs(dir_path)
2022 with open(file_path, 'w') as f:
2023 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2024 self.created_paths.extend([dir_name, file_name])
2025 self.created_paths.sort()
2026
2027 def tearDown(self):
2028 shutil.rmtree(support.TESTFN)
2029
2030 def test_listdir_no_extended_path(self):
2031 """Test when the path is not an "extended" path."""
2032 # unicode
2033 self.assertEqual(
2034 sorted(os.listdir(support.TESTFN)),
2035 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002036
Tim Golden781bbeb2013-10-25 20:24:06 +01002037 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002038 self.assertEqual(
2039 sorted(os.listdir(os.fsencode(support.TESTFN))),
2040 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002041
2042 def test_listdir_extended_path(self):
2043 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002044 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002045 # unicode
2046 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2047 self.assertEqual(
2048 sorted(os.listdir(path)),
2049 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002050
Tim Golden781bbeb2013-10-25 20:24:06 +01002051 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002052 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2053 self.assertEqual(
2054 sorted(os.listdir(path)),
2055 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002056
2057
2058@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002059@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002060class Win32SymlinkTests(unittest.TestCase):
2061 filelink = 'filelinktest'
2062 filelink_target = os.path.abspath(__file__)
2063 dirlink = 'dirlinktest'
2064 dirlink_target = os.path.dirname(filelink_target)
2065 missing_link = 'missing link'
2066
2067 def setUp(self):
2068 assert os.path.exists(self.dirlink_target)
2069 assert os.path.exists(self.filelink_target)
2070 assert not os.path.exists(self.dirlink)
2071 assert not os.path.exists(self.filelink)
2072 assert not os.path.exists(self.missing_link)
2073
2074 def tearDown(self):
2075 if os.path.exists(self.filelink):
2076 os.remove(self.filelink)
2077 if os.path.exists(self.dirlink):
2078 os.rmdir(self.dirlink)
2079 if os.path.lexists(self.missing_link):
2080 os.remove(self.missing_link)
2081
2082 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002083 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002084 self.assertTrue(os.path.exists(self.dirlink))
2085 self.assertTrue(os.path.isdir(self.dirlink))
2086 self.assertTrue(os.path.islink(self.dirlink))
2087 self.check_stat(self.dirlink, self.dirlink_target)
2088
2089 def test_file_link(self):
2090 os.symlink(self.filelink_target, self.filelink)
2091 self.assertTrue(os.path.exists(self.filelink))
2092 self.assertTrue(os.path.isfile(self.filelink))
2093 self.assertTrue(os.path.islink(self.filelink))
2094 self.check_stat(self.filelink, self.filelink_target)
2095
2096 def _create_missing_dir_link(self):
2097 'Create a "directory" link to a non-existent target'
2098 linkname = self.missing_link
2099 if os.path.lexists(linkname):
2100 os.remove(linkname)
2101 target = r'c:\\target does not exist.29r3c740'
2102 assert not os.path.exists(target)
2103 target_is_dir = True
2104 os.symlink(target, linkname, target_is_dir)
2105
2106 def test_remove_directory_link_to_missing_target(self):
2107 self._create_missing_dir_link()
2108 # For compatibility with Unix, os.remove will check the
2109 # directory status and call RemoveDirectory if the symlink
2110 # was created with target_is_dir==True.
2111 os.remove(self.missing_link)
2112
2113 @unittest.skip("currently fails; consider for improvement")
2114 def test_isdir_on_directory_link_to_missing_target(self):
2115 self._create_missing_dir_link()
2116 # consider having isdir return true for directory links
2117 self.assertTrue(os.path.isdir(self.missing_link))
2118
2119 @unittest.skip("currently fails; consider for improvement")
2120 def test_rmdir_on_directory_link_to_missing_target(self):
2121 self._create_missing_dir_link()
2122 # consider allowing rmdir to remove directory links
2123 os.rmdir(self.missing_link)
2124
2125 def check_stat(self, link, target):
2126 self.assertEqual(os.stat(link), os.stat(target))
2127 self.assertNotEqual(os.lstat(link), os.stat(link))
2128
Brian Curtind25aef52011-06-13 15:16:04 -05002129 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002130 self.assertEqual(os.stat(bytes_link), os.stat(target))
2131 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002132
2133 def test_12084(self):
2134 level1 = os.path.abspath(support.TESTFN)
2135 level2 = os.path.join(level1, "level2")
2136 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002137 self.addCleanup(support.rmtree, level1)
2138
2139 os.mkdir(level1)
2140 os.mkdir(level2)
2141 os.mkdir(level3)
2142
2143 file1 = os.path.abspath(os.path.join(level1, "file1"))
2144 create_file(file1)
2145
2146 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002147 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002148 os.chdir(level2)
2149 link = os.path.join(level2, "link")
2150 os.symlink(os.path.relpath(file1), "link")
2151 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002152
Victor Stinnerae39d232016-03-24 17:12:55 +01002153 # Check os.stat calls from the same dir as the link
2154 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002155
Victor Stinnerae39d232016-03-24 17:12:55 +01002156 # Check os.stat calls from a dir below the link
2157 os.chdir(level1)
2158 self.assertEqual(os.stat(file1),
2159 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002160
Victor Stinnerae39d232016-03-24 17:12:55 +01002161 # Check os.stat calls from a dir above the link
2162 os.chdir(level3)
2163 self.assertEqual(os.stat(file1),
2164 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002165 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002166 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002167
Miss Islington (bot)74ebbae2018-02-12 13:39:42 -08002168 @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
2169 and os.path.exists(r'C:\ProgramData'),
2170 'Test directories not found')
2171 def test_29248(self):
2172 # os.symlink() calls CreateSymbolicLink, which creates
2173 # the reparse data buffer with the print name stored
2174 # first, so the offset is always 0. CreateSymbolicLink
2175 # stores the "PrintName" DOS path (e.g. "C:\") first,
2176 # with an offset of 0, followed by the "SubstituteName"
2177 # NT path (e.g. "\??\C:\"). The "All Users" link, on
2178 # the other hand, seems to have been created manually
2179 # with an inverted order.
2180 target = os.readlink(r'C:\Users\All Users')
2181 self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
2182
Brian Curtind40e6f72010-07-08 21:39:08 +00002183
Tim Golden0321cf22014-05-05 19:46:17 +01002184@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2185class Win32JunctionTests(unittest.TestCase):
2186 junction = 'junctiontest'
2187 junction_target = os.path.dirname(os.path.abspath(__file__))
2188
2189 def setUp(self):
2190 assert os.path.exists(self.junction_target)
2191 assert not os.path.exists(self.junction)
2192
2193 def tearDown(self):
2194 if os.path.exists(self.junction):
2195 # os.rmdir delegates to Windows' RemoveDirectoryW,
2196 # which removes junction points safely.
2197 os.rmdir(self.junction)
2198
2199 def test_create_junction(self):
2200 _winapi.CreateJunction(self.junction_target, self.junction)
2201 self.assertTrue(os.path.exists(self.junction))
2202 self.assertTrue(os.path.isdir(self.junction))
2203
2204 # Junctions are not recognized as links.
2205 self.assertFalse(os.path.islink(self.junction))
2206
2207 def test_unlink_removes_junction(self):
2208 _winapi.CreateJunction(self.junction_target, self.junction)
2209 self.assertTrue(os.path.exists(self.junction))
2210
2211 os.unlink(self.junction)
2212 self.assertFalse(os.path.exists(self.junction))
2213
2214
Jason R. Coombs3a092862013-05-27 23:21:28 -04002215@support.skip_unless_symlink
2216class NonLocalSymlinkTests(unittest.TestCase):
2217
2218 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002219 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002220 Create this structure:
2221
2222 base
2223 \___ some_dir
2224 """
2225 os.makedirs('base/some_dir')
2226
2227 def tearDown(self):
2228 shutil.rmtree('base')
2229
2230 def test_directory_link_nonlocal(self):
2231 """
2232 The symlink target should resolve relative to the link, not relative
2233 to the current directory.
2234
2235 Then, link base/some_link -> base/some_dir and ensure that some_link
2236 is resolved as a directory.
2237
2238 In issue13772, it was discovered that directory detection failed if
2239 the symlink target was not specified relative to the current
2240 directory, which was a defect in the implementation.
2241 """
2242 src = os.path.join('base', 'some_link')
2243 os.symlink('some_dir', src)
2244 assert os.path.isdir(src)
2245
2246
Victor Stinnere8d51452010-08-19 01:05:19 +00002247class FSEncodingTests(unittest.TestCase):
2248 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2250 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002251
Victor Stinnere8d51452010-08-19 01:05:19 +00002252 def test_identity(self):
2253 # assert fsdecode(fsencode(x)) == x
2254 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2255 try:
2256 bytesfn = os.fsencode(fn)
2257 except UnicodeEncodeError:
2258 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002259 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002260
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002261
Brett Cannonefb00c02012-02-29 18:31:31 -05002262
2263class DeviceEncodingTests(unittest.TestCase):
2264
2265 def test_bad_fd(self):
2266 # Return None when an fd doesn't actually exist.
2267 self.assertIsNone(os.device_encoding(123456))
2268
Philip Jenveye308b7c2012-02-29 16:16:15 -08002269 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2270 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002271 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002272 def test_device_encoding(self):
2273 encoding = os.device_encoding(0)
2274 self.assertIsNotNone(encoding)
2275 self.assertTrue(codecs.lookup(encoding))
2276
2277
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002278class PidTests(unittest.TestCase):
2279 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2280 def test_getppid(self):
2281 p = subprocess.Popen([sys.executable, '-c',
2282 'import os; print(os.getppid())'],
2283 stdout=subprocess.PIPE)
2284 stdout, _ = p.communicate()
2285 # We are the parent of our subprocess
2286 self.assertEqual(int(stdout), os.getpid())
2287
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002288 def test_waitpid(self):
2289 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002290 # Add an implicit test for PyUnicode_FSConverter().
2291 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002292 status = os.waitpid(pid, 0)
2293 self.assertEqual(status, (pid, 0))
2294
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002295
Victor Stinner4659ccf2016-09-14 10:57:00 +02002296class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002297 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002298 self.exitcode = 17
2299
2300 filename = support.TESTFN
2301 self.addCleanup(support.unlink, filename)
2302
2303 if not with_env:
2304 code = 'import sys; sys.exit(%s)' % self.exitcode
2305 else:
2306 self.env = dict(os.environ)
2307 # create an unique key
2308 self.key = str(uuid.uuid4())
2309 self.env[self.key] = self.key
2310 # read the variable from os.environ to check that it exists
2311 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2312 % (self.key, self.exitcode))
2313
2314 with open(filename, "w") as fp:
2315 fp.write(code)
2316
Berker Peksag81816462016-09-15 20:19:47 +03002317 args = [sys.executable, filename]
2318 if use_bytes:
2319 args = [os.fsencode(a) for a in args]
2320 self.env = {os.fsencode(k): os.fsencode(v)
2321 for k, v in self.env.items()}
2322
2323 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002324
Berker Peksag4af23d72016-09-15 20:32:44 +03002325 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002326 def test_spawnl(self):
2327 args = self.create_args()
2328 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2329 self.assertEqual(exitcode, self.exitcode)
2330
Berker Peksag4af23d72016-09-15 20:32:44 +03002331 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002332 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002333 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002334 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2335 self.assertEqual(exitcode, self.exitcode)
2336
Berker Peksag4af23d72016-09-15 20:32:44 +03002337 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002338 def test_spawnlp(self):
2339 args = self.create_args()
2340 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2341 self.assertEqual(exitcode, self.exitcode)
2342
Berker Peksag4af23d72016-09-15 20:32:44 +03002343 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002344 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002345 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002346 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2347 self.assertEqual(exitcode, self.exitcode)
2348
Berker Peksag4af23d72016-09-15 20:32:44 +03002349 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002350 def test_spawnv(self):
2351 args = self.create_args()
2352 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2353 self.assertEqual(exitcode, self.exitcode)
2354
Berker Peksag4af23d72016-09-15 20:32:44 +03002355 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002356 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002357 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002358 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2359 self.assertEqual(exitcode, self.exitcode)
2360
Berker Peksag4af23d72016-09-15 20:32:44 +03002361 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002362 def test_spawnvp(self):
2363 args = self.create_args()
2364 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2365 self.assertEqual(exitcode, self.exitcode)
2366
Berker Peksag4af23d72016-09-15 20:32:44 +03002367 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002368 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002369 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002370 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2371 self.assertEqual(exitcode, self.exitcode)
2372
Berker Peksag4af23d72016-09-15 20:32:44 +03002373 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002374 def test_nowait(self):
2375 args = self.create_args()
2376 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2377 result = os.waitpid(pid, 0)
2378 self.assertEqual(result[0], pid)
2379 status = result[1]
2380 if hasattr(os, 'WIFEXITED'):
2381 self.assertTrue(os.WIFEXITED(status))
2382 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2383 else:
2384 self.assertEqual(status, self.exitcode << 8)
2385
Berker Peksag4af23d72016-09-15 20:32:44 +03002386 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002387 def test_spawnve_bytes(self):
2388 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2389 args = self.create_args(with_env=True, use_bytes=True)
2390 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2391 self.assertEqual(exitcode, self.exitcode)
2392
Steve Dower859fd7b2016-11-19 18:53:19 -08002393 @requires_os_func('spawnl')
2394 def test_spawnl_noargs(self):
2395 args = self.create_args()
2396 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002397 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002398
2399 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002400 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002401 args = self.create_args()
2402 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002403 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002404
2405 @requires_os_func('spawnv')
2406 def test_spawnv_noargs(self):
2407 args = self.create_args()
2408 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2409 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002410 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2411 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002412
2413 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002414 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002415 args = self.create_args()
2416 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2417 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002418 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2419 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002420
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002421 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002422 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002423
Ville Skyttä49b27342017-08-03 09:00:59 +03002424 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002425 newenv = os.environ.copy()
2426 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2427 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002428 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002429 except ValueError:
2430 pass
2431 else:
2432 self.assertEqual(exitcode, 127)
2433
Ville Skyttä49b27342017-08-03 09:00:59 +03002434 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002435 newenv = os.environ.copy()
2436 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2437 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002438 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002439 except ValueError:
2440 pass
2441 else:
2442 self.assertEqual(exitcode, 127)
2443
Ville Skyttä49b27342017-08-03 09:00:59 +03002444 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002445 newenv = os.environ.copy()
2446 newenv["FRUIT=ORANGE"] = "lemon"
2447 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002448 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002449 except ValueError:
2450 pass
2451 else:
2452 self.assertEqual(exitcode, 127)
2453
Ville Skyttä49b27342017-08-03 09:00:59 +03002454 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002455 filename = support.TESTFN
2456 self.addCleanup(support.unlink, filename)
2457 with open(filename, "w") as fp:
2458 fp.write('import sys, os\n'
2459 'if os.getenv("FRUIT") != "orange=lemon":\n'
2460 ' raise AssertionError')
2461 args = [sys.executable, filename]
2462 newenv = os.environ.copy()
2463 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002464 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002465 self.assertEqual(exitcode, 0)
2466
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002467 @requires_os_func('spawnve')
2468 def test_spawnve_invalid_env(self):
2469 self._test_invalid_env(os.spawnve)
2470
2471 @requires_os_func('spawnvpe')
2472 def test_spawnvpe_invalid_env(self):
2473 self._test_invalid_env(os.spawnvpe)
2474
Serhiy Storchaka77703942017-06-25 07:33:01 +03002475
Brian Curtin0151b8e2010-09-24 13:43:43 +00002476# The introduction of this TestCase caused at least two different errors on
2477# *nix buildbots. Temporarily skip this to let the buildbots move along.
2478@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002479@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2480class LoginTests(unittest.TestCase):
2481 def test_getlogin(self):
2482 user_name = os.getlogin()
2483 self.assertNotEqual(len(user_name), 0)
2484
2485
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002486@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2487 "needs os.getpriority and os.setpriority")
2488class ProgramPriorityTests(unittest.TestCase):
2489 """Tests for os.getpriority() and os.setpriority()."""
2490
2491 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002492
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002493 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2494 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2495 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002496 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2497 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002498 raise unittest.SkipTest("unable to reliably test setpriority "
2499 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002500 else:
2501 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002502 finally:
2503 try:
2504 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2505 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002506 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002507 raise
2508
2509
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002510class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002511
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002512 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002513
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002514 def __init__(self, conn):
2515 asynchat.async_chat.__init__(self, conn)
2516 self.in_buffer = []
2517 self.closed = False
2518 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002519
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002520 def handle_read(self):
2521 data = self.recv(4096)
2522 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002523
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002524 def get_data(self):
2525 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002526
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002527 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002528 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002529 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002530
2531 def handle_error(self):
2532 raise
2533
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002534 def __init__(self, address):
2535 threading.Thread.__init__(self)
2536 asyncore.dispatcher.__init__(self)
2537 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2538 self.bind(address)
2539 self.listen(5)
2540 self.host, self.port = self.socket.getsockname()[:2]
2541 self.handler_instance = None
2542 self._active = False
2543 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002544
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002545 # --- public API
2546
2547 @property
2548 def running(self):
2549 return self._active
2550
2551 def start(self):
2552 assert not self.running
2553 self.__flag = threading.Event()
2554 threading.Thread.start(self)
2555 self.__flag.wait()
2556
2557 def stop(self):
2558 assert self.running
2559 self._active = False
2560 self.join()
2561
2562 def wait(self):
2563 # wait for handler connection to be closed, then stop the server
2564 while not getattr(self.handler_instance, "closed", False):
2565 time.sleep(0.001)
2566 self.stop()
2567
2568 # --- internals
2569
2570 def run(self):
2571 self._active = True
2572 self.__flag.set()
2573 while self._active and asyncore.socket_map:
2574 self._active_lock.acquire()
2575 asyncore.loop(timeout=0.001, count=1)
2576 self._active_lock.release()
2577 asyncore.close_all()
2578
2579 def handle_accept(self):
2580 conn, addr = self.accept()
2581 self.handler_instance = self.Handler(conn)
2582
2583 def handle_connect(self):
2584 self.close()
2585 handle_read = handle_connect
2586
2587 def writable(self):
2588 return 0
2589
2590 def handle_error(self):
2591 raise
2592
2593
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002594@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2595class TestSendfile(unittest.TestCase):
2596
Victor Stinner8c663fd2017-11-08 14:44:44 -08002597 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002598 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002599 not sys.platform.startswith("solaris") and \
2600 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002601 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2602 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002603
2604 @classmethod
2605 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002606 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002607 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002608
2609 @classmethod
2610 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002611 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002612 support.unlink(support.TESTFN)
2613
2614 def setUp(self):
2615 self.server = SendfileTestServer((support.HOST, 0))
2616 self.server.start()
2617 self.client = socket.socket()
2618 self.client.connect((self.server.host, self.server.port))
2619 self.client.settimeout(1)
2620 # synchronize by waiting for "220 ready" response
2621 self.client.recv(1024)
2622 self.sockno = self.client.fileno()
2623 self.file = open(support.TESTFN, 'rb')
2624 self.fileno = self.file.fileno()
2625
2626 def tearDown(self):
2627 self.file.close()
2628 self.client.close()
2629 if self.server.running:
2630 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002631 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002632
2633 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2634 """A higher level wrapper representing how an application is
2635 supposed to use sendfile().
2636 """
2637 while 1:
2638 try:
2639 if self.SUPPORT_HEADERS_TRAILERS:
2640 return os.sendfile(sock, file, offset, nbytes, headers,
2641 trailers)
2642 else:
2643 return os.sendfile(sock, file, offset, nbytes)
2644 except OSError as err:
2645 if err.errno == errno.ECONNRESET:
2646 # disconnected
2647 raise
2648 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2649 # we have to retry send data
2650 continue
2651 else:
2652 raise
2653
2654 def test_send_whole_file(self):
2655 # normal send
2656 total_sent = 0
2657 offset = 0
2658 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002659 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002660 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2661 if sent == 0:
2662 break
2663 offset += sent
2664 total_sent += sent
2665 self.assertTrue(sent <= nbytes)
2666 self.assertEqual(offset, total_sent)
2667
2668 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002669 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002670 self.client.close()
2671 self.server.wait()
2672 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002673 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002674 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002675
2676 def test_send_at_certain_offset(self):
2677 # start sending a file at a certain offset
2678 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002679 offset = len(self.DATA) // 2
2680 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002681 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002682 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002683 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2684 if sent == 0:
2685 break
2686 offset += sent
2687 total_sent += sent
2688 self.assertTrue(sent <= nbytes)
2689
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002690 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002691 self.client.close()
2692 self.server.wait()
2693 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002694 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002695 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002696 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002697 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002698
2699 def test_offset_overflow(self):
2700 # specify an offset > file size
2701 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002702 try:
2703 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2704 except OSError as e:
2705 # Solaris can raise EINVAL if offset >= file length, ignore.
2706 if e.errno != errno.EINVAL:
2707 raise
2708 else:
2709 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002710 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002711 self.client.close()
2712 self.server.wait()
2713 data = self.server.handler_instance.get_data()
2714 self.assertEqual(data, b'')
2715
2716 def test_invalid_offset(self):
2717 with self.assertRaises(OSError) as cm:
2718 os.sendfile(self.sockno, self.fileno, -1, 4096)
2719 self.assertEqual(cm.exception.errno, errno.EINVAL)
2720
Martin Panterbf19d162015-09-09 01:01:13 +00002721 def test_keywords(self):
2722 # Keyword arguments should be supported
2723 os.sendfile(out=self.sockno, offset=0, count=4096,
2724 **{'in': self.fileno})
2725 if self.SUPPORT_HEADERS_TRAILERS:
2726 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002727 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002728
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002729 # --- headers / trailers tests
2730
Serhiy Storchaka43767632013-11-03 21:31:38 +02002731 @requires_headers_trailers
2732 def test_headers(self):
2733 total_sent = 0
2734 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2735 headers=[b"x" * 512])
2736 total_sent += sent
2737 offset = 4096
2738 nbytes = 4096
2739 while 1:
2740 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2741 offset, nbytes)
2742 if sent == 0:
2743 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002744 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002745 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002746
Serhiy Storchaka43767632013-11-03 21:31:38 +02002747 expected_data = b"x" * 512 + self.DATA
2748 self.assertEqual(total_sent, len(expected_data))
2749 self.client.close()
2750 self.server.wait()
2751 data = self.server.handler_instance.get_data()
2752 self.assertEqual(hash(data), hash(expected_data))
2753
2754 @requires_headers_trailers
2755 def test_trailers(self):
2756 TESTFN2 = support.TESTFN + "2"
2757 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002758
2759 self.addCleanup(support.unlink, TESTFN2)
2760 create_file(TESTFN2, file_data)
2761
2762 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002763 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2764 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002765 self.client.close()
2766 self.server.wait()
2767 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002768 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002769
Serhiy Storchaka43767632013-11-03 21:31:38 +02002770 @requires_headers_trailers
2771 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2772 'test needs os.SF_NODISKIO')
2773 def test_flags(self):
2774 try:
2775 os.sendfile(self.sockno, self.fileno, 0, 4096,
2776 flags=os.SF_NODISKIO)
2777 except OSError as err:
2778 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2779 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002780
2781
Larry Hastings9cf065c2012-06-22 16:30:09 -07002782def supports_extended_attributes():
2783 if not hasattr(os, "setxattr"):
2784 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002785
Larry Hastings9cf065c2012-06-22 16:30:09 -07002786 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002787 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002788 try:
2789 os.setxattr(fp.fileno(), b"user.test", b"")
2790 except OSError:
2791 return False
2792 finally:
2793 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002794
2795 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002796
2797
2798@unittest.skipUnless(supports_extended_attributes(),
2799 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002800# Kernels < 2.6.39 don't respect setxattr flags.
2801@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002802class ExtendedAttributeTests(unittest.TestCase):
2803
Larry Hastings9cf065c2012-06-22 16:30:09 -07002804 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002805 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002806 self.addCleanup(support.unlink, fn)
2807 create_file(fn)
2808
Benjamin Peterson799bd802011-08-31 22:15:17 -04002809 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002810 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002811 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002812
Victor Stinnerf12e5062011-10-16 22:12:03 +02002813 init_xattr = listxattr(fn)
2814 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002815
Larry Hastings9cf065c2012-06-22 16:30:09 -07002816 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002817 xattr = set(init_xattr)
2818 xattr.add("user.test")
2819 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002820 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2821 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2822 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002823
Benjamin Peterson799bd802011-08-31 22:15:17 -04002824 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002825 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002826 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002827
Benjamin Peterson799bd802011-08-31 22:15:17 -04002828 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002829 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002830 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002831
Larry Hastings9cf065c2012-06-22 16:30:09 -07002832 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002833 xattr.add("user.test2")
2834 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002835 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002836
Benjamin Peterson799bd802011-08-31 22:15:17 -04002837 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002838 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002839 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002840
Victor Stinnerf12e5062011-10-16 22:12:03 +02002841 xattr.remove("user.test")
2842 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002843 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2844 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2845 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2846 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002847 many = sorted("user.test{}".format(i) for i in range(100))
2848 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002849 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002850 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002851
Larry Hastings9cf065c2012-06-22 16:30:09 -07002852 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002853 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002854 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002855
2856 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2857 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002858
2859 def test_simple(self):
2860 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2861 os.listxattr)
2862
2863 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002864 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2865 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002866
2867 def test_fds(self):
2868 def getxattr(path, *args):
2869 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002870 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002871 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002872 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002873 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002874 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002875 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002876 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002877 def listxattr(path, *args):
2878 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002879 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002880 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2881
2882
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002883@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2884class TermsizeTests(unittest.TestCase):
2885 def test_does_not_crash(self):
2886 """Check if get_terminal_size() returns a meaningful value.
2887
2888 There's no easy portable way to actually check the size of the
2889 terminal, so let's check if it returns something sensible instead.
2890 """
2891 try:
2892 size = os.get_terminal_size()
2893 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002894 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002895 # Under win32 a generic OSError can be thrown if the
2896 # handle cannot be retrieved
2897 self.skipTest("failed to query terminal size")
2898 raise
2899
Antoine Pitroucfade362012-02-08 23:48:59 +01002900 self.assertGreaterEqual(size.columns, 0)
2901 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002902
2903 def test_stty_match(self):
2904 """Check if stty returns the same results
2905
2906 stty actually tests stdin, so get_terminal_size is invoked on
2907 stdin explicitly. If stty succeeded, then get_terminal_size()
2908 should work too.
2909 """
2910 try:
2911 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01002912 except (FileNotFoundError, subprocess.CalledProcessError,
2913 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002914 self.skipTest("stty invocation failed")
2915 expected = (int(size[1]), int(size[0])) # reversed order
2916
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002917 try:
2918 actual = os.get_terminal_size(sys.__stdin__.fileno())
2919 except OSError as e:
2920 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2921 # Under win32 a generic OSError can be thrown if the
2922 # handle cannot be retrieved
2923 self.skipTest("failed to query terminal size")
2924 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002925 self.assertEqual(expected, actual)
2926
2927
Victor Stinner292c8352012-10-30 02:17:38 +01002928class OSErrorTests(unittest.TestCase):
2929 def setUp(self):
2930 class Str(str):
2931 pass
2932
Victor Stinnerafe17062012-10-31 22:47:43 +01002933 self.bytes_filenames = []
2934 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002935 if support.TESTFN_UNENCODABLE is not None:
2936 decoded = support.TESTFN_UNENCODABLE
2937 else:
2938 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002939 self.unicode_filenames.append(decoded)
2940 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002941 if support.TESTFN_UNDECODABLE is not None:
2942 encoded = support.TESTFN_UNDECODABLE
2943 else:
2944 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002945 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002946 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002947 self.bytes_filenames.append(memoryview(encoded))
2948
2949 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002950
2951 def test_oserror_filename(self):
2952 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002953 (self.filenames, os.chdir,),
2954 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002955 (self.filenames, os.lstat,),
2956 (self.filenames, os.open, os.O_RDONLY),
2957 (self.filenames, os.rmdir,),
2958 (self.filenames, os.stat,),
2959 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002960 ]
2961 if sys.platform == "win32":
2962 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002963 (self.bytes_filenames, os.rename, b"dst"),
2964 (self.bytes_filenames, os.replace, b"dst"),
2965 (self.unicode_filenames, os.rename, "dst"),
2966 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002967 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002968 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002969 else:
2970 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002971 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002972 (self.filenames, os.rename, "dst"),
2973 (self.filenames, os.replace, "dst"),
2974 ))
2975 if hasattr(os, "chown"):
2976 funcs.append((self.filenames, os.chown, 0, 0))
2977 if hasattr(os, "lchown"):
2978 funcs.append((self.filenames, os.lchown, 0, 0))
2979 if hasattr(os, "truncate"):
2980 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002981 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002982 funcs.append((self.filenames, os.chflags, 0))
2983 if hasattr(os, "lchflags"):
2984 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002985 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002986 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002987 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002988 if sys.platform == "win32":
2989 funcs.append((self.bytes_filenames, os.link, b"dst"))
2990 funcs.append((self.unicode_filenames, os.link, "dst"))
2991 else:
2992 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002993 if hasattr(os, "listxattr"):
2994 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002995 (self.filenames, os.listxattr,),
2996 (self.filenames, os.getxattr, "user.test"),
2997 (self.filenames, os.setxattr, "user.test", b'user'),
2998 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002999 ))
3000 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003001 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01003002 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01003003 if sys.platform == "win32":
3004 funcs.append((self.unicode_filenames, os.readlink,))
3005 else:
3006 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01003007
Steve Dowercc16be82016-09-08 10:35:16 -07003008
Victor Stinnerafe17062012-10-31 22:47:43 +01003009 for filenames, func, *func_args in funcs:
3010 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01003011 try:
Steve Dowercc16be82016-09-08 10:35:16 -07003012 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003013 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003014 else:
3015 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3016 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003017 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003018 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003019 except UnicodeDecodeError:
3020 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003021 else:
3022 self.fail("No exception thrown by {}".format(func))
3023
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003024class CPUCountTests(unittest.TestCase):
3025 def test_cpu_count(self):
3026 cpus = os.cpu_count()
3027 if cpus is not None:
3028 self.assertIsInstance(cpus, int)
3029 self.assertGreater(cpus, 0)
3030 else:
3031 self.skipTest("Could not determine the number of CPUs")
3032
Victor Stinnerdaf45552013-08-28 00:53:59 +02003033
3034class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003035 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003036 fd = os.open(__file__, os.O_RDONLY)
3037 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003038 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003039
Victor Stinnerdaf45552013-08-28 00:53:59 +02003040 os.set_inheritable(fd, True)
3041 self.assertEqual(os.get_inheritable(fd), True)
3042
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003043 @unittest.skipIf(fcntl is None, "need fcntl")
3044 def test_get_inheritable_cloexec(self):
3045 fd = os.open(__file__, os.O_RDONLY)
3046 self.addCleanup(os.close, fd)
3047 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003048
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003049 # clear FD_CLOEXEC flag
3050 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3051 flags &= ~fcntl.FD_CLOEXEC
3052 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003053
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003054 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003055
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003056 @unittest.skipIf(fcntl is None, "need fcntl")
3057 def test_set_inheritable_cloexec(self):
3058 fd = os.open(__file__, os.O_RDONLY)
3059 self.addCleanup(os.close, fd)
3060 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3061 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003062
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003063 os.set_inheritable(fd, True)
3064 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3065 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003066
Victor Stinnerdaf45552013-08-28 00:53:59 +02003067 def test_open(self):
3068 fd = os.open(__file__, os.O_RDONLY)
3069 self.addCleanup(os.close, fd)
3070 self.assertEqual(os.get_inheritable(fd), False)
3071
3072 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3073 def test_pipe(self):
3074 rfd, wfd = os.pipe()
3075 self.addCleanup(os.close, rfd)
3076 self.addCleanup(os.close, wfd)
3077 self.assertEqual(os.get_inheritable(rfd), False)
3078 self.assertEqual(os.get_inheritable(wfd), False)
3079
3080 def test_dup(self):
3081 fd1 = os.open(__file__, os.O_RDONLY)
3082 self.addCleanup(os.close, fd1)
3083
3084 fd2 = os.dup(fd1)
3085 self.addCleanup(os.close, fd2)
3086 self.assertEqual(os.get_inheritable(fd2), False)
3087
3088 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3089 def test_dup2(self):
3090 fd = os.open(__file__, os.O_RDONLY)
3091 self.addCleanup(os.close, fd)
3092
3093 # inheritable by default
3094 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003095 self.addCleanup(os.close, fd2)
3096 self.assertEqual(os.dup2(fd, fd2), fd2)
3097 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003098
3099 # force non-inheritable
3100 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003101 self.addCleanup(os.close, fd3)
3102 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3103 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003104
3105 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3106 def test_openpty(self):
3107 master_fd, slave_fd = os.openpty()
3108 self.addCleanup(os.close, master_fd)
3109 self.addCleanup(os.close, slave_fd)
3110 self.assertEqual(os.get_inheritable(master_fd), False)
3111 self.assertEqual(os.get_inheritable(slave_fd), False)
3112
3113
Brett Cannon3f9183b2016-08-26 14:44:48 -07003114class PathTConverterTests(unittest.TestCase):
3115 # tuples of (function name, allows fd arguments, additional arguments to
3116 # function, cleanup function)
3117 functions = [
3118 ('stat', True, (), None),
3119 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003120 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003121 ('chflags', False, (0,), None),
3122 ('lchflags', False, (0,), None),
3123 ('open', False, (0,), getattr(os, 'close', None)),
3124 ]
3125
3126 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003127 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003128 if os.name == 'nt':
3129 bytes_fspath = bytes_filename = None
3130 else:
3131 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003132 bytes_fspath = _PathLike(bytes_filename)
3133 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003134 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003135 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003136
Brett Cannonec6ce872016-09-06 15:50:29 -07003137 int_fspath = _PathLike(fd)
3138 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003139
3140 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3141 with self.subTest(name=name):
3142 try:
3143 fn = getattr(os, name)
3144 except AttributeError:
3145 continue
3146
Brett Cannon8f96a302016-08-26 19:30:11 -07003147 for path in (str_filename, bytes_filename, str_fspath,
3148 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003149 if path is None:
3150 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003151 with self.subTest(name=name, path=path):
3152 result = fn(path, *extra_args)
3153 if cleanup_fn is not None:
3154 cleanup_fn(result)
3155
3156 with self.assertRaisesRegex(
3157 TypeError, 'should be string, bytes'):
3158 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003159
3160 if allow_fd:
3161 result = fn(fd, *extra_args) # should not fail
3162 if cleanup_fn is not None:
3163 cleanup_fn(result)
3164 else:
3165 with self.assertRaisesRegex(
3166 TypeError,
3167 'os.PathLike'):
3168 fn(fd, *extra_args)
3169
3170
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003171@unittest.skipUnless(hasattr(os, 'get_blocking'),
3172 'needs os.get_blocking() and os.set_blocking()')
3173class BlockingTests(unittest.TestCase):
3174 def test_blocking(self):
3175 fd = os.open(__file__, os.O_RDONLY)
3176 self.addCleanup(os.close, fd)
3177 self.assertEqual(os.get_blocking(fd), True)
3178
3179 os.set_blocking(fd, False)
3180 self.assertEqual(os.get_blocking(fd), False)
3181
3182 os.set_blocking(fd, True)
3183 self.assertEqual(os.get_blocking(fd), True)
3184
3185
Yury Selivanov97e2e062014-09-26 12:33:06 -04003186
3187class ExportsTests(unittest.TestCase):
3188 def test_os_all(self):
3189 self.assertIn('open', os.__all__)
3190 self.assertIn('walk', os.__all__)
3191
3192
Victor Stinner6036e442015-03-08 01:58:04 +01003193class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003194 check_no_resource_warning = support.check_no_resource_warning
3195
Victor Stinner6036e442015-03-08 01:58:04 +01003196 def setUp(self):
3197 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003198 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003199 self.addCleanup(support.rmtree, self.path)
3200 os.mkdir(self.path)
3201
3202 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003203 path = self.bytes_path if isinstance(name, bytes) else self.path
3204 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003205 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003206 return filename
3207
3208 def get_entries(self, names):
3209 entries = dict((entry.name, entry)
3210 for entry in os.scandir(self.path))
3211 self.assertEqual(sorted(entries.keys()), names)
3212 return entries
3213
3214 def assert_stat_equal(self, stat1, stat2, skip_fields):
3215 if skip_fields:
3216 for attr in dir(stat1):
3217 if not attr.startswith("st_"):
3218 continue
3219 if attr in ("st_dev", "st_ino", "st_nlink"):
3220 continue
3221 self.assertEqual(getattr(stat1, attr),
3222 getattr(stat2, attr),
3223 (stat1, stat2, attr))
3224 else:
3225 self.assertEqual(stat1, stat2)
3226
3227 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003228 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003229 self.assertEqual(entry.name, name)
3230 self.assertEqual(entry.path, os.path.join(self.path, name))
3231 self.assertEqual(entry.inode(),
3232 os.stat(entry.path, follow_symlinks=False).st_ino)
3233
3234 entry_stat = os.stat(entry.path)
3235 self.assertEqual(entry.is_dir(),
3236 stat.S_ISDIR(entry_stat.st_mode))
3237 self.assertEqual(entry.is_file(),
3238 stat.S_ISREG(entry_stat.st_mode))
3239 self.assertEqual(entry.is_symlink(),
3240 os.path.islink(entry.path))
3241
3242 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3243 self.assertEqual(entry.is_dir(follow_symlinks=False),
3244 stat.S_ISDIR(entry_lstat.st_mode))
3245 self.assertEqual(entry.is_file(follow_symlinks=False),
3246 stat.S_ISREG(entry_lstat.st_mode))
3247
3248 self.assert_stat_equal(entry.stat(),
3249 entry_stat,
3250 os.name == 'nt' and not is_symlink)
3251 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3252 entry_lstat,
3253 os.name == 'nt')
3254
3255 def test_attributes(self):
3256 link = hasattr(os, 'link')
3257 symlink = support.can_symlink()
3258
3259 dirname = os.path.join(self.path, "dir")
3260 os.mkdir(dirname)
3261 filename = self.create_file("file.txt")
3262 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003263 try:
3264 os.link(filename, os.path.join(self.path, "link_file.txt"))
3265 except PermissionError as e:
3266 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003267 if symlink:
3268 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3269 target_is_directory=True)
3270 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3271
3272 names = ['dir', 'file.txt']
3273 if link:
3274 names.append('link_file.txt')
3275 if symlink:
3276 names.extend(('symlink_dir', 'symlink_file.txt'))
3277 entries = self.get_entries(names)
3278
3279 entry = entries['dir']
3280 self.check_entry(entry, 'dir', True, False, False)
3281
3282 entry = entries['file.txt']
3283 self.check_entry(entry, 'file.txt', False, True, False)
3284
3285 if link:
3286 entry = entries['link_file.txt']
3287 self.check_entry(entry, 'link_file.txt', False, True, False)
3288
3289 if symlink:
3290 entry = entries['symlink_dir']
3291 self.check_entry(entry, 'symlink_dir', True, False, True)
3292
3293 entry = entries['symlink_file.txt']
3294 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3295
3296 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003297 path = self.bytes_path if isinstance(name, bytes) else self.path
3298 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003299 self.assertEqual(len(entries), 1)
3300
3301 entry = entries[0]
3302 self.assertEqual(entry.name, name)
3303 return entry
3304
Brett Cannon96881cd2016-06-10 14:37:21 -07003305 def create_file_entry(self, name='file.txt'):
3306 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003307 return self.get_entry(os.path.basename(filename))
3308
3309 def test_current_directory(self):
3310 filename = self.create_file()
3311 old_dir = os.getcwd()
3312 try:
3313 os.chdir(self.path)
3314
3315 # call scandir() without parameter: it must list the content
3316 # of the current directory
3317 entries = dict((entry.name, entry) for entry in os.scandir())
3318 self.assertEqual(sorted(entries.keys()),
3319 [os.path.basename(filename)])
3320 finally:
3321 os.chdir(old_dir)
3322
3323 def test_repr(self):
3324 entry = self.create_file_entry()
3325 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3326
Brett Cannon96881cd2016-06-10 14:37:21 -07003327 def test_fspath_protocol(self):
3328 entry = self.create_file_entry()
3329 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3330
3331 def test_fspath_protocol_bytes(self):
3332 bytes_filename = os.fsencode('bytesfile.txt')
3333 bytes_entry = self.create_file_entry(name=bytes_filename)
3334 fspath = os.fspath(bytes_entry)
3335 self.assertIsInstance(fspath, bytes)
3336 self.assertEqual(fspath,
3337 os.path.join(os.fsencode(self.path),bytes_filename))
3338
Victor Stinner6036e442015-03-08 01:58:04 +01003339 def test_removed_dir(self):
3340 path = os.path.join(self.path, 'dir')
3341
3342 os.mkdir(path)
3343 entry = self.get_entry('dir')
3344 os.rmdir(path)
3345
3346 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3347 if os.name == 'nt':
3348 self.assertTrue(entry.is_dir())
3349 self.assertFalse(entry.is_file())
3350 self.assertFalse(entry.is_symlink())
3351 if os.name == 'nt':
3352 self.assertRaises(FileNotFoundError, entry.inode)
3353 # don't fail
3354 entry.stat()
3355 entry.stat(follow_symlinks=False)
3356 else:
3357 self.assertGreater(entry.inode(), 0)
3358 self.assertRaises(FileNotFoundError, entry.stat)
3359 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3360
3361 def test_removed_file(self):
3362 entry = self.create_file_entry()
3363 os.unlink(entry.path)
3364
3365 self.assertFalse(entry.is_dir())
3366 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3367 if os.name == 'nt':
3368 self.assertTrue(entry.is_file())
3369 self.assertFalse(entry.is_symlink())
3370 if os.name == 'nt':
3371 self.assertRaises(FileNotFoundError, entry.inode)
3372 # don't fail
3373 entry.stat()
3374 entry.stat(follow_symlinks=False)
3375 else:
3376 self.assertGreater(entry.inode(), 0)
3377 self.assertRaises(FileNotFoundError, entry.stat)
3378 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3379
3380 def test_broken_symlink(self):
3381 if not support.can_symlink():
3382 return self.skipTest('cannot create symbolic link')
3383
3384 filename = self.create_file("file.txt")
3385 os.symlink(filename,
3386 os.path.join(self.path, "symlink.txt"))
3387 entries = self.get_entries(['file.txt', 'symlink.txt'])
3388 entry = entries['symlink.txt']
3389 os.unlink(filename)
3390
3391 self.assertGreater(entry.inode(), 0)
3392 self.assertFalse(entry.is_dir())
3393 self.assertFalse(entry.is_file()) # broken symlink returns False
3394 self.assertFalse(entry.is_dir(follow_symlinks=False))
3395 self.assertFalse(entry.is_file(follow_symlinks=False))
3396 self.assertTrue(entry.is_symlink())
3397 self.assertRaises(FileNotFoundError, entry.stat)
3398 # don't fail
3399 entry.stat(follow_symlinks=False)
3400
3401 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003402 self.create_file("file.txt")
3403
3404 path_bytes = os.fsencode(self.path)
3405 entries = list(os.scandir(path_bytes))
3406 self.assertEqual(len(entries), 1, entries)
3407 entry = entries[0]
3408
3409 self.assertEqual(entry.name, b'file.txt')
3410 self.assertEqual(entry.path,
3411 os.fsencode(os.path.join(self.path, 'file.txt')))
3412
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003413 def test_bytes_like(self):
3414 self.create_file("file.txt")
3415
3416 for cls in bytearray, memoryview:
3417 path_bytes = cls(os.fsencode(self.path))
3418 with self.assertWarns(DeprecationWarning):
3419 entries = list(os.scandir(path_bytes))
3420 self.assertEqual(len(entries), 1, entries)
3421 entry = entries[0]
3422
3423 self.assertEqual(entry.name, b'file.txt')
3424 self.assertEqual(entry.path,
3425 os.fsencode(os.path.join(self.path, 'file.txt')))
3426 self.assertIs(type(entry.name), bytes)
3427 self.assertIs(type(entry.path), bytes)
3428
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003429 @unittest.skipUnless(os.listdir in os.supports_fd,
3430 'fd support for listdir required for this test.')
3431 def test_fd(self):
3432 self.assertIn(os.scandir, os.supports_fd)
3433 self.create_file('file.txt')
3434 expected_names = ['file.txt']
3435 if support.can_symlink():
3436 os.symlink('file.txt', os.path.join(self.path, 'link'))
3437 expected_names.append('link')
3438
3439 fd = os.open(self.path, os.O_RDONLY)
3440 try:
3441 with os.scandir(fd) as it:
3442 entries = list(it)
3443 names = [entry.name for entry in entries]
3444 self.assertEqual(sorted(names), expected_names)
3445 self.assertEqual(names, os.listdir(fd))
3446 for entry in entries:
3447 self.assertEqual(entry.path, entry.name)
3448 self.assertEqual(os.fspath(entry), entry.name)
3449 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3450 if os.stat in os.supports_dir_fd:
3451 st = os.stat(entry.name, dir_fd=fd)
3452 self.assertEqual(entry.stat(), st)
3453 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3454 self.assertEqual(entry.stat(follow_symlinks=False), st)
3455 finally:
3456 os.close(fd)
3457
Victor Stinner6036e442015-03-08 01:58:04 +01003458 def test_empty_path(self):
3459 self.assertRaises(FileNotFoundError, os.scandir, '')
3460
3461 def test_consume_iterator_twice(self):
3462 self.create_file("file.txt")
3463 iterator = os.scandir(self.path)
3464
3465 entries = list(iterator)
3466 self.assertEqual(len(entries), 1, entries)
3467
3468 # check than consuming the iterator twice doesn't raise exception
3469 entries2 = list(iterator)
3470 self.assertEqual(len(entries2), 0, entries2)
3471
3472 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003473 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003474 self.assertRaises(TypeError, os.scandir, obj)
3475
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003476 def test_close(self):
3477 self.create_file("file.txt")
3478 self.create_file("file2.txt")
3479 iterator = os.scandir(self.path)
3480 next(iterator)
3481 iterator.close()
3482 # multiple closes
3483 iterator.close()
3484 with self.check_no_resource_warning():
3485 del iterator
3486
3487 def test_context_manager(self):
3488 self.create_file("file.txt")
3489 self.create_file("file2.txt")
3490 with os.scandir(self.path) as iterator:
3491 next(iterator)
3492 with self.check_no_resource_warning():
3493 del iterator
3494
3495 def test_context_manager_close(self):
3496 self.create_file("file.txt")
3497 self.create_file("file2.txt")
3498 with os.scandir(self.path) as iterator:
3499 next(iterator)
3500 iterator.close()
3501
3502 def test_context_manager_exception(self):
3503 self.create_file("file.txt")
3504 self.create_file("file2.txt")
3505 with self.assertRaises(ZeroDivisionError):
3506 with os.scandir(self.path) as iterator:
3507 next(iterator)
3508 1/0
3509 with self.check_no_resource_warning():
3510 del iterator
3511
3512 def test_resource_warning(self):
3513 self.create_file("file.txt")
3514 self.create_file("file2.txt")
3515 iterator = os.scandir(self.path)
3516 next(iterator)
3517 with self.assertWarns(ResourceWarning):
3518 del iterator
3519 support.gc_collect()
3520 # exhausted iterator
3521 iterator = os.scandir(self.path)
3522 list(iterator)
3523 with self.check_no_resource_warning():
3524 del iterator
3525
Victor Stinner6036e442015-03-08 01:58:04 +01003526
Ethan Furmancdc08792016-06-02 15:06:09 -07003527class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003528
3529 # Abstracted so it can be overridden to test pure Python implementation
3530 # if a C version is provided.
3531 fspath = staticmethod(os.fspath)
3532
Ethan Furmancdc08792016-06-02 15:06:09 -07003533 def test_return_bytes(self):
3534 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003535 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003536
3537 def test_return_string(self):
3538 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003539 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003540
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003541 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003542 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003543 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003544
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003545 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003546 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3547 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3548
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003549 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003550 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3551 self.assertTrue(issubclass(_PathLike, os.PathLike))
3552 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003553
Ethan Furmancdc08792016-06-02 15:06:09 -07003554 def test_garbage_in_exception_out(self):
3555 vapor = type('blah', (), {})
3556 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003557 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003558
3559 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003560 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003561
Brett Cannon044283a2016-07-15 10:41:49 -07003562 def test_bad_pathlike(self):
3563 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003564 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003565 # __fspath__ attribute that is not callable.
3566 c = type('foo', (), {})
3567 c.__fspath__ = 1
3568 self.assertRaises(TypeError, self.fspath, c())
3569 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003570 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003571 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003572
Victor Stinnerc29b5852017-11-02 07:28:27 -07003573
3574class TimesTests(unittest.TestCase):
3575 def test_times(self):
3576 times = os.times()
3577 self.assertIsInstance(times, os.times_result)
3578
3579 for field in ('user', 'system', 'children_user', 'children_system',
3580 'elapsed'):
3581 value = getattr(times, field)
3582 self.assertIsInstance(value, float)
3583
3584 if os.name == 'nt':
3585 self.assertEqual(times.children_user, 0)
3586 self.assertEqual(times.children_system, 0)
3587 self.assertEqual(times.elapsed, 0)
3588
3589
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003590# Only test if the C version is provided, otherwise TestPEP519 already tested
3591# the pure Python implementation.
3592if hasattr(os, "_fspath"):
3593 class TestPEP519PurePython(TestPEP519):
3594
3595 """Explicitly test the pure Python implementation of os.fspath()."""
3596
3597 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003598
3599
Fred Drake2e2be372001-09-20 21:33:42 +00003600if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003601 unittest.main()