blob: 83e214d91eacec66d70aa9d09eb9861aabb98fca [file] [log] [blame]
Fred Drake38c2ef02001-07-17 20:52:51 +00001# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
Walter Dörwaldf0dfc7a2003-10-20 14:01:56 +00003# portable than they had been thought to be.
Fred Drake38c2ef02001-07-17 20:52:51 +00004
Victor Stinner47aacc82015-06-12 17:26:23 +02005import asynchat
6import asyncore
7import codecs
Victor Stinnerc2d095f2010-05-17 00:14:53 +00008import contextlib
Victor Stinner47aacc82015-06-12 17:26:23 +02009import decimal
10import errno
11import fractions
12import getpass
13import itertools
14import locale
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +000015import mmap
Victor Stinner47aacc82015-06-12 17:26:23 +020016import os
17import pickle
Victor Stinner47aacc82015-06-12 17:26:23 +020018import shutil
19import signal
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +000020import socket
Charles-François Natali7372b062012-02-05 15:15:38 +010021import stat
Victor Stinner47aacc82015-06-12 17:26:23 +020022import subprocess
23import sys
Victor Stinner4d6a3d62014-12-21 01:16:38 +010024import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020025import threading
Victor Stinner47aacc82015-06-12 17:26:23 +020026import time
27import unittest
28import uuid
29import warnings
30from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020031
Antoine Pitrouec34ab52013-08-16 20:44:38 +020032try:
33 import resource
34except ImportError:
35 resource = None
Victor Stinner7ba6b0f2013-09-08 11:47:54 +020036try:
37 import fcntl
38except ImportError:
39 fcntl = None
Tim Golden0321cf22014-05-05 19:46:17 +010040try:
41 import _winapi
42except ImportError:
43 _winapi = None
Victor Stinnerb28ed922014-07-11 17:04:41 +020044try:
R David Murrayf2ad1732014-12-25 18:36:56 -050045 import grp
46 groups = [g.gr_gid for g in grp.getgrall() if getpass.getuser() in g.gr_mem]
47 if hasattr(os, 'getgid'):
48 process_gid = os.getgid()
49 if process_gid not in groups:
50 groups.append(process_gid)
51except ImportError:
52 groups = []
53try:
54 import pwd
55 all_users = [u.pw_uid for u in pwd.getpwall()]
Xavier de Gaye21060102016-11-16 08:05:27 +010056except (ImportError, AttributeError):
R David Murrayf2ad1732014-12-25 18:36:56 -050057 all_users = []
58try:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020059 from _testcapi import INT_MAX, PY_SSIZE_T_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +020060except ImportError:
Victor Stinner5c6e6fc2014-07-12 11:03:53 +020061 INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
Antoine Pitrouec34ab52013-08-16 20:44:38 +020062
Berker Peksagce643912015-05-06 06:33:17 +030063from test.support.script_helper import assert_python_ok
Xavier de Gayed1415312016-07-22 12:15:29 +020064from test.support import unix_shell
Fred Drake38c2ef02001-07-17 20:52:51 +000065
Victor Stinner923590e2016-03-24 09:11:48 +010066
R David Murrayf2ad1732014-12-25 18:36:56 -050067root_in_posix = False
68if hasattr(os, 'geteuid'):
69 root_in_posix = (os.geteuid() == 0)
70
Mark Dickinson7cf03892010-04-16 13:45:35 +000071# Detect whether we're on a Linux system that uses the (now outdated
72# and unmaintained) linuxthreads threading library. There's an issue
73# when combining linuxthreads with a failed execv call: see
74# http://bugs.python.org/issue4970.
Victor Stinnerd5c355c2011-04-30 14:53:09 +020075if hasattr(sys, 'thread_info') and sys.thread_info.version:
76 USING_LINUXTHREADS = sys.thread_info.version.startswith("linuxthreads")
77else:
78 USING_LINUXTHREADS = False
Brian Curtineb24d742010-04-12 17:16:38 +000079
Stefan Krahebee49a2013-01-17 15:31:00 +010080# Issue #14110: Some tests fail on FreeBSD if the user is in the wheel group.
81HAVE_WHEEL_GROUP = sys.platform.startswith('freebsd') and os.getgid() == 0
82
Victor Stinner923590e2016-03-24 09:11:48 +010083
Berker Peksag4af23d72016-09-15 20:32:44 +030084def requires_os_func(name):
85 return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
86
87
Brett Cannonec6ce872016-09-06 15:50:29 -070088class _PathLike(os.PathLike):
89
90 def __init__(self, path=""):
91 self.path = path
92
93 def __str__(self):
94 return str(self.path)
95
96 def __fspath__(self):
97 if isinstance(self.path, BaseException):
98 raise self.path
99 else:
100 return self.path
101
102
Victor Stinnerae39d232016-03-24 17:12:55 +0100103def create_file(filename, content=b'content'):
104 with open(filename, "xb", 0) as fp:
105 fp.write(content)
106
107
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000108# Tests creating TESTFN
109class FileTests(unittest.TestCase):
110 def setUp(self):
Martin Panterbf19d162015-09-09 01:01:13 +0000111 if os.path.lexists(support.TESTFN):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000112 os.unlink(support.TESTFN)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000113 tearDown = setUp
114
115 def test_access(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000116 f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117 os.close(f)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000118 self.assertTrue(os.access(support.TESTFN, os.W_OK))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000119
Christian Heimesfdab48e2008-01-20 09:06:41 +0000120 def test_closerange(self):
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000121 first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
122 # We must allocate two consecutive file descriptors, otherwise
123 # it will mess up other file descriptors (perhaps even the three
124 # standard ones).
125 second = os.dup(first)
126 try:
127 retries = 0
128 while second != first + 1:
129 os.close(first)
130 retries += 1
131 if retries > 10:
132 # XXX test skipped
Benjamin Petersonfa0d7032009-06-01 22:42:33 +0000133 self.skipTest("couldn't allocate two consecutive fds")
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000134 first, second = second, os.dup(second)
135 finally:
136 os.close(second)
Christian Heimesfdab48e2008-01-20 09:06:41 +0000137 # close a fd that is open, and one that isn't
Antoine Pitroub9ee06c2008-08-16 22:03:17 +0000138 os.closerange(first, first + 2)
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000139 self.assertRaises(OSError, os.write, first, b"a")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000140
Benjamin Peterson1cc6df92010-06-30 17:39:45 +0000141 @support.cpython_only
Hirokazu Yamamoto4c19e6e2008-09-08 23:41:21 +0000142 def test_rename(self):
143 path = support.TESTFN
144 old = sys.getrefcount(path)
145 self.assertRaises(TypeError, os.rename, path, 0)
146 new = sys.getrefcount(path)
147 self.assertEqual(old, new)
148
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000149 def test_read(self):
150 with open(support.TESTFN, "w+b") as fobj:
151 fobj.write(b"spam")
152 fobj.flush()
153 fd = fobj.fileno()
154 os.lseek(fd, 0, 0)
155 s = os.read(fd, 4)
156 self.assertEqual(type(s), bytes)
157 self.assertEqual(s, b"spam")
158
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200159 @support.cpython_only
Victor Stinner5c6e6fc2014-07-12 11:03:53 +0200160 # Skip the test on 32-bit platforms: the number of bytes must fit in a
161 # Py_ssize_t type
162 @unittest.skipUnless(INT_MAX < PY_SSIZE_T_MAX,
163 "needs INT_MAX < PY_SSIZE_T_MAX")
Victor Stinner6e1ccfe2014-07-11 17:35:06 +0200164 @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
165 def test_large_read(self, size):
Victor Stinnerb28ed922014-07-11 17:04:41 +0200166 self.addCleanup(support.unlink, support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +0100167 create_file(support.TESTFN, b'test')
Victor Stinnerb28ed922014-07-11 17:04:41 +0200168
169 # Issue #21932: Make sure that os.read() does not raise an
170 # OverflowError for size larger than INT_MAX
Victor Stinnerb28ed922014-07-11 17:04:41 +0200171 with open(support.TESTFN, "rb") as fp:
172 data = os.read(fp.fileno(), size)
173
Victor Stinner8c663fd2017-11-08 14:44:44 -0800174 # The test does not try to read more than 2 GiB at once because the
Victor Stinnerb28ed922014-07-11 17:04:41 +0200175 # operating system is free to return less bytes than requested.
176 self.assertEqual(data, b'test')
177
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000178 def test_write(self):
179 # os.write() accepts bytes- and buffer-like objects but not strings
180 fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
181 self.assertRaises(TypeError, os.write, fd, "beans")
182 os.write(fd, b"bacon\n")
183 os.write(fd, bytearray(b"eggs\n"))
184 os.write(fd, memoryview(b"spam\n"))
185 os.close(fd)
186 with open(support.TESTFN, "rb") as fobj:
Antoine Pitroud62269f2008-09-15 23:54:52 +0000187 self.assertEqual(fobj.read().splitlines(),
188 [b"bacon", b"eggs", b"spam"])
Antoine Pitrou9cadb1b2008-09-15 23:02:56 +0000189
Victor Stinnere0daff12011-03-20 23:36:35 +0100190 def write_windows_console(self, *args):
191 retcode = subprocess.call(args,
192 # use a new console to not flood the test output
193 creationflags=subprocess.CREATE_NEW_CONSOLE,
194 # use a shell to hide the console window (SW_HIDE)
195 shell=True)
196 self.assertEqual(retcode, 0)
197
198 @unittest.skipUnless(sys.platform == 'win32',
199 'test specific to the Windows console')
200 def test_write_windows_console(self):
201 # Issue #11395: the Windows console returns an error (12: not enough
202 # space error) on writing into stdout if stdout mode is binary and the
203 # length is greater than 66,000 bytes (or less, depending on heap
204 # usage).
205 code = "print('x' * 100000)"
206 self.write_windows_console(sys.executable, "-c", code)
207 self.write_windows_console(sys.executable, "-u", "-c", code)
208
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000209 def fdopen_helper(self, *args):
210 fd = os.open(support.TESTFN, os.O_RDONLY)
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200211 f = os.fdopen(fd, *args)
212 f.close()
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000213
214 def test_fdopen(self):
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200215 fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
216 os.close(fd)
217
Amaury Forgeot d'Arce2e36ba2008-08-01 00:14:22 +0000218 self.fdopen_helper()
219 self.fdopen_helper('r')
220 self.fdopen_helper('r', 100)
221
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100222 def test_replace(self):
223 TESTFN2 = support.TESTFN + ".2"
Victor Stinnerae39d232016-03-24 17:12:55 +0100224 self.addCleanup(support.unlink, support.TESTFN)
225 self.addCleanup(support.unlink, TESTFN2)
226
227 create_file(support.TESTFN, b"1")
228 create_file(TESTFN2, b"2")
229
Antoine Pitrouf3b2d882012-01-30 22:08:52 +0100230 os.replace(support.TESTFN, TESTFN2)
231 self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
232 with open(TESTFN2, 'r') as f:
233 self.assertEqual(f.read(), "1")
234
Martin Panterbf19d162015-09-09 01:01:13 +0000235 def test_open_keywords(self):
236 f = os.open(path=__file__, flags=os.O_RDONLY, mode=0o777,
237 dir_fd=None)
238 os.close(f)
239
240 def test_symlink_keywords(self):
241 symlink = support.get_attribute(os, "symlink")
242 try:
243 symlink(src='target', dst=support.TESTFN,
244 target_is_directory=False, dir_fd=None)
245 except (NotImplementedError, OSError):
246 pass # No OS support or unprivileged user
247
Victor Stinnerbef7fdf2011-07-01 13:45:30 +0200248
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000249# Test attributes on return values from os.*stat* family.
250class StatAttributeTests(unittest.TestCase):
251 def setUp(self):
Victor Stinner47aacc82015-06-12 17:26:23 +0200252 self.fname = support.TESTFN
253 self.addCleanup(support.unlink, self.fname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100254 create_file(self.fname, b"ABC")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000255
Serhiy Storchaka43767632013-11-03 21:31:38 +0200256 @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
Antoine Pitrou38425292010-09-21 18:19:07 +0000257 def check_stat_attributes(self, fname):
Antoine Pitrou38425292010-09-21 18:19:07 +0000258 result = os.stat(fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000259
260 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(result[stat.ST_SIZE], 3)
262 self.assertEqual(result.st_size, 3)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000263
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000264 # Make sure all the attributes are there
265 members = dir(result)
266 for name in dir(stat):
267 if name[:3] == 'ST_':
268 attr = name.lower()
Martin v. Löwis4d394df2005-01-23 09:19:22 +0000269 if name.endswith("TIME"):
270 def trunc(x): return int(x)
271 else:
272 def trunc(x): return x
Ezio Melottib3aedd42010-11-20 19:04:17 +0000273 self.assertEqual(trunc(getattr(result, attr)),
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000274 result[getattr(stat, name)])
Benjamin Peterson577473f2010-01-19 00:09:57 +0000275 self.assertIn(attr, members)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000276
Larry Hastings6fe20b32012-04-19 15:07:49 -0700277 # Make sure that the st_?time and st_?time_ns fields roughly agree
Larry Hastings76ad59b2012-05-03 00:30:07 -0700278 # (they should always agree up to around tens-of-microseconds)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700279 for name in 'st_atime st_mtime st_ctime'.split():
280 floaty = int(getattr(result, name) * 100000)
281 nanosecondy = getattr(result, name + "_ns") // 10000
Larry Hastings76ad59b2012-05-03 00:30:07 -0700282 self.assertAlmostEqual(floaty, nanosecondy, delta=2)
Larry Hastings6fe20b32012-04-19 15:07:49 -0700283
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000284 try:
285 result[200]
Andrew Svetlov737fb892012-12-18 21:14:22 +0200286 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000287 except IndexError:
288 pass
289
290 # Make sure that assignment fails
291 try:
292 result.st_mode = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200293 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000294 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000295 pass
296
297 try:
298 result.st_rdev = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200299 self.fail("No exception raised")
Guido van Rossum1fff8782001-10-18 21:19:31 +0000300 except (AttributeError, TypeError):
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000301 pass
302
303 try:
304 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200305 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000306 except AttributeError:
307 pass
308
309 # Use the stat_result constructor with a too-short tuple.
310 try:
311 result2 = os.stat_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200312 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000313 except TypeError:
314 pass
315
Ezio Melotti42da6632011-03-15 05:18:48 +0200316 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000317 try:
318 result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
319 except TypeError:
320 pass
321
Antoine Pitrou38425292010-09-21 18:19:07 +0000322 def test_stat_attributes(self):
323 self.check_stat_attributes(self.fname)
324
325 def test_stat_attributes_bytes(self):
326 try:
327 fname = self.fname.encode(sys.getfilesystemencoding())
328 except UnicodeEncodeError:
329 self.skipTest("cannot encode %a for the filesystem" % self.fname)
Steve Dowercc16be82016-09-08 10:35:16 -0700330 self.check_stat_attributes(fname)
Tim Peterse0c446b2001-10-18 21:57:37 +0000331
Christian Heimes25827622013-10-12 01:27:08 +0200332 def test_stat_result_pickle(self):
333 result = os.stat(self.fname)
Serhiy Storchakabad12572014-12-15 14:03:42 +0200334 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
335 p = pickle.dumps(result, proto)
336 self.assertIn(b'stat_result', p)
337 if proto < 4:
338 self.assertIn(b'cos\nstat_result\n', p)
339 unpickled = pickle.loads(p)
340 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200341
Serhiy Storchaka43767632013-11-03 21:31:38 +0200342 @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000343 def test_statvfs_attributes(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700344 result = os.statvfs(self.fname)
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000345
346 # Make sure direct access works
Ezio Melottib3aedd42010-11-20 19:04:17 +0000347 self.assertEqual(result.f_bfree, result[3])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000348
Brett Cannoncfaf10c2008-05-16 00:45:35 +0000349 # Make sure all the attributes are there.
350 members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
351 'ffree', 'favail', 'flag', 'namemax')
352 for value, member in enumerate(members):
Ezio Melottib3aedd42010-11-20 19:04:17 +0000353 self.assertEqual(getattr(result, 'f_' + member), result[value])
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000354
Giuseppe Scrivano96a5e502017-12-14 23:46:46 +0100355 self.assertTrue(isinstance(result.f_fsid, int))
356
357 # Test that the size of the tuple doesn't change
358 self.assertEqual(len(result), 10)
359
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000360 # Make sure that assignment really fails
361 try:
362 result.f_bfree = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200363 self.fail("No exception raised")
Collin Winter42dae6a2007-03-28 21:44:53 +0000364 except AttributeError:
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000365 pass
366
367 try:
368 result.parrot = 1
Andrew Svetlov737fb892012-12-18 21:14:22 +0200369 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000370 except AttributeError:
371 pass
372
373 # Use the constructor with a too-short tuple.
374 try:
375 result2 = os.statvfs_result((10,))
Andrew Svetlov737fb892012-12-18 21:14:22 +0200376 self.fail("No exception raised")
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000377 except TypeError:
378 pass
379
Ezio Melotti42da6632011-03-15 05:18:48 +0200380 # Use the constructor with a too-long tuple.
Guido van Rossum98bf58f2001-10-18 20:34:25 +0000381 try:
382 result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
383 except TypeError:
384 pass
Fred Drake38c2ef02001-07-17 20:52:51 +0000385
Christian Heimes25827622013-10-12 01:27:08 +0200386 @unittest.skipUnless(hasattr(os, 'statvfs'),
387 "need os.statvfs()")
388 def test_statvfs_result_pickle(self):
Benjamin Peterson4eaf7f92017-10-25 23:55:14 -0700389 result = os.statvfs(self.fname)
Victor Stinner370cb252013-10-12 01:33:54 +0200390
Serhiy Storchakabad12572014-12-15 14:03:42 +0200391 for proto in range(pickle.HIGHEST_PROTOCOL + 1):
392 p = pickle.dumps(result, proto)
393 self.assertIn(b'statvfs_result', p)
394 if proto < 4:
395 self.assertIn(b'cos\nstatvfs_result\n', p)
396 unpickled = pickle.loads(p)
397 self.assertEqual(result, unpickled)
Christian Heimes25827622013-10-12 01:27:08 +0200398
Serhiy Storchaka43767632013-11-03 21:31:38 +0200399 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
400 def test_1686475(self):
401 # Verify that an open file can be stat'ed
402 try:
403 os.stat(r"c:\pagefile.sys")
404 except FileNotFoundError:
Zachary Ware101d9e72013-12-08 00:44:27 -0600405 self.skipTest(r'c:\pagefile.sys does not exist')
Serhiy Storchaka43767632013-11-03 21:31:38 +0200406 except OSError as e:
407 self.fail("Could not stat pagefile.sys")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000408
Serhiy Storchaka43767632013-11-03 21:31:38 +0200409 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
410 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
411 def test_15261(self):
412 # Verify that stat'ing a closed fd does not cause crash
413 r, w = os.pipe()
414 try:
415 os.stat(r) # should not raise error
416 finally:
417 os.close(r)
418 os.close(w)
419 with self.assertRaises(OSError) as ctx:
420 os.stat(r)
421 self.assertEqual(ctx.exception.errno, errno.EBADF)
Richard Oudkerk2240ac12012-07-06 12:05:32 +0100422
Zachary Ware63f277b2014-06-19 09:46:37 -0500423 def check_file_attributes(self, result):
424 self.assertTrue(hasattr(result, 'st_file_attributes'))
425 self.assertTrue(isinstance(result.st_file_attributes, int))
426 self.assertTrue(0 <= result.st_file_attributes <= 0xFFFFFFFF)
427
428 @unittest.skipUnless(sys.platform == "win32",
429 "st_file_attributes is Win32 specific")
430 def test_file_attributes(self):
431 # test file st_file_attributes (FILE_ATTRIBUTE_DIRECTORY not set)
432 result = os.stat(self.fname)
433 self.check_file_attributes(result)
434 self.assertEqual(
435 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
436 0)
437
438 # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set)
Victor Stinner47aacc82015-06-12 17:26:23 +0200439 dirname = support.TESTFN + "dir"
440 os.mkdir(dirname)
441 self.addCleanup(os.rmdir, dirname)
442
443 result = os.stat(dirname)
Zachary Ware63f277b2014-06-19 09:46:37 -0500444 self.check_file_attributes(result)
445 self.assertEqual(
446 result.st_file_attributes & stat.FILE_ATTRIBUTE_DIRECTORY,
447 stat.FILE_ATTRIBUTE_DIRECTORY)
448
Berker Peksag0b4dc482016-09-17 15:49:59 +0300449 @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
450 def test_access_denied(self):
451 # Default to FindFirstFile WIN32_FIND_DATA when access is
452 # denied. See issue 28075.
453 # os.environ['TEMP'] should be located on a volume that
454 # supports file ACLs.
455 fname = os.path.join(os.environ['TEMP'], self.fname)
456 self.addCleanup(support.unlink, fname)
457 create_file(fname, b'ABC')
458 # Deny the right to [S]YNCHRONIZE on the file to
459 # force CreateFile to fail with ERROR_ACCESS_DENIED.
460 DETACHED_PROCESS = 8
461 subprocess.check_call(
Denis Osipov897bba72017-06-07 22:15:26 +0500462 # bpo-30584: Use security identifier *S-1-5-32-545 instead
463 # of localized "Users" to not depend on the locale.
464 ['icacls.exe', fname, '/deny', '*S-1-5-32-545:(S)'],
Berker Peksag0b4dc482016-09-17 15:49:59 +0300465 creationflags=DETACHED_PROCESS
466 )
467 result = os.stat(fname)
468 self.assertNotEqual(result.st_size, 0)
469
Victor Stinner47aacc82015-06-12 17:26:23 +0200470
471class UtimeTests(unittest.TestCase):
472 def setUp(self):
473 self.dirname = support.TESTFN
474 self.fname = os.path.join(self.dirname, "f1")
475
476 self.addCleanup(support.rmtree, self.dirname)
477 os.mkdir(self.dirname)
Victor Stinnerae39d232016-03-24 17:12:55 +0100478 create_file(self.fname)
Victor Stinner47aacc82015-06-12 17:26:23 +0200479
Victor Stinner47aacc82015-06-12 17:26:23 +0200480 def support_subsecond(self, filename):
481 # Heuristic to check if the filesystem supports timestamp with
482 # subsecond resolution: check if float and int timestamps are different
483 st = os.stat(filename)
484 return ((st.st_atime != st[7])
485 or (st.st_mtime != st[8])
486 or (st.st_ctime != st[9]))
487
488 def _test_utime(self, set_time, filename=None):
489 if not filename:
490 filename = self.fname
491
492 support_subsecond = self.support_subsecond(filename)
493 if support_subsecond:
494 # Timestamp with a resolution of 1 microsecond (10^-6).
495 #
496 # The resolution of the C internal function used by os.utime()
497 # depends on the platform: 1 sec, 1 us, 1 ns. Writing a portable
498 # test with a resolution of 1 ns requires more work:
499 # see the issue #15745.
500 atime_ns = 1002003000 # 1.002003 seconds
501 mtime_ns = 4005006000 # 4.005006 seconds
502 else:
503 # use a resolution of 1 second
504 atime_ns = 5 * 10**9
505 mtime_ns = 8 * 10**9
506
507 set_time(filename, (atime_ns, mtime_ns))
508 st = os.stat(filename)
509
510 if support_subsecond:
511 self.assertAlmostEqual(st.st_atime, atime_ns * 1e-9, delta=1e-6)
512 self.assertAlmostEqual(st.st_mtime, mtime_ns * 1e-9, delta=1e-6)
513 else:
514 self.assertEqual(st.st_atime, atime_ns * 1e-9)
515 self.assertEqual(st.st_mtime, mtime_ns * 1e-9)
516 self.assertEqual(st.st_atime_ns, atime_ns)
517 self.assertEqual(st.st_mtime_ns, mtime_ns)
518
519 def test_utime(self):
520 def set_time(filename, ns):
521 # test the ns keyword parameter
522 os.utime(filename, ns=ns)
523 self._test_utime(set_time)
524
525 @staticmethod
526 def ns_to_sec(ns):
527 # Convert a number of nanosecond (int) to a number of seconds (float).
528 # Round towards infinity by adding 0.5 nanosecond to avoid rounding
529 # issue, os.utime() rounds towards minus infinity.
530 return (ns * 1e-9) + 0.5e-9
531
532 def test_utime_by_indexed(self):
533 # pass times as floating point seconds as the second indexed parameter
534 def set_time(filename, ns):
535 atime_ns, mtime_ns = ns
536 atime = self.ns_to_sec(atime_ns)
537 mtime = self.ns_to_sec(mtime_ns)
538 # test utimensat(timespec), utimes(timeval), utime(utimbuf)
539 # or utime(time_t)
540 os.utime(filename, (atime, mtime))
541 self._test_utime(set_time)
542
543 def test_utime_by_times(self):
544 def set_time(filename, ns):
545 atime_ns, mtime_ns = ns
546 atime = self.ns_to_sec(atime_ns)
547 mtime = self.ns_to_sec(mtime_ns)
548 # test the times keyword parameter
549 os.utime(filename, times=(atime, mtime))
550 self._test_utime(set_time)
551
552 @unittest.skipUnless(os.utime in os.supports_follow_symlinks,
553 "follow_symlinks support for utime required "
554 "for this test.")
555 def test_utime_nofollow_symlinks(self):
556 def set_time(filename, ns):
557 # use follow_symlinks=False to test utimensat(timespec)
558 # or lutimes(timeval)
559 os.utime(filename, ns=ns, follow_symlinks=False)
560 self._test_utime(set_time)
561
562 @unittest.skipUnless(os.utime in os.supports_fd,
563 "fd support for utime required for this test.")
564 def test_utime_fd(self):
565 def set_time(filename, ns):
Victor Stinnerae39d232016-03-24 17:12:55 +0100566 with open(filename, 'wb', 0) as fp:
Victor Stinner47aacc82015-06-12 17:26:23 +0200567 # use a file descriptor to test futimens(timespec)
568 # or futimes(timeval)
569 os.utime(fp.fileno(), ns=ns)
570 self._test_utime(set_time)
571
572 @unittest.skipUnless(os.utime in os.supports_dir_fd,
573 "dir_fd support for utime required for this test.")
574 def test_utime_dir_fd(self):
575 def set_time(filename, ns):
576 dirname, name = os.path.split(filename)
577 dirfd = os.open(dirname, os.O_RDONLY)
578 try:
579 # pass dir_fd to test utimensat(timespec) or futimesat(timeval)
580 os.utime(name, dir_fd=dirfd, ns=ns)
581 finally:
582 os.close(dirfd)
583 self._test_utime(set_time)
584
585 def test_utime_directory(self):
586 def set_time(filename, ns):
587 # test calling os.utime() on a directory
588 os.utime(filename, ns=ns)
589 self._test_utime(set_time, filename=self.dirname)
590
591 def _test_utime_current(self, set_time):
592 # Get the system clock
593 current = time.time()
594
595 # Call os.utime() to set the timestamp to the current system clock
596 set_time(self.fname)
597
598 if not self.support_subsecond(self.fname):
599 delta = 1.0
Victor Stinnera8e7d902017-09-18 08:49:45 -0700600 else:
Victor Stinnerc94caca2017-06-13 23:48:27 +0200601 # On Windows, the usual resolution of time.time() is 15.6 ms.
602 # bpo-30649: Tolerate 50 ms for slow Windows buildbots.
Victor Stinnera8e7d902017-09-18 08:49:45 -0700603 #
604 # x86 Gentoo Refleaks 3.x once failed with dt=20.2 ms. So use
605 # also 50 ms on other platforms.
Victor Stinnerc94caca2017-06-13 23:48:27 +0200606 delta = 0.050
Victor Stinner47aacc82015-06-12 17:26:23 +0200607 st = os.stat(self.fname)
608 msg = ("st_time=%r, current=%r, dt=%r"
609 % (st.st_mtime, current, st.st_mtime - current))
610 self.assertAlmostEqual(st.st_mtime, current,
611 delta=delta, msg=msg)
612
613 def test_utime_current(self):
614 def set_time(filename):
615 # Set to the current time in the new way
616 os.utime(self.fname)
617 self._test_utime_current(set_time)
618
619 def test_utime_current_old(self):
620 def set_time(filename):
621 # Set to the current time in the old explicit way.
622 os.utime(self.fname, None)
623 self._test_utime_current(set_time)
624
625 def get_file_system(self, path):
626 if sys.platform == 'win32':
627 root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
628 import ctypes
629 kernel32 = ctypes.windll.kernel32
630 buf = ctypes.create_unicode_buffer("", 100)
631 ok = kernel32.GetVolumeInformationW(root, None, 0,
632 None, None, None,
633 buf, len(buf))
634 if ok:
635 return buf.value
636 # return None if the filesystem is unknown
637
638 def test_large_time(self):
639 # Many filesystems are limited to the year 2038. At least, the test
640 # pass with NTFS filesystem.
641 if self.get_file_system(self.dirname) != "NTFS":
642 self.skipTest("requires NTFS")
643
644 large = 5000000000 # some day in 2128
645 os.utime(self.fname, (large, large))
646 self.assertEqual(os.stat(self.fname).st_mtime, large)
647
648 def test_utime_invalid_arguments(self):
649 # seconds and nanoseconds parameters are mutually exclusive
650 with self.assertRaises(ValueError):
651 os.utime(self.fname, (5, 5), ns=(5, 5))
652
653
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000654from test import mapping_tests
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000655
Walter Dörwald0a6d0ff2004-05-31 16:29:04 +0000656class EnvironTests(mapping_tests.BasicTestMappingProtocol):
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000657 """check that os.environ object conform to mapping protocol"""
Walter Dörwald118f9312004-06-02 18:42:25 +0000658 type2test = None
Christian Heimes90333392007-11-01 19:08:42 +0000659
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000660 def setUp(self):
661 self.__save = dict(os.environ)
Victor Stinnerb745a742010-05-18 17:17:23 +0000662 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000663 self.__saveb = dict(os.environb)
Christian Heimes90333392007-11-01 19:08:42 +0000664 for key, value in self._reference().items():
665 os.environ[key] = value
666
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000667 def tearDown(self):
668 os.environ.clear()
669 os.environ.update(self.__save)
Victor Stinnerb745a742010-05-18 17:17:23 +0000670 if os.supports_bytes_environ:
Victor Stinner208d28c2010-05-07 00:54:14 +0000671 os.environb.clear()
672 os.environb.update(self.__saveb)
Raymond Hettinger2c2d3222003-03-09 07:05:43 +0000673
Christian Heimes90333392007-11-01 19:08:42 +0000674 def _reference(self):
675 return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
676
677 def _empty_mapping(self):
678 os.environ.clear()
679 return os.environ
680
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000681 # Bug 1110478
Xavier de Gayed1415312016-07-22 12:15:29 +0200682 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
683 'requires a shell')
Martin v. Löwis5510f652005-02-17 21:23:20 +0000684 def test_update2(self):
Christian Heimes90333392007-11-01 19:08:42 +0000685 os.environ.clear()
Ezio Melottic7e139b2012-09-26 20:01:34 +0300686 os.environ.update(HELLO="World")
Xavier de Gayed1415312016-07-22 12:15:29 +0200687 with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300688 value = popen.read().strip()
689 self.assertEqual(value, "World")
Martin v. Löwis1d11de62005-01-29 13:29:23 +0000690
Xavier de Gayed1415312016-07-22 12:15:29 +0200691 @unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
692 'requires a shell')
Christian Heimes1a13d592007-11-08 14:16:55 +0000693 def test_os_popen_iter(self):
Xavier de Gayed1415312016-07-22 12:15:29 +0200694 with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
695 % unix_shell) as popen:
Ezio Melottic7e139b2012-09-26 20:01:34 +0300696 it = iter(popen)
697 self.assertEqual(next(it), "line1\n")
698 self.assertEqual(next(it), "line2\n")
699 self.assertEqual(next(it), "line3\n")
700 self.assertRaises(StopIteration, next, it)
Christian Heimes1a13d592007-11-08 14:16:55 +0000701
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000702 # Verify environ keys and values from the OS are of the
703 # correct str type.
704 def test_keyvalue_types(self):
705 for key, val in os.environ.items():
Ezio Melottib3aedd42010-11-20 19:04:17 +0000706 self.assertEqual(type(key), str)
707 self.assertEqual(type(val), str)
Guido van Rossum67aca9e2007-06-13 21:51:27 +0000708
Christian Heimes90333392007-11-01 19:08:42 +0000709 def test_items(self):
710 for key, value in self._reference().items():
711 self.assertEqual(os.environ.get(key), value)
712
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000713 # Issue 7310
714 def test___repr__(self):
715 """Check that the repr() of os.environ looks like environ({...})."""
716 env = os.environ
Victor Stinner96f0de92010-07-29 00:29:00 +0000717 self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join(
718 '{!r}: {!r}'.format(key, value)
719 for key, value in env.items())))
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000720
Gregory P. Smithb6e8c7e2010-02-27 07:22:22 +0000721 def test_get_exec_path(self):
722 defpath_list = os.defpath.split(os.pathsep)
723 test_path = ['/monty', '/python', '', '/flying/circus']
724 test_env = {'PATH': os.pathsep.join(test_path)}
725
726 saved_environ = os.environ
727 try:
728 os.environ = dict(test_env)
729 # Test that defaulting to os.environ works.
730 self.assertSequenceEqual(test_path, os.get_exec_path())
731 self.assertSequenceEqual(test_path, os.get_exec_path(env=None))
732 finally:
733 os.environ = saved_environ
734
735 # No PATH environment variable
736 self.assertSequenceEqual(defpath_list, os.get_exec_path({}))
737 # Empty PATH environment variable
738 self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''}))
739 # Supplied PATH environment variable
740 self.assertSequenceEqual(test_path, os.get_exec_path(test_env))
741
Victor Stinnerb745a742010-05-18 17:17:23 +0000742 if os.supports_bytes_environ:
743 # env cannot contain 'PATH' and b'PATH' keys
Victor Stinner38430e22010-08-19 17:10:18 +0000744 try:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000745 # ignore BytesWarning warning
746 with warnings.catch_warnings(record=True):
747 mixed_env = {'PATH': '1', b'PATH': b'2'}
Victor Stinner38430e22010-08-19 17:10:18 +0000748 except BytesWarning:
Victor Stinner6f35eda2010-10-29 00:38:58 +0000749 # mixed_env cannot be created with python -bb
Victor Stinner38430e22010-08-19 17:10:18 +0000750 pass
751 else:
752 self.assertRaises(ValueError, os.get_exec_path, mixed_env)
Victor Stinnerb745a742010-05-18 17:17:23 +0000753
754 # bytes key and/or value
755 self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}),
756 ['abc'])
757 self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}),
758 ['abc'])
759 self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}),
760 ['abc'])
761
762 @unittest.skipUnless(os.supports_bytes_environ,
763 "os.environb required for this test.")
Victor Stinner84ae1182010-05-06 22:05:07 +0000764 def test_environb(self):
765 # os.environ -> os.environb
766 value = 'euro\u20ac'
767 try:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000768 value_bytes = value.encode(sys.getfilesystemencoding(),
769 'surrogateescape')
Victor Stinner84ae1182010-05-06 22:05:07 +0000770 except UnicodeEncodeError:
Benjamin Peterson180799d2010-05-06 22:25:42 +0000771 msg = "U+20AC character is not encodable to %s" % (
772 sys.getfilesystemencoding(),)
Benjamin Peterson932d3f42010-05-06 22:26:31 +0000773 self.skipTest(msg)
Victor Stinner84ae1182010-05-06 22:05:07 +0000774 os.environ['unicode'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000775 self.assertEqual(os.environ['unicode'], value)
776 self.assertEqual(os.environb[b'unicode'], value_bytes)
Victor Stinner84ae1182010-05-06 22:05:07 +0000777
778 # os.environb -> os.environ
779 value = b'\xff'
780 os.environb[b'bytes'] = value
Ezio Melottib3aedd42010-11-20 19:04:17 +0000781 self.assertEqual(os.environb[b'bytes'], value)
Victor Stinner84ae1182010-05-06 22:05:07 +0000782 value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape')
Ezio Melottib3aedd42010-11-20 19:04:17 +0000783 self.assertEqual(os.environ['bytes'], value_str)
Ezio Melotti19e4acf2010-02-22 15:59:01 +0000784
Charles-François Natali2966f102011-11-26 11:32:46 +0100785 # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
786 # #13415).
787 @support.requires_freebsd_version(7)
788 @support.requires_mac_ver(10, 6)
Victor Stinner60b385e2011-11-22 22:01:28 +0100789 def test_unset_error(self):
790 if sys.platform == "win32":
791 # an environment variable is limited to 32,767 characters
792 key = 'x' * 50000
Victor Stinnerb3f82682011-11-22 22:30:19 +0100793 self.assertRaises(ValueError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100794 else:
795 # "=" is not allowed in a variable name
796 key = 'key='
Victor Stinnerb3f82682011-11-22 22:30:19 +0100797 self.assertRaises(OSError, os.environ.__delitem__, key)
Victor Stinner60b385e2011-11-22 22:01:28 +0100798
Victor Stinner6d101392013-04-14 16:35:04 +0200799 def test_key_type(self):
800 missing = 'missingkey'
801 self.assertNotIn(missing, os.environ)
802
Victor Stinner839e5ea2013-04-14 16:43:03 +0200803 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200804 os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200805 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200806 self.assertTrue(cm.exception.__suppress_context__)
Victor Stinner6d101392013-04-14 16:35:04 +0200807
Victor Stinner839e5ea2013-04-14 16:43:03 +0200808 with self.assertRaises(KeyError) as cm:
Victor Stinner6d101392013-04-14 16:35:04 +0200809 del os.environ[missing]
Victor Stinner839e5ea2013-04-14 16:43:03 +0200810 self.assertIs(cm.exception.args[0], missing)
Victor Stinner0c2dd0c2013-08-23 19:19:15 +0200811 self.assertTrue(cm.exception.__suppress_context__)
812
Osvaldo Santana Neto8a8d2852017-07-01 14:34:45 -0300813 def _test_environ_iteration(self, collection):
814 iterator = iter(collection)
815 new_key = "__new_key__"
816
817 next(iterator) # start iteration over os.environ.items
818
819 # add a new key in os.environ mapping
820 os.environ[new_key] = "test_environ_iteration"
821
822 try:
823 next(iterator) # force iteration over modified mapping
824 self.assertEqual(os.environ[new_key], "test_environ_iteration")
825 finally:
826 del os.environ[new_key]
827
828 def test_iter_error_when_changing_os_environ(self):
829 self._test_environ_iteration(os.environ)
830
831 def test_iter_error_when_changing_os_environ_items(self):
832 self._test_environ_iteration(os.environ.items())
833
834 def test_iter_error_when_changing_os_environ_values(self):
835 self._test_environ_iteration(os.environ.values())
836
Victor Stinner6d101392013-04-14 16:35:04 +0200837
Tim Petersc4e09402003-04-25 07:11:48 +0000838class WalkTests(unittest.TestCase):
839 """Tests for os.walk()."""
840
Victor Stinner0561c532015-03-12 10:28:24 +0100841 # Wrapper to hide minor differences between os.walk and os.fwalk
842 # to tests both functions with the same code base
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200843 def walk(self, top, **kwargs):
Serhiy Storchakaa17ca192015-12-23 00:37:34 +0200844 if 'follow_symlinks' in kwargs:
845 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +0200846 return os.walk(top, **kwargs)
Victor Stinner0561c532015-03-12 10:28:24 +0100847
Charles-François Natali7372b062012-02-05 15:15:38 +0100848 def setUp(self):
Victor Stinner0561c532015-03-12 10:28:24 +0100849 join = os.path.join
Victor Stinner3899b542016-03-24 17:21:17 +0100850 self.addCleanup(support.rmtree, support.TESTFN)
Tim Petersc4e09402003-04-25 07:11:48 +0000851
852 # Build:
Guido van Rossumd8faa362007-04-27 19:54:29 +0000853 # TESTFN/
854 # TEST1/ a file kid and two directory kids
Tim Petersc4e09402003-04-25 07:11:48 +0000855 # tmp1
856 # SUB1/ a file kid and a directory kid
Guido van Rossumd8faa362007-04-27 19:54:29 +0000857 # tmp2
858 # SUB11/ no kids
859 # SUB2/ a file kid and a dirsymlink kid
860 # tmp3
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300861 # SUB21/ not readable
862 # tmp5
Guido van Rossumd8faa362007-04-27 19:54:29 +0000863 # link/ a symlink to TESTFN.2
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200864 # broken_link
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300865 # broken_link2
866 # broken_link3
Guido van Rossumd8faa362007-04-27 19:54:29 +0000867 # TEST2/
868 # tmp4 a lone file
Victor Stinner0561c532015-03-12 10:28:24 +0100869 self.walk_path = join(support.TESTFN, "TEST1")
870 self.sub1_path = join(self.walk_path, "SUB1")
871 self.sub11_path = join(self.sub1_path, "SUB11")
872 sub2_path = join(self.walk_path, "SUB2")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300873 sub21_path = join(sub2_path, "SUB21")
Victor Stinner0561c532015-03-12 10:28:24 +0100874 tmp1_path = join(self.walk_path, "tmp1")
875 tmp2_path = join(self.sub1_path, "tmp2")
Tim Petersc4e09402003-04-25 07:11:48 +0000876 tmp3_path = join(sub2_path, "tmp3")
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300877 tmp5_path = join(sub21_path, "tmp3")
Victor Stinner0561c532015-03-12 10:28:24 +0100878 self.link_path = join(sub2_path, "link")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000879 t2_path = join(support.TESTFN, "TEST2")
880 tmp4_path = join(support.TESTFN, "TEST2", "tmp4")
Hynek Schlawack66bfcc12012-05-15 16:32:21 +0200881 broken_link_path = join(sub2_path, "broken_link")
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300882 broken_link2_path = join(sub2_path, "broken_link2")
883 broken_link3_path = join(sub2_path, "broken_link3")
Tim Petersc4e09402003-04-25 07:11:48 +0000884
885 # Create stuff.
Victor Stinner0561c532015-03-12 10:28:24 +0100886 os.makedirs(self.sub11_path)
Tim Petersc4e09402003-04-25 07:11:48 +0000887 os.makedirs(sub2_path)
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300888 os.makedirs(sub21_path)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000889 os.makedirs(t2_path)
Victor Stinner0561c532015-03-12 10:28:24 +0100890
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300891 for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
Victor Stinnere77c9742016-03-25 10:28:23 +0100892 with open(path, "x") as f:
893 f.write("I'm " + path + " and proud of it. Blame test_os.\n")
Tim Petersc4e09402003-04-25 07:11:48 +0000894
Victor Stinner0561c532015-03-12 10:28:24 +0100895 if support.can_symlink():
896 os.symlink(os.path.abspath(t2_path), self.link_path)
897 os.symlink('broken', broken_link_path, True)
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300898 os.symlink(join('tmp3', 'broken'), broken_link2_path, True)
899 os.symlink(join('SUB21', 'tmp5'), broken_link3_path, True)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300900 self.sub2_tree = (sub2_path, ["SUB21", "link"],
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300901 ["broken_link", "broken_link2", "broken_link3",
902 "tmp3"])
Victor Stinner0561c532015-03-12 10:28:24 +0100903 else:
904 self.sub2_tree = (sub2_path, [], ["tmp3"])
905
Serhiy Storchakaaf4e4742016-10-25 14:34:38 +0300906 os.chmod(sub21_path, 0)
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300907 try:
908 os.listdir(sub21_path)
909 except PermissionError:
910 self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
911 else:
912 os.chmod(sub21_path, stat.S_IRWXU)
913 os.unlink(tmp5_path)
914 os.rmdir(sub21_path)
915 del self.sub2_tree[1][:1]
Serhiy Storchaka42babab2016-10-25 14:28:38 +0300916
Victor Stinner0561c532015-03-12 10:28:24 +0100917 def test_walk_topdown(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000918 # Walk top-down.
Serhiy Storchakaa07ab292016-04-16 17:51:00 +0300919 all = list(self.walk(self.walk_path))
Victor Stinner0561c532015-03-12 10:28:24 +0100920
Tim Petersc4e09402003-04-25 07:11:48 +0000921 self.assertEqual(len(all), 4)
922 # We can't know which order SUB1 and SUB2 will appear in.
923 # Not flipped: TESTFN, SUB1, SUB11, SUB2
924 # flipped: TESTFN, SUB2, SUB1, SUB11
925 flipped = all[0][1][0] != "SUB1"
926 all[0][1].sort()
Hynek Schlawackc96f5a02012-05-15 17:55:38 +0200927 all[3 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300928 all[3 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100929 self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
930 self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
931 self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
932 self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000933
Brett Cannon3f9183b2016-08-26 14:44:48 -0700934 def test_walk_prune(self, walk_path=None):
935 if walk_path is None:
936 walk_path = self.walk_path
Tim Petersc4e09402003-04-25 07:11:48 +0000937 # Prune the search.
938 all = []
Brett Cannon3f9183b2016-08-26 14:44:48 -0700939 for root, dirs, files in self.walk(walk_path):
Tim Petersc4e09402003-04-25 07:11:48 +0000940 all.append((root, dirs, files))
941 # Don't descend into SUB1.
942 if 'SUB1' in dirs:
943 # Note that this also mutates the dirs we appended to all!
944 dirs.remove('SUB1')
Tim Petersc4e09402003-04-25 07:11:48 +0000945
Victor Stinner0561c532015-03-12 10:28:24 +0100946 self.assertEqual(len(all), 2)
947 self.assertEqual(all[0],
Brett Cannon3f9183b2016-08-26 14:44:48 -0700948 (str(walk_path), ["SUB2"], ["tmp1"]))
Victor Stinner0561c532015-03-12 10:28:24 +0100949
950 all[1][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300951 all[1][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100952 self.assertEqual(all[1], self.sub2_tree)
953
Brett Cannon3f9183b2016-08-26 14:44:48 -0700954 def test_file_like_path(self):
Brett Cannonec6ce872016-09-06 15:50:29 -0700955 self.test_walk_prune(_PathLike(self.walk_path))
Brett Cannon3f9183b2016-08-26 14:44:48 -0700956
Victor Stinner0561c532015-03-12 10:28:24 +0100957 def test_walk_bottom_up(self):
Tim Petersc4e09402003-04-25 07:11:48 +0000958 # Walk bottom-up.
Victor Stinner0561c532015-03-12 10:28:24 +0100959 all = list(self.walk(self.walk_path, topdown=False))
960
Victor Stinner53b0a412016-03-26 01:12:36 +0100961 self.assertEqual(len(all), 4, all)
Tim Petersc4e09402003-04-25 07:11:48 +0000962 # We can't know which order SUB1 and SUB2 will appear in.
963 # Not flipped: SUB11, SUB1, SUB2, TESTFN
964 # flipped: SUB2, SUB11, SUB1, TESTFN
965 flipped = all[3][1][0] != "SUB1"
966 all[3][1].sort()
Hynek Schlawack39bf90d2012-05-15 18:40:17 +0200967 all[2 - 2 * flipped][-1].sort()
Serhiy Storchaka28f98202016-10-25 19:01:41 +0300968 all[2 - 2 * flipped][1].sort()
Victor Stinner0561c532015-03-12 10:28:24 +0100969 self.assertEqual(all[3],
970 (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
971 self.assertEqual(all[flipped],
972 (self.sub11_path, [], []))
973 self.assertEqual(all[flipped + 1],
974 (self.sub1_path, ["SUB11"], ["tmp2"]))
975 self.assertEqual(all[2 - 2 * flipped],
976 self.sub2_tree)
Tim Petersc4e09402003-04-25 07:11:48 +0000977
Victor Stinner0561c532015-03-12 10:28:24 +0100978 def test_walk_symlink(self):
979 if not support.can_symlink():
980 self.skipTest("need symlink support")
981
982 # Walk, following symlinks.
983 walk_it = self.walk(self.walk_path, follow_symlinks=True)
984 for root, dirs, files in walk_it:
985 if root == self.link_path:
986 self.assertEqual(dirs, [])
987 self.assertEqual(files, ["tmp4"])
988 break
989 else:
990 self.fail("Didn't follow symlink with followlinks=True")
Guido van Rossumd8faa362007-04-27 19:54:29 +0000991
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +0200992 def test_walk_bad_dir(self):
993 # Walk top-down.
994 errors = []
995 walk_it = self.walk(self.walk_path, onerror=errors.append)
996 root, dirs, files = next(walk_it)
Serhiy Storchaka7865dff2016-10-28 09:17:38 +0300997 self.assertEqual(errors, [])
998 dir1 = 'SUB1'
999 path1 = os.path.join(root, dir1)
1000 path1new = os.path.join(root, dir1 + '.new')
1001 os.rename(path1, path1new)
1002 try:
1003 roots = [r for r, d, f in walk_it]
1004 self.assertTrue(errors)
1005 self.assertNotIn(path1, roots)
1006 self.assertNotIn(path1new, roots)
1007 for dir2 in dirs:
1008 if dir2 != dir1:
1009 self.assertIn(os.path.join(root, dir2), roots)
1010 finally:
1011 os.rename(path1new, path1)
Serhiy Storchaka0bddc9e2015-12-23 00:08:24 +02001012
Charles-François Natali7372b062012-02-05 15:15:38 +01001013
1014@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1015class FwalkTests(WalkTests):
1016 """Tests for os.fwalk()."""
1017
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001018 def walk(self, top, **kwargs):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001019 for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
Victor Stinner0561c532015-03-12 10:28:24 +01001020 yield (root, dirs, files)
1021
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001022 def fwalk(self, *args, **kwargs):
1023 return os.fwalk(*args, **kwargs)
1024
Larry Hastingsc48fe982012-06-25 04:49:05 -07001025 def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
1026 """
1027 compare with walk() results.
1028 """
Larry Hastingsb4038062012-07-15 10:57:38 -07001029 walk_kwargs = walk_kwargs.copy()
1030 fwalk_kwargs = fwalk_kwargs.copy()
1031 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1032 walk_kwargs.update(topdown=topdown, followlinks=follow_symlinks)
1033 fwalk_kwargs.update(topdown=topdown, follow_symlinks=follow_symlinks)
Larry Hastingsc48fe982012-06-25 04:49:05 -07001034
Charles-François Natali7372b062012-02-05 15:15:38 +01001035 expected = {}
Larry Hastingsc48fe982012-06-25 04:49:05 -07001036 for root, dirs, files in os.walk(**walk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001037 expected[root] = (set(dirs), set(files))
1038
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001039 for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
Charles-François Natali7372b062012-02-05 15:15:38 +01001040 self.assertIn(root, expected)
1041 self.assertEqual(expected[root], (set(dirs), set(files)))
1042
Larry Hastingsc48fe982012-06-25 04:49:05 -07001043 def test_compare_to_walk(self):
1044 kwargs = {'top': support.TESTFN}
1045 self._compare_to_walk(kwargs, kwargs)
1046
Charles-François Natali7372b062012-02-05 15:15:38 +01001047 def test_dir_fd(self):
Larry Hastingsc48fe982012-06-25 04:49:05 -07001048 try:
1049 fd = os.open(".", os.O_RDONLY)
1050 walk_kwargs = {'top': support.TESTFN}
1051 fwalk_kwargs = walk_kwargs.copy()
1052 fwalk_kwargs['dir_fd'] = fd
1053 self._compare_to_walk(walk_kwargs, fwalk_kwargs)
1054 finally:
1055 os.close(fd)
1056
1057 def test_yields_correct_dir_fd(self):
Charles-François Natali7372b062012-02-05 15:15:38 +01001058 # check returned file descriptors
Larry Hastingsb4038062012-07-15 10:57:38 -07001059 for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
1060 args = support.TESTFN, topdown, None
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001061 for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
Charles-François Natali7372b062012-02-05 15:15:38 +01001062 # check that the FD is valid
1063 os.fstat(rootfd)
Larry Hastings9cf065c2012-06-22 16:30:09 -07001064 # redundant check
1065 os.stat(rootfd)
1066 # check that listdir() returns consistent information
1067 self.assertEqual(set(os.listdir(rootfd)), set(dirs) | set(files))
Charles-François Natali7372b062012-02-05 15:15:38 +01001068
1069 def test_fd_leak(self):
1070 # Since we're opening a lot of FDs, we must be careful to avoid leaks:
1071 # we both check that calling fwalk() a large number of times doesn't
1072 # yield EMFILE, and that the minimum allocated FD hasn't changed.
1073 minfd = os.dup(1)
1074 os.close(minfd)
1075 for i in range(256):
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001076 for x in self.fwalk(support.TESTFN):
Charles-François Natali7372b062012-02-05 15:15:38 +01001077 pass
1078 newfd = os.dup(1)
1079 self.addCleanup(os.close, newfd)
1080 self.assertEqual(newfd, minfd)
1081
Serhiy Storchaka5f6a0b42016-02-08 16:23:28 +02001082class BytesWalkTests(WalkTests):
1083 """Tests for os.walk() with bytes."""
1084 def walk(self, top, **kwargs):
1085 if 'follow_symlinks' in kwargs:
1086 kwargs['followlinks'] = kwargs.pop('follow_symlinks')
1087 for broot, bdirs, bfiles in os.walk(os.fsencode(top), **kwargs):
1088 root = os.fsdecode(broot)
1089 dirs = list(map(os.fsdecode, bdirs))
1090 files = list(map(os.fsdecode, bfiles))
1091 yield (root, dirs, files)
1092 bdirs[:] = list(map(os.fsencode, dirs))
1093 bfiles[:] = list(map(os.fsencode, files))
1094
Serhiy Storchaka8f6b3442017-03-07 14:33:21 +02001095@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
1096class BytesFwalkTests(FwalkTests):
1097 """Tests for os.walk() with bytes."""
1098 def fwalk(self, top='.', *args, **kwargs):
1099 for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
1100 root = os.fsdecode(broot)
1101 dirs = list(map(os.fsdecode, bdirs))
1102 files = list(map(os.fsdecode, bfiles))
1103 yield (root, dirs, files, topfd)
1104 bdirs[:] = list(map(os.fsencode, dirs))
1105 bfiles[:] = list(map(os.fsencode, files))
1106
Charles-François Natali7372b062012-02-05 15:15:38 +01001107
Guido van Rossume7ba4952007-06-06 23:52:48 +00001108class MakedirTests(unittest.TestCase):
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001109 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001110 os.mkdir(support.TESTFN)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001111
1112 def test_makedir(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001113 base = support.TESTFN
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001114 path = os.path.join(base, 'dir1', 'dir2', 'dir3')
1115 os.makedirs(path) # Should work
1116 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
1117 os.makedirs(path)
1118
1119 # Try paths with a '.' in them
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001120 self.assertRaises(OSError, os.makedirs, os.curdir)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001121 path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
1122 os.makedirs(path)
1123 path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
1124 'dir5', 'dir6')
1125 os.makedirs(path)
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001126
Serhiy Storchakae304e332017-03-24 13:27:42 +02001127 def test_mode(self):
1128 with support.temp_umask(0o002):
1129 base = support.TESTFN
1130 parent = os.path.join(base, 'dir1')
1131 path = os.path.join(parent, 'dir2')
1132 os.makedirs(path, 0o555)
1133 self.assertTrue(os.path.exists(path))
1134 self.assertTrue(os.path.isdir(path))
1135 if os.name != 'nt':
1136 self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), 0o555)
1137 self.assertEqual(stat.S_IMODE(os.stat(parent).st_mode), 0o775)
1138
Terry Reedy5a22b652010-12-02 07:05:56 +00001139 def test_exist_ok_existing_directory(self):
1140 path = os.path.join(support.TESTFN, 'dir1')
1141 mode = 0o777
1142 old_mask = os.umask(0o022)
1143 os.makedirs(path, mode)
1144 self.assertRaises(OSError, os.makedirs, path, mode)
1145 self.assertRaises(OSError, os.makedirs, path, mode, exist_ok=False)
Benjamin Peterson4717e212014-04-01 19:17:57 -04001146 os.makedirs(path, 0o776, exist_ok=True)
Terry Reedy5a22b652010-12-02 07:05:56 +00001147 os.makedirs(path, mode=mode, exist_ok=True)
1148 os.umask(old_mask)
1149
Martin Pantera82642f2015-11-19 04:48:44 +00001150 # Issue #25583: A drive root could raise PermissionError on Windows
1151 os.makedirs(os.path.abspath('/'), exist_ok=True)
1152
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001153 def test_exist_ok_s_isgid_directory(self):
1154 path = os.path.join(support.TESTFN, 'dir1')
1155 S_ISGID = stat.S_ISGID
1156 mode = 0o777
1157 old_mask = os.umask(0o022)
1158 try:
1159 existing_testfn_mode = stat.S_IMODE(
1160 os.lstat(support.TESTFN).st_mode)
Ned Deilyc622f422012-08-08 20:57:24 -07001161 try:
1162 os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID)
Ned Deily3a2b97e2012-08-08 21:03:02 -07001163 except PermissionError:
Ned Deilyc622f422012-08-08 20:57:24 -07001164 raise unittest.SkipTest('Cannot set S_ISGID for dir.')
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001165 if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID):
1166 raise unittest.SkipTest('No support for S_ISGID dir mode.')
1167 # The os should apply S_ISGID from the parent dir for us, but
1168 # this test need not depend on that behavior. Be explicit.
1169 os.makedirs(path, mode | S_ISGID)
1170 # http://bugs.python.org/issue14992
1171 # Should not fail when the bit is already set.
1172 os.makedirs(path, mode, exist_ok=True)
1173 # remove the bit.
1174 os.chmod(path, stat.S_IMODE(os.lstat(path).st_mode) & ~S_ISGID)
Benjamin Petersonee5f1c12014-04-01 19:13:18 -04001175 # May work even when the bit is not already set when demanded.
1176 os.makedirs(path, mode | S_ISGID, exist_ok=True)
Gregory P. Smitha81c8562012-06-03 14:30:44 -07001177 finally:
1178 os.umask(old_mask)
Terry Reedy5a22b652010-12-02 07:05:56 +00001179
1180 def test_exist_ok_existing_regular_file(self):
1181 base = support.TESTFN
1182 path = os.path.join(support.TESTFN, 'dir1')
1183 f = open(path, 'w')
1184 f.write('abc')
1185 f.close()
1186 self.assertRaises(OSError, os.makedirs, path)
1187 self.assertRaises(OSError, os.makedirs, path, exist_ok=False)
1188 self.assertRaises(OSError, os.makedirs, path, exist_ok=True)
1189 os.remove(path)
1190
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001191 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001192 path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3',
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001193 'dir4', 'dir5', 'dir6')
1194 # If the tests failed, the bottom-most directory ('../dir6')
1195 # may not have been created, so we look for the outermost directory
1196 # that exists.
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001197 while not os.path.exists(path) and path != support.TESTFN:
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001198 path = os.path.dirname(path)
1199
1200 os.removedirs(path)
1201
Andrew Svetlov405faed2012-12-25 12:18:09 +02001202
R David Murrayf2ad1732014-12-25 18:36:56 -05001203@unittest.skipUnless(hasattr(os, 'chown'), "Test needs chown")
1204class ChownFileTests(unittest.TestCase):
1205
Berker Peksag036a71b2015-07-21 09:29:48 +03001206 @classmethod
1207 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001208 os.mkdir(support.TESTFN)
1209
1210 def test_chown_uid_gid_arguments_must_be_index(self):
1211 stat = os.stat(support.TESTFN)
1212 uid = stat.st_uid
1213 gid = stat.st_gid
1214 for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)):
1215 self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid)
1216 self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value)
1217 self.assertIsNone(os.chown(support.TESTFN, uid, gid))
1218 self.assertIsNone(os.chown(support.TESTFN, -1, -1))
1219
1220 @unittest.skipUnless(len(groups) > 1, "test needs more than one group")
1221 def test_chown(self):
1222 gid_1, gid_2 = groups[:2]
1223 uid = os.stat(support.TESTFN).st_uid
1224 os.chown(support.TESTFN, uid, gid_1)
1225 gid = os.stat(support.TESTFN).st_gid
1226 self.assertEqual(gid, gid_1)
1227 os.chown(support.TESTFN, uid, gid_2)
1228 gid = os.stat(support.TESTFN).st_gid
1229 self.assertEqual(gid, gid_2)
1230
1231 @unittest.skipUnless(root_in_posix and len(all_users) > 1,
1232 "test needs root privilege and more than one user")
1233 def test_chown_with_root(self):
1234 uid_1, uid_2 = all_users[:2]
1235 gid = os.stat(support.TESTFN).st_gid
1236 os.chown(support.TESTFN, uid_1, gid)
1237 uid = os.stat(support.TESTFN).st_uid
1238 self.assertEqual(uid, uid_1)
1239 os.chown(support.TESTFN, uid_2, gid)
1240 uid = os.stat(support.TESTFN).st_uid
1241 self.assertEqual(uid, uid_2)
1242
1243 @unittest.skipUnless(not root_in_posix and len(all_users) > 1,
1244 "test needs non-root account and more than one user")
1245 def test_chown_without_permission(self):
1246 uid_1, uid_2 = all_users[:2]
1247 gid = os.stat(support.TESTFN).st_gid
Serhiy Storchakaa9e00d12015-02-16 08:35:18 +02001248 with self.assertRaises(PermissionError):
R David Murrayf2ad1732014-12-25 18:36:56 -05001249 os.chown(support.TESTFN, uid_1, gid)
1250 os.chown(support.TESTFN, uid_2, gid)
1251
Berker Peksag036a71b2015-07-21 09:29:48 +03001252 @classmethod
1253 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05001254 os.rmdir(support.TESTFN)
1255
1256
Andrew Svetlov405faed2012-12-25 12:18:09 +02001257class RemoveDirsTests(unittest.TestCase):
1258 def setUp(self):
1259 os.makedirs(support.TESTFN)
1260
1261 def tearDown(self):
1262 support.rmtree(support.TESTFN)
1263
1264 def test_remove_all(self):
1265 dira = os.path.join(support.TESTFN, 'dira')
1266 os.mkdir(dira)
1267 dirb = os.path.join(dira, 'dirb')
1268 os.mkdir(dirb)
1269 os.removedirs(dirb)
1270 self.assertFalse(os.path.exists(dirb))
1271 self.assertFalse(os.path.exists(dira))
1272 self.assertFalse(os.path.exists(support.TESTFN))
1273
1274 def test_remove_partial(self):
1275 dira = os.path.join(support.TESTFN, 'dira')
1276 os.mkdir(dira)
1277 dirb = os.path.join(dira, 'dirb')
1278 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001279 create_file(os.path.join(dira, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001280 os.removedirs(dirb)
1281 self.assertFalse(os.path.exists(dirb))
1282 self.assertTrue(os.path.exists(dira))
1283 self.assertTrue(os.path.exists(support.TESTFN))
1284
1285 def test_remove_nothing(self):
1286 dira = os.path.join(support.TESTFN, 'dira')
1287 os.mkdir(dira)
1288 dirb = os.path.join(dira, 'dirb')
1289 os.mkdir(dirb)
Victor Stinnerae39d232016-03-24 17:12:55 +01001290 create_file(os.path.join(dirb, 'file.txt'))
Andrew Svetlov405faed2012-12-25 12:18:09 +02001291 with self.assertRaises(OSError):
1292 os.removedirs(dirb)
1293 self.assertTrue(os.path.exists(dirb))
1294 self.assertTrue(os.path.exists(dira))
1295 self.assertTrue(os.path.exists(support.TESTFN))
1296
1297
Guido van Rossume7ba4952007-06-06 23:52:48 +00001298class DevNullTests(unittest.TestCase):
Martin v. Löwisbdec50f2004-06-08 08:29:33 +00001299 def test_devnull(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001300 with open(os.devnull, 'wb', 0) as f:
Victor Stinnera6d2c762011-06-30 18:20:11 +02001301 f.write(b'hello')
1302 f.close()
1303 with open(os.devnull, 'rb') as f:
1304 self.assertEqual(f.read(), b'')
Andrew M. Kuchlingb386f6a2003-12-23 16:36:11 +00001305
Andrew Svetlov405faed2012-12-25 12:18:09 +02001306
Guido van Rossume7ba4952007-06-06 23:52:48 +00001307class URandomTests(unittest.TestCase):
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001308 def test_urandom_length(self):
1309 self.assertEqual(len(os.urandom(0)), 0)
1310 self.assertEqual(len(os.urandom(1)), 1)
1311 self.assertEqual(len(os.urandom(10)), 10)
1312 self.assertEqual(len(os.urandom(100)), 100)
1313 self.assertEqual(len(os.urandom(1000)), 1000)
1314
1315 def test_urandom_value(self):
1316 data1 = os.urandom(16)
Victor Stinner9b1f4742016-09-06 16:18:52 -07001317 self.assertIsInstance(data1, bytes)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001318 data2 = os.urandom(16)
1319 self.assertNotEqual(data1, data2)
1320
1321 def get_urandom_subprocess(self, count):
1322 code = '\n'.join((
1323 'import os, sys',
1324 'data = os.urandom(%s)' % count,
1325 'sys.stdout.buffer.write(data)',
1326 'sys.stdout.buffer.flush()'))
1327 out = assert_python_ok('-c', code)
1328 stdout = out[1]
Pablo Galindofb77e0d2017-12-07 06:55:44 +00001329 self.assertEqual(len(stdout), count)
Georg Brandl2daf6ae2012-02-20 19:54:16 +01001330 return stdout
1331
1332 def test_urandom_subprocess(self):
1333 data1 = self.get_urandom_subprocess(16)
1334 data2 = self.get_urandom_subprocess(16)
1335 self.assertNotEqual(data1, data2)
Martin v. Löwisdc3883f2004-08-29 15:46:35 +00001336
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001337
Victor Stinner9b1f4742016-09-06 16:18:52 -07001338@unittest.skipUnless(hasattr(os, 'getrandom'), 'need os.getrandom()')
1339class GetRandomTests(unittest.TestCase):
Victor Stinner173a1f32016-09-06 19:57:40 -07001340 @classmethod
1341 def setUpClass(cls):
1342 try:
1343 os.getrandom(1)
1344 except OSError as exc:
1345 if exc.errno == errno.ENOSYS:
1346 # Python compiled on a more recent Linux version
1347 # than the current Linux kernel
1348 raise unittest.SkipTest("getrandom() syscall fails with ENOSYS")
1349 else:
1350 raise
1351
Victor Stinner9b1f4742016-09-06 16:18:52 -07001352 def test_getrandom_type(self):
1353 data = os.getrandom(16)
1354 self.assertIsInstance(data, bytes)
1355 self.assertEqual(len(data), 16)
1356
1357 def test_getrandom0(self):
1358 empty = os.getrandom(0)
1359 self.assertEqual(empty, b'')
1360
1361 def test_getrandom_random(self):
1362 self.assertTrue(hasattr(os, 'GRND_RANDOM'))
1363
1364 # Don't test os.getrandom(1, os.GRND_RANDOM) to not consume the rare
1365 # resource /dev/random
1366
1367 def test_getrandom_nonblock(self):
1368 # The call must not fail. Check also that the flag exists
1369 try:
1370 os.getrandom(1, os.GRND_NONBLOCK)
1371 except BlockingIOError:
1372 # System urandom is not initialized yet
1373 pass
1374
1375 def test_getrandom_value(self):
1376 data1 = os.getrandom(16)
1377 data2 = os.getrandom(16)
1378 self.assertNotEqual(data1, data2)
1379
1380
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001381# os.urandom() doesn't use a file descriptor when it is implemented with the
1382# getentropy() function, the getrandom() function or the getrandom() syscall
1383OS_URANDOM_DONT_USE_FD = (
1384 sysconfig.get_config_var('HAVE_GETENTROPY') == 1
1385 or sysconfig.get_config_var('HAVE_GETRANDOM') == 1
1386 or sysconfig.get_config_var('HAVE_GETRANDOM_SYSCALL') == 1)
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001387
Victor Stinnerd8f432a2015-09-18 16:24:31 +02001388@unittest.skipIf(OS_URANDOM_DONT_USE_FD ,
1389 "os.random() does not use a file descriptor")
Victor Stinner4d6a3d62014-12-21 01:16:38 +01001390class URandomFDTests(unittest.TestCase):
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001391 @unittest.skipUnless(resource, "test requires the resource module")
1392 def test_urandom_failure(self):
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001393 # Check urandom() failing when it is not able to open /dev/random.
1394 # We spawn a new process to make the test more robust (if getrlimit()
1395 # failed to restore the file descriptor limit after this, the whole
1396 # test suite would crash; this actually happened on the OS X Tiger
1397 # buildbot).
1398 code = """if 1:
1399 import errno
1400 import os
1401 import resource
1402
1403 soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
1404 resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
1405 try:
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001406 os.urandom(16)
Antoine Pitroueba25ba2013-08-24 20:52:27 +02001407 except OSError as e:
1408 assert e.errno == errno.EMFILE, e.errno
1409 else:
1410 raise AssertionError("OSError not raised")
1411 """
1412 assert_python_ok('-c', code)
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001413
Antoine Pitroue472aea2014-04-26 14:33:03 +02001414 def test_urandom_fd_closed(self):
1415 # Issue #21207: urandom() should reopen its fd to /dev/urandom if
1416 # closed.
1417 code = """if 1:
1418 import os
1419 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001420 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001421 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001422 with test.support.SuppressCrashReport():
1423 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001424 sys.stdout.buffer.write(os.urandom(4))
1425 """
1426 rc, out, err = assert_python_ok('-Sc', code)
1427
1428 def test_urandom_fd_reopened(self):
1429 # Issue #21207: urandom() should detect its fd to /dev/urandom
1430 # changed to something else, and reopen it.
Victor Stinnerae39d232016-03-24 17:12:55 +01001431 self.addCleanup(support.unlink, support.TESTFN)
1432 create_file(support.TESTFN, b"x" * 256)
1433
Antoine Pitroue472aea2014-04-26 14:33:03 +02001434 code = """if 1:
1435 import os
1436 import sys
Steve Dowerd5a0be62015-03-07 21:25:54 -08001437 import test.support
Antoine Pitroue472aea2014-04-26 14:33:03 +02001438 os.urandom(4)
Steve Dowerd5a0be62015-03-07 21:25:54 -08001439 with test.support.SuppressCrashReport():
1440 for fd in range(3, 256):
1441 try:
1442 os.close(fd)
1443 except OSError:
1444 pass
1445 else:
1446 # Found the urandom fd (XXX hopefully)
1447 break
1448 os.closerange(3, 256)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001449 with open({TESTFN!r}, 'rb') as f:
Xavier de Gaye21060102016-11-16 08:05:27 +01001450 new_fd = f.fileno()
1451 # Issue #26935: posix allows new_fd and fd to be equal but
1452 # some libc implementations have dup2 return an error in this
1453 # case.
1454 if new_fd != fd:
1455 os.dup2(new_fd, fd)
Antoine Pitroue472aea2014-04-26 14:33:03 +02001456 sys.stdout.buffer.write(os.urandom(4))
1457 sys.stdout.buffer.write(os.urandom(4))
1458 """.format(TESTFN=support.TESTFN)
1459 rc, out, err = assert_python_ok('-Sc', code)
1460 self.assertEqual(len(out), 8)
1461 self.assertNotEqual(out[0:4], out[4:8])
1462 rc, out2, err2 = assert_python_ok('-Sc', code)
1463 self.assertEqual(len(out2), 8)
1464 self.assertNotEqual(out2, out)
1465
Antoine Pitrouec34ab52013-08-16 20:44:38 +02001466
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001467@contextlib.contextmanager
1468def _execvpe_mockup(defpath=None):
1469 """
1470 Stubs out execv and execve functions when used as context manager.
1471 Records exec calls. The mock execv and execve functions always raise an
1472 exception as they would normally never return.
1473 """
1474 # A list of tuples containing (function name, first arg, args)
1475 # of calls to execv or execve that have been made.
1476 calls = []
1477
1478 def mock_execv(name, *args):
1479 calls.append(('execv', name, args))
1480 raise RuntimeError("execv called")
1481
1482 def mock_execve(name, *args):
1483 calls.append(('execve', name, args))
1484 raise OSError(errno.ENOTDIR, "execve called")
1485
1486 try:
1487 orig_execv = os.execv
1488 orig_execve = os.execve
1489 orig_defpath = os.defpath
1490 os.execv = mock_execv
1491 os.execve = mock_execve
1492 if defpath is not None:
1493 os.defpath = defpath
1494 yield calls
1495 finally:
1496 os.execv = orig_execv
1497 os.execve = orig_execve
1498 os.defpath = orig_defpath
1499
Victor Stinner4659ccf2016-09-14 10:57:00 +02001500
Guido van Rossume7ba4952007-06-06 23:52:48 +00001501class ExecTests(unittest.TestCase):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001502 @unittest.skipIf(USING_LINUXTHREADS,
1503 "avoid triggering a linuxthreads bug: see issue #4970")
Guido van Rossume7ba4952007-06-06 23:52:48 +00001504 def test_execvpe_with_bad_program(self):
Mark Dickinson7cf03892010-04-16 13:45:35 +00001505 self.assertRaises(OSError, os.execvpe, 'no such app-',
1506 ['no such app-'], None)
Guido van Rossume7ba4952007-06-06 23:52:48 +00001507
Steve Dowerbce26262016-11-19 19:17:26 -08001508 def test_execv_with_bad_arglist(self):
1509 self.assertRaises(ValueError, os.execv, 'notepad', ())
1510 self.assertRaises(ValueError, os.execv, 'notepad', [])
1511 self.assertRaises(ValueError, os.execv, 'notepad', ('',))
1512 self.assertRaises(ValueError, os.execv, 'notepad', [''])
1513
Thomas Heller6790d602007-08-30 17:15:14 +00001514 def test_execvpe_with_bad_arglist(self):
1515 self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
Steve Dowerbce26262016-11-19 19:17:26 -08001516 self.assertRaises(ValueError, os.execvpe, 'notepad', [], {})
1517 self.assertRaises(ValueError, os.execvpe, 'notepad', [''], {})
Thomas Heller6790d602007-08-30 17:15:14 +00001518
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001519 @unittest.skipUnless(hasattr(os, '_execvpe'),
1520 "No internal os._execvpe function to test.")
Victor Stinnerb745a742010-05-18 17:17:23 +00001521 def _test_internal_execvpe(self, test_type):
1522 program_path = os.sep + 'absolutepath'
1523 if test_type is bytes:
1524 program = b'executable'
1525 fullpath = os.path.join(os.fsencode(program_path), program)
1526 native_fullpath = fullpath
1527 arguments = [b'progname', 'arg1', 'arg2']
1528 else:
1529 program = 'executable'
1530 arguments = ['progname', 'arg1', 'arg2']
1531 fullpath = os.path.join(program_path, program)
1532 if os.name != "nt":
1533 native_fullpath = os.fsencode(fullpath)
1534 else:
1535 native_fullpath = fullpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001536 env = {'spam': 'beans'}
1537
Victor Stinnerb745a742010-05-18 17:17:23 +00001538 # test os._execvpe() with an absolute path
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001539 with _execvpe_mockup() as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001540 self.assertRaises(RuntimeError,
1541 os._execvpe, fullpath, arguments)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001542 self.assertEqual(len(calls), 1)
1543 self.assertEqual(calls[0], ('execv', fullpath, (arguments,)))
1544
Victor Stinnerb745a742010-05-18 17:17:23 +00001545 # test os._execvpe() with a relative path:
1546 # os.get_exec_path() returns defpath
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001547 with _execvpe_mockup(defpath=program_path) as calls:
Victor Stinnerb745a742010-05-18 17:17:23 +00001548 self.assertRaises(OSError,
1549 os._execvpe, program, arguments, env=env)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001550 self.assertEqual(len(calls), 1)
Victor Stinnerb745a742010-05-18 17:17:23 +00001551 self.assertSequenceEqual(calls[0],
1552 ('execve', native_fullpath, (arguments, env)))
1553
1554 # test os._execvpe() with a relative path:
1555 # os.get_exec_path() reads the 'PATH' variable
1556 with _execvpe_mockup() as calls:
1557 env_path = env.copy()
Victor Stinner38430e22010-08-19 17:10:18 +00001558 if test_type is bytes:
1559 env_path[b'PATH'] = program_path
1560 else:
1561 env_path['PATH'] = program_path
Victor Stinnerb745a742010-05-18 17:17:23 +00001562 self.assertRaises(OSError,
1563 os._execvpe, program, arguments, env=env_path)
1564 self.assertEqual(len(calls), 1)
1565 self.assertSequenceEqual(calls[0],
1566 ('execve', native_fullpath, (arguments, env_path)))
1567
1568 def test_internal_execvpe_str(self):
1569 self._test_internal_execvpe(str)
1570 if os.name != "nt":
1571 self._test_internal_execvpe(bytes)
Victor Stinnerc2d095f2010-05-17 00:14:53 +00001572
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001573 def test_execve_invalid_env(self):
1574 args = [sys.executable, '-c', 'pass']
1575
Ville Skyttä49b27342017-08-03 09:00:59 +03001576 # null character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001577 newenv = os.environ.copy()
1578 newenv["FRUIT\0VEGETABLE"] = "cabbage"
1579 with self.assertRaises(ValueError):
1580 os.execve(args[0], args, newenv)
1581
Ville Skyttä49b27342017-08-03 09:00:59 +03001582 # null character in the environment variable value
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001583 newenv = os.environ.copy()
1584 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
1585 with self.assertRaises(ValueError):
1586 os.execve(args[0], args, newenv)
1587
Ville Skyttä49b27342017-08-03 09:00:59 +03001588 # equal character in the environment variable name
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03001589 newenv = os.environ.copy()
1590 newenv["FRUIT=ORANGE"] = "lemon"
1591 with self.assertRaises(ValueError):
1592 os.execve(args[0], args, newenv)
1593
Gregory P. Smith4ae37772010-05-08 18:05:46 +00001594
Serhiy Storchaka43767632013-11-03 21:31:38 +02001595@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001596class Win32ErrorTests(unittest.TestCase):
Victor Stinnere77c9742016-03-25 10:28:23 +01001597 def setUp(self):
Victor Stinner32830142016-03-25 15:12:08 +01001598 try:
1599 os.stat(support.TESTFN)
1600 except FileNotFoundError:
1601 exists = False
1602 except OSError as exc:
1603 exists = True
1604 self.fail("file %s must not exist; os.stat failed with %s"
1605 % (support.TESTFN, exc))
1606 else:
1607 self.fail("file %s must not exist" % support.TESTFN)
Victor Stinnere77c9742016-03-25 10:28:23 +01001608
Thomas Wouters477c8d52006-05-27 19:21:47 +00001609 def test_rename(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001610 self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak")
Thomas Wouters477c8d52006-05-27 19:21:47 +00001611
1612 def test_remove(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001613 self.assertRaises(OSError, os.remove, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001614
1615 def test_chdir(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001616 self.assertRaises(OSError, os.chdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001617
1618 def test_mkdir(self):
Victor Stinnerae39d232016-03-24 17:12:55 +01001619 self.addCleanup(support.unlink, support.TESTFN)
1620
Victor Stinnere77c9742016-03-25 10:28:23 +01001621 with open(support.TESTFN, "x") as f:
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001622 self.assertRaises(OSError, os.mkdir, support.TESTFN)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001623
1624 def test_utime(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001625 self.assertRaises(OSError, os.utime, support.TESTFN, None)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001626
Thomas Wouters477c8d52006-05-27 19:21:47 +00001627 def test_chmod(self):
Andrew Svetlov2606a6f2012-12-19 14:33:35 +02001628 self.assertRaises(OSError, os.chmod, support.TESTFN, 0)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001629
Victor Stinnere77c9742016-03-25 10:28:23 +01001630
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001631class TestInvalidFD(unittest.TestCase):
Benjamin Peterson05e782f2009-01-19 15:15:02 +00001632 singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat",
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001633 "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
1634 #singles.append("close")
Steve Dower39294992016-08-30 21:22:36 -07001635 #We omit close because it doesn't raise an exception on some platforms
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001636 def get_single(f):
1637 def helper(self):
Benjamin Peterson7522c742009-01-19 21:00:09 +00001638 if hasattr(os, f):
1639 self.check(getattr(os, f))
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001640 return helper
1641 for f in singles:
1642 locals()["test_"+f] = get_single(f)
1643
Benjamin Peterson7522c742009-01-19 21:00:09 +00001644 def check(self, f, *args):
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001645 try:
1646 f(support.make_bad_fd(), *args)
1647 except OSError as e:
1648 self.assertEqual(e.errno, errno.EBADF)
1649 else:
Martin Panter7462b6492015-11-02 03:37:02 +00001650 self.fail("%r didn't raise an OSError with a bad file descriptor"
Benjamin Peterson5c6d7872009-02-06 02:40:07 +00001651 % f)
Benjamin Peterson7522c742009-01-19 21:00:09 +00001652
Serhiy Storchaka43767632013-11-03 21:31:38 +02001653 @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001654 def test_isatty(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001655 self.assertEqual(os.isatty(support.make_bad_fd()), False)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001656
Serhiy Storchaka43767632013-11-03 21:31:38 +02001657 @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001658 def test_closerange(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001659 fd = support.make_bad_fd()
1660 # Make sure none of the descriptors we are about to close are
1661 # currently valid (issue 6542).
1662 for i in range(10):
1663 try: os.fstat(fd+i)
1664 except OSError:
1665 pass
1666 else:
1667 break
1668 if i < 2:
1669 raise unittest.SkipTest(
1670 "Unable to acquire a range of invalid file descriptors")
1671 self.assertEqual(os.closerange(fd, fd + i-1), None)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001672
Serhiy Storchaka43767632013-11-03 21:31:38 +02001673 @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001674 def test_dup2(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001675 self.check(os.dup2, 20)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001676
Serhiy Storchaka43767632013-11-03 21:31:38 +02001677 @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001678 def test_fchmod(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001679 self.check(os.fchmod, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001680
Serhiy Storchaka43767632013-11-03 21:31:38 +02001681 @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001682 def test_fchown(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001683 self.check(os.fchown, -1, -1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001684
Serhiy Storchaka43767632013-11-03 21:31:38 +02001685 @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001686 def test_fpathconf(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001687 self.check(os.pathconf, "PC_NAME_MAX")
1688 self.check(os.fpathconf, "PC_NAME_MAX")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001689
Serhiy Storchaka43767632013-11-03 21:31:38 +02001690 @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001691 def test_ftruncate(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001692 self.check(os.truncate, 0)
1693 self.check(os.ftruncate, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001694
Serhiy Storchaka43767632013-11-03 21:31:38 +02001695 @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001696 def test_lseek(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001697 self.check(os.lseek, 0, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001698
Serhiy Storchaka43767632013-11-03 21:31:38 +02001699 @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001700 def test_read(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001701 self.check(os.read, 1)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001702
Victor Stinner57ddf782014-01-08 15:21:28 +01001703 @unittest.skipUnless(hasattr(os, 'readv'), 'test needs os.readv()')
1704 def test_readv(self):
1705 buf = bytearray(10)
1706 self.check(os.readv, [buf])
1707
Serhiy Storchaka43767632013-11-03 21:31:38 +02001708 @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001709 def test_tcsetpgrpt(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001710 self.check(os.tcsetpgrp, 0)
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001711
Serhiy Storchaka43767632013-11-03 21:31:38 +02001712 @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001713 def test_write(self):
Serhiy Storchaka43767632013-11-03 21:31:38 +02001714 self.check(os.write, b" ")
Benjamin Petersone1cdfd72009-01-18 21:02:37 +00001715
Victor Stinner57ddf782014-01-08 15:21:28 +01001716 @unittest.skipUnless(hasattr(os, 'writev'), 'test needs os.writev()')
1717 def test_writev(self):
1718 self.check(os.writev, [b'abc'])
1719
Victor Stinner1db9e7b2014-07-29 22:32:47 +02001720 def test_inheritable(self):
1721 self.check(os.get_inheritable)
1722 self.check(os.set_inheritable, True)
1723
1724 @unittest.skipUnless(hasattr(os, 'get_blocking'),
1725 'needs os.get_blocking() and os.set_blocking()')
1726 def test_blocking(self):
1727 self.check(os.get_blocking)
1728 self.check(os.set_blocking, True)
1729
Brian Curtin1b9df392010-11-24 20:24:31 +00001730
1731class LinkTests(unittest.TestCase):
1732 def setUp(self):
1733 self.file1 = support.TESTFN
1734 self.file2 = os.path.join(support.TESTFN + "2")
1735
Brian Curtinc0abc4e2010-11-30 23:46:54 +00001736 def tearDown(self):
Brian Curtin1b9df392010-11-24 20:24:31 +00001737 for file in (self.file1, self.file2):
1738 if os.path.exists(file):
1739 os.unlink(file)
1740
Brian Curtin1b9df392010-11-24 20:24:31 +00001741 def _test_link(self, file1, file2):
Victor Stinnere77c9742016-03-25 10:28:23 +01001742 create_file(file1)
Brian Curtin1b9df392010-11-24 20:24:31 +00001743
xdegaye6a55d092017-11-12 17:57:04 +01001744 try:
1745 os.link(file1, file2)
1746 except PermissionError as e:
1747 self.skipTest('os.link(): %s' % e)
Brian Curtin1b9df392010-11-24 20:24:31 +00001748 with open(file1, "r") as f1, open(file2, "r") as f2:
1749 self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno()))
1750
1751 def test_link(self):
1752 self._test_link(self.file1, self.file2)
1753
1754 def test_link_bytes(self):
1755 self._test_link(bytes(self.file1, sys.getfilesystemencoding()),
1756 bytes(self.file2, sys.getfilesystemencoding()))
1757
Brian Curtinf498b752010-11-30 15:54:04 +00001758 def test_unicode_name(self):
Brian Curtin43f0c272010-11-30 15:40:04 +00001759 try:
Brian Curtinf498b752010-11-30 15:54:04 +00001760 os.fsencode("\xf1")
Brian Curtin43f0c272010-11-30 15:40:04 +00001761 except UnicodeError:
1762 raise unittest.SkipTest("Unable to encode for this platform.")
1763
Brian Curtinf498b752010-11-30 15:54:04 +00001764 self.file1 += "\xf1"
Brian Curtinfc889c42010-11-28 23:59:46 +00001765 self.file2 = self.file1 + "2"
1766 self._test_link(self.file1, self.file2)
1767
Serhiy Storchaka43767632013-11-03 21:31:38 +02001768@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1769class PosixUidGidTests(unittest.TestCase):
1770 @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
1771 def test_setuid(self):
1772 if os.getuid() != 0:
1773 self.assertRaises(OSError, os.setuid, 0)
1774 self.assertRaises(OverflowError, os.setuid, 1<<32)
Thomas Wouters477c8d52006-05-27 19:21:47 +00001775
Serhiy Storchaka43767632013-11-03 21:31:38 +02001776 @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
1777 def test_setgid(self):
1778 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1779 self.assertRaises(OSError, os.setgid, 0)
1780 self.assertRaises(OverflowError, os.setgid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001781
Serhiy Storchaka43767632013-11-03 21:31:38 +02001782 @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
1783 def test_seteuid(self):
1784 if os.getuid() != 0:
1785 self.assertRaises(OSError, os.seteuid, 0)
1786 self.assertRaises(OverflowError, os.seteuid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001787
Serhiy Storchaka43767632013-11-03 21:31:38 +02001788 @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
1789 def test_setegid(self):
1790 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1791 self.assertRaises(OSError, os.setegid, 0)
1792 self.assertRaises(OverflowError, os.setegid, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001793
Serhiy Storchaka43767632013-11-03 21:31:38 +02001794 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1795 def test_setreuid(self):
1796 if os.getuid() != 0:
1797 self.assertRaises(OSError, os.setreuid, 0, 0)
1798 self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
1799 self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001800
Serhiy Storchaka43767632013-11-03 21:31:38 +02001801 @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
1802 def test_setreuid_neg1(self):
1803 # Needs to accept -1. We run this in a subprocess to avoid
1804 # altering the test runner's process state (issue8045).
1805 subprocess.check_call([
1806 sys.executable, '-c',
1807 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001808
Serhiy Storchaka43767632013-11-03 21:31:38 +02001809 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1810 def test_setregid(self):
1811 if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
1812 self.assertRaises(OSError, os.setregid, 0, 0)
1813 self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
1814 self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001815
Serhiy Storchaka43767632013-11-03 21:31:38 +02001816 @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
1817 def test_setregid_neg1(self):
1818 # Needs to accept -1. We run this in a subprocess to avoid
1819 # altering the test runner's process state (issue8045).
1820 subprocess.check_call([
1821 sys.executable, '-c',
1822 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
Benjamin Petersonebe87ba2010-03-06 20:34:24 +00001823
Serhiy Storchaka43767632013-11-03 21:31:38 +02001824@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
1825class Pep383Tests(unittest.TestCase):
1826 def setUp(self):
1827 if support.TESTFN_UNENCODABLE:
1828 self.dir = support.TESTFN_UNENCODABLE
1829 elif support.TESTFN_NONASCII:
1830 self.dir = support.TESTFN_NONASCII
1831 else:
1832 self.dir = support.TESTFN
1833 self.bdir = os.fsencode(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001834
Serhiy Storchaka43767632013-11-03 21:31:38 +02001835 bytesfn = []
1836 def add_filename(fn):
Victor Stinnerd91df1a2010-08-18 10:56:19 +00001837 try:
Serhiy Storchaka43767632013-11-03 21:31:38 +02001838 fn = os.fsencode(fn)
1839 except UnicodeEncodeError:
1840 return
1841 bytesfn.append(fn)
1842 add_filename(support.TESTFN_UNICODE)
1843 if support.TESTFN_UNENCODABLE:
1844 add_filename(support.TESTFN_UNENCODABLE)
1845 if support.TESTFN_NONASCII:
1846 add_filename(support.TESTFN_NONASCII)
1847 if not bytesfn:
1848 self.skipTest("couldn't create any non-ascii filename")
Martin v. Löwis011e8422009-05-05 04:43:17 +00001849
Serhiy Storchaka43767632013-11-03 21:31:38 +02001850 self.unicodefn = set()
1851 os.mkdir(self.dir)
1852 try:
1853 for fn in bytesfn:
1854 support.create_empty_file(os.path.join(self.bdir, fn))
1855 fn = os.fsdecode(fn)
1856 if fn in self.unicodefn:
1857 raise ValueError("duplicate filename")
1858 self.unicodefn.add(fn)
1859 except:
Martin v. Löwis011e8422009-05-05 04:43:17 +00001860 shutil.rmtree(self.dir)
Serhiy Storchaka43767632013-11-03 21:31:38 +02001861 raise
Martin v. Löwis011e8422009-05-05 04:43:17 +00001862
Serhiy Storchaka43767632013-11-03 21:31:38 +02001863 def tearDown(self):
1864 shutil.rmtree(self.dir)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001865
Serhiy Storchaka43767632013-11-03 21:31:38 +02001866 def test_listdir(self):
1867 expected = self.unicodefn
1868 found = set(os.listdir(self.dir))
1869 self.assertEqual(found, expected)
1870 # test listdir without arguments
1871 current_directory = os.getcwd()
1872 try:
1873 os.chdir(os.sep)
1874 self.assertEqual(set(os.listdir()), set(os.listdir(os.sep)))
1875 finally:
1876 os.chdir(current_directory)
Martin v. Löwis011e8422009-05-05 04:43:17 +00001877
Serhiy Storchaka43767632013-11-03 21:31:38 +02001878 def test_open(self):
1879 for fn in self.unicodefn:
1880 f = open(os.path.join(self.dir, fn), 'rb')
1881 f.close()
Victor Stinnere4110dc2013-01-01 23:05:55 +01001882
Serhiy Storchaka43767632013-11-03 21:31:38 +02001883 @unittest.skipUnless(hasattr(os, 'statvfs'),
1884 "need os.statvfs()")
1885 def test_statvfs(self):
1886 # issue #9645
1887 for fn in self.unicodefn:
1888 # should not fail with file not found error
1889 fullname = os.path.join(self.dir, fn)
1890 os.statvfs(fullname)
1891
1892 def test_stat(self):
1893 for fn in self.unicodefn:
1894 os.stat(os.path.join(self.dir, fn))
Benjamin Petersonef3e4c22009-04-11 19:48:14 +00001895
Brian Curtineb24d742010-04-12 17:16:38 +00001896@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
1897class Win32KillTests(unittest.TestCase):
Brian Curtinc3acbc32010-05-28 16:08:40 +00001898 def _kill(self, sig):
1899 # Start sys.executable as a subprocess and communicate from the
1900 # subprocess to the parent that the interpreter is ready. When it
1901 # becomes ready, send *sig* via os.kill to the subprocess and check
1902 # that the return code is equal to *sig*.
1903 import ctypes
1904 from ctypes import wintypes
1905 import msvcrt
1906
1907 # Since we can't access the contents of the process' stdout until the
1908 # process has exited, use PeekNamedPipe to see what's inside stdout
1909 # without waiting. This is done so we can tell that the interpreter
1910 # is started and running at a point where it could handle a signal.
1911 PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
1912 PeekNamedPipe.restype = wintypes.BOOL
1913 PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
1914 ctypes.POINTER(ctypes.c_char), # stdout buf
1915 wintypes.DWORD, # Buffer size
1916 ctypes.POINTER(wintypes.DWORD), # bytes read
1917 ctypes.POINTER(wintypes.DWORD), # bytes avail
1918 ctypes.POINTER(wintypes.DWORD)) # bytes left
1919 msg = "running"
1920 proc = subprocess.Popen([sys.executable, "-c",
1921 "import sys;"
1922 "sys.stdout.write('{}');"
1923 "sys.stdout.flush();"
1924 "input()".format(msg)],
1925 stdout=subprocess.PIPE,
1926 stderr=subprocess.PIPE,
1927 stdin=subprocess.PIPE)
Brian Curtin43ec5772010-11-05 15:17:11 +00001928 self.addCleanup(proc.stdout.close)
1929 self.addCleanup(proc.stderr.close)
1930 self.addCleanup(proc.stdin.close)
Brian Curtinc3acbc32010-05-28 16:08:40 +00001931
1932 count, max = 0, 100
1933 while count < max and proc.poll() is None:
1934 # Create a string buffer to store the result of stdout from the pipe
1935 buf = ctypes.create_string_buffer(len(msg))
1936 # Obtain the text currently in proc.stdout
1937 # Bytes read/avail/left are left as NULL and unused
1938 rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
1939 buf, ctypes.sizeof(buf), None, None, None)
1940 self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
1941 if buf.value:
1942 self.assertEqual(msg, buf.value.decode())
1943 break
1944 time.sleep(0.1)
1945 count += 1
1946 else:
1947 self.fail("Did not receive communication from the subprocess")
1948
Brian Curtineb24d742010-04-12 17:16:38 +00001949 os.kill(proc.pid, sig)
1950 self.assertEqual(proc.wait(), sig)
1951
1952 def test_kill_sigterm(self):
1953 # SIGTERM doesn't mean anything special, but make sure it works
Brian Curtinc3acbc32010-05-28 16:08:40 +00001954 self._kill(signal.SIGTERM)
Brian Curtineb24d742010-04-12 17:16:38 +00001955
1956 def test_kill_int(self):
1957 # os.kill on Windows can take an int which gets set as the exit code
Brian Curtinc3acbc32010-05-28 16:08:40 +00001958 self._kill(100)
Brian Curtineb24d742010-04-12 17:16:38 +00001959
1960 def _kill_with_event(self, event, name):
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001961 tagname = "test_os_%s" % uuid.uuid1()
1962 m = mmap.mmap(-1, 1, tagname)
1963 m[0] = 0
Brian Curtineb24d742010-04-12 17:16:38 +00001964 # Run a script which has console control handling enabled.
1965 proc = subprocess.Popen([sys.executable,
1966 os.path.join(os.path.dirname(__file__),
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001967 "win_console_handler.py"), tagname],
Brian Curtineb24d742010-04-12 17:16:38 +00001968 creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
1969 # Let the interpreter startup before we send signals. See #3137.
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001970 count, max = 0, 100
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001971 while count < max and proc.poll() is None:
Brian Curtinf668df52010-10-15 14:21:06 +00001972 if m[0] == 1:
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001973 break
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001974 time.sleep(0.1)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001975 count += 1
1976 else:
Hirokazu Yamamoto8e9fe9f2010-12-05 02:41:46 +00001977 # Forcefully kill the process if we weren't able to signal it.
1978 os.kill(proc.pid, signal.SIGINT)
Hirokazu Yamamoto54c950f2010-10-08 08:38:15 +00001979 self.fail("Subprocess didn't finish initialization")
Brian Curtineb24d742010-04-12 17:16:38 +00001980 os.kill(proc.pid, event)
1981 # proc.send_signal(event) could also be done here.
1982 # Allow time for the signal to be passed and the process to exit.
1983 time.sleep(0.5)
1984 if not proc.poll():
1985 # Forcefully kill the process if we weren't able to signal it.
1986 os.kill(proc.pid, signal.SIGINT)
1987 self.fail("subprocess did not stop on {}".format(name))
1988
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03001989 @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
Brian Curtineb24d742010-04-12 17:16:38 +00001990 def test_CTRL_C_EVENT(self):
1991 from ctypes import wintypes
1992 import ctypes
1993
1994 # Make a NULL value by creating a pointer with no argument.
1995 NULL = ctypes.POINTER(ctypes.c_int)()
1996 SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
1997 SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
1998 wintypes.BOOL)
1999 SetConsoleCtrlHandler.restype = wintypes.BOOL
2000
2001 # Calling this with NULL and FALSE causes the calling process to
Serhiy Storchaka0424eaf2015-09-12 17:45:25 +03002002 # handle Ctrl+C, rather than ignore it. This property is inherited
Brian Curtineb24d742010-04-12 17:16:38 +00002003 # by subprocesses.
2004 SetConsoleCtrlHandler(NULL, 0)
2005
2006 self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
2007
2008 def test_CTRL_BREAK_EVENT(self):
2009 self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
2010
2011
Brian Curtind40e6f72010-07-08 21:39:08 +00002012@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Tim Golden781bbeb2013-10-25 20:24:06 +01002013class Win32ListdirTests(unittest.TestCase):
2014 """Test listdir on Windows."""
2015
2016 def setUp(self):
2017 self.created_paths = []
2018 for i in range(2):
2019 dir_name = 'SUB%d' % i
2020 dir_path = os.path.join(support.TESTFN, dir_name)
2021 file_name = 'FILE%d' % i
2022 file_path = os.path.join(support.TESTFN, file_name)
2023 os.makedirs(dir_path)
2024 with open(file_path, 'w') as f:
2025 f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
2026 self.created_paths.extend([dir_name, file_name])
2027 self.created_paths.sort()
2028
2029 def tearDown(self):
2030 shutil.rmtree(support.TESTFN)
2031
2032 def test_listdir_no_extended_path(self):
2033 """Test when the path is not an "extended" path."""
2034 # unicode
2035 self.assertEqual(
2036 sorted(os.listdir(support.TESTFN)),
2037 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002038
Tim Golden781bbeb2013-10-25 20:24:06 +01002039 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002040 self.assertEqual(
2041 sorted(os.listdir(os.fsencode(support.TESTFN))),
2042 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002043
2044 def test_listdir_extended_path(self):
2045 """Test when the path starts with '\\\\?\\'."""
Tim Golden1cc35402013-10-25 21:26:06 +01002046 # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
Tim Golden781bbeb2013-10-25 20:24:06 +01002047 # unicode
2048 path = '\\\\?\\' + os.path.abspath(support.TESTFN)
2049 self.assertEqual(
2050 sorted(os.listdir(path)),
2051 self.created_paths)
Victor Stinner923590e2016-03-24 09:11:48 +01002052
Tim Golden781bbeb2013-10-25 20:24:06 +01002053 # bytes
Steve Dowercc16be82016-09-08 10:35:16 -07002054 path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN))
2055 self.assertEqual(
2056 sorted(os.listdir(path)),
2057 [os.fsencode(path) for path in self.created_paths])
Tim Golden781bbeb2013-10-25 20:24:06 +01002058
2059
2060@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
Brian Curtin3b4499c2010-12-28 14:31:47 +00002061@support.skip_unless_symlink
Brian Curtind40e6f72010-07-08 21:39:08 +00002062class Win32SymlinkTests(unittest.TestCase):
2063 filelink = 'filelinktest'
2064 filelink_target = os.path.abspath(__file__)
2065 dirlink = 'dirlinktest'
2066 dirlink_target = os.path.dirname(filelink_target)
2067 missing_link = 'missing link'
2068
2069 def setUp(self):
2070 assert os.path.exists(self.dirlink_target)
2071 assert os.path.exists(self.filelink_target)
2072 assert not os.path.exists(self.dirlink)
2073 assert not os.path.exists(self.filelink)
2074 assert not os.path.exists(self.missing_link)
2075
2076 def tearDown(self):
2077 if os.path.exists(self.filelink):
2078 os.remove(self.filelink)
2079 if os.path.exists(self.dirlink):
2080 os.rmdir(self.dirlink)
2081 if os.path.lexists(self.missing_link):
2082 os.remove(self.missing_link)
2083
2084 def test_directory_link(self):
Jason R. Coombs3a092862013-05-27 23:21:28 -04002085 os.symlink(self.dirlink_target, self.dirlink)
Brian Curtind40e6f72010-07-08 21:39:08 +00002086 self.assertTrue(os.path.exists(self.dirlink))
2087 self.assertTrue(os.path.isdir(self.dirlink))
2088 self.assertTrue(os.path.islink(self.dirlink))
2089 self.check_stat(self.dirlink, self.dirlink_target)
2090
2091 def test_file_link(self):
2092 os.symlink(self.filelink_target, self.filelink)
2093 self.assertTrue(os.path.exists(self.filelink))
2094 self.assertTrue(os.path.isfile(self.filelink))
2095 self.assertTrue(os.path.islink(self.filelink))
2096 self.check_stat(self.filelink, self.filelink_target)
2097
2098 def _create_missing_dir_link(self):
2099 'Create a "directory" link to a non-existent target'
2100 linkname = self.missing_link
2101 if os.path.lexists(linkname):
2102 os.remove(linkname)
2103 target = r'c:\\target does not exist.29r3c740'
2104 assert not os.path.exists(target)
2105 target_is_dir = True
2106 os.symlink(target, linkname, target_is_dir)
2107
2108 def test_remove_directory_link_to_missing_target(self):
2109 self._create_missing_dir_link()
2110 # For compatibility with Unix, os.remove will check the
2111 # directory status and call RemoveDirectory if the symlink
2112 # was created with target_is_dir==True.
2113 os.remove(self.missing_link)
2114
2115 @unittest.skip("currently fails; consider for improvement")
2116 def test_isdir_on_directory_link_to_missing_target(self):
2117 self._create_missing_dir_link()
2118 # consider having isdir return true for directory links
2119 self.assertTrue(os.path.isdir(self.missing_link))
2120
2121 @unittest.skip("currently fails; consider for improvement")
2122 def test_rmdir_on_directory_link_to_missing_target(self):
2123 self._create_missing_dir_link()
2124 # consider allowing rmdir to remove directory links
2125 os.rmdir(self.missing_link)
2126
2127 def check_stat(self, link, target):
2128 self.assertEqual(os.stat(link), os.stat(target))
2129 self.assertNotEqual(os.lstat(link), os.stat(link))
2130
Brian Curtind25aef52011-06-13 15:16:04 -05002131 bytes_link = os.fsencode(link)
Steve Dowercc16be82016-09-08 10:35:16 -07002132 self.assertEqual(os.stat(bytes_link), os.stat(target))
2133 self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
Brian Curtind25aef52011-06-13 15:16:04 -05002134
2135 def test_12084(self):
2136 level1 = os.path.abspath(support.TESTFN)
2137 level2 = os.path.join(level1, "level2")
2138 level3 = os.path.join(level2, "level3")
Victor Stinnerae39d232016-03-24 17:12:55 +01002139 self.addCleanup(support.rmtree, level1)
2140
2141 os.mkdir(level1)
2142 os.mkdir(level2)
2143 os.mkdir(level3)
2144
2145 file1 = os.path.abspath(os.path.join(level1, "file1"))
2146 create_file(file1)
2147
2148 orig_dir = os.getcwd()
Brian Curtind25aef52011-06-13 15:16:04 -05002149 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002150 os.chdir(level2)
2151 link = os.path.join(level2, "link")
2152 os.symlink(os.path.relpath(file1), "link")
2153 self.assertIn("link", os.listdir(os.getcwd()))
Brian Curtind25aef52011-06-13 15:16:04 -05002154
Victor Stinnerae39d232016-03-24 17:12:55 +01002155 # Check os.stat calls from the same dir as the link
2156 self.assertEqual(os.stat(file1), os.stat("link"))
Brian Curtind25aef52011-06-13 15:16:04 -05002157
Victor Stinnerae39d232016-03-24 17:12:55 +01002158 # Check os.stat calls from a dir below the link
2159 os.chdir(level1)
2160 self.assertEqual(os.stat(file1),
2161 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002162
Victor Stinnerae39d232016-03-24 17:12:55 +01002163 # Check os.stat calls from a dir above the link
2164 os.chdir(level3)
2165 self.assertEqual(os.stat(file1),
2166 os.stat(os.path.relpath(link)))
Brian Curtind25aef52011-06-13 15:16:04 -05002167 finally:
Victor Stinnerae39d232016-03-24 17:12:55 +01002168 os.chdir(orig_dir)
Brian Curtind25aef52011-06-13 15:16:04 -05002169
Brian Curtind40e6f72010-07-08 21:39:08 +00002170
Tim Golden0321cf22014-05-05 19:46:17 +01002171@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
2172class Win32JunctionTests(unittest.TestCase):
2173 junction = 'junctiontest'
2174 junction_target = os.path.dirname(os.path.abspath(__file__))
2175
2176 def setUp(self):
2177 assert os.path.exists(self.junction_target)
2178 assert not os.path.exists(self.junction)
2179
2180 def tearDown(self):
2181 if os.path.exists(self.junction):
2182 # os.rmdir delegates to Windows' RemoveDirectoryW,
2183 # which removes junction points safely.
2184 os.rmdir(self.junction)
2185
2186 def test_create_junction(self):
2187 _winapi.CreateJunction(self.junction_target, self.junction)
2188 self.assertTrue(os.path.exists(self.junction))
2189 self.assertTrue(os.path.isdir(self.junction))
2190
2191 # Junctions are not recognized as links.
2192 self.assertFalse(os.path.islink(self.junction))
2193
2194 def test_unlink_removes_junction(self):
2195 _winapi.CreateJunction(self.junction_target, self.junction)
2196 self.assertTrue(os.path.exists(self.junction))
2197
2198 os.unlink(self.junction)
2199 self.assertFalse(os.path.exists(self.junction))
2200
2201
Jason R. Coombs3a092862013-05-27 23:21:28 -04002202@support.skip_unless_symlink
2203class NonLocalSymlinkTests(unittest.TestCase):
2204
2205 def setUp(self):
R David Murray44b548d2016-09-08 13:59:53 -04002206 r"""
Jason R. Coombs3a092862013-05-27 23:21:28 -04002207 Create this structure:
2208
2209 base
2210 \___ some_dir
2211 """
2212 os.makedirs('base/some_dir')
2213
2214 def tearDown(self):
2215 shutil.rmtree('base')
2216
2217 def test_directory_link_nonlocal(self):
2218 """
2219 The symlink target should resolve relative to the link, not relative
2220 to the current directory.
2221
2222 Then, link base/some_link -> base/some_dir and ensure that some_link
2223 is resolved as a directory.
2224
2225 In issue13772, it was discovered that directory detection failed if
2226 the symlink target was not specified relative to the current
2227 directory, which was a defect in the implementation.
2228 """
2229 src = os.path.join('base', 'some_link')
2230 os.symlink('some_dir', src)
2231 assert os.path.isdir(src)
2232
2233
Victor Stinnere8d51452010-08-19 01:05:19 +00002234class FSEncodingTests(unittest.TestCase):
2235 def test_nop(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(os.fsencode(b'abc\xff'), b'abc\xff')
2237 self.assertEqual(os.fsdecode('abc\u0141'), 'abc\u0141')
Benjamin Peterson31191a92010-05-09 03:22:58 +00002238
Victor Stinnere8d51452010-08-19 01:05:19 +00002239 def test_identity(self):
2240 # assert fsdecode(fsencode(x)) == x
2241 for fn in ('unicode\u0141', 'latin\xe9', 'ascii'):
2242 try:
2243 bytesfn = os.fsencode(fn)
2244 except UnicodeEncodeError:
2245 continue
Ezio Melottib3aedd42010-11-20 19:04:17 +00002246 self.assertEqual(os.fsdecode(bytesfn), fn)
Victor Stinnere8d51452010-08-19 01:05:19 +00002247
Victor Stinnerbf9bcab2010-05-09 03:15:33 +00002248
Brett Cannonefb00c02012-02-29 18:31:31 -05002249
2250class DeviceEncodingTests(unittest.TestCase):
2251
2252 def test_bad_fd(self):
2253 # Return None when an fd doesn't actually exist.
2254 self.assertIsNone(os.device_encoding(123456))
2255
Philip Jenveye308b7c2012-02-29 16:16:15 -08002256 @unittest.skipUnless(os.isatty(0) and (sys.platform.startswith('win') or
2257 (hasattr(locale, 'nl_langinfo') and hasattr(locale, 'CODESET'))),
Philip Jenveyd7aff2d2012-02-29 16:21:25 -08002258 'test requires a tty and either Windows or nl_langinfo(CODESET)')
Brett Cannonefb00c02012-02-29 18:31:31 -05002259 def test_device_encoding(self):
2260 encoding = os.device_encoding(0)
2261 self.assertIsNotNone(encoding)
2262 self.assertTrue(codecs.lookup(encoding))
2263
2264
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002265class PidTests(unittest.TestCase):
2266 @unittest.skipUnless(hasattr(os, 'getppid'), "test needs os.getppid")
2267 def test_getppid(self):
2268 p = subprocess.Popen([sys.executable, '-c',
2269 'import os; print(os.getppid())'],
2270 stdout=subprocess.PIPE)
2271 stdout, _ = p.communicate()
2272 # We are the parent of our subprocess
2273 self.assertEqual(int(stdout), os.getpid())
2274
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002275 def test_waitpid(self):
2276 args = [sys.executable, '-c', 'pass']
Brett Cannonec6ce872016-09-06 15:50:29 -07002277 # Add an implicit test for PyUnicode_FSConverter().
2278 pid = os.spawnv(os.P_NOWAIT, _PathLike(args[0]), args)
Victor Stinnerd3ffd322015-09-15 10:11:03 +02002279 status = os.waitpid(pid, 0)
2280 self.assertEqual(status, (pid, 0))
2281
Amaury Forgeot d'Arc4b6fdf32010-09-07 21:31:17 +00002282
Victor Stinner4659ccf2016-09-14 10:57:00 +02002283class SpawnTests(unittest.TestCase):
Berker Peksag47e70622016-09-15 20:23:55 +03002284 def create_args(self, *, with_env=False, use_bytes=False):
Victor Stinner4659ccf2016-09-14 10:57:00 +02002285 self.exitcode = 17
2286
2287 filename = support.TESTFN
2288 self.addCleanup(support.unlink, filename)
2289
2290 if not with_env:
2291 code = 'import sys; sys.exit(%s)' % self.exitcode
2292 else:
2293 self.env = dict(os.environ)
2294 # create an unique key
2295 self.key = str(uuid.uuid4())
2296 self.env[self.key] = self.key
2297 # read the variable from os.environ to check that it exists
2298 code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)'
2299 % (self.key, self.exitcode))
2300
2301 with open(filename, "w") as fp:
2302 fp.write(code)
2303
Berker Peksag81816462016-09-15 20:19:47 +03002304 args = [sys.executable, filename]
2305 if use_bytes:
2306 args = [os.fsencode(a) for a in args]
2307 self.env = {os.fsencode(k): os.fsencode(v)
2308 for k, v in self.env.items()}
2309
2310 return args
Victor Stinner4659ccf2016-09-14 10:57:00 +02002311
Berker Peksag4af23d72016-09-15 20:32:44 +03002312 @requires_os_func('spawnl')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002313 def test_spawnl(self):
2314 args = self.create_args()
2315 exitcode = os.spawnl(os.P_WAIT, args[0], *args)
2316 self.assertEqual(exitcode, self.exitcode)
2317
Berker Peksag4af23d72016-09-15 20:32:44 +03002318 @requires_os_func('spawnle')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002319 def test_spawnle(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002320 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002321 exitcode = os.spawnle(os.P_WAIT, args[0], *args, self.env)
2322 self.assertEqual(exitcode, self.exitcode)
2323
Berker Peksag4af23d72016-09-15 20:32:44 +03002324 @requires_os_func('spawnlp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002325 def test_spawnlp(self):
2326 args = self.create_args()
2327 exitcode = os.spawnlp(os.P_WAIT, args[0], *args)
2328 self.assertEqual(exitcode, self.exitcode)
2329
Berker Peksag4af23d72016-09-15 20:32:44 +03002330 @requires_os_func('spawnlpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002331 def test_spawnlpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002332 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002333 exitcode = os.spawnlpe(os.P_WAIT, args[0], *args, self.env)
2334 self.assertEqual(exitcode, self.exitcode)
2335
Berker Peksag4af23d72016-09-15 20:32:44 +03002336 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002337 def test_spawnv(self):
2338 args = self.create_args()
2339 exitcode = os.spawnv(os.P_WAIT, args[0], args)
2340 self.assertEqual(exitcode, self.exitcode)
2341
Berker Peksag4af23d72016-09-15 20:32:44 +03002342 @requires_os_func('spawnve')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002343 def test_spawnve(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002344 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002345 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2346 self.assertEqual(exitcode, self.exitcode)
2347
Berker Peksag4af23d72016-09-15 20:32:44 +03002348 @requires_os_func('spawnvp')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002349 def test_spawnvp(self):
2350 args = self.create_args()
2351 exitcode = os.spawnvp(os.P_WAIT, args[0], args)
2352 self.assertEqual(exitcode, self.exitcode)
2353
Berker Peksag4af23d72016-09-15 20:32:44 +03002354 @requires_os_func('spawnvpe')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002355 def test_spawnvpe(self):
Berker Peksag47e70622016-09-15 20:23:55 +03002356 args = self.create_args(with_env=True)
Victor Stinner4659ccf2016-09-14 10:57:00 +02002357 exitcode = os.spawnvpe(os.P_WAIT, args[0], args, self.env)
2358 self.assertEqual(exitcode, self.exitcode)
2359
Berker Peksag4af23d72016-09-15 20:32:44 +03002360 @requires_os_func('spawnv')
Victor Stinner4659ccf2016-09-14 10:57:00 +02002361 def test_nowait(self):
2362 args = self.create_args()
2363 pid = os.spawnv(os.P_NOWAIT, args[0], args)
2364 result = os.waitpid(pid, 0)
2365 self.assertEqual(result[0], pid)
2366 status = result[1]
2367 if hasattr(os, 'WIFEXITED'):
2368 self.assertTrue(os.WIFEXITED(status))
2369 self.assertEqual(os.WEXITSTATUS(status), self.exitcode)
2370 else:
2371 self.assertEqual(status, self.exitcode << 8)
2372
Berker Peksag4af23d72016-09-15 20:32:44 +03002373 @requires_os_func('spawnve')
Berker Peksag81816462016-09-15 20:19:47 +03002374 def test_spawnve_bytes(self):
2375 # Test bytes handling in parse_arglist and parse_envlist (#28114)
2376 args = self.create_args(with_env=True, use_bytes=True)
2377 exitcode = os.spawnve(os.P_WAIT, args[0], args, self.env)
2378 self.assertEqual(exitcode, self.exitcode)
2379
Steve Dower859fd7b2016-11-19 18:53:19 -08002380 @requires_os_func('spawnl')
2381 def test_spawnl_noargs(self):
2382 args = self.create_args()
2383 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0])
Steve Dowerbce26262016-11-19 19:17:26 -08002384 self.assertRaises(ValueError, os.spawnl, os.P_NOWAIT, args[0], '')
Steve Dower859fd7b2016-11-19 18:53:19 -08002385
2386 @requires_os_func('spawnle')
Steve Dowerbce26262016-11-19 19:17:26 -08002387 def test_spawnle_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002388 args = self.create_args()
2389 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002390 self.assertRaises(ValueError, os.spawnle, os.P_NOWAIT, args[0], '', {})
Steve Dower859fd7b2016-11-19 18:53:19 -08002391
2392 @requires_os_func('spawnv')
2393 def test_spawnv_noargs(self):
2394 args = self.create_args()
2395 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ())
2396 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [])
Steve Dowerbce26262016-11-19 19:17:26 -08002397 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], ('',))
2398 self.assertRaises(ValueError, os.spawnv, os.P_NOWAIT, args[0], [''])
Steve Dower859fd7b2016-11-19 18:53:19 -08002399
2400 @requires_os_func('spawnve')
Steve Dowerbce26262016-11-19 19:17:26 -08002401 def test_spawnve_noargs(self):
Steve Dower859fd7b2016-11-19 18:53:19 -08002402 args = self.create_args()
2403 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], (), {})
2404 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [], {})
Steve Dowerbce26262016-11-19 19:17:26 -08002405 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], ('',), {})
2406 self.assertRaises(ValueError, os.spawnve, os.P_NOWAIT, args[0], [''], {})
Victor Stinner4659ccf2016-09-14 10:57:00 +02002407
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002408 def _test_invalid_env(self, spawn):
Serhiy Storchaka77703942017-06-25 07:33:01 +03002409 args = [sys.executable, '-c', 'pass']
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002410
Ville Skyttä49b27342017-08-03 09:00:59 +03002411 # null character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002412 newenv = os.environ.copy()
2413 newenv["FRUIT\0VEGETABLE"] = "cabbage"
2414 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002415 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002416 except ValueError:
2417 pass
2418 else:
2419 self.assertEqual(exitcode, 127)
2420
Ville Skyttä49b27342017-08-03 09:00:59 +03002421 # null character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002422 newenv = os.environ.copy()
2423 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
2424 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002425 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002426 except ValueError:
2427 pass
2428 else:
2429 self.assertEqual(exitcode, 127)
2430
Ville Skyttä49b27342017-08-03 09:00:59 +03002431 # equal character in the environment variable name
Serhiy Storchaka77703942017-06-25 07:33:01 +03002432 newenv = os.environ.copy()
2433 newenv["FRUIT=ORANGE"] = "lemon"
2434 try:
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002435 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002436 except ValueError:
2437 pass
2438 else:
2439 self.assertEqual(exitcode, 127)
2440
Ville Skyttä49b27342017-08-03 09:00:59 +03002441 # equal character in the environment variable value
Serhiy Storchaka77703942017-06-25 07:33:01 +03002442 filename = support.TESTFN
2443 self.addCleanup(support.unlink, filename)
2444 with open(filename, "w") as fp:
2445 fp.write('import sys, os\n'
2446 'if os.getenv("FRUIT") != "orange=lemon":\n'
2447 ' raise AssertionError')
2448 args = [sys.executable, filename]
2449 newenv = os.environ.copy()
2450 newenv["FRUIT"] = "orange=lemon"
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002451 exitcode = spawn(os.P_WAIT, args[0], args, newenv)
Serhiy Storchaka77703942017-06-25 07:33:01 +03002452 self.assertEqual(exitcode, 0)
2453
Serhiy Storchakaaf5392f2017-06-25 09:48:54 +03002454 @requires_os_func('spawnve')
2455 def test_spawnve_invalid_env(self):
2456 self._test_invalid_env(os.spawnve)
2457
2458 @requires_os_func('spawnvpe')
2459 def test_spawnvpe_invalid_env(self):
2460 self._test_invalid_env(os.spawnvpe)
2461
Serhiy Storchaka77703942017-06-25 07:33:01 +03002462
Brian Curtin0151b8e2010-09-24 13:43:43 +00002463# The introduction of this TestCase caused at least two different errors on
2464# *nix buildbots. Temporarily skip this to let the buildbots move along.
2465@unittest.skip("Skip due to platform/environment differences on *NIX buildbots")
Brian Curtine8e4b3b2010-09-23 20:04:14 +00002466@unittest.skipUnless(hasattr(os, 'getlogin'), "test needs os.getlogin")
2467class LoginTests(unittest.TestCase):
2468 def test_getlogin(self):
2469 user_name = os.getlogin()
2470 self.assertNotEqual(len(user_name), 0)
2471
2472
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002473@unittest.skipUnless(hasattr(os, 'getpriority') and hasattr(os, 'setpriority'),
2474 "needs os.getpriority and os.setpriority")
2475class ProgramPriorityTests(unittest.TestCase):
2476 """Tests for os.getpriority() and os.setpriority()."""
2477
2478 def test_set_get_priority(self):
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002479
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002480 base = os.getpriority(os.PRIO_PROCESS, os.getpid())
2481 os.setpriority(os.PRIO_PROCESS, os.getpid(), base + 1)
2482 try:
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002483 new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
2484 if base >= 19 and new_prio <= 19:
Victor Stinnerae39d232016-03-24 17:12:55 +01002485 raise unittest.SkipTest("unable to reliably test setpriority "
2486 "at current nice level of %s" % base)
Giampaolo Rodolàcfbcec32011-02-28 19:27:16 +00002487 else:
2488 self.assertEqual(new_prio, base + 1)
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002489 finally:
2490 try:
2491 os.setpriority(os.PRIO_PROCESS, os.getpid(), base)
2492 except OSError as err:
Antoine Pitrou692f0382011-02-26 00:22:25 +00002493 if err.errno != errno.EACCES:
Giampaolo Rodolà18e8bcb2011-02-25 20:57:54 +00002494 raise
2495
2496
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002497class SendfileTestServer(asyncore.dispatcher, threading.Thread):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002498
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002499 class Handler(asynchat.async_chat):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002500
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002501 def __init__(self, conn):
2502 asynchat.async_chat.__init__(self, conn)
2503 self.in_buffer = []
2504 self.closed = False
2505 self.push(b"220 ready\r\n")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002506
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002507 def handle_read(self):
2508 data = self.recv(4096)
2509 self.in_buffer.append(data)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002510
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002511 def get_data(self):
2512 return b''.join(self.in_buffer)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002513
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002514 def handle_close(self):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002515 self.close()
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002516 self.closed = True
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002517
2518 def handle_error(self):
2519 raise
2520
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002521 def __init__(self, address):
2522 threading.Thread.__init__(self)
2523 asyncore.dispatcher.__init__(self)
2524 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
2525 self.bind(address)
2526 self.listen(5)
2527 self.host, self.port = self.socket.getsockname()[:2]
2528 self.handler_instance = None
2529 self._active = False
2530 self._active_lock = threading.Lock()
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002531
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02002532 # --- public API
2533
2534 @property
2535 def running(self):
2536 return self._active
2537
2538 def start(self):
2539 assert not self.running
2540 self.__flag = threading.Event()
2541 threading.Thread.start(self)
2542 self.__flag.wait()
2543
2544 def stop(self):
2545 assert self.running
2546 self._active = False
2547 self.join()
2548
2549 def wait(self):
2550 # wait for handler connection to be closed, then stop the server
2551 while not getattr(self.handler_instance, "closed", False):
2552 time.sleep(0.001)
2553 self.stop()
2554
2555 # --- internals
2556
2557 def run(self):
2558 self._active = True
2559 self.__flag.set()
2560 while self._active and asyncore.socket_map:
2561 self._active_lock.acquire()
2562 asyncore.loop(timeout=0.001, count=1)
2563 self._active_lock.release()
2564 asyncore.close_all()
2565
2566 def handle_accept(self):
2567 conn, addr = self.accept()
2568 self.handler_instance = self.Handler(conn)
2569
2570 def handle_connect(self):
2571 self.close()
2572 handle_read = handle_connect
2573
2574 def writable(self):
2575 return 0
2576
2577 def handle_error(self):
2578 raise
2579
2580
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002581@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
2582class TestSendfile(unittest.TestCase):
2583
Victor Stinner8c663fd2017-11-08 14:44:44 -08002584 DATA = b"12345abcde" * 16 * 1024 # 160 KiB
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002585 SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Giampaolo Rodolà4bc68572011-02-25 21:46:01 +00002586 not sys.platform.startswith("solaris") and \
2587 not sys.platform.startswith("sunos")
Serhiy Storchaka43767632013-11-03 21:31:38 +02002588 requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS,
2589 'requires headers and trailers support')
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002590
2591 @classmethod
2592 def setUpClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002593 cls.key = support.threading_setup()
Victor Stinnerae39d232016-03-24 17:12:55 +01002594 create_file(support.TESTFN, cls.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002595
2596 @classmethod
2597 def tearDownClass(cls):
R David Murrayf2ad1732014-12-25 18:36:56 -05002598 support.threading_cleanup(*cls.key)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002599 support.unlink(support.TESTFN)
2600
2601 def setUp(self):
2602 self.server = SendfileTestServer((support.HOST, 0))
2603 self.server.start()
2604 self.client = socket.socket()
2605 self.client.connect((self.server.host, self.server.port))
2606 self.client.settimeout(1)
2607 # synchronize by waiting for "220 ready" response
2608 self.client.recv(1024)
2609 self.sockno = self.client.fileno()
2610 self.file = open(support.TESTFN, 'rb')
2611 self.fileno = self.file.fileno()
2612
2613 def tearDown(self):
2614 self.file.close()
2615 self.client.close()
2616 if self.server.running:
2617 self.server.stop()
Victor Stinnerd1cc0372017-07-12 16:05:43 +02002618 self.server = None
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002619
2620 def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
2621 """A higher level wrapper representing how an application is
2622 supposed to use sendfile().
2623 """
2624 while 1:
2625 try:
2626 if self.SUPPORT_HEADERS_TRAILERS:
2627 return os.sendfile(sock, file, offset, nbytes, headers,
2628 trailers)
2629 else:
2630 return os.sendfile(sock, file, offset, nbytes)
2631 except OSError as err:
2632 if err.errno == errno.ECONNRESET:
2633 # disconnected
2634 raise
2635 elif err.errno in (errno.EAGAIN, errno.EBUSY):
2636 # we have to retry send data
2637 continue
2638 else:
2639 raise
2640
2641 def test_send_whole_file(self):
2642 # normal send
2643 total_sent = 0
2644 offset = 0
2645 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002646 while total_sent < len(self.DATA):
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002647 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2648 if sent == 0:
2649 break
2650 offset += sent
2651 total_sent += sent
2652 self.assertTrue(sent <= nbytes)
2653 self.assertEqual(offset, total_sent)
2654
2655 self.assertEqual(total_sent, len(self.DATA))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002656 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002657 self.client.close()
2658 self.server.wait()
2659 data = self.server.handler_instance.get_data()
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002660 self.assertEqual(len(data), len(self.DATA))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002661 self.assertEqual(data, self.DATA)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002662
2663 def test_send_at_certain_offset(self):
2664 # start sending a file at a certain offset
2665 total_sent = 0
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002666 offset = len(self.DATA) // 2
2667 must_send = len(self.DATA) - offset
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002668 nbytes = 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002669 while total_sent < must_send:
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002670 sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
2671 if sent == 0:
2672 break
2673 offset += sent
2674 total_sent += sent
2675 self.assertTrue(sent <= nbytes)
2676
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002677 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002678 self.client.close()
2679 self.server.wait()
2680 data = self.server.handler_instance.get_data()
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002681 expected = self.DATA[len(self.DATA) // 2:]
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002682 self.assertEqual(total_sent, len(expected))
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002683 self.assertEqual(len(data), len(expected))
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002684 self.assertEqual(data, expected)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002685
2686 def test_offset_overflow(self):
2687 # specify an offset > file size
2688 offset = len(self.DATA) + 4096
Antoine Pitrou18dd0df2011-02-26 14:29:24 +00002689 try:
2690 sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
2691 except OSError as e:
2692 # Solaris can raise EINVAL if offset >= file length, ignore.
2693 if e.errno != errno.EINVAL:
2694 raise
2695 else:
2696 self.assertEqual(sent, 0)
Antoine Pitrou2de51ff2011-02-26 17:52:50 +00002697 self.client.shutdown(socket.SHUT_RDWR)
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002698 self.client.close()
2699 self.server.wait()
2700 data = self.server.handler_instance.get_data()
2701 self.assertEqual(data, b'')
2702
2703 def test_invalid_offset(self):
2704 with self.assertRaises(OSError) as cm:
2705 os.sendfile(self.sockno, self.fileno, -1, 4096)
2706 self.assertEqual(cm.exception.errno, errno.EINVAL)
2707
Martin Panterbf19d162015-09-09 01:01:13 +00002708 def test_keywords(self):
2709 # Keyword arguments should be supported
2710 os.sendfile(out=self.sockno, offset=0, count=4096,
2711 **{'in': self.fileno})
2712 if self.SUPPORT_HEADERS_TRAILERS:
2713 os.sendfile(self.sockno, self.fileno, offset=0, count=4096,
Martin Panter94994132015-09-09 05:29:24 +00002714 headers=(), trailers=(), flags=0)
Martin Panterbf19d162015-09-09 01:01:13 +00002715
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002716 # --- headers / trailers tests
2717
Serhiy Storchaka43767632013-11-03 21:31:38 +02002718 @requires_headers_trailers
2719 def test_headers(self):
2720 total_sent = 0
2721 sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
2722 headers=[b"x" * 512])
2723 total_sent += sent
2724 offset = 4096
2725 nbytes = 4096
2726 while 1:
2727 sent = self.sendfile_wrapper(self.sockno, self.fileno,
2728 offset, nbytes)
2729 if sent == 0:
2730 break
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002731 total_sent += sent
Serhiy Storchaka43767632013-11-03 21:31:38 +02002732 offset += sent
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002733
Serhiy Storchaka43767632013-11-03 21:31:38 +02002734 expected_data = b"x" * 512 + self.DATA
2735 self.assertEqual(total_sent, len(expected_data))
2736 self.client.close()
2737 self.server.wait()
2738 data = self.server.handler_instance.get_data()
2739 self.assertEqual(hash(data), hash(expected_data))
2740
2741 @requires_headers_trailers
2742 def test_trailers(self):
2743 TESTFN2 = support.TESTFN + "2"
2744 file_data = b"abcdef"
Victor Stinnerae39d232016-03-24 17:12:55 +01002745
2746 self.addCleanup(support.unlink, TESTFN2)
2747 create_file(TESTFN2, file_data)
2748
2749 with open(TESTFN2, 'rb') as f:
Serhiy Storchaka43767632013-11-03 21:31:38 +02002750 os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
2751 trailers=[b"1234"])
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002752 self.client.close()
2753 self.server.wait()
2754 data = self.server.handler_instance.get_data()
Serhiy Storchaka43767632013-11-03 21:31:38 +02002755 self.assertEqual(data, b"abcdef1234")
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002756
Serhiy Storchaka43767632013-11-03 21:31:38 +02002757 @requires_headers_trailers
2758 @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
2759 'test needs os.SF_NODISKIO')
2760 def test_flags(self):
2761 try:
2762 os.sendfile(self.sockno, self.fileno, 0, 4096,
2763 flags=os.SF_NODISKIO)
2764 except OSError as err:
2765 if err.errno not in (errno.EBUSY, errno.EAGAIN):
2766 raise
Giampaolo Rodolàc9c2c8b2011-02-25 14:39:16 +00002767
2768
Larry Hastings9cf065c2012-06-22 16:30:09 -07002769def supports_extended_attributes():
2770 if not hasattr(os, "setxattr"):
2771 return False
Victor Stinnerae39d232016-03-24 17:12:55 +01002772
Larry Hastings9cf065c2012-06-22 16:30:09 -07002773 try:
Victor Stinnerae39d232016-03-24 17:12:55 +01002774 with open(support.TESTFN, "xb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002775 try:
2776 os.setxattr(fp.fileno(), b"user.test", b"")
2777 except OSError:
2778 return False
2779 finally:
2780 support.unlink(support.TESTFN)
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002781
2782 return True
Larry Hastings9cf065c2012-06-22 16:30:09 -07002783
2784
2785@unittest.skipUnless(supports_extended_attributes(),
2786 "no non-broken extended attribute support")
Victor Stinnerf95a19b2016-03-24 16:50:41 +01002787# Kernels < 2.6.39 don't respect setxattr flags.
2788@support.requires_linux_version(2, 6, 39)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002789class ExtendedAttributeTests(unittest.TestCase):
2790
Larry Hastings9cf065c2012-06-22 16:30:09 -07002791 def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
Benjamin Peterson799bd802011-08-31 22:15:17 -04002792 fn = support.TESTFN
Victor Stinnerae39d232016-03-24 17:12:55 +01002793 self.addCleanup(support.unlink, fn)
2794 create_file(fn)
2795
Benjamin Peterson799bd802011-08-31 22:15:17 -04002796 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002797 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002798 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002799
Victor Stinnerf12e5062011-10-16 22:12:03 +02002800 init_xattr = listxattr(fn)
2801 self.assertIsInstance(init_xattr, list)
Victor Stinnerae39d232016-03-24 17:12:55 +01002802
Larry Hastings9cf065c2012-06-22 16:30:09 -07002803 setxattr(fn, s("user.test"), b"", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002804 xattr = set(init_xattr)
2805 xattr.add("user.test")
2806 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002807 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
2808 setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
2809 self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
Victor Stinnerae39d232016-03-24 17:12:55 +01002810
Benjamin Peterson799bd802011-08-31 22:15:17 -04002811 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002812 setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002813 self.assertEqual(cm.exception.errno, errno.EEXIST)
Victor Stinnerae39d232016-03-24 17:12:55 +01002814
Benjamin Peterson799bd802011-08-31 22:15:17 -04002815 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002816 setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002817 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002818
Larry Hastings9cf065c2012-06-22 16:30:09 -07002819 setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002820 xattr.add("user.test2")
2821 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002822 removexattr(fn, s("user.test"), **kwargs)
Victor Stinnerae39d232016-03-24 17:12:55 +01002823
Benjamin Peterson799bd802011-08-31 22:15:17 -04002824 with self.assertRaises(OSError) as cm:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002825 getxattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002826 self.assertEqual(cm.exception.errno, errno.ENODATA)
Victor Stinnerae39d232016-03-24 17:12:55 +01002827
Victor Stinnerf12e5062011-10-16 22:12:03 +02002828 xattr.remove("user.test")
2829 self.assertEqual(set(listxattr(fn)), xattr)
Larry Hastings9cf065c2012-06-22 16:30:09 -07002830 self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
2831 setxattr(fn, s("user.test"), b"a"*1024, **kwargs)
2832 self.assertEqual(getxattr(fn, s("user.test"), **kwargs), b"a"*1024)
2833 removexattr(fn, s("user.test"), **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002834 many = sorted("user.test{}".format(i) for i in range(100))
2835 for thing in many:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002836 setxattr(fn, thing, b"x", **kwargs)
Victor Stinnerf12e5062011-10-16 22:12:03 +02002837 self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
Benjamin Peterson799bd802011-08-31 22:15:17 -04002838
Larry Hastings9cf065c2012-06-22 16:30:09 -07002839 def _check_xattrs(self, *args, **kwargs):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002840 self._check_xattrs_str(str, *args, **kwargs)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002841 support.unlink(support.TESTFN)
Victor Stinnerae39d232016-03-24 17:12:55 +01002842
2843 self._check_xattrs_str(os.fsencode, *args, **kwargs)
2844 support.unlink(support.TESTFN)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002845
2846 def test_simple(self):
2847 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2848 os.listxattr)
2849
2850 def test_lpath(self):
Larry Hastings9cf065c2012-06-22 16:30:09 -07002851 self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
2852 os.listxattr, follow_symlinks=False)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002853
2854 def test_fds(self):
2855 def getxattr(path, *args):
2856 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002857 return os.getxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002858 def setxattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002859 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002860 os.setxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002861 def removexattr(path, *args):
Victor Stinnerae39d232016-03-24 17:12:55 +01002862 with open(path, "wb", 0) as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002863 os.removexattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002864 def listxattr(path, *args):
2865 with open(path, "rb") as fp:
Larry Hastings9cf065c2012-06-22 16:30:09 -07002866 return os.listxattr(fp.fileno(), *args)
Benjamin Peterson799bd802011-08-31 22:15:17 -04002867 self._check_xattrs(getxattr, setxattr, removexattr, listxattr)
2868
2869
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002870@unittest.skipUnless(hasattr(os, 'get_terminal_size'), "requires os.get_terminal_size")
2871class TermsizeTests(unittest.TestCase):
2872 def test_does_not_crash(self):
2873 """Check if get_terminal_size() returns a meaningful value.
2874
2875 There's no easy portable way to actually check the size of the
2876 terminal, so let's check if it returns something sensible instead.
2877 """
2878 try:
2879 size = os.get_terminal_size()
2880 except OSError as e:
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002881 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002882 # Under win32 a generic OSError can be thrown if the
2883 # handle cannot be retrieved
2884 self.skipTest("failed to query terminal size")
2885 raise
2886
Antoine Pitroucfade362012-02-08 23:48:59 +01002887 self.assertGreaterEqual(size.columns, 0)
2888 self.assertGreaterEqual(size.lines, 0)
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002889
2890 def test_stty_match(self):
2891 """Check if stty returns the same results
2892
2893 stty actually tests stdin, so get_terminal_size is invoked on
2894 stdin explicitly. If stty succeeded, then get_terminal_size()
2895 should work too.
2896 """
2897 try:
2898 size = subprocess.check_output(['stty', 'size']).decode().split()
xdegaye6a55d092017-11-12 17:57:04 +01002899 except (FileNotFoundError, subprocess.CalledProcessError,
2900 PermissionError):
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002901 self.skipTest("stty invocation failed")
2902 expected = (int(size[1]), int(size[0])) # reversed order
2903
Antoine Pitrou81a1fa52012-02-09 00:11:00 +01002904 try:
2905 actual = os.get_terminal_size(sys.__stdin__.fileno())
2906 except OSError as e:
2907 if sys.platform == "win32" or e.errno in (errno.EINVAL, errno.ENOTTY):
2908 # Under win32 a generic OSError can be thrown if the
2909 # handle cannot be retrieved
2910 self.skipTest("failed to query terminal size")
2911 raise
Antoine Pitroubcf2b592012-02-08 23:28:36 +01002912 self.assertEqual(expected, actual)
2913
2914
Victor Stinner292c8352012-10-30 02:17:38 +01002915class OSErrorTests(unittest.TestCase):
2916 def setUp(self):
2917 class Str(str):
2918 pass
2919
Victor Stinnerafe17062012-10-31 22:47:43 +01002920 self.bytes_filenames = []
2921 self.unicode_filenames = []
Victor Stinner292c8352012-10-30 02:17:38 +01002922 if support.TESTFN_UNENCODABLE is not None:
2923 decoded = support.TESTFN_UNENCODABLE
2924 else:
2925 decoded = support.TESTFN
Victor Stinnerafe17062012-10-31 22:47:43 +01002926 self.unicode_filenames.append(decoded)
2927 self.unicode_filenames.append(Str(decoded))
Victor Stinner292c8352012-10-30 02:17:38 +01002928 if support.TESTFN_UNDECODABLE is not None:
2929 encoded = support.TESTFN_UNDECODABLE
2930 else:
2931 encoded = os.fsencode(support.TESTFN)
Victor Stinnerafe17062012-10-31 22:47:43 +01002932 self.bytes_filenames.append(encoded)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03002933 self.bytes_filenames.append(bytearray(encoded))
Victor Stinnerafe17062012-10-31 22:47:43 +01002934 self.bytes_filenames.append(memoryview(encoded))
2935
2936 self.filenames = self.bytes_filenames + self.unicode_filenames
Victor Stinner292c8352012-10-30 02:17:38 +01002937
2938 def test_oserror_filename(self):
2939 funcs = [
Victor Stinnerafe17062012-10-31 22:47:43 +01002940 (self.filenames, os.chdir,),
2941 (self.filenames, os.chmod, 0o777),
Victor Stinnerafe17062012-10-31 22:47:43 +01002942 (self.filenames, os.lstat,),
2943 (self.filenames, os.open, os.O_RDONLY),
2944 (self.filenames, os.rmdir,),
2945 (self.filenames, os.stat,),
2946 (self.filenames, os.unlink,),
Victor Stinner292c8352012-10-30 02:17:38 +01002947 ]
2948 if sys.platform == "win32":
2949 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002950 (self.bytes_filenames, os.rename, b"dst"),
2951 (self.bytes_filenames, os.replace, b"dst"),
2952 (self.unicode_filenames, os.rename, "dst"),
2953 (self.unicode_filenames, os.replace, "dst"),
Steve Dowercc16be82016-09-08 10:35:16 -07002954 (self.unicode_filenames, os.listdir, ),
Victor Stinner292c8352012-10-30 02:17:38 +01002955 ))
Victor Stinnerafe17062012-10-31 22:47:43 +01002956 else:
2957 funcs.extend((
Victor Stinner64e039a2012-11-07 00:10:14 +01002958 (self.filenames, os.listdir,),
Victor Stinnerafe17062012-10-31 22:47:43 +01002959 (self.filenames, os.rename, "dst"),
2960 (self.filenames, os.replace, "dst"),
2961 ))
2962 if hasattr(os, "chown"):
2963 funcs.append((self.filenames, os.chown, 0, 0))
2964 if hasattr(os, "lchown"):
2965 funcs.append((self.filenames, os.lchown, 0, 0))
2966 if hasattr(os, "truncate"):
2967 funcs.append((self.filenames, os.truncate, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002968 if hasattr(os, "chflags"):
Victor Stinneree36c242012-11-13 09:31:51 +01002969 funcs.append((self.filenames, os.chflags, 0))
2970 if hasattr(os, "lchflags"):
2971 funcs.append((self.filenames, os.lchflags, 0))
Victor Stinner292c8352012-10-30 02:17:38 +01002972 if hasattr(os, "chroot"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002973 funcs.append((self.filenames, os.chroot,))
Victor Stinner292c8352012-10-30 02:17:38 +01002974 if hasattr(os, "link"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002975 if sys.platform == "win32":
2976 funcs.append((self.bytes_filenames, os.link, b"dst"))
2977 funcs.append((self.unicode_filenames, os.link, "dst"))
2978 else:
2979 funcs.append((self.filenames, os.link, "dst"))
Victor Stinner292c8352012-10-30 02:17:38 +01002980 if hasattr(os, "listxattr"):
2981 funcs.extend((
Victor Stinnerafe17062012-10-31 22:47:43 +01002982 (self.filenames, os.listxattr,),
2983 (self.filenames, os.getxattr, "user.test"),
2984 (self.filenames, os.setxattr, "user.test", b'user'),
2985 (self.filenames, os.removexattr, "user.test"),
Victor Stinner292c8352012-10-30 02:17:38 +01002986 ))
2987 if hasattr(os, "lchmod"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002988 funcs.append((self.filenames, os.lchmod, 0o777))
Victor Stinner292c8352012-10-30 02:17:38 +01002989 if hasattr(os, "readlink"):
Victor Stinnerafe17062012-10-31 22:47:43 +01002990 if sys.platform == "win32":
2991 funcs.append((self.unicode_filenames, os.readlink,))
2992 else:
2993 funcs.append((self.filenames, os.readlink,))
Victor Stinner292c8352012-10-30 02:17:38 +01002994
Steve Dowercc16be82016-09-08 10:35:16 -07002995
Victor Stinnerafe17062012-10-31 22:47:43 +01002996 for filenames, func, *func_args in funcs:
2997 for name in filenames:
Victor Stinner292c8352012-10-30 02:17:38 +01002998 try:
Steve Dowercc16be82016-09-08 10:35:16 -07002999 if isinstance(name, (str, bytes)):
Victor Stinner923590e2016-03-24 09:11:48 +01003000 func(name, *func_args)
Serhiy Storchakad73c3182016-08-06 23:22:08 +03003001 else:
3002 with self.assertWarnsRegex(DeprecationWarning, 'should be'):
3003 func(name, *func_args)
Victor Stinnerbd54f0e2012-10-31 01:12:55 +01003004 except OSError as err:
Steve Dowercc16be82016-09-08 10:35:16 -07003005 self.assertIs(err.filename, name, str(func))
Steve Dower78057b42016-11-06 19:35:08 -08003006 except UnicodeDecodeError:
3007 pass
Victor Stinner292c8352012-10-30 02:17:38 +01003008 else:
3009 self.fail("No exception thrown by {}".format(func))
3010
Charles-Francois Natali44feda32013-05-20 14:40:46 +02003011class CPUCountTests(unittest.TestCase):
3012 def test_cpu_count(self):
3013 cpus = os.cpu_count()
3014 if cpus is not None:
3015 self.assertIsInstance(cpus, int)
3016 self.assertGreater(cpus, 0)
3017 else:
3018 self.skipTest("Could not determine the number of CPUs")
3019
Victor Stinnerdaf45552013-08-28 00:53:59 +02003020
3021class FDInheritanceTests(unittest.TestCase):
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003022 def test_get_set_inheritable(self):
Victor Stinnerdaf45552013-08-28 00:53:59 +02003023 fd = os.open(__file__, os.O_RDONLY)
3024 self.addCleanup(os.close, fd)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003025 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinnerdaf45552013-08-28 00:53:59 +02003026
Victor Stinnerdaf45552013-08-28 00:53:59 +02003027 os.set_inheritable(fd, True)
3028 self.assertEqual(os.get_inheritable(fd), True)
3029
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003030 @unittest.skipIf(fcntl is None, "need fcntl")
3031 def test_get_inheritable_cloexec(self):
3032 fd = os.open(__file__, os.O_RDONLY)
3033 self.addCleanup(os.close, fd)
3034 self.assertEqual(os.get_inheritable(fd), False)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003035
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003036 # clear FD_CLOEXEC flag
3037 flags = fcntl.fcntl(fd, fcntl.F_GETFD)
3038 flags &= ~fcntl.FD_CLOEXEC
3039 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003040
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003041 self.assertEqual(os.get_inheritable(fd), True)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003042
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003043 @unittest.skipIf(fcntl is None, "need fcntl")
3044 def test_set_inheritable_cloexec(self):
3045 fd = os.open(__file__, os.O_RDONLY)
3046 self.addCleanup(os.close, fd)
3047 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3048 fcntl.FD_CLOEXEC)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003049
Victor Stinner4f7a36f2013-09-08 14:14:38 +02003050 os.set_inheritable(fd, True)
3051 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
3052 0)
Victor Stinner7ba6b0f2013-09-08 11:47:54 +02003053
Victor Stinnerdaf45552013-08-28 00:53:59 +02003054 def test_open(self):
3055 fd = os.open(__file__, os.O_RDONLY)
3056 self.addCleanup(os.close, fd)
3057 self.assertEqual(os.get_inheritable(fd), False)
3058
3059 @unittest.skipUnless(hasattr(os, 'pipe'), "need os.pipe()")
3060 def test_pipe(self):
3061 rfd, wfd = os.pipe()
3062 self.addCleanup(os.close, rfd)
3063 self.addCleanup(os.close, wfd)
3064 self.assertEqual(os.get_inheritable(rfd), False)
3065 self.assertEqual(os.get_inheritable(wfd), False)
3066
3067 def test_dup(self):
3068 fd1 = os.open(__file__, os.O_RDONLY)
3069 self.addCleanup(os.close, fd1)
3070
3071 fd2 = os.dup(fd1)
3072 self.addCleanup(os.close, fd2)
3073 self.assertEqual(os.get_inheritable(fd2), False)
3074
3075 @unittest.skipUnless(hasattr(os, 'dup2'), "need os.dup2()")
3076 def test_dup2(self):
3077 fd = os.open(__file__, os.O_RDONLY)
3078 self.addCleanup(os.close, fd)
3079
3080 # inheritable by default
3081 fd2 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003082 self.addCleanup(os.close, fd2)
3083 self.assertEqual(os.dup2(fd, fd2), fd2)
3084 self.assertTrue(os.get_inheritable(fd2))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003085
3086 # force non-inheritable
3087 fd3 = os.open(__file__, os.O_RDONLY)
Benjamin Petersonbbdb17d2017-12-29 13:13:06 -08003088 self.addCleanup(os.close, fd3)
3089 self.assertEqual(os.dup2(fd, fd3, inheritable=False), fd3)
3090 self.assertFalse(os.get_inheritable(fd3))
Victor Stinnerdaf45552013-08-28 00:53:59 +02003091
3092 @unittest.skipUnless(hasattr(os, 'openpty'), "need os.openpty()")
3093 def test_openpty(self):
3094 master_fd, slave_fd = os.openpty()
3095 self.addCleanup(os.close, master_fd)
3096 self.addCleanup(os.close, slave_fd)
3097 self.assertEqual(os.get_inheritable(master_fd), False)
3098 self.assertEqual(os.get_inheritable(slave_fd), False)
3099
3100
Brett Cannon3f9183b2016-08-26 14:44:48 -07003101class PathTConverterTests(unittest.TestCase):
3102 # tuples of (function name, allows fd arguments, additional arguments to
3103 # function, cleanup function)
3104 functions = [
3105 ('stat', True, (), None),
3106 ('lstat', False, (), None),
Benjamin Petersona9ab1652016-09-05 15:40:59 -07003107 ('access', False, (os.F_OK,), None),
Brett Cannon3f9183b2016-08-26 14:44:48 -07003108 ('chflags', False, (0,), None),
3109 ('lchflags', False, (0,), None),
3110 ('open', False, (0,), getattr(os, 'close', None)),
3111 ]
3112
3113 def test_path_t_converter(self):
Brett Cannon3f9183b2016-08-26 14:44:48 -07003114 str_filename = support.TESTFN
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003115 if os.name == 'nt':
3116 bytes_fspath = bytes_filename = None
3117 else:
3118 bytes_filename = support.TESTFN.encode('ascii')
Brett Cannonec6ce872016-09-06 15:50:29 -07003119 bytes_fspath = _PathLike(bytes_filename)
3120 fd = os.open(_PathLike(str_filename), os.O_WRONLY|os.O_CREAT)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003121 self.addCleanup(support.unlink, support.TESTFN)
Berker Peksagd0f5bab2016-08-27 21:26:35 +03003122 self.addCleanup(os.close, fd)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003123
Brett Cannonec6ce872016-09-06 15:50:29 -07003124 int_fspath = _PathLike(fd)
3125 str_fspath = _PathLike(str_filename)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003126
3127 for name, allow_fd, extra_args, cleanup_fn in self.functions:
3128 with self.subTest(name=name):
3129 try:
3130 fn = getattr(os, name)
3131 except AttributeError:
3132 continue
3133
Brett Cannon8f96a302016-08-26 19:30:11 -07003134 for path in (str_filename, bytes_filename, str_fspath,
3135 bytes_fspath):
Brett Cannon3ce2fd42016-08-27 09:42:40 -07003136 if path is None:
3137 continue
Brett Cannon3f9183b2016-08-26 14:44:48 -07003138 with self.subTest(name=name, path=path):
3139 result = fn(path, *extra_args)
3140 if cleanup_fn is not None:
3141 cleanup_fn(result)
3142
3143 with self.assertRaisesRegex(
3144 TypeError, 'should be string, bytes'):
3145 fn(int_fspath, *extra_args)
Brett Cannon3f9183b2016-08-26 14:44:48 -07003146
3147 if allow_fd:
3148 result = fn(fd, *extra_args) # should not fail
3149 if cleanup_fn is not None:
3150 cleanup_fn(result)
3151 else:
3152 with self.assertRaisesRegex(
3153 TypeError,
3154 'os.PathLike'):
3155 fn(fd, *extra_args)
3156
3157
Victor Stinner1db9e7b2014-07-29 22:32:47 +02003158@unittest.skipUnless(hasattr(os, 'get_blocking'),
3159 'needs os.get_blocking() and os.set_blocking()')
3160class BlockingTests(unittest.TestCase):
3161 def test_blocking(self):
3162 fd = os.open(__file__, os.O_RDONLY)
3163 self.addCleanup(os.close, fd)
3164 self.assertEqual(os.get_blocking(fd), True)
3165
3166 os.set_blocking(fd, False)
3167 self.assertEqual(os.get_blocking(fd), False)
3168
3169 os.set_blocking(fd, True)
3170 self.assertEqual(os.get_blocking(fd), True)
3171
3172
Yury Selivanov97e2e062014-09-26 12:33:06 -04003173
3174class ExportsTests(unittest.TestCase):
3175 def test_os_all(self):
3176 self.assertIn('open', os.__all__)
3177 self.assertIn('walk', os.__all__)
3178
3179
Victor Stinner6036e442015-03-08 01:58:04 +01003180class TestScandir(unittest.TestCase):
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003181 check_no_resource_warning = support.check_no_resource_warning
3182
Victor Stinner6036e442015-03-08 01:58:04 +01003183 def setUp(self):
3184 self.path = os.path.realpath(support.TESTFN)
Brett Cannon96881cd2016-06-10 14:37:21 -07003185 self.bytes_path = os.fsencode(self.path)
Victor Stinner6036e442015-03-08 01:58:04 +01003186 self.addCleanup(support.rmtree, self.path)
3187 os.mkdir(self.path)
3188
3189 def create_file(self, name="file.txt"):
Brett Cannon96881cd2016-06-10 14:37:21 -07003190 path = self.bytes_path if isinstance(name, bytes) else self.path
3191 filename = os.path.join(path, name)
Victor Stinnerae39d232016-03-24 17:12:55 +01003192 create_file(filename, b'python')
Victor Stinner6036e442015-03-08 01:58:04 +01003193 return filename
3194
3195 def get_entries(self, names):
3196 entries = dict((entry.name, entry)
3197 for entry in os.scandir(self.path))
3198 self.assertEqual(sorted(entries.keys()), names)
3199 return entries
3200
3201 def assert_stat_equal(self, stat1, stat2, skip_fields):
3202 if skip_fields:
3203 for attr in dir(stat1):
3204 if not attr.startswith("st_"):
3205 continue
3206 if attr in ("st_dev", "st_ino", "st_nlink"):
3207 continue
3208 self.assertEqual(getattr(stat1, attr),
3209 getattr(stat2, attr),
3210 (stat1, stat2, attr))
3211 else:
3212 self.assertEqual(stat1, stat2)
3213
3214 def check_entry(self, entry, name, is_dir, is_file, is_symlink):
Brett Cannona32c4d02016-06-24 14:14:44 -07003215 self.assertIsInstance(entry, os.DirEntry)
Victor Stinner6036e442015-03-08 01:58:04 +01003216 self.assertEqual(entry.name, name)
3217 self.assertEqual(entry.path, os.path.join(self.path, name))
3218 self.assertEqual(entry.inode(),
3219 os.stat(entry.path, follow_symlinks=False).st_ino)
3220
3221 entry_stat = os.stat(entry.path)
3222 self.assertEqual(entry.is_dir(),
3223 stat.S_ISDIR(entry_stat.st_mode))
3224 self.assertEqual(entry.is_file(),
3225 stat.S_ISREG(entry_stat.st_mode))
3226 self.assertEqual(entry.is_symlink(),
3227 os.path.islink(entry.path))
3228
3229 entry_lstat = os.stat(entry.path, follow_symlinks=False)
3230 self.assertEqual(entry.is_dir(follow_symlinks=False),
3231 stat.S_ISDIR(entry_lstat.st_mode))
3232 self.assertEqual(entry.is_file(follow_symlinks=False),
3233 stat.S_ISREG(entry_lstat.st_mode))
3234
3235 self.assert_stat_equal(entry.stat(),
3236 entry_stat,
3237 os.name == 'nt' and not is_symlink)
3238 self.assert_stat_equal(entry.stat(follow_symlinks=False),
3239 entry_lstat,
3240 os.name == 'nt')
3241
3242 def test_attributes(self):
3243 link = hasattr(os, 'link')
3244 symlink = support.can_symlink()
3245
3246 dirname = os.path.join(self.path, "dir")
3247 os.mkdir(dirname)
3248 filename = self.create_file("file.txt")
3249 if link:
xdegaye6a55d092017-11-12 17:57:04 +01003250 try:
3251 os.link(filename, os.path.join(self.path, "link_file.txt"))
3252 except PermissionError as e:
3253 self.skipTest('os.link(): %s' % e)
Victor Stinner6036e442015-03-08 01:58:04 +01003254 if symlink:
3255 os.symlink(dirname, os.path.join(self.path, "symlink_dir"),
3256 target_is_directory=True)
3257 os.symlink(filename, os.path.join(self.path, "symlink_file.txt"))
3258
3259 names = ['dir', 'file.txt']
3260 if link:
3261 names.append('link_file.txt')
3262 if symlink:
3263 names.extend(('symlink_dir', 'symlink_file.txt'))
3264 entries = self.get_entries(names)
3265
3266 entry = entries['dir']
3267 self.check_entry(entry, 'dir', True, False, False)
3268
3269 entry = entries['file.txt']
3270 self.check_entry(entry, 'file.txt', False, True, False)
3271
3272 if link:
3273 entry = entries['link_file.txt']
3274 self.check_entry(entry, 'link_file.txt', False, True, False)
3275
3276 if symlink:
3277 entry = entries['symlink_dir']
3278 self.check_entry(entry, 'symlink_dir', True, False, True)
3279
3280 entry = entries['symlink_file.txt']
3281 self.check_entry(entry, 'symlink_file.txt', False, True, True)
3282
3283 def get_entry(self, name):
Brett Cannon96881cd2016-06-10 14:37:21 -07003284 path = self.bytes_path if isinstance(name, bytes) else self.path
3285 entries = list(os.scandir(path))
Victor Stinner6036e442015-03-08 01:58:04 +01003286 self.assertEqual(len(entries), 1)
3287
3288 entry = entries[0]
3289 self.assertEqual(entry.name, name)
3290 return entry
3291
Brett Cannon96881cd2016-06-10 14:37:21 -07003292 def create_file_entry(self, name='file.txt'):
3293 filename = self.create_file(name=name)
Victor Stinner6036e442015-03-08 01:58:04 +01003294 return self.get_entry(os.path.basename(filename))
3295
3296 def test_current_directory(self):
3297 filename = self.create_file()
3298 old_dir = os.getcwd()
3299 try:
3300 os.chdir(self.path)
3301
3302 # call scandir() without parameter: it must list the content
3303 # of the current directory
3304 entries = dict((entry.name, entry) for entry in os.scandir())
3305 self.assertEqual(sorted(entries.keys()),
3306 [os.path.basename(filename)])
3307 finally:
3308 os.chdir(old_dir)
3309
3310 def test_repr(self):
3311 entry = self.create_file_entry()
3312 self.assertEqual(repr(entry), "<DirEntry 'file.txt'>")
3313
Brett Cannon96881cd2016-06-10 14:37:21 -07003314 def test_fspath_protocol(self):
3315 entry = self.create_file_entry()
3316 self.assertEqual(os.fspath(entry), os.path.join(self.path, 'file.txt'))
3317
3318 def test_fspath_protocol_bytes(self):
3319 bytes_filename = os.fsencode('bytesfile.txt')
3320 bytes_entry = self.create_file_entry(name=bytes_filename)
3321 fspath = os.fspath(bytes_entry)
3322 self.assertIsInstance(fspath, bytes)
3323 self.assertEqual(fspath,
3324 os.path.join(os.fsencode(self.path),bytes_filename))
3325
Victor Stinner6036e442015-03-08 01:58:04 +01003326 def test_removed_dir(self):
3327 path = os.path.join(self.path, 'dir')
3328
3329 os.mkdir(path)
3330 entry = self.get_entry('dir')
3331 os.rmdir(path)
3332
3333 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3334 if os.name == 'nt':
3335 self.assertTrue(entry.is_dir())
3336 self.assertFalse(entry.is_file())
3337 self.assertFalse(entry.is_symlink())
3338 if os.name == 'nt':
3339 self.assertRaises(FileNotFoundError, entry.inode)
3340 # don't fail
3341 entry.stat()
3342 entry.stat(follow_symlinks=False)
3343 else:
3344 self.assertGreater(entry.inode(), 0)
3345 self.assertRaises(FileNotFoundError, entry.stat)
3346 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3347
3348 def test_removed_file(self):
3349 entry = self.create_file_entry()
3350 os.unlink(entry.path)
3351
3352 self.assertFalse(entry.is_dir())
3353 # On POSIX, is_dir() result depends if scandir() filled d_type or not
3354 if os.name == 'nt':
3355 self.assertTrue(entry.is_file())
3356 self.assertFalse(entry.is_symlink())
3357 if os.name == 'nt':
3358 self.assertRaises(FileNotFoundError, entry.inode)
3359 # don't fail
3360 entry.stat()
3361 entry.stat(follow_symlinks=False)
3362 else:
3363 self.assertGreater(entry.inode(), 0)
3364 self.assertRaises(FileNotFoundError, entry.stat)
3365 self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False)
3366
3367 def test_broken_symlink(self):
3368 if not support.can_symlink():
3369 return self.skipTest('cannot create symbolic link')
3370
3371 filename = self.create_file("file.txt")
3372 os.symlink(filename,
3373 os.path.join(self.path, "symlink.txt"))
3374 entries = self.get_entries(['file.txt', 'symlink.txt'])
3375 entry = entries['symlink.txt']
3376 os.unlink(filename)
3377
3378 self.assertGreater(entry.inode(), 0)
3379 self.assertFalse(entry.is_dir())
3380 self.assertFalse(entry.is_file()) # broken symlink returns False
3381 self.assertFalse(entry.is_dir(follow_symlinks=False))
3382 self.assertFalse(entry.is_file(follow_symlinks=False))
3383 self.assertTrue(entry.is_symlink())
3384 self.assertRaises(FileNotFoundError, entry.stat)
3385 # don't fail
3386 entry.stat(follow_symlinks=False)
3387
3388 def test_bytes(self):
Victor Stinner6036e442015-03-08 01:58:04 +01003389 self.create_file("file.txt")
3390
3391 path_bytes = os.fsencode(self.path)
3392 entries = list(os.scandir(path_bytes))
3393 self.assertEqual(len(entries), 1, entries)
3394 entry = entries[0]
3395
3396 self.assertEqual(entry.name, b'file.txt')
3397 self.assertEqual(entry.path,
3398 os.fsencode(os.path.join(self.path, 'file.txt')))
3399
Serhiy Storchaka1180e5a2017-07-11 06:36:46 +03003400 def test_bytes_like(self):
3401 self.create_file("file.txt")
3402
3403 for cls in bytearray, memoryview:
3404 path_bytes = cls(os.fsencode(self.path))
3405 with self.assertWarns(DeprecationWarning):
3406 entries = list(os.scandir(path_bytes))
3407 self.assertEqual(len(entries), 1, entries)
3408 entry = entries[0]
3409
3410 self.assertEqual(entry.name, b'file.txt')
3411 self.assertEqual(entry.path,
3412 os.fsencode(os.path.join(self.path, 'file.txt')))
3413 self.assertIs(type(entry.name), bytes)
3414 self.assertIs(type(entry.path), bytes)
3415
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003416 @unittest.skipUnless(os.listdir in os.supports_fd,
3417 'fd support for listdir required for this test.')
3418 def test_fd(self):
3419 self.assertIn(os.scandir, os.supports_fd)
3420 self.create_file('file.txt')
3421 expected_names = ['file.txt']
3422 if support.can_symlink():
3423 os.symlink('file.txt', os.path.join(self.path, 'link'))
3424 expected_names.append('link')
3425
3426 fd = os.open(self.path, os.O_RDONLY)
3427 try:
3428 with os.scandir(fd) as it:
3429 entries = list(it)
3430 names = [entry.name for entry in entries]
3431 self.assertEqual(sorted(names), expected_names)
3432 self.assertEqual(names, os.listdir(fd))
3433 for entry in entries:
3434 self.assertEqual(entry.path, entry.name)
3435 self.assertEqual(os.fspath(entry), entry.name)
3436 self.assertEqual(entry.is_symlink(), entry.name == 'link')
3437 if os.stat in os.supports_dir_fd:
3438 st = os.stat(entry.name, dir_fd=fd)
3439 self.assertEqual(entry.stat(), st)
3440 st = os.stat(entry.name, dir_fd=fd, follow_symlinks=False)
3441 self.assertEqual(entry.stat(follow_symlinks=False), st)
3442 finally:
3443 os.close(fd)
3444
Victor Stinner6036e442015-03-08 01:58:04 +01003445 def test_empty_path(self):
3446 self.assertRaises(FileNotFoundError, os.scandir, '')
3447
3448 def test_consume_iterator_twice(self):
3449 self.create_file("file.txt")
3450 iterator = os.scandir(self.path)
3451
3452 entries = list(iterator)
3453 self.assertEqual(len(entries), 1, entries)
3454
3455 # check than consuming the iterator twice doesn't raise exception
3456 entries2 = list(iterator)
3457 self.assertEqual(len(entries2), 0, entries2)
3458
3459 def test_bad_path_type(self):
Serhiy Storchakaea720fe2017-03-30 09:12:31 +03003460 for obj in [1.234, {}, []]:
Victor Stinner6036e442015-03-08 01:58:04 +01003461 self.assertRaises(TypeError, os.scandir, obj)
3462
Serhiy Storchakaffe96ae2016-02-11 13:21:30 +02003463 def test_close(self):
3464 self.create_file("file.txt")
3465 self.create_file("file2.txt")
3466 iterator = os.scandir(self.path)
3467 next(iterator)
3468 iterator.close()
3469 # multiple closes
3470 iterator.close()
3471 with self.check_no_resource_warning():
3472 del iterator
3473
3474 def test_context_manager(self):
3475 self.create_file("file.txt")
3476 self.create_file("file2.txt")
3477 with os.scandir(self.path) as iterator:
3478 next(iterator)
3479 with self.check_no_resource_warning():
3480 del iterator
3481
3482 def test_context_manager_close(self):
3483 self.create_file("file.txt")
3484 self.create_file("file2.txt")
3485 with os.scandir(self.path) as iterator:
3486 next(iterator)
3487 iterator.close()
3488
3489 def test_context_manager_exception(self):
3490 self.create_file("file.txt")
3491 self.create_file("file2.txt")
3492 with self.assertRaises(ZeroDivisionError):
3493 with os.scandir(self.path) as iterator:
3494 next(iterator)
3495 1/0
3496 with self.check_no_resource_warning():
3497 del iterator
3498
3499 def test_resource_warning(self):
3500 self.create_file("file.txt")
3501 self.create_file("file2.txt")
3502 iterator = os.scandir(self.path)
3503 next(iterator)
3504 with self.assertWarns(ResourceWarning):
3505 del iterator
3506 support.gc_collect()
3507 # exhausted iterator
3508 iterator = os.scandir(self.path)
3509 list(iterator)
3510 with self.check_no_resource_warning():
3511 del iterator
3512
Victor Stinner6036e442015-03-08 01:58:04 +01003513
Ethan Furmancdc08792016-06-02 15:06:09 -07003514class TestPEP519(unittest.TestCase):
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003515
3516 # Abstracted so it can be overridden to test pure Python implementation
3517 # if a C version is provided.
3518 fspath = staticmethod(os.fspath)
3519
Ethan Furmancdc08792016-06-02 15:06:09 -07003520 def test_return_bytes(self):
3521 for b in b'hello', b'goodbye', b'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003522 self.assertEqual(b, self.fspath(b))
Ethan Furmancdc08792016-06-02 15:06:09 -07003523
3524 def test_return_string(self):
3525 for s in 'hello', 'goodbye', 'some/path/and/file':
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003526 self.assertEqual(s, self.fspath(s))
Ethan Furmancdc08792016-06-02 15:06:09 -07003527
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003528 def test_fsencode_fsdecode(self):
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003529 for p in "path/like/object", b"path/like/object":
Brett Cannonec6ce872016-09-06 15:50:29 -07003530 pathlike = _PathLike(p)
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003531
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003532 self.assertEqual(p, self.fspath(pathlike))
Ethan Furmanc1cbeed2016-06-04 10:19:27 -07003533 self.assertEqual(b"path/like/object", os.fsencode(pathlike))
3534 self.assertEqual("path/like/object", os.fsdecode(pathlike))
3535
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003536 def test_pathlike(self):
Brett Cannonec6ce872016-09-06 15:50:29 -07003537 self.assertEqual('#feelthegil', self.fspath(_PathLike('#feelthegil')))
3538 self.assertTrue(issubclass(_PathLike, os.PathLike))
3539 self.assertTrue(isinstance(_PathLike(), os.PathLike))
Ethan Furman410ef8e2016-06-04 12:06:26 -07003540
Ethan Furmancdc08792016-06-02 15:06:09 -07003541 def test_garbage_in_exception_out(self):
3542 vapor = type('blah', (), {})
3543 for o in int, type, os, vapor():
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003544 self.assertRaises(TypeError, self.fspath, o)
Ethan Furmancdc08792016-06-02 15:06:09 -07003545
3546 def test_argument_required(self):
Brett Cannon044283a2016-07-15 10:41:49 -07003547 self.assertRaises(TypeError, self.fspath)
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003548
Brett Cannon044283a2016-07-15 10:41:49 -07003549 def test_bad_pathlike(self):
3550 # __fspath__ returns a value other than str or bytes.
Brett Cannonec6ce872016-09-06 15:50:29 -07003551 self.assertRaises(TypeError, self.fspath, _PathLike(42))
Brett Cannon044283a2016-07-15 10:41:49 -07003552 # __fspath__ attribute that is not callable.
3553 c = type('foo', (), {})
3554 c.__fspath__ = 1
3555 self.assertRaises(TypeError, self.fspath, c())
3556 # __fspath__ raises an exception.
Brett Cannon044283a2016-07-15 10:41:49 -07003557 self.assertRaises(ZeroDivisionError, self.fspath,
Brett Cannonec6ce872016-09-06 15:50:29 -07003558 _PathLike(ZeroDivisionError()))
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003559
Victor Stinnerc29b5852017-11-02 07:28:27 -07003560
3561class TimesTests(unittest.TestCase):
3562 def test_times(self):
3563 times = os.times()
3564 self.assertIsInstance(times, os.times_result)
3565
3566 for field in ('user', 'system', 'children_user', 'children_system',
3567 'elapsed'):
3568 value = getattr(times, field)
3569 self.assertIsInstance(value, float)
3570
3571 if os.name == 'nt':
3572 self.assertEqual(times.children_user, 0)
3573 self.assertEqual(times.children_system, 0)
3574 self.assertEqual(times.elapsed, 0)
3575
3576
Brett Cannonc78ca1e2016-06-24 12:03:43 -07003577# Only test if the C version is provided, otherwise TestPEP519 already tested
3578# the pure Python implementation.
3579if hasattr(os, "_fspath"):
3580 class TestPEP519PurePython(TestPEP519):
3581
3582 """Explicitly test the pure Python implementation of os.fspath()."""
3583
3584 fspath = staticmethod(os._fspath)
Ethan Furmancdc08792016-06-02 15:06:09 -07003585
3586
Fred Drake2e2be372001-09-20 21:33:42 +00003587if __name__ == "__main__":
R David Murrayf2ad1732014-12-25 18:36:56 -05003588 unittest.main()